-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenality.go
112 lines (95 loc) · 2.55 KB
/
genality.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package genality
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
"github.com/doug-martin/goqu/v9/exp"
_ "github.com/lib/pq"
)
var driverName = "postgres"
var tableName = "genality"
var dialect = goqu.Dialect(driverName)
type Opts struct {
ConnectionString string
}
func New(opts Opts) (Descriptor, error) {
return newGenality(opts)
}
func newGenality(opts Opts) (genality, error) {
conn, err := sql.Open(driverName, opts.ConnectionString)
if err != nil {
return genality{}, err
}
return genality{
db: conn,
}, nil
}
type genality struct {
db *sql.DB
}
func (m genality) Add(ctx context.Context, record string) error {
return m.add(ctx, record, time.Now().UTC())
}
func (m genality) add(ctx context.Context, record string, t time.Time) error {
query, params, err := dialect.Insert(tableName).Prepared(true).
Cols("record", "time").Vals([]interface{}{record, t}).ToSQL()
if err != nil {
return err
}
_, err = m.db.ExecContext(ctx, query, params...)
if err != nil {
return err
}
return nil
}
func (m genality) GetCountFrom(ctx context.Context, record string, start time.Time) (int, error) {
query, params, err := dialect.Select(goqu.COUNT("*")).
From(tableName).Where(
exp.NewBooleanExpression(exp.EqOp, goqu.L("record"), record),
exp.NewBooleanExpression(exp.GteOp, goqu.L("time"), start),
).
Prepared(true).ToSQL()
if err != nil {
return 0, err
}
row := m.db.QueryRowContext(ctx, query, params...)
var v int
if err := row.Scan(&v); err != nil {
return 0, err
}
return v, nil
}
type BucketResponse struct {
Bucket time.Time `json:"bucket"`
Count uint `json:"count"`
}
func (m genality) GetCountBuckets(ctx context.Context, record string, start time.Time, bucketSize time.Duration) ([]BucketResponse, error) {
query, params, err := dialect.Select(goqu.L(getBucketSize(bucketSize)).As("bucket"), goqu.COUNT("*")).
From(tableName).Where(
exp.NewBooleanExpression(exp.EqOp, goqu.L("record"), record),
exp.NewBooleanExpression(exp.GteOp, goqu.L("time"), start),
).GroupBy("bucket").
Prepared(true).ToSQL()
if err != nil {
return nil, err
}
rows, err := m.db.QueryContext(ctx, query, params...)
if err != nil {
return nil, err
}
var res []BucketResponse
for rows.Next() {
var r BucketResponse
if err := rows.Scan(&r.Bucket, &r.Count); err != nil {
return nil, err
}
res = append(res, r)
}
return res, nil
}
func getBucketSize(d time.Duration) string {
return fmt.Sprintf(`time_bucket('%d hour',time)`, uint(d.Hours()))
}