From 6644557f8cce2e7231b201581c98a9519d2ae132 Mon Sep 17 00:00:00 2001 From: Jakub Martin Date: Sat, 15 Oct 2022 12:51:34 +0200 Subject: [PATCH] Add configuration parameters for JSON maximum line size as well as file reader buffer size. --- cmd/root.go | 19 ++++++++------- config/config.go | 46 ++++++++++++++++++++++++++++------- datasources/csv/impl.go | 6 ++--- datasources/json/execution.go | 3 ++- datasources/json/impl.go | 4 +-- datasources/lines/impl.go | 4 +-- datasources/parquet/impl.go | 2 +- execution/files/files.go | 4 ++- physical/physical.go | 4 +-- 9 files changed, 62 insertions(+), 30 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index ac2e99fb..ccf81cb6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,6 +78,12 @@ octosql "SELECT * FROM plugins.plugins"`, defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop() } ctx := cmd.Context() + cfg, err := config.Read() + if err != nil { + return fmt.Errorf("couldn't read config: %w", err) + } + ctx = config.ContextWithConfig(ctx, cfg) + debug.SetGCPercent(1000) logs.InitializeFileLogger() @@ -94,11 +100,6 @@ octosql "SELECT * FROM plugins.plugins"`, } }() - cfg, err := config.Read() - if err != nil { - return fmt.Errorf("couldn't read config: %w", err) - } - installedPlugins, err := pluginManager.ListInstalledPlugins() if err != nil { return fmt.Errorf("couldn't list installed plugins: %w", err) @@ -188,7 +189,7 @@ octosql "SELECT * FROM plugins.plugins"`, if err != nil { return fmt.Errorf("couldn't get file extension handlers: %w", err) } - fileHandlers := map[string]func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error){ + fileHandlers := map[string]func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error){ "csv": csv.Creator(','), "json": json.Creator, "lines": lines.Creator, @@ -196,7 +197,7 @@ octosql "SELECT * FROM plugins.plugins"`, "tsv": csv.Creator('\t'), } for ext, pluginName := range fileExtensionHandlers { - fileHandlers[ext] = func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { + fileHandlers[ext] = func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { db, err := databases[pluginName]() if err != nil { return nil, physical.Schema{}, fmt.Errorf("couldn't get plugin %s database for plugin extensions %s: %w", pluginName, ext, err) @@ -513,7 +514,7 @@ func typecheckExpr(ctx context.Context, expr logical.Expression, env physical.En } type fileTypeDatabaseCreator struct { - creator func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) + creator func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) } func (f *fileTypeDatabaseCreator) ListTables(ctx context.Context) ([]string, error) { @@ -521,5 +522,5 @@ func (f *fileTypeDatabaseCreator) ListTables(ctx context.Context) ([]string, err } func (f *fileTypeDatabaseCreator) GetTable(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { - return f.creator(name, options) + return f.creator(ctx, name, options) } diff --git a/config/config.go b/config/config.go index 1f2728fc..739575bf 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "context" "fmt" "io/ioutil" "log" @@ -23,6 +24,7 @@ var octosqlHomeDir = func() string { type Config struct { Databases []DatabaseConfig `yaml:"databases"` + Files FilesConfig `yaml:"files"` } type DatabaseConfig struct { @@ -32,20 +34,34 @@ type DatabaseConfig struct { Config yaml.Node `yaml:"config"` } +type FilesConfig struct { + JSON JSONConfig `yaml:"json"` + BufferSizeBytes int `yaml:"buffer_size_bytes"` +} + +type JSONConfig struct { + MaxLineSizeBytes int `yaml:"max_line_size_bytes"` +} + func Read() (*Config, error) { + var config Config data, err := ioutil.ReadFile(filepath.Join(octosqlHomeDir, "octosql.yml")) - if err != nil { - if os.IsNotExist(err) { - return &Config{ - Databases: nil, - }, nil - } + if err != nil && os.IsNotExist(err) { + config = Config{} + } else if err != nil { return nil, fmt.Errorf("couldn't read config file: %w", err) + } else { + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("couldn't unmarshal yaml configuration: %w", err) + } } - var config Config - if err := yaml.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("couldn't unmarshal yaml configuration: %w", err) + // TODO: Refactor to a sensibly structured default value system. + if config.Files.BufferSizeBytes == 0 { + config.Files.BufferSizeBytes = 4096 * 1024 + } + if config.Files.JSON.MaxLineSizeBytes == 0 { + config.Files.JSON.MaxLineSizeBytes = 1024 * 1024 } return &config, nil @@ -89,3 +105,15 @@ func (r *PluginReference) UnmarshalText(text []byte) error { func (r *PluginReference) String() string { return fmt.Sprintf("%s/%s", r.Repository, r.Name) } + +// TODO: Using a custom context everywhere would be better. + +type contextKey struct{} + +func ContextWithConfig(ctx context.Context, config *Config) context.Context { + return context.WithValue(ctx, contextKey{}, config) +} + +func FromContext(ctx context.Context) *Config { + return ctx.Value(contextKey{}).(*Config) +} diff --git a/datasources/csv/impl.go b/datasources/csv/impl.go index 0330d250..105bf48b 100644 --- a/datasources/csv/impl.go +++ b/datasources/csv/impl.go @@ -14,9 +14,9 @@ import ( "github.com/cube2222/octosql/physical" ) -func Creator(separator rune) func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { - return func(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { - f, err := files.OpenLocalFile(context.Background(), name, files.WithPreview()) +func Creator(separator rune) func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { + return func(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { + f, err := files.OpenLocalFile(ctx, name, files.WithPreview()) if err != nil { return nil, physical.Schema{}, fmt.Errorf("couldn't open local file: %w", err) } diff --git a/datasources/json/execution.go b/datasources/json/execution.go index 63d78c2a..d9fd7d13 100644 --- a/datasources/json/execution.go +++ b/datasources/json/execution.go @@ -8,6 +8,7 @@ import ( "github.com/valyala/fastjson" + "github.com/cube2222/octosql/config" . "github.com/cube2222/octosql/execution" "github.com/cube2222/octosql/execution/files" "github.com/cube2222/octosql/octosql" @@ -28,7 +29,7 @@ func (d *DatasourceExecuting) Run(ctx ExecutionContext, produce ProduceFn, metaS defer f.Close() sc := bufio.NewScanner(f) - sc.Buffer(nil, 1024*1024) + sc.Buffer(nil, config.FromContext(ctx).Files.JSON.MaxLineSizeBytes) // All async processing in this function and all jobs created by it use this context. // This means that returning from this function will properly clean up all async processors. diff --git a/datasources/json/impl.go b/datasources/json/impl.go index 9d325075..4bd54659 100644 --- a/datasources/json/impl.go +++ b/datasources/json/impl.go @@ -15,8 +15,8 @@ import ( "github.com/cube2222/octosql/physical" ) -func Creator(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { - f, err := files.OpenLocalFile(context.Background(), name, files.WithPreview()) +func Creator(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { + f, err := files.OpenLocalFile(ctx, name, files.WithPreview()) if err != nil { return nil, physical.Schema{}, fmt.Errorf("couldn't open local file: %w", err) } diff --git a/datasources/lines/impl.go b/datasources/lines/impl.go index 639aab09..173bbf36 100644 --- a/datasources/lines/impl.go +++ b/datasources/lines/impl.go @@ -10,8 +10,8 @@ import ( "github.com/cube2222/octosql/physical" ) -func Creator(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { - f, err := files.OpenLocalFile(context.Background(), name, files.WithPreview()) +func Creator(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { + f, err := files.OpenLocalFile(ctx, name, files.WithPreview()) if err != nil { return nil, physical.Schema{}, fmt.Errorf("couldn't open local file: %w", err) } diff --git a/datasources/parquet/impl.go b/datasources/parquet/impl.go index 317b4c1c..8e665cb0 100644 --- a/datasources/parquet/impl.go +++ b/datasources/parquet/impl.go @@ -12,7 +12,7 @@ import ( "github.com/cube2222/octosql/physical" ) -func Creator(name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { +func Creator(ctx context.Context, name string, options map[string]string) (physical.DatasourceImplementation, physical.Schema, error) { f, err := os.Open(name) if err != nil { return nil, physical.Schema{}, fmt.Errorf("couldn't open file: %w", err) diff --git a/execution/files/files.go b/execution/files/files.go index 61767e87..22aa948b 100644 --- a/execution/files/files.go +++ b/execution/files/files.go @@ -10,6 +10,8 @@ import ( "sync" "github.com/nxadm/tail" + + "github.com/cube2222/octosql/config" ) type customCloser struct { @@ -113,7 +115,7 @@ func OpenLocalFile(ctx context.Context, path string, opts ...OpenFileOption) (io return nil, fmt.Errorf("couldn't open file: %w", err) } return &customCloser{ - Reader: bufio.NewReaderSize(f, 4096*1024), + Reader: bufio.NewReaderSize(f, config.FromContext(ctx).Files.BufferSizeBytes), close: f.Close, }, nil } else { diff --git a/physical/physical.go b/physical/physical.go index e0338584..09585882 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -54,7 +54,7 @@ type DatasourceRepository struct { // Bo inaczej będzie na start strasznie dużo rzeczy ładować niepotrzebnych dla wszystkich // skonfigurowanych baz danych. Databases map[string]func() (Database, error) - FileHandlers map[string]func(name string, options map[string]string) (DatasourceImplementation, Schema, error) + FileHandlers map[string]func(ctx context.Context, name string, options map[string]string) (DatasourceImplementation, Schema, error) } type Database interface { @@ -78,7 +78,7 @@ func (dr *DatasourceRepository) GetDatasource(ctx context.Context, name string, if index := strings.LastIndex(name, "."); index != -1 { extension := name[index+1:] if handler, ok := dr.FileHandlers[extension]; ok { - return handler(name, options) + return handler(ctx, name, options) } }