This repository has been archived by the owner on Feb 27, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmapping_to_db.go
163 lines (134 loc) · 4.16 KB
/
mapping_to_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package dataloading
import (
"strconv"
"github.com/gocodo/bloomdb"
)
var sqlTypes = map[string]string{
"datetime": "timestamp",
"bigint": "bigint",
"int": "int",
"decimal": "decimal",
"string": "character varying",
"boolean": "boolean",
}
func MappingToTableOnly(mapping *SourceMapping) string {
sources := (*mapping).Sources
var create string
for _, source := range sources {
for _, destination := range source.Destinations {
create += "CREATE TABLE " + destination.Name + "(\n"
for fieldIndex, field := range destination.Fields {
var sqlType string
switch field.Source.(type) {
case string:
if field.Type == "" {
sqlType = sqlTypes["string"]
} else {
sqlType = sqlTypes[field.Type]
}
if sqlType == "character varying" {
if field.MaxLength != 0 {
sqlType += "(" + strconv.Itoa(field.MaxLength) + ")"
} else {
sqlType += "(255)"
}
}
case []interface{}:
sqlType = "uuid"
case []string:
sqlType = "uuid"
}
create += field.Dest + " " + sqlType
if fieldIndex + 1 != len(destination.Fields) {
create += ",\n"
} else {
create += ",\n"
create += "bloom_created_at timestamp\n"
}
}
create += ");\n"
}
for _, destination := range source.Destinations {
create += "CREATE TABLE " + destination.Name + "_revisions(\n"
for fieldIndex, field := range destination.Fields {
var sqlType string
switch field.Source.(type) {
case string:
if field.Type == "" {
sqlType = sqlTypes["string"]
} else {
sqlType = sqlTypes[field.Type]
}
if sqlType == "character varying" {
if field.MaxLength != 0 {
sqlType += "(" + strconv.Itoa(field.MaxLength) + ")"
} else {
sqlType += "(255)"
}
}
case []interface{}:
sqlType = "uuid"
case []string:
sqlType = "uuid"
}
create += field.Dest + " " + sqlType
if fieldIndex + 1 != len(destination.Fields) {
create += ",\n"
} else {
create += ",\n"
create += "bloom_created_at timestamp,\n"
create += "bloom_updated_at timestamp,\n"
create += "bloom_action character varying(255)\n"
}
}
create += ");\n"
}
}
return create
}
func MappingToCreate(mapping *SourceMapping) string {
sources := (*mapping).Sources
create := MappingToTableOnly(mapping)
for _, source := range sources {
source_id := bloomdb.MakeKey(source.Name)
for _, destination := range source.Destinations {
table_id := bloomdb.MakeKey(source.Name, destination.Name)
create += "INSERT INTO source_tables (id, source_id, name) VALUES ('" + table_id + "', '" + source_id + "', '" + destination.Name + "');\n"
}
create += "INSERT INTO sources (id, name) VALUES ('" + source_id + "', '" + source.Name + "');\n";
}
return create;
}
func MappingToDrop(mapping *SourceMapping) string {
sources := (*mapping).Sources
var drop string
for _, source := range sources {
for _, destination := range source.Destinations {
drop += "DROP TABLE IF EXISTS " + destination.Name + ";\n"
drop += "DROP TABLE IF EXISTS " + destination.Name + "_revisions;\n"
}
drop += "DELETE FROM source_versions USING sources WHERE sources.id = source_versions.source_id AND sources.name = '" + source.Name + "';\n"
drop += "DELETE FROM sources WHERE sources.name = '" + source.Name + "';\n"
}
return drop
}
func MappingToIndex(mapping *SourceMapping) string {
sources := (*mapping).Sources
var index string
for _, source := range sources {
for _, destination := range source.Destinations {
for _, field := range destination.Fields {
switch field.Source.(type) {
case []interface{}:
index += "CREATE INDEX ON " + destination.Name + " (" + field.Dest + ");\n"
index += "CREATE INDEX ON " + destination.Name + "_revisions (" + field.Dest + ");\n"
}
}
index += "CREATE INDEX ON " + destination.Name + " (bloom_created_at);\n"
index += "CREATE INDEX ON " + destination.Name + "_revisions (bloom_created_at);\n"
index += "CREATE INDEX ON " + destination.Name + "_revisions (bloom_action);\n"
index += "CREATE INDEX ON " + destination.Name + "_revisions (bloom_updated_at);\n"
}
}
return index
}