-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathrows.go
136 lines (116 loc) · 3.11 KB
/
rows.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package ep
import (
"context"
"database/sql/driver"
"io"
"strings"
)
// Rows creates a new Runner that also implements driver.Rows, useful for cases
// when we need to execute the Runner in a database library. Calling .Next() for
// the first time will Run the underlying Runner with the provided context. This
// runner cannot be distributed and thus should only be used at the top-level
// client-facing code. It's safe to cast the returned Runner to driver.Rows.
func Rows(ctx context.Context, r Runner) Runner {
return &rows{Runner: r, Ctx: ctx}
}
// rows is a runner that also implements driver.Rows
type rows struct {
Runner
Ctx context.Context
CancelFunc context.CancelFunc
Out chan Dataset
Buff [][]string // buffer of received rows waiting for Next()
Err error
}
func (r *rows) Equals(other interface{}) bool {
o, ok := other.(*rows)
return ok && r.Runner.Equals(o.Runner)
}
func (r *rows) Run(ctx context.Context, inp, out chan Dataset) error {
if r.Out != nil {
if r.Out != out {
panic("rows already have out")
}
} else {
r.Out = out // save it for Next()
}
r.Ctx, r.CancelFunc = context.WithCancel(ctx) // for Close()
return r.Runner.Run(r.Ctx, inp, out)
}
// see driver.Rows
func (r *rows) Columns() []string {
var cols []string
for _, t := range r.Returns() {
alias := GetAlias(t)
if alias == "" {
alias = UnnamedColumn
}
cols = append(cols, alias)
}
return cols
}
// see driver.Rows. Blocks until the runner has indeed finished
func (r *rows) Close() error {
if r.CancelFunc == nil {
return nil // it never started.
}
r.CancelFunc()
// wait for it to end.
return nil // return the error.
}
// see driver.Rows
func (r *rows) Next(dest []driver.Value) error {
// Not running? start it.
if r.Out == nil {
r.Out = make(chan Dataset)
// kick it off with one empty input
inp := make(chan Dataset)
close(inp)
go func() {
defer close(r.Out)
r.Err = r.Run(r.Ctx, inp, r.Out)
}()
}
// if we still have buffered rows, emit the first one and remove it
if len(r.Buff) > 0 {
row := r.Buff[0]
for i := range dest {
dest[i] = row[i]
}
r.Buff = r.Buff[1:]
return nil
}
// read the next batch of rows
data, ok := <-r.Out
if !ok && r.Err != nil {
// This is intentionally not at the beginning of this function, in order
// to emit all of the buffered rows that arrived before the error
return r.Err
} else if !ok {
return io.EOF
}
// build the batch of data
columnar := ColumnStrings(data)
// transpose the columnar strings to rows of strings for the buffer.
rows := make([][]string, data.Len())
dataWidth := data.Width()
for i := range rows {
row := make([]string, dataWidth)
for j := range row {
row[j] = columnar[j][i]
}
rows[i] = row
}
r.Buff = rows
// now that we have the new buffer, try again.
return r.Next(dest)
}
// see driver.ColumnTypeDatabaseTypeName
func (r *rows) ColumnTypeDatabaseTypeName(index int) string {
types := r.Returns()
return strings.ToUpper(types[index].Name())
}
// see driver.RowsColumnTypeNullable
func (r *rows) ColumnTypeNullable(index int) (nullable, ok bool) {
return true, true
}