Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): minor ease-of-use improvements (#…
Browse files Browse the repository at this point in the history
…5660)

* feat(bigquery/storage/managedwriter): minor ease-of-use improvements

This PR updates the reference for default streams to use the newer
.../streams/_default form (though both are accepted), and adds a
utility function for constructing parent references easily.


Co-authored-by: Steffany Brown <[email protected]>
  • Loading branch information
shollyman and steffnay authored Feb 15, 2022
1 parent 5fa9819 commit d253c24
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
8 changes: 7 additions & 1 deletion bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient

if ms.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
streamName := fmt.Sprintf("%s/streams/_default", ms.destinationTable)
if ms.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Expand Down Expand Up @@ -222,3 +222,9 @@ func TableParentFromStreamName(streamName string) string {
}
return strings.Join(parts[:6], "/")
}

// TableParentFromParts constructs a table identifier using individual identifiers and
// returns a string in the form "projects/{project}/datasets/{dataset}/tables/{table}".
func TableParentFromParts(projectID, datasetID, tableID string) string {
return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
}
16 changes: 8 additions & 8 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
Expand Down Expand Up @@ -240,7 +240,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
md, descriptorProto := setupDynamicDescriptors(t, testdata.SimpleMessageSchema)

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
Expand Down Expand Up @@ -302,7 +302,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(BufferedStream),
WithSchemaDescriptor(descriptorProto),
)
Expand Down Expand Up @@ -363,7 +363,7 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
Expand Down Expand Up @@ -408,7 +408,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(PendingStream),
WithSchemaDescriptor(descriptorProto),
)
Expand Down Expand Up @@ -489,7 +489,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
Expand Down Expand Up @@ -560,7 +560,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
Expand Down Expand Up @@ -746,7 +746,7 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(dp),
)
Expand Down

0 comments on commit d253c24

Please sign in to comment.