Skip to content

Commit

Permalink
customized key column per table, minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
adranwit committed Apr 20, 2019
1 parent 7dae902 commit 6eca7cb
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 111 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## April 18 2019 0.5.0
* Added keyName customized per table
* Minor refactoring

## Jul 1 2016 (Alpha)

Expand Down
41 changes: 40 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[![Datastore Connectivity library for Aerospike in Go.](https://goreportcard.com/badge/github.com/viant/asc)](https://goreportcard.com/report/github.com/viant/asc)
[![GoDoc](https://godoc.org/github.com/viant/asc?status.svg)](https://godoc.org/github.com/viant/asc)

This library is compatible with Go 1.5+
This library is compatible with Go 1.11+


Please refer to [`CHANGELOG.md`](CHANGELOG.md) if you encounter breaking changes.

Expand All @@ -13,7 +14,45 @@ Please refer to [`CHANGELOG.md`](CHANGELOG.md) if you encounter breaking changes
- [Credits and Acknowledgements](#Credits-and-Acknowledgements)


#### Configuration parameters

###### aerospike client/policy config params
- timeoutMs
- connectionTimeout
- serverSocketTimeout
- scanPct
- host
- port
- namespace
- sleepBetweenRetries
- batchSize


###### keyColumn, keyColumnName

Defines name of column used as record key ('id' by default)

It can be specified per table i.e

events.keyColumn = code


###### excludedColumns

List of columns to be excluded from record (i.e: id - in case we need it only as record key)


###### dateFormat

ISO date format used to time.Time conversion


###### optimizeLargeScan

Experimental feature that first scan all keys and write then to disk
and then separate go routines scan data using the dumped keys

You can only specify _scanBaseDirectory_


## Usage:
Expand Down
84 changes: 84 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package asc

import (
"fmt"
"github.com/viant/dsc"
"strings"
"time"
)

const (
pkColumnKey = "keyColumn"
pkColumnNameKey = "keyColumnName"
inheritIdFromPKKey = "inheritIdFromPK"
generationColumnKey = "generationColumn"
excludedColumnsKey = "excludedColumns"

serverSocketTimeoutKey = "serverSocketTimeout"
connectionTimeoutMsKey = "connectionTimeoutMs"
timeoutMsKey = "timeoutMs"
maxRetriesKey = "maxRetries"
sleepBetweenRetriesMsKey = "sleepBetweenRetriesMs"

namespaceKey = "namespace"
hostKey = "host"
portKey = "port"
scanPctKey = "scanPct"
batchSizeKey = "batchSize"

optimizeLargeScanKey = "optimizeLargeScan"
scanBaseDirectoryKey = "scanBaseDirectory"
)

type config struct {
*dsc.Config
keyColumn string
connectionTimeout time.Duration
timeout time.Duration
serverSocketTimeout time.Duration
scanPct int
maxRetries int
sleepBetweenRetries time.Duration
generationColumn string
namespace string
excludedColumns []string
inheritIdFromPK bool
batchSize int
optimizeLargeScan bool
scanBaseDirectory string
}

func (m *config) getKeyColumn(table string) string {
if keyColumn := m.GetString(table+"."+pkColumnKey, ""); keyColumn != "" {
return keyColumn
}
return m.keyColumn
}

func newConfig(conf *dsc.Config) (*config, error) {
result := &config{
Config: conf,

scanPct: conf.GetInt(scanPctKey, 100),
serverSocketTimeout: conf.GetDuration(serverSocketTimeoutKey, time.Millisecond, 30000*time.Millisecond),
connectionTimeout: conf.GetDuration(connectionTimeoutMsKey, time.Millisecond, 5000*time.Millisecond),
timeout: conf.GetDuration(timeoutMsKey, time.Millisecond, 5000*time.Millisecond),
sleepBetweenRetries: conf.GetDuration(sleepBetweenRetriesMsKey, time.Millisecond, 100*time.Millisecond),
maxRetries: conf.GetInt(maxRetriesKey, 3),
namespace: conf.Get(namespaceKey),
generationColumn: conf.GetString(generationColumnKey, ""),
keyColumn: conf.GetString(pkColumnKey, ""),
inheritIdFromPK: conf.GetBoolean(inheritIdFromPKKey, true),
excludedColumns: strings.Split(conf.Get(excludedColumnsKey), ","),
batchSize: conf.GetInt(batchSizeKey, 256),
optimizeLargeScan: conf.GetBoolean(optimizeLargeScanKey, false),
scanBaseDirectory: conf.GetString(scanBaseDirectoryKey, ""),
}
if result.namespace == "" {
return nil, fmt.Errorf("namespaceKey was empty")
}
if result.keyColumn = conf.GetString(pkColumnKey, ""); result.keyColumn == "" {
result.keyColumn = conf.GetString(pkColumnNameKey, "id")
}
return result, nil
}
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type connectionProvider struct {

func (p *connectionProvider) NewConnection() (dsc.Connection, error) {
config := p.ConnectionProvider.Config()
client, err := aerospike.NewClient(config.Get(hostKey), config.GetInt(portKey, 3000))
client, err := aerospike.NewClient(config.GetString(hostKey, "127.0.0.1"), config.GetInt(portKey, 3000))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestNewConnection(t *testing.T) {
config := dsc.NewConfig("aerospike", "", "host:127.0.0.1,port:3000,namespace:test,generationColumnName:generation,dateLayout:2006-01-02 15:04:05.000,connectionTimeout:1000")
config := dsc.NewConfig("aerospike", "", "host:127.0.0.1,port:3000,namespace:test,generationColumn:generation,dateLayout:2006-01-02 15:04:05.000,connectionTimeout:1000")
factory := dsc.NewManagerFactory()
manager, _ := factory.Create(config)
provider := manager.ConnectionProvider()
Expand Down
15 changes: 7 additions & 8 deletions dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"github.com/viant/toolbox"
)

var defaulConnectionTimeout = 500 * time.Millisecond
var defaultConnectionTimeout = 500 * time.Millisecond

type dialect struct{ dsc.DatastoreDialect }

func getConnection(config *dsc.Config) (*aerospike.Connection, error) {
if !config.Has(hostKey) || !config.Has(portKey) {
return nil, errors.New("port or hostKey are not present")
}
connectionTimeoutInMs := defaulConnectionTimeout
connectionTimeoutInMs := defaultConnectionTimeout
if config.Has(connectionTimeoutMsKey) {
timeout := toolbox.AsInt(config.Get(connectionTimeoutMsKey))
connectionTimeoutInMs = time.Duration(timeout) * time.Millisecond
Expand All @@ -32,13 +32,12 @@ func getConnection(config *dsc.Config) (*aerospike.Connection, error) {

//GetKeyName returns a name of column name that is a key, or coma separated list if complex key

func (d *dialect) GetKeyName(manager dsc.Manager, datastore, table string) string {
config := manager.Config()
var keyName = pkColumnNameDefaultValue
if config.Has(pkColumnNameKey) {
keyName = config.Get(pkColumnNameKey)
func (d *dialect) GetKeyName(mgr dsc.Manager, datastore, table string) string {
config := mgr.Config()
if manager, ok := mgr.(*manager); ok {
return manager.config.getKeyColumn(table)
}
return keyName
return config.GetString(pkColumnNameKey, "id")
}

func (d *dialect) SendAdminCommand(manager dsc.Manager, command string) (map[string]string, error) {
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)
{
config := dsc.NewConfig("aerospike", "", "host:127.0.0.1,port:3000,namespace:test,generationColumnName:generation,dateLayout:2006-01-02 15:04:05.000")
config := dsc.NewConfig("aerospike", "", "host:127.0.0.1,port:3000,namespace:test,generationColumn:generation,dateLayout:2006-01-02 15:04:05.000")
factory := dsc.NewManagerFactory()
manager, err := factory.Create(config)
}
Expand Down
Loading

0 comments on commit 6eca7cb

Please sign in to comment.