From fb35bab5468da9edc5c144480a6dc0d49631674b Mon Sep 17 00:00:00 2001 From: Florent Messa Date: Sun, 20 Dec 2020 12:44:23 +0100 Subject: [PATCH] chore: cleanup --- Makefile | 3 ++ README.md | 15 ++++--- etl/{etl.go => engine.go} | 87 +++++++++++++++++++-------------------- etl/extractor.go | 21 +++++----- etl/util.go | 17 +++++--- 5 files changed, 77 insertions(+), 66 deletions(-) rename etl/{etl.go => engine.go} (83%) diff --git a/Makefile b/Makefile index 152d20b..4a57079 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ lint: golangci-lint run +build: + go build -o mover ./cmd/mover/ + test: go test ./... -v diff --git a/README.md b/README.md index c6e478a..703c58f 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,17 @@ # mover -`mover` is a simple utility to extract data from a remote database server and load them in another environment. +`mover` is used to extract data from a remote database server and load them in another environment. -It uses the underlying introspection API from the RDMS to retrieve relationships from the extracted results. +It uses the underlying introspection API from the RDMS to retrieve relationships +(foreign keys, reference keys, etc.) based on results from your database schema. ## How do we use it internally? Instead of harcoding fixtures for each workflows, we export data from our production database by sanitizing sensible data (password, personal user information, etc.). -Thanks to this tool, we don't have to maintain anymore fixtures and we can -quickly reply production bugs in our local environment. +Thanks to this tool, we don't have to maintain fixtures anymore and we can +quickly replicate a production bug in our local environment. ## Usage @@ -29,11 +30,13 @@ mkdir -p output Extract data from backup database: ```console -go run cmd/mover/main.go -dsn "postgresql://user:password@remote.server:5432/dbname" -path output -action extract -query "SELECT * FROM user WHERE id = 1" -table "user" +export REMOTE_DSN="postgresql://user:password@remote.server:5432/dbname" +go run cmd/mover/main.go -dsn $REMOTE_DSN -path output -action extract -query "SELECT * FROM user WHERE id = 1" -table "user" ``` Load data to your local database: ```console -go run cmd/mover/main.go -dsn "postgresql://user:password@localhost:5432/dbname" -path output -action load +export LOCAL_DSN="postgresql://user:password@localhost:5432/dbname" +go run cmd/mover/main.go -dsn $LOCAL_DSN -path output -action load ``` diff --git a/etl/etl.go b/etl/engine.go similarity index 83% rename from etl/etl.go rename to etl/engine.go index 0fc18af..27cfc82 100644 --- a/etl/etl.go +++ b/etl/engine.go @@ -17,6 +17,40 @@ import ( "github.com/ulule/mover/dialect/postgres" ) +// copySchemaTables copies tables from database to schema configuration. +func copySchemaTables(schema []config.Schema, tables []dialect.Table) map[string]config.Schema { + schemas := make(map[string]config.Schema, len(tables)) + for i := range tables { + tableName := tables[i].Name + found := false + for j := range schema { + if tables[i].Name == schema[j].TableName { + found = true + schema[j].Table = tables[i] + schemas[tableName] = schema[j] + + } + } + + if !found { + schemas[tableName] = config.Schema{ + TableName: tables[i].Name, + Table: tables[i], + } + } + + for k := range schemas[tableName].Queries { + for j := range tables { + if tables[j].Name == schemas[tableName].Queries[k].TableName { + schemas[tableName].Queries[k].Table = tables[j] + } + } + } + } + + return schemas +} + // Engine extracts and loads data from database with specific dialect. type Engine struct { schema map[string]config.Schema @@ -33,55 +67,24 @@ type jsonPayload struct { // NewEngine returns a new Engine instance. func NewEngine(ctx context.Context, cfg config.Config, dsn string, logger *zap.Logger) (*Engine, error) { - etl := &Engine{ - config: cfg, - logger: logger, - } - - dialect, err := etl.newDialect(ctx, dsn) + dialect, err := postgres.NewPGDialect(ctx, dsn) if err != nil { return nil, err } - etl.dialect = dialect - tables, err := dialect.Tables(ctx) if err != nil { return nil, err } - schema := make(map[string]config.Schema, len(tables)) - for i := range tables { - tableName := tables[i].Name - found := false - for j := range cfg.Schema { - if tables[i].Name == cfg.Schema[j].TableName { - found = true - cfg.Schema[j].Table = tables[i] - schema[tableName] = cfg.Schema[j] - - } - } + schema := copySchemaTables(cfg.Schema, tables) - if !found { - schema[tableName] = config.Schema{ - TableName: tables[i].Name, - Table: tables[i], - } - } - - for k := range schema[tableName].Queries { - for j := range tables { - if tables[j].Name == schema[tableName].Queries[k].TableName { - schema[tableName].Queries[k].Table = tables[j] - } - } - } - } - - etl.schema = schema - - return etl, nil + return &Engine{ + config: cfg, + logger: logger, + dialect: dialect, + schema: schema, + }, nil } // Describe returns a table from its name. @@ -164,7 +167,7 @@ func (e *Engine) extract(ctx context.Context, outputPath string, schema config.S zap.String("files", strings.Join(filenames, " ")), zap.String("output_path", outputPath)) - if err := downloadFiles(ctx, filenames, path.Join(outputPath, "media")); err != nil { + if err := downloadFiles(ctx, filenames, path.Join(outputPath, "media"), 10); err != nil { e.logger.Error("unable to download files", zap.Error(err)) } } @@ -172,10 +175,6 @@ func (e *Engine) extract(ctx context.Context, outputPath string, schema config.S return nil } -func (e *Engine) newDialect(ctx context.Context, dsn string) (dialect.Dialect, error) { - return postgres.NewPGDialect(ctx, dsn) -} - func (e *Engine) newLoader() *loader { return &loader{ dialect: e.dialect, diff --git a/etl/extractor.go b/etl/extractor.go index c7864df..64dd352 100644 --- a/etl/extractor.go +++ b/etl/extractor.go @@ -12,16 +12,17 @@ import ( "github.com/ulule/mover/dialect" ) -type resultSet []map[string]interface{} -type extract map[string]entry -type entry map[string]resultSet - -type extractor struct { - extract extract - dialect dialect.Dialect - schema map[string]config.Schema - logger *zap.Logger -} +type ( + resultSet []map[string]interface{} + extract map[string]entry + entry map[string]resultSet + extractor struct { + extract extract + dialect dialect.Dialect + schema map[string]config.Schema + logger *zap.Logger + } +) func depthF(depth int, msg string, args ...interface{}) string { return strings.Repeat("\t", depth+1) + fmt.Sprintf(msg, args...) diff --git a/etl/util.go b/etl/util.go index ff42194..75fa7ce 100644 --- a/etl/util.go +++ b/etl/util.go @@ -58,9 +58,14 @@ func chunkStrings(slice []string, chunkSize int) [][]string { return chunks } -func downloadFiles(ctx context.Context, filenames []string, outputPath string) error { +func downloadFiles(ctx context.Context, filenames []string, outputPath string, chunkSize int) error { g, _ := errgroup.WithContext(ctx) - chunks := chunkStrings(filenames, 10) + var chunks [][]string + if chunkSize == 0 { + chunks = [][]string{filenames} + } else { + chunks = chunkStrings(filenames, chunkSize) + } for i := range chunks { for j := range chunks[i] { g.Go((func(filename string) func() error { @@ -81,7 +86,7 @@ func downloadFiles(ctx context.Context, filenames []string, outputPath string) e return nil } -func downloadFile(absoluteURL string, output string) error { +func downloadFile(absoluteURL string, outputDir string) error { res, err := http.Get(absoluteURL) if err != nil { return fmt.Errorf("unable to retrieve %s: %w", absoluteURL, err) @@ -89,7 +94,7 @@ func downloadFile(absoluteURL string, output string) error { defer res.Body.Close() if res.StatusCode != 200 { - return fmt.Errorf("unable to download %s to %s: received %d HTTP code", absoluteURL, output, res.StatusCode) + return fmt.Errorf("unable to download %s to %s: received %d HTTP code", absoluteURL, outputDir, res.StatusCode) } u, err := url.Parse(absoluteURL) @@ -97,12 +102,12 @@ func downloadFile(absoluteURL string, output string) error { return fmt.Errorf("unable to parse %s: %w", absoluteURL, err) } - path := filepath.Join(output, filepath.Dir(u.Path)) + path := filepath.Join(outputDir, filepath.Dir(u.Path)) if err := os.MkdirAll(path, os.ModePerm); err != nil { return fmt.Errorf("unable to create directory %s: %w", path, err) } - file, err := os.Create(filepath.Join(output, u.Path)) + file, err := os.Create(filepath.Join(outputDir, u.Path)) if err != nil { return fmt.Errorf("unable to create file %s: %w", path, err) }