diff --git a/docker-compose.yml b/docker-compose.yml index 1074499b59..2608ca1869 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,15 +5,25 @@ services: - "9000-9004" ports: - "0.0.0.0:9000-9004:9000-9004" - - "0.0.0.0:7201:7201" - "0.0.0.0:2379-2380:2379-2380" networks: - backend build: context: . - dockerfile: Dockerfile - image: m3db_dbnode01:latest + dockerfile: ./docker/m3dbnode/Dockerfile + image: m3dbnode01:latest volumes: - "~/m3db:/var/lib/m3db" + coordinator01: + expose: + - "7201" + ports: + - "0.0.0.0:7201:7201" + networks: + - backend + build: + context: . + dockerfile: ./docker/m3coordinator/Dockerfile + image: m3coordinator01:latest networks: backend: diff --git a/docker/m3coordinator/Dockerfile b/docker/m3coordinator/Dockerfile new file mode 100644 index 0000000000..d9d974c7a6 --- /dev/null +++ b/docker/m3coordinator/Dockerfile @@ -0,0 +1,27 @@ +# stage 1: build +FROM golang:1.10-alpine AS builder +LABEL maintainer="The M3DB Authors " + +# Install Glide +RUN apk add --update glide git make bash + +# Add source code +RUN mkdir -p /go/src/github.com/m3db/m3db +ADD . /go/src/github.com/m3db/m3db + +# Build m3coordinator binary +RUN cd /go/src/github.com/m3db/m3db/ && \ + git submodule update --init && \ + make m3coordinator-linux-amd64 + +# stage 2: lightweight "release" +FROM alpine:latest +LABEL maintainer="The M3DB Authors " + +EXPOSE 7201/tcp 7203/tcp + +COPY --from=builder /go/src/github.com/m3db/m3db/bin/m3coordinator /bin/ +COPY --from=builder /go/src/github.com/m3db/m3db/src/coordinator/config/m3coordinator-local-etcd.yml /etc/m3coordinator/m3coordinator.yml + +ENTRYPOINT [ "/bin/m3coordinator" ] +CMD [ "-f", "/etc/m3coordinator/m3coordinator.yml" ] diff --git a/Dockerfile b/docker/m3dbnode/Dockerfile similarity index 80% rename from Dockerfile rename to docker/m3dbnode/Dockerfile index 0b998884ac..233d646f7d 100644 --- a/Dockerfile +++ b/docker/m3dbnode/Dockerfile @@ -21,9 +21,7 @@ LABEL maintainer="The M3DB Authors " EXPOSE 2379/tcp 2380/tcp 7201/tcp 9000-9004/tcp COPY --from=builder /go/src/github.com/m3db/m3db/bin/m3dbnode /bin/ -COPY --from=builder /go/src/github.com/m3db/m3db/src/dbnode/config/m3dbnode-local-etcd.yml \ - /go/src/github.com/m3db/m3db/src/dbnode/config/m3dbnode-local.yml \ - /etc/m3dbnode/ +COPY --from=builder /go/src/github.com/m3db/m3db/src/dbnode/config/m3dbnode-local-etcd.yml /etc/m3dbnode/m3dbnode.yml ENTRYPOINT [ "/bin/m3dbnode" ] -CMD [ "-f", "/etc/m3dbnode/m3dbnode-local-etcd.yml" ] +CMD [ "-f", "/etc/m3dbnode/m3dbnode.yml" ] diff --git a/scripts/integration-tests/docker-integration-test.sh b/scripts/integration-tests/docker-integration-test.sh index 486a1f58d2..2ba8688686 100755 --- a/scripts/integration-tests/docker-integration-test.sh +++ b/scripts/integration-tests/docker-integration-test.sh @@ -4,7 +4,7 @@ set -xe echo "Build docker image" -docker build -t "m3dbnode:$(git rev-parse HEAD)" -f Dockerfile . +docker build -t "m3dbnode:$(git rev-parse HEAD)" -f ./docker/m3dbnode/Dockerfile . echo "Run docker container" diff --git a/scripts/integration-tests/prometheus/docker-compose.yml b/scripts/integration-tests/prometheus/docker-compose.yml index f72aed6783..7f14476908 100644 --- a/scripts/integration-tests/prometheus/docker-compose.yml +++ b/scripts/integration-tests/prometheus/docker-compose.yml @@ -3,23 +3,33 @@ services: dbnode01: expose: - "9000-9004" - - "7201" - - "7203" - "2379-2380" ports: - "0.0.0.0:9000-9004:9000-9004" - - "0.0.0.0:7201:7201" - - "0.0.0.0:7203:7203" - "0.0.0.0:2379-2380:2379-2380" networks: - backend build: context: ../../../ - dockerfile: Dockerfile - image: m3db_dbnode01:latest + dockerfile: ./docker/m3dbnode/Dockerfile + image: m3dbnode01:latest volumes: - - "./:/etc/m3dbnode/" - "/tmp/m3dbdata:/var/lib/m3db" + coordinator01: + expose: + - "7201" + - "7203" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + networks: + - backend + build: + context: ../../../ + dockerfile: ./docker/m3coordinator/Dockerfile + image: m3coordinator01:latest + volumes: + - "./:/etc/m3coordinator/" prometheus01: expose: - "9090" diff --git a/scripts/integration-tests/prometheus/m3coordinator.yml b/scripts/integration-tests/prometheus/m3coordinator.yml new file mode 100644 index 0000000000..d223b2fb55 --- /dev/null +++ b/scripts/integration-tests/prometheus/m3coordinator.yml @@ -0,0 +1,45 @@ +listenAddress: 0.0.0.0:7201 + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3db/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +clusters: + - namespaces: + - namespace: prometheus_metrics + storageMetricsType: unaggregated + retention: 48h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + writeTimeout: 10s + fetchTimeout: 15s + connectTimeout: 20s + writeRetry: + initialBackoff: 500ms + backoffFactor: 3 + maxRetries: 2 + jitter: true + fetchRetry: + initialBackoff: 500ms + backoffFactor: 2 + maxRetries: 3 + jitter: true + backgroundHealthCheckFailLimit: 4 + backgroundHealthCheckFailThrottleFactor: 0.5 diff --git a/scripts/integration-tests/prometheus/m3dbnode-local-etcd.yml b/scripts/integration-tests/prometheus/m3dbnode-local-etcd.yml deleted file mode 100644 index 4500a7b7a1..0000000000 --- a/scripts/integration-tests/prometheus/m3dbnode-local-etcd.yml +++ /dev/null @@ -1,209 +0,0 @@ -coordinator: - listenAddress: 0.0.0.0:7201 - - local: - namespace: prometheus_metrics - retention: 48h - - metrics: - scope: - prefix: "coordinator" - prometheus: - handlerPath: /metrics - listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3db/issues/682 is resolved - sanitization: prometheus - samplingRate: 1.0 - extended: none - -db: - logging: - level: info - - metrics: - prometheus: - handlerPath: /metrics - sanitization: prometheus - samplingRate: 1.0 - extended: detailed - - listenAddress: 0.0.0.0:9000 - clusterListenAddress: 0.0.0.0:9001 - httpNodeListenAddress: 0.0.0.0:9002 - httpClusterListenAddress: 0.0.0.0:9003 - debugListenAddress: 0.0.0.0:9004 - - hostID: - resolver: config - value: m3db_local - - client: - writeConsistencyLevel: majority - readConsistencyLevel: unstrict_majority - writeTimeout: 10s - fetchTimeout: 15s - connectTimeout: 20s - writeRetry: - initialBackoff: 500ms - backoffFactor: 3 - maxRetries: 2 - jitter: true - fetchRetry: - initialBackoff: 500ms - backoffFactor: 2 - maxRetries: 3 - jitter: true - backgroundHealthCheckFailLimit: 4 - backgroundHealthCheckFailThrottleFactor: 0.5 - - gcPercentage: 100 - - writeNewSeriesAsync: true - writeNewSeriesLimitPerSecond: 1048576 - writeNewSeriesBackoffDuration: 2ms - - bootstrap: - bootstrappers: - - filesystem - - commitlog - fs: - numProcessorsPerCPU: 0.125 - - commitlog: - flushMaxBytes: 524288 - flushEvery: 1s - queue: - calculationType: fixed - size: 2097152 - retentionPeriod: 24h - blockSize: 10m - - fs: - filePathPrefix: /var/lib/m3db - writeBufferSize: 65536 - dataReadBufferSize: 65536 - infoReadBufferSize: 128 - seekReadBufferSize: 4096 - throughputLimitMbps: 100.0 - throughputCheckEvery: 128 - - repair: - enabled: false - interval: 2h - offset: 30m - jitter: 1h - throttle: 2m - checkInterval: 1m - - pooling: - blockAllocSize: 16 - type: simple - seriesPool: - size: 262144 - lowWatermark: 0.7 - highWatermark: 1.0 - blockPool: - size: 262144 - lowWatermark: 0.7 - highWatermark: 1.0 - encoderPool: - size: 262144 - lowWatermark: 0.7 - highWatermark: 1.0 - closersPool: - size: 104857 - lowWatermark: 0.7 - highWatermark: 1.0 - contextPool: - size: 262144 - lowWatermark: 0.7 - highWatermark: 1.0 - segmentReaderPool: - size: 16384 - lowWatermark: 0.7 - highWatermark: 1.0 - iteratorPool: - size: 2048 - lowWatermark: 0.7 - highWatermark: 1.0 - fetchBlockMetadataResultsPool: - size: 65536 - capacity: 32 - lowWatermark: 0.7 - highWatermark: 1.0 - fetchBlocksMetadataResultsPool: - size: 32 - capacity: 4096 - lowWatermark: 0.7 - highWatermark: 1.0 - hostBlockMetadataSlicePool: - size: 131072 - capacity: 3 - lowWatermark: 0.7 - highWatermark: 1.0 - blockMetadataPool: - size: 65536 - lowWatermark: 0.7 - highWatermark: 1.0 - blockMetadataSlicePool: - size: 65536 - capacity: 32 - lowWatermark: 0.7 - highWatermark: 1.0 - blocksMetadataPool: - size: 65536 - lowWatermark: 0.7 - highWatermark: 1.0 - blocksMetadataSlicePool: - size: 32 - capacity: 4096 - lowWatermark: 0.7 - highWatermark: 1.0 - identifierPool: - size: 262144 - lowWatermark: 0.7 - highWatermark: 1.0 - bytesPool: - buckets: - - capacity: 16 - size: 524288 - lowWatermark: 0.7 - highWatermark: 1.0 - - capacity: 32 - size: 262144 - lowWatermark: 0.7 - highWatermark: 1.0 - - capacity: 64 - size: 131072 - lowWatermark: 0.7 - highWatermark: 1.0 - - capacity: 128 - size: 65536 - lowWatermark: 0.7 - highWatermark: 1.0 - - capacity: 256 - size: 65536 - lowWatermark: 0.7 - highWatermark: 1.0 - - capacity: 1440 - size: 16384 - lowWatermark: 0.7 - highWatermark: 1.0 - - capacity: 4096 - size: 8192 - lowWatermark: 0.7 - highWatermark: 1.0 - - config: - service: - env: default_env - zone: embedded - service: m3db - cacheDir: /var/lib/m3kv - etcdClusters: - - zone: embedded - endpoints: - - 127.0.0.1:2379 - seedNodes: - initialCluster: - - hostID: m3db_local - endpoint: http://127.0.0.1:2380 diff --git a/scripts/integration-tests/prometheus/prometheus-integration-test.sh b/scripts/integration-tests/prometheus/prometheus-integration-test.sh index faf53cccb3..2fbcc89dd8 100755 --- a/scripts/integration-tests/prometheus/prometheus-integration-test.sh +++ b/scripts/integration-tests/prometheus/prometheus-integration-test.sh @@ -5,15 +5,16 @@ set -xe rm -rf /tmp/m3dbdata/ mkdir -p /tmp/m3dbdata/ -echo "Build M3DB docker image" +echo "Build docker images" docker-compose -f docker-compose.yml build -echo "Run M3DB docker container" +echo "Run m3dbnode and m3coordinator containers" docker-compose -f docker-compose.yml up -d dbnode01 +docker-compose -f docker-compose.yml up -d coordinator01 -echo "Sleeping for a bit to ensure db" +echo "Sleeping for a bit to ensure db up" sleep 10 # TODO Replace sleeps with logic to determine when to proceed diff --git a/src/coordinator/api/v1/httpd/handler.go b/src/coordinator/api/v1/httpd/handler.go index cf5b80efe2..f539d5c33f 100644 --- a/src/coordinator/api/v1/httpd/handler.go +++ b/src/coordinator/api/v1/httpd/handler.go @@ -21,7 +21,9 @@ package httpd import ( + "encoding/json" "log" + "net/http" "net/http/pprof" "os" @@ -45,7 +47,8 @@ import ( ) const ( - pprofURL = "/debug/pprof/profile" + pprofURL = "/debug/pprof/profile" + routesURL = "/routes" ) var ( @@ -105,14 +108,15 @@ func (h *Handler) RegisterRoutes() error { h.Router.HandleFunc(native.PromReadURL, logged(native.NewPromReadHandler(h.engine)).ServeHTTP).Methods(native.PromReadHTTPMethod) h.Router.HandleFunc(handler.SearchURL, logged(handler.NewSearchHandler(h.storage)).ServeHTTP).Methods(handler.SearchHTTPMethod) - h.registerProfileEndpoints() - if h.clusterClient != nil { placement.RegisterRoutes(h.Router, h.clusterClient, h.config) namespace.RegisterRoutes(h.Router, h.clusterClient) database.RegisterRoutes(h.Router, h.clusterClient, h.config, h.embeddedDbCfg) } + h.registerProfileEndpoints() + h.registerRoutesEndpoint() + return nil } @@ -120,3 +124,28 @@ func (h *Handler) RegisterRoutes() error { func (h *Handler) registerProfileEndpoints() { h.Router.HandleFunc(pprofURL, pprof.Profile) } + +// Endpoints useful for viewing routes directory +func (h *Handler) registerRoutesEndpoint() { + h.Router.HandleFunc(routesURL, func(w http.ResponseWriter, r *http.Request) { + var routes []string + err := h.Router.Walk( + func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { + str, err := route.GetPathTemplate() + if err != nil { + return err + } + routes = append(routes, str) + return nil + }) + if err != nil { + handler.Error(w, err, http.StatusInternalServerError) + return + } + json.NewEncoder(w).Encode(struct { + Routes []string `json:"routes"` + }{ + Routes: routes, + }) + }) +} diff --git a/src/coordinator/api/v1/httpd/handler_test.go b/src/coordinator/api/v1/httpd/handler_test.go index 1d3324c0fe..e26510cc4b 100644 --- a/src/coordinator/api/v1/httpd/handler_test.go +++ b/src/coordinator/api/v1/httpd/handler_test.go @@ -21,6 +21,7 @@ package httpd import ( + "encoding/json" "net/http" "net/http/httptest" "testing" @@ -33,6 +34,7 @@ import ( "github.com/m3db/m3db/src/coordinator/util/logging" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" ) @@ -98,3 +100,35 @@ func TestPromNativeReadPost(t *testing.T) { h.Router.ServeHTTP(res, req) require.Equal(t, res.Code, http.StatusMethodNotAllowed, "POST method not defined") } + +func TestRoutesGet(t *testing.T) { + logging.InitWithCores(nil) + + req, _ := http.NewRequest("GET", routesURL, nil) + res := httptest.NewRecorder() + ctrl := gomock.NewController(t) + storage, _ := local.NewStorageAndSession(t, ctrl) + + h, err := NewHandler(storage, executor.NewEngine(storage), nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) + require.NoError(t, err, "unable to setup handler") + h.RegisterRoutes() + h.Router.ServeHTTP(res, req) + + require.Equal(t, res.Code, http.StatusOK) + + response := &struct { + Routes []string `json:"routes"` + }{} + + err = json.NewDecoder(res.Body).Decode(response) + require.NoError(t, err) + + foundRoutesURL := false + for _, route := range response.Routes { + if route == routesURL { + foundRoutesURL = true + break + } + } + assert.True(t, foundRoutesURL, "routes URL not served by routes endpoint") +} diff --git a/src/coordinator/config/m3coordinator-local-etcd.yml b/src/coordinator/config/m3coordinator-local-etcd.yml new file mode 100644 index 0000000000..28ba29c826 --- /dev/null +++ b/src/coordinator/config/m3coordinator-local-etcd.yml @@ -0,0 +1,50 @@ +listenAddress: 0.0.0.0:7201 + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3db/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +clusters: + - namespaces: + - namespace: default + storageMetricsType: unaggregated + retention: 48h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - 127.0.0.1:2379 + seedNodes: + initialCluster: + - hostID: m3db_local + endpoint: http://127.0.0.1:2380 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + writeTimeout: 10s + fetchTimeout: 15s + connectTimeout: 20s + writeRetry: + initialBackoff: 500ms + backoffFactor: 3 + maxRetries: 2 + jitter: true + fetchRetry: + initialBackoff: 500ms + backoffFactor: 2 + maxRetries: 3 + jitter: true + backgroundHealthCheckFailLimit: 4 + backgroundHealthCheckFailThrottleFactor: 0.5 + diff --git a/src/coordinator/services/m3coordinator/server/server.go b/src/coordinator/services/m3coordinator/server/server.go index f362674d74..72d5dda566 100644 --- a/src/coordinator/services/m3coordinator/server/server.go +++ b/src/coordinator/services/m3coordinator/server/server.go @@ -31,7 +31,7 @@ import ( "time" clusterclient "github.com/m3db/m3cluster/client" - "github.com/m3db/m3cluster/client/etcd" + etcdclient "github.com/m3db/m3cluster/client/etcd" "github.com/m3db/m3db/src/cmd/services/m3coordinator/config" dbconfig "github.com/m3db/m3db/src/cmd/services/m3dbnode/config" "github.com/m3db/m3db/src/coordinator/api/v1/httpd" @@ -117,23 +117,37 @@ func Run(runOpts RunOptions) { clusterClientCh = runOpts.ClusterClient } - if clusterClientCh == nil && cfg.ClusterManagement != nil { - // We resolved an etcd configuration for cluster management endpoints - etcdCfg := cfg.ClusterManagement.Etcd - clusterSvcClientOpts := etcdCfg.NewOptions() - clusterClient, err := etcd.NewConfigServiceClient(clusterSvcClientOpts) - if err != nil { - logger.Fatal("unable to create cluster management etcd client", zap.Any("error", err)) + if clusterClientCh == nil { + var etcdCfg *etcdclient.Configuration + switch { + case cfg.ClusterManagement != nil: + etcdCfg = &cfg.ClusterManagement.Etcd + + case len(cfg.Clusters) == 1 && + cfg.Clusters[0].Client.EnvironmentConfig.Service != nil: + etcdCfg = cfg.Clusters[0].Client.EnvironmentConfig.Service } - clusterClientSendableCh := make(chan clusterclient.Client, 1) - clusterClientSendableCh <- clusterClient - clusterClientCh = clusterClientSendableCh + if etcdCfg != nil { + // We resolved an etcd configuration for cluster management endpoints + clusterSvcClientOpts := etcdCfg.NewOptions() + clusterClient, err := etcdclient.NewConfigServiceClient(clusterSvcClientOpts) + if err != nil { + logger.Fatal("unable to create cluster management etcd client", zap.Any("error", err)) + } + + clusterClientSendableCh := make(chan clusterclient.Client, 1) + clusterClientSendableCh <- clusterClient + clusterClientCh = clusterClientSendableCh + } } var clusters local.Clusters if len(cfg.Clusters) > 0 { - clusters, err = cfg.Clusters.NewClusters() + opts := local.ClustersStaticConfigurationOptions{ + AsyncSessions: true, + } + clusters, err = cfg.Clusters.NewClusters(opts) if err != nil { logger.Fatal("unable to connect to clusters", zap.Any("error", err)) } @@ -144,7 +158,7 @@ func Run(runOpts RunOptions) { } dbClientCh := runOpts.DBClient if dbClientCh == nil { - logger.Fatal("no clusters configured and not running local embedded cluster") + logger.Fatal("no clusters configured and not running local cluster") } session := m3db.NewAsyncSession(func() (client.Client, error) { return <-dbClientCh, nil @@ -159,6 +173,11 @@ func Run(runOpts RunOptions) { } } + for _, namespace := range clusters.ClusterNamespaces() { + logger.Info("resolved cluster namespace", + zap.String("namespace", namespace.NamespaceID().String())) + } + workerPoolCount := cfg.DecompressWorkerPoolCount if workerPoolCount == 0 { workerPoolCount = defaultWorkerPoolCount diff --git a/src/coordinator/storage/local/cluster_test.go b/src/coordinator/storage/local/cluster_test.go index 3f7e3a1f59..f770da5aef 100644 --- a/src/coordinator/storage/local/cluster_test.go +++ b/src/coordinator/storage/local/cluster_test.go @@ -97,7 +97,7 @@ func TestNewClustersFromConfig(t *testing.T) { }, } - clusters, err := cfg.NewClusters() + clusters, err := cfg.NewClusters(ClustersStaticConfigurationOptions{}) require.NoError(t, err) // Resolve expected clusters and check attributes diff --git a/src/coordinator/storage/local/config.go b/src/coordinator/storage/local/config.go index 83b1b5b900..a1abbfb7bd 100644 --- a/src/coordinator/storage/local/config.go +++ b/src/coordinator/storage/local/config.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3db/src/coordinator/storage" + "github.com/m3db/m3db/src/coordinator/stores/m3db" "github.com/m3db/m3db/src/dbnode/client" "github.com/m3db/m3x/ident" ) @@ -46,8 +47,8 @@ type newClientFromConfig func( // ClusterStaticConfiguration is a static cluster configuration. type ClusterStaticConfiguration struct { newClientFromConfig newClientFromConfig - Client client.Configuration `yaml:"client"` Namespaces []ClusterStaticNamespaceConfiguration `yaml:"namespaces"` + Client client.Configuration `yaml:"client"` } func (c ClusterStaticConfiguration) newClient( @@ -86,8 +87,16 @@ type clusterConnectResult struct { err error } +// ClustersStaticConfigurationOptions are options to use when +// constructing clusters from config. +type ClustersStaticConfigurationOptions struct { + AsyncSessions bool +} + // NewClusters instantiates a new Clusters instance. -func (c ClustersStaticConfiguration) NewClusters() (Clusters, error) { +func (c ClustersStaticConfiguration) NewClusters( + opts ClustersStaticConfigurationOptions, +) (Clusters, error) { var ( numUnaggregatedClusterNamespaces int numAggregatedClusterNamespaces int @@ -147,14 +156,26 @@ func (c ClustersStaticConfiguration) NewClusters() (Clusters, error) { go func() { defer wg.Done() cfg := unaggregatedClusterNamespaceCfg - cfg.result.session, cfg.result.err = cfg.client.DefaultSession() + if !opts.AsyncSessions { + cfg.result.session, cfg.result.err = cfg.client.DefaultSession() + } else { + cfg.result.session = m3db.NewAsyncSession(func() (client.Client, error) { + return cfg.client, nil + }, nil) + } }() for _, cfg := range aggregatedClusterNamespacesCfgs { cfg := cfg // Capture var wg.Add(1) go func() { defer wg.Done() - cfg.result.session, cfg.result.err = cfg.client.DefaultSession() + if !opts.AsyncSessions { + cfg.result.session, cfg.result.err = cfg.client.DefaultSession() + } else { + cfg.result.session = m3db.NewAsyncSession(func() (client.Client, error) { + return cfg.client, nil + }, nil) + } }() } diff --git a/src/coordinator/stores/m3db/async_session.go b/src/coordinator/stores/m3db/async_session.go index 16f6f82f5d..486b1007fa 100644 --- a/src/coordinator/stores/m3db/async_session.go +++ b/src/coordinator/stores/m3db/async_session.go @@ -76,7 +76,7 @@ func NewAsyncSession(fn NewClientFn, done chan<- struct{}) *AsyncSession { return } - session, err := c.NewSession() + session, err := c.DefaultSession() asyncSession.Lock() defer asyncSession.Unlock() diff --git a/src/coordinator/stores/m3db/async_session_test.go b/src/coordinator/stores/m3db/async_session_test.go index 7f5141f5ba..6dcd87f278 100644 --- a/src/coordinator/stores/m3db/async_session_test.go +++ b/src/coordinator/stores/m3db/async_session_test.go @@ -60,7 +60,7 @@ func TestAsyncSessionError(t *testing.T) { customErr := errors.New("some error") expectedErrStr := fmt.Sprintf(errNewSessionFailFmt, customErr) - mockClient.EXPECT().NewSession().Return(nil, customErr) + mockClient.EXPECT().DefaultSession().Return(nil, customErr) done := make(chan struct{}, 1) asyncSession := NewAsyncSession(func() (client.Client, error) { return mockClient, nil @@ -92,7 +92,7 @@ func TestAsyncSessionUninitialized(t *testing.T) { mockClient, _ := SetupAsyncSessionTest(t) // Sleep one minute after a NewSession call to ensure we get an "uninitialized" error - mockClient.EXPECT().NewSession().Do(func() { time.Sleep(time.Minute) }).Return(nil, errors.New("some error")) + mockClient.EXPECT().DefaultSession().Do(func() { time.Sleep(time.Minute) }).Return(nil, errors.New("some error")) asyncSession := NewAsyncSession(func() (client.Client, error) { return mockClient, nil }, nil) @@ -121,7 +121,7 @@ func TestAsyncSessionUninitialized(t *testing.T) { func TestAsyncSessionInitialized(t *testing.T) { mockClient, mockSession := SetupAsyncSessionTest(t) - mockClient.EXPECT().NewSession().Return(mockSession, nil) + mockClient.EXPECT().DefaultSession().Return(mockSession, nil) done := make(chan struct{}, 1) asyncSession := NewAsyncSession(func() (client.Client, error) { return mockClient, nil