Skip to content

Commit

Permalink
Merge pull request #11 from jehiah/concurrent_mutations_11
Browse files Browse the repository at this point in the history
bug: fix concurrent same-row mutations
  • Loading branch information
jehiah authored Feb 14, 2024
2 parents 3280dad + e1899ee commit f5c75b0
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 8 deletions.
25 changes: 17 additions & 8 deletions bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo
// Read all rows
tbl.rows.Ascend(addRow)
}
gcRules := tbl.gcRules()
gcRules := tbl.gcRulesNoLock()
tbl.mu.RUnlock()

rows := make([]*row, 0, len(rowSet))
Expand Down Expand Up @@ -883,12 +883,15 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt
return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
}
fs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
r := tbl.mutableRow(string(req.RowKey))
if err := applyMutations(tbl, r, req.Mutations, fs); err != nil {
return nil, err
}
// JIT per-row GC
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion
for f, _ := range r.families {
if _, ok := fs[f]; !ok {
Expand Down Expand Up @@ -920,7 +923,8 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
for i, entry := range req.Entries {
r := tbl.mutableRow(string(entry.RowKey))
code, msg := int32(codes.OK), ""
Expand All @@ -932,7 +936,7 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
Index: int64(i),
Status: &statpb.Status{Code: code, Message: msg},
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand All @@ -954,7 +958,8 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
res := &btpb.CheckAndMutateRowResponse{}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
r := tbl.mutableRow(string(req.RowKey))

// Figure out which mutation to apply.
Expand Down Expand Up @@ -982,7 +987,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
if err := applyMutations(tbl, r, muts, cfs); err != nil {
return nil, err
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand Down Expand Up @@ -1119,7 +1124,8 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
rowKey := string(req.RowKey)
r := tbl.mutableRow(rowKey)
resultRow := newRow(rowKey) // copy of updated cells
Expand Down Expand Up @@ -1177,7 +1183,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
resultFamily.cellsByColumn(col) // create the column
resultFamily.Cells[col] = []cell{newCell} // overwrite the cells
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand Down Expand Up @@ -1318,7 +1324,10 @@ func (t *table) gcRules() map[string]*btapb.GcRule {
// This method doesn't add or remove rows, so we only need a read lock for the table.
t.mu.RLock()
defer t.mu.RUnlock()
return t.gcRulesNoLock()
}

func (t *table) gcRulesNoLock() map[string]*btapb.GcRule {
// Gather GC rules we'll apply.
rules := make(map[string]*btapb.GcRule) // keyed by "fam"
for fam, cf := range t.families {
Expand Down
98 changes: 98 additions & 0 deletions bttest/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,104 @@ func TestConcurrentMutationsReadModifyAndGC(t *testing.T) {
}
}

func TestConcurrentMutations(t *testing.T) {
// 50 concurrent mutations of different cells on the same row
// expect all 50 values after
s := newTestServer(t)
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
defer cancel()
if _, err := s.CreateTable(
ctx,
&btapb.CreateTableRequest{Parent: "c", TableId: "t"}); err != nil {
t.Fatal(err)
}
const name = `c/tables/t`
req := &btapb.ModifyColumnFamiliesRequest{
Name: name,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
}},
}
_, err := s.ModifyColumnFamilies(ctx, req)
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
ms := func(i int) []*btpb.Mutation {
return []*btpb.Mutation{{
Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
FamilyName: "cf",
ColumnQualifier: []byte(fmt.Sprintf("%d", i)),
Value: []byte(fmt.Sprintf("%d", i)),
TimestampMicros: 1000,
}},
}}
}

rowKey := []byte("rowkey")
start := make(chan bool)
for i := 0; i < 50; i++ {
i := i
wg.Add(1)
go func(i int) {
defer wg.Done()
<-start
for ctx.Err() == nil {
req := &btpb.MutateRowRequest{
TableName: name,
RowKey: rowKey,
Mutations: ms(i),
}
if _, err := s.MutateRow(ctx, req); err != nil {
panic(err) // can't use t.Fatal in goroutine
}
}
}(i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
close(start)
select {
case <-done:
case <-time.After(1 * time.Second):
t.Error("Concurrent mutations haven't completed after 1s")
}

// verify
mock := &MockReadRowsServer{}
rreq := &btpb.ReadRowsRequest{TableName: name}
if err = s.ReadRows(rreq, mock); err != nil {
t.Fatalf("ReadRows error: %v", err)
}
if len(mock.responses) != 1 {
t.Fatal("Response count: got 0, want 1")
}
if len(mock.responses[0].Chunks) != 50 {
t.Errorf("Chunk count: got %d, want 50", len(mock.responses[0].Chunks))
}

var gotChunks []*btpb.ReadRowsResponse_CellChunk
for _, res := range mock.responses {
gotChunks = append(gotChunks, res.Chunks...)
}
var seen []string
for i, c := range gotChunks {
if !bytes.Equal(c.RowKey, rowKey) {
t.Fatalf("expected row %q got %q", c.RowKey, rowKey)
}
if !bytes.Equal(c.Qualifier.Value, c.Value) {
t.Fatalf("[%d] expected equal got %q %q", i, c.Qualifier.Value, c.Value)
}
seen = append(seen, string(c.Qualifier.Value))
}
sort.Strings(seen)
t.Logf("seen %#v", seen)
}

func TestCreateTableResponse(t *testing.T) {
// We need to ensure that invoking CreateTable returns
// the ColumnFamilies as well as Granularity.
Expand Down

0 comments on commit f5c75b0

Please sign in to comment.