Skip to content

Commit

Permalink
Add mocks for bqstream and refactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
koladilip committed Oct 12, 2022
1 parent 6ef200d commit 6926586
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 39 deletions.
64 changes: 64 additions & 0 deletions mocks/services/streammanager/bqstream/mock_bqstream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 38 additions & 19 deletions services/streammanager/bqstream/bqstreammanager.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:generate mockgen -destination=../../../mocks/services/streammanager/bqstream/mock_bqstream.go -package mock_bqstream github.com/rudderlabs/rudder-server/services/streammanager/bqstream BQClient

package bqstream

import (
Expand Down Expand Up @@ -25,15 +27,33 @@ type Config struct {
TableId string `json:"tableId"`
}

// https://stackoverflow.com/questions/55951812/insert-into-bigquery-without-a-well-defined-struct
type GenericRecord map[string]bigquery.Value

type BQClient interface {
Put(ctx context.Context, datasetID string, tableID string, records []*GenericRecord) error
Close() error
}

type BQStreamProducer struct {
Opts common.Opts
Client BQClient
}

type Client struct {
bqClient *bigquery.Client
opts common.Opts
}

// https://stackoverflow.com/questions/55951812/insert-into-bigquery-without-a-well-defined-struct
type genericRecord map[string]bigquery.Value
func (c *Client) Put(ctx context.Context, datasetID string, tableID string, records []*GenericRecord) error {
bqInserter := c.bqClient.Dataset(datasetID).Table(tableID).Inserter()
return bqInserter.Put(ctx, records)
}

func (c *Client) Close() error {
return c.bqClient.Close()
}

func (rec genericRecord) Save() (map[string]bigquery.Value, string, error) {
func (rec GenericRecord) Save() (map[string]bigquery.Value, string, error) {
var insertID string
if columnVal, isInsertIdPresent := rec["insertId"]; isInsertIdPresent {
insertID = columnVal.(string)
Expand All @@ -44,12 +64,12 @@ func (rec genericRecord) Save() (map[string]bigquery.Value, string, error) {

var pkgLogger logger.Logger

func init() {
func Init() {
pkgLogger = logger.NewLogger().Child("streammanager").Child("bqstream")
}

type BQStreamProducer struct {
client *Client
func init() {
Init()
}

func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*BQStreamProducer, error) {
Expand Down Expand Up @@ -78,37 +98,37 @@ func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*BQStr
if err != nil {
return nil, err
}
return &BQStreamProducer{client: &Client{bqClient: bqClient, opts: o}}, nil
return &BQStreamProducer{Client: &Client{bqClient: bqClient}, Opts: o}, nil
}

func (producer *BQStreamProducer) Produce(jsonData json.RawMessage, _ interface{}) (statusCode int, respStatus, responseMessage string) {
client := producer.client
bqClient := client.bqClient
o := client.opts
client := producer.Client
if client == nil {
return http.StatusBadRequest, "Failure", "[BQStream] error :: invalid client"
}
parsedJSON := gjson.ParseBytes(jsonData)
dsId := parsedJSON.Get("datasetId").String()
tblId := parsedJSON.Get("tableId").String()
props := parsedJSON.Get("properties")

var genericRecs []*genericRecord
var genericRecs []*GenericRecord
if props.IsArray() {
err := json.Unmarshal([]byte(props.String()), &genericRecs)
if err != nil {
return http.StatusBadRequest, "Failure", createErr(err, "error in unmarshalling data").Error()
}
} else {
var genericRec *genericRecord
var genericRec *GenericRecord
err := json.Unmarshal([]byte(props.String()), &genericRec)
if err != nil {
return http.StatusBadRequest, "Failure", createErr(err, "error in unmarshalling data").Error()
}
genericRecs = append(genericRecs, genericRec)
}

bqInserter := bqClient.Dataset(dsId).Table(tblId).Inserter()
ctx, cancel := context.WithTimeout(context.Background(), o.Timeout)
ctx, cancel := context.WithTimeout(context.Background(), producer.Opts.Timeout)
defer cancel()
err := bqInserter.Put(ctx, genericRecs)
err := client.Put(ctx, dsId, tblId, genericRecs)
if err != nil {
if ctx.Err() != nil && errors.Is(err, context.DeadlineExceeded) {
return http.StatusGatewayTimeout, "Failure", createErr(err, "timeout in data insertion").Error()
Expand All @@ -120,13 +140,12 @@ func (producer *BQStreamProducer) Produce(jsonData json.RawMessage, _ interface{
}

func (producer *BQStreamProducer) Close() error {
client := producer.client
client := producer.Client
if client == nil {
return createErr(nil, "error while trying to close the client")
}
bqClient := client.bqClient

err := bqClient.Close()
err := client.Close()
if err != nil {
return createErr(err, "error while closing the client")
}
Expand Down
77 changes: 57 additions & 20 deletions services/streammanager/bqstream/bqstreammanager_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package bqstream
package bqstream_test

import (
"encoding/json"
"errors"
"os"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/rudderlabs/rudder-server/config"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
mock_logger "github.com/rudderlabs/rudder-server/mocks/utils/logger"
mock_bqstream "github.com/rudderlabs/rudder-server/mocks/services/streammanager/bqstream"
"github.com/rudderlabs/rudder-server/utils/logger"

"github.com/rudderlabs/rudder-server/services/streammanager/bqstream"
"github.com/rudderlabs/rudder-server/services/streammanager/common"
"github.com/stretchr/testify/assert"
)
Expand All @@ -18,12 +24,20 @@ type BigQueryCredentials struct {
Credentials map[string]interface{} `json:"credentials"`
}

func TestTimeout(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockLogger := mock_logger.NewMockLogger(mockCtrl)
mockLogger.EXPECT().Errorf(gomock.Any(), gomock.Any()).AnyTimes()
pkgLogger = mockLogger
var (
once sync.Once
)

func initBQTest() {
once.Do(func() {
config.Reset()
logger.Reset()
bqstream.Init()
})
}

func TestTimeout(t *testing.T) {
initBQTest()
cred := os.Getenv("BIGQUERY_INTEGRATION_TEST_USER_CRED")
if cred == "" {
t.Skip("Skipping bigquery test, since no credentials are available in the environment")
Expand All @@ -40,7 +54,7 @@ func TestTimeout(t *testing.T) {
"ProjectId": bqCredentials.ProjectID,
}
destination := backendconfig.DestinationT{Config: config}
producer, err := NewProducer(&destination, common.Opts{Timeout: 1 * time.Microsecond})
producer, err := bqstream.NewProducer(&destination, common.Opts{Timeout: 1 * time.Microsecond})
if err != nil {
t.Errorf(" %+v", err)
return
Expand Down Expand Up @@ -73,11 +87,7 @@ func TestTimeout(t *testing.T) {
}

func TestUnsupportedCredentials(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockLogger := mock_logger.NewMockLogger(mockCtrl)
mockLogger.EXPECT().Errorf(gomock.Any(), gomock.Any()).AnyTimes()
pkgLogger = mockLogger

initBQTest()
var bqCredentials BigQueryCredentials
var err error
err = json.Unmarshal(
Expand Down Expand Up @@ -105,18 +115,14 @@ func TestUnsupportedCredentials(t *testing.T) {
"ProjectId": bqCredentials.ProjectID,
}
destination := backendconfig.DestinationT{Config: config}
_, err = NewProducer(&destination, common.Opts{Timeout: 1 * time.Microsecond})
_, err = bqstream.NewProducer(&destination, common.Opts{Timeout: 1 * time.Microsecond})

assert.NotNil(t, err)
assert.Contains(t, err.Error(), "incompatible credentials")
}

func TestInvalidCredentials(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockLogger := mock_logger.NewMockLogger(mockCtrl)
mockLogger.EXPECT().Errorf(gomock.Any(), gomock.Any()).AnyTimes()
pkgLogger = mockLogger

initBQTest()
var bqCredentials BigQueryCredentials
var err error
err = json.Unmarshal(
Expand All @@ -134,8 +140,39 @@ func TestInvalidCredentials(t *testing.T) {
"ProjectId": bqCredentials.ProjectID,
}
destination := backendconfig.DestinationT{Config: config}
_, err = NewProducer(&destination, common.Opts{Timeout: 1 * time.Microsecond})
_, err = bqstream.NewProducer(&destination, common.Opts{Timeout: 1 * time.Microsecond})

assert.NotNil(t, err)
assert.EqualError(t, err, "bigquery: constructing client: missing 'type' field in credentials")
}

func TestProduceWithInvalidClient(t *testing.T) {
initBQTest()
invalidProducer := bqstream.BQStreamProducer{}
invalidProducer.Produce([]byte("{}"), map[string]interface{}{})
statusCode, statusMsg, respMsg := invalidProducer.Produce([]byte("{}"), map[string]interface{}{})
assert.Equal(t, 400, statusCode)
assert.Equal(t, "Failure", statusMsg)
assert.Contains(t, respMsg, "invalid client")
}

func TestCloseSuccessfulCase(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := mock_bqstream.NewMockBQClient(ctrl)
producer := &bqstream.BQStreamProducer{Client: mockClient}
mockClient.EXPECT().Close().Return(nil)
assert.Nil(t, producer.Close())
}

func TestCloseFailedCase(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := mock_bqstream.NewMockBQClient(ctrl)
producer := &bqstream.BQStreamProducer{Client: mockClient}
mockClient.EXPECT().Close().Return(errors.New("failed close"))
assert.ErrorContains(t, producer.Close(), "failed close")
}

func TestCloseWithInvalidClient(t *testing.T) {
producer := &bqstream.BQStreamProducer{}
assert.ErrorContains(t, producer.Close(), "error while trying to close the client")
}

0 comments on commit 6926586

Please sign in to comment.