diff --git a/bigtable/internal/testproxy/proxy.go b/bigtable/internal/testproxy/proxy.go index be5b96d6e489..4247030bc3e1 100644 --- a/bigtable/internal/testproxy/proxy.go +++ b/bigtable/internal/testproxy/proxy.go @@ -17,6 +17,7 @@ package main import ( "context" "crypto/x509" + "errors" "flag" "fmt" "log" @@ -346,6 +347,27 @@ func statusFromError(err error) *statpb.Status { return st } +// parseTableID extracts a table ID from a table name. +// For example, a table ID is in the format projects//instances//tables/ +// +// Note that this function does not check all variants and edge cases. It assumes +// that the test suite used with the test proxy sends *generally* correct requests. +func parseTableID(tableName string) (tableID string, _ error) { + paths := strings.Split(tableName, "/") + + if len(paths) < 6 { + return "", errors.New("table resource name does not have the correct format") + } + + tableID = paths[len(paths)-1] + var err error + if tableID == "" { + err = errors.New("cannot read tableID from table name") + } + + return tableID, err +} + // testClient contains a bigtable.Client object, cancel functions for the calls // made using the client, an appProfileID (optionally), and a // perOperationTimeout (optionally). @@ -507,15 +529,15 @@ type goTestProxyServer struct { // client retrieves a testClient from the clientIDs map. You must lock clientsLock before calling // this method. -func (s *goTestProxyServer) client(clientID string) (*testClient, bool) { +func (s *goTestProxyServer) client(clientID string) (*testClient, error) { client, ok := s.clientIDs[clientID] if !ok { - return nil, false + return nil, fmt.Errorf("client ID %s does not exist", clientID) } if !client.isOpen { - return nil, false + return nil, fmt.Errorf("client ID %s is closed to new requests", clientID) } - return client, true + return client, nil } // CreateClient responds to the CreateClient RPC. This method adds a new client @@ -571,10 +593,9 @@ func (s *goTestProxyServer) CloseClient(ctx context.Context, req *pb.CloseClient s.clientsLock.Lock() defer s.clientsLock.Unlock() - btc, exists := s.client(clientID) - if !exists { - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + btc, err := s.client(clientID) + if err != nil { + return nil, err } btc.isOpen = false @@ -608,16 +629,17 @@ func (s *goTestProxyServer) RemoveClient(ctx context.Context, req *pb.RemoveClie // data for a single row in the Table. func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest) (*pb.RowResult, error) { s.clientsLock.RLock() - btc, exists := s.client(req.ClientId) + btc, err := s.client(req.ClientId) + if err != nil { + return nil, err + } s.clientsLock.RUnlock() - if !exists { - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + tid, err := parseTableID(req.TableName) + if err != nil { + return nil, err } - - tName := req.TableName - t := btc.c.Open(tName) + t := btc.c.Open(tid) res := &pb.RowResult{ Status: &statpb.Status{ @@ -652,13 +674,11 @@ func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest) // data for a set of rows, a range of rows, or the entire table. func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsRequest) (*pb.RowsResult, error) { s.clientsLock.RLock() - btc, exists := s.client(req.ClientId) + btc, err := s.client(req.ClientId) s.clientsLock.RUnlock() - if !exists { - log.Printf("bad client ID: %v\n", req.ClientId) - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + if err != nil { + return nil, err } rrq := req.GetRequest() @@ -669,7 +689,11 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques } - t := btc.c.Open(rrq.TableName) + tid, err := parseTableID(rrq.TableName) + if err != nil { + return nil, err + } + t := btc.c.Open(tid) rowPbs := rrq.Rows rs := rowSetFromProto(rowPbs) @@ -687,7 +711,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques var c int32 var rowsPb []*btpb.Row lim := req.GetCancelAfterRows() - err := t.ReadRows(ctx, rs, func(r bigtable.Row) bool { + err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool { c++ if c == lim { @@ -728,12 +752,11 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques // changes (or deletions) to a single row in a table. func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequest) (*pb.MutateRowResult, error) { s.clientsLock.RLock() - btc, exists := s.client(req.ClientId) + btc, err := s.client(req.ClientId) s.clientsLock.RUnlock() - if !exists { - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + if err != nil { + return nil, err } rrq := req.GetRequest() @@ -744,7 +767,11 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ mPbs := rrq.Mutations m := mutationFromProto(mPbs) - t := btc.c.Open(rrq.TableName) + tid, err := parseTableID(rrq.TableName) + if err != nil { + return nil, err + } + t := btc.c.Open(tid) row := rrq.RowKey res := &pb.MutateRowResult{ @@ -756,7 +783,7 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ ctx, cancel := btc.timeout(ctx) defer cancel() - err := t.Apply(ctx, string(row), m) + err = t.Apply(ctx, string(row), m) if err != nil { res.Status = statusFromError(err) return res, nil @@ -769,13 +796,11 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ // series of changes or deletions to multiple rows in a single call. func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRowsRequest) (*pb.MutateRowsResult, error) { s.clientsLock.RLock() - btc, exists := s.client(req.ClientId) + btc, err := s.client(req.ClientId) s.clientsLock.RUnlock() - if !exists { - log.Printf("received invalid client ID: %s\n", req.ClientId) - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + if err != nil { + return nil, err } rrq := req.GetRequest() @@ -785,7 +810,11 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo } mrs := rrq.Entries - t := btc.c.Open(rrq.TableName) + tid, err := parseTableID(rrq.TableName) + if err != nil { + return nil, err + } + t := btc.c.Open(tid) keys := make([]string, len(mrs)) muts := make([]*bigtable.Mutation, len(mrs)) @@ -840,13 +869,11 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo // one mutation if a condition is true and another mutation if it is false. func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.CheckAndMutateRowRequest) (*pb.CheckAndMutateRowResult, error) { s.clientsLock.RLock() - btc, exists := s.client(req.ClientId) + btc, err := s.client(req.ClientId) s.clientsLock.RUnlock() - if !exists { - log.Printf("received invalid ClientID: %s\n", req.ClientId) - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + if err != nil { + return nil, err } rrq := req.GetRequest() @@ -873,7 +900,11 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check }, } - t := btc.c.Open(rrq.TableName) + tid, err := parseTableID(rrq.TableName) + if err != nil { + return nil, err + } + t := btc.c.Open(tid) rowKey := string(rrq.RowKey) var matched bool @@ -882,7 +913,7 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check ctx, cancel := btc.timeout(ctx) defer cancel() - err := t.Apply(ctx, rowKey, c, ao) + err = t.Apply(ctx, rowKey, c, ao) if err != nil { log.Printf("received error from Table.Apply: %v", err) res.Status = statusFromError(err) @@ -900,13 +931,11 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check // of the keys available in a table. func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRowKeysRequest) (*pb.SampleRowKeysResult, error) { s.clientsLock.RLock() - btc, exists := s.client(req.ClientId) + btc, err := s.client(req.ClientId) s.clientsLock.RUnlock() - if !exists { - log.Printf("received invalid client ID: %s\n", req.ClientId) - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + if err != nil { + return nil, err } rrq := req.GetRequest() @@ -924,7 +953,11 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow ctx, cancel := btc.timeout(ctx) defer cancel() - t := btc.c.Open(rrq.TableName) + tid, err := parseTableID(rrq.TableName) + if err != nil { + return nil, err + } + t := btc.c.Open(tid) keys, err := t.SampleRowKeys(ctx) if err != nil { log.Printf("received error from Table.SampleRowKeys(): %v\n", err) @@ -949,13 +982,11 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow // applies a non-idempotent change to a row. func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.ReadModifyWriteRowRequest) (*pb.RowResult, error) { s.clientsLock.RLock() - btc, exists := s.client(req.ClientId) + btc, err := s.client(req.ClientId) s.clientsLock.RUnlock() - if !exists { - log.Printf("received invalid client ID: %s\n", req.ClientId) - return nil, stat.Error(codes.InvalidArgument, - fmt.Sprintf("%s: ClientID does not exist", logLabel)) + if err != nil { + return nil, err } rrq := req.GetRequest() @@ -984,7 +1015,11 @@ func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.Read }, } - t := btc.c.Open(rrq.TableName) + tid, err := parseTableID(rrq.TableName) + if err != nil { + return nil, err + } + t := btc.c.Open(tid) k := string(rrq.RowKey) ctx, cancel := btc.timeout(ctx) diff --git a/bigtable/internal/testproxy/proxy_test.go b/bigtable/internal/testproxy/proxy_test.go index ee1cd07483c5..bfd46ef165c2 100644 --- a/bigtable/internal/testproxy/proxy_test.go +++ b/bigtable/internal/testproxy/proxy_test.go @@ -36,7 +36,8 @@ import ( const ( buffer = 1024 * 1024 - tableName = "table" + tableName = "projects/my-project/instances/my-instance/tables/table" + tableID = "table" columnFamily = "cf" testProxyClient = "testProxyClient" testProxyAddress = "localhost:9990" @@ -70,7 +71,7 @@ func populateTable(bts *bttest.Server) error { } defer adminClient.Close() - if err := adminClient.CreateTable(ctx, tableName); err != nil { + if err := adminClient.CreateTable(ctx, tableID); err != nil { return fmt.Errorf("testproxy setup: can't create table: %v", err) } @@ -78,7 +79,7 @@ func populateTable(bts *bttest.Server) error { count := 3 for i := 0; i < count; i++ { cfName := fmt.Sprintf("%s%d", columnFamily, i) - if err := adminClient.CreateColumnFamily(ctx, tableName, cfName); err != nil { + if err := adminClient.CreateColumnFamily(ctx, tableID, cfName); err != nil { return fmt.Errorf("testproxy setup: can't create column family: %s", cfName) } } @@ -90,7 +91,7 @@ func populateTable(bts *bttest.Server) error { } defer dataClient.Close() - t := dataClient.Open(tableName) + t := dataClient.Open(tableID) for fc := 0; fc < count; fc++ { for cc := count; cc > 0; cc-- {