Skip to content

Commit

Permalink
testing: add multiplex integration test (#7237)
Browse files Browse the repository at this point in the history
* testing: add multiplex integration test

This adds a quick validation test that writes to multiple tables in an
interleaved fashion.  Until the multiplexing is enabled this represents
writes on three independent connections.  With multiplexing enabled this
ensures we're correctly processing writes on the same connection.
  • Loading branch information
shollyman authored Jan 11, 2023
1 parent 97f3be6 commit c6a3da2
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,3 +1159,133 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,
t.Errorf("error in response: %v", err)
}
}

func TestIntegration_MultiplexWrites(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

wantWrites := 10

testTables := []struct {
tbl *bigquery.Table
schema bigquery.Schema
dp *descriptorpb.DescriptorProto
sampleRow []byte
constraints []constraintOption
}{
{
tbl: dataset.Table(tableIDs.New()),
schema: testdata.SimpleMessageSchema,
dp: func() *descriptorpb.DescriptorProto {
m := &testdata.SimpleMessageProto2{}
dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
return dp
}(),
sampleRow: func() []byte {
msg := &testdata.SimpleMessageProto2{
Name: proto.String("sample_name"),
Value: proto.Int64(1001),
}
b, _ := proto.Marshal(msg)
return b
}(),
constraints: []constraintOption{
withExactRowCount(int64(wantWrites)),
withStringValueCount("name", "sample_name", int64(wantWrites)),
},
},
{
tbl: dataset.Table(tableIDs.New()),
schema: testdata.ValidationBaseSchema,
dp: func() *descriptorpb.DescriptorProto {
m := &testdata.ValidationP2Optional{}
dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
return dp
}(),
sampleRow: func() []byte {
msg := &testdata.ValidationP2Optional{
Int64Field: proto.Int64(69),
StringField: proto.String("validation_string"),
}
b, _ := proto.Marshal(msg)
return b
}(),
constraints: []constraintOption{
withExactRowCount(int64(wantWrites)),
withStringValueCount("string_field", "validation_string", int64(wantWrites)),
},
},
{
tbl: dataset.Table(tableIDs.New()),
schema: testdata.GithubArchiveSchema,
dp: func() *descriptorpb.DescriptorProto {
m := &testdata.GithubArchiveMessageProto2{}
dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
return dp
}(),
sampleRow: func() []byte {
msg := &testdata.GithubArchiveMessageProto2{
Payload: proto.String("payload_string"),
Id: proto.String("some_id"),
}
b, _ := proto.Marshal(msg)
return b
}(),
constraints: []constraintOption{
withExactRowCount(int64(wantWrites)),
withStringValueCount("payload", "payload_string", int64(wantWrites)),
},
},
}

// setup tables
for _, testTable := range testTables {
if err := testTable.tbl.Create(ctx, &bigquery.TableMetadata{Schema: testTable.schema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.tbl.FullyQualifiedName(), err)
}
}

var results []*AppendResult
for i := 0; i < wantWrites; i++ {
for k, testTable := range testTables {
// create a writer and send a single append
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.tbl.ProjectID, testTable.tbl.DatasetID, testTable.tbl.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(testTable.dp),
)
defer ms.Close() // we won't clean these up until the end of the test, rather than per use.
if err != nil {
t.Fatalf("failed to create ManagedStream for table %d on iteration %d: %v", k, i, err)
}
res, err := ms.AppendRows(ctx, [][]byte{testTable.sampleRow})
if err != nil {
t.Errorf("failed to append to table %d on iteration %d: %v", k, i, err)
}
results = append(results, res)
}
}

// drain results
for k, res := range results {
if _, err := res.GetResult(ctx); err != nil {
t.Errorf("result %d yielded error: %v", k, err)
}
}

// validate the tables
for _, testTable := range testTables {
validateTableConstraints(ctx, t, bqClient, testTable.tbl, "", testTable.constraints...)
}

}

0 comments on commit c6a3da2

Please sign in to comment.