Skip to content

Commit

Permalink
ref: remove table flag on extract
Browse files Browse the repository at this point in the history
  • Loading branch information
thoas committed Dec 28, 2020
1 parent 511289d commit 11f0514
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
3 changes: 1 addition & 2 deletions cmd/mover/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ var (

func main() {
flag.StringVar(&query, "query", "", "query to execute")
flag.StringVar(&tableName, "table", "", "table name to export")
flag.StringVar(&path, "path", "", "directory output")
flag.StringVar(&dsn, "dsn", "", "database dsn")
flag.StringVar(&action, "action", "", "action to execute")
Expand Down Expand Up @@ -62,7 +61,7 @@ func main() {

switch action {
case "extract":
if err := engine.Extract(ctx, path, tableName, query); err != nil {
if err := engine.Extract(ctx, path, query); err != nil {
logger.Error("unable to extract data",
zap.Error(err),
zap.String("table_name", tableName),
Expand Down
4 changes: 2 additions & 2 deletions dialect/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ulule/mover/dialect"
)

var fkReg = regexp.MustCompile(`FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)`)
var fkRegexp = regexp.MustCompile(`FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)`)

// NewPGDialect initializes a new PGDialect instance.
func NewPGDialect(ctx context.Context, dsn string) (dialect.Dialect, error) {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (d *PGDialect) ForeignKeys(ctx context.Context, tableName string) (dialect.

foreignKeys := make(dialect.ForeignKeys, len(results))
for i := range foreignKeys {
matches := fkReg.FindStringSubmatch(results[i].Condef)
matches := fkRegexp.FindStringSubmatch(results[i].Condef)

foreignKeys[i] = dialect.ForeignKey{
Name: results[i].Conname,
Expand Down
7 changes: 6 additions & 1 deletion etl/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,14 @@ func (e *Engine) Load(ctx context.Context, outputPath string) error {
}

// Extract extracts data to an output directory with a table name and its query.
func (e *Engine) Extract(ctx context.Context, outputPath, tableName, query string) error {
func (e *Engine) Extract(ctx context.Context, outputPath, query string) error {
extractor := e.newExtractor()

tableName := getQueryTable(query)
if tableName == "" {
return fmt.Errorf("unable to retrieve table from query: %s", query)
}

cache, err := extractor.Handle(ctx, e.schema[tableName], query)
if err != nil {
return fmt.Errorf("unable to extract %s (query %s): %w", tableName, query, err)
Expand Down
24 changes: 24 additions & 0 deletions etl/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,37 @@ import (
"net/url"
"os"
"path/filepath"
"regexp"
"strings"

"golang.org/x/sync/errgroup"

"github.com/ulule/mover/config"
)

var sqlSelectRegexp = regexp.MustCompile(`^(?i)SELECT (?P<columns>.*[^T]) FROM (?P<table>\w+).*`)

func getQueryTable(query string) string {
match := sqlSelectRegexp.FindStringSubmatch(query)
if match == nil {
return ""
}

captures := make(map[string]string)
for i, name := range sqlSelectRegexp.SubexpNames() {
if i == 0 {
continue
}
captures[name] = match[i]
}

if val, ok := captures["table"]; ok {
return val
}

return ""
}

func extractFilenames(schema config.Schema, rows entry) []string {
filenames := make([]string, 0)
for i := range schema.Columns {
Expand Down

0 comments on commit 11f0514

Please sign in to comment.