Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Import field value #840

Merged
merged 6 commits into from
Sep 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,94 @@ func (c *Client) importNode(ctx context.Context, node *Node, buf []byte) error {
return nil
}

// ImportValue bulk imports field values for a single slice to a host.
func (c *Client) ImportValue(ctx context.Context, index, frame, field string, slice uint64, vals []FieldValue) error {
if index == "" {
return ErrIndexRequired
} else if frame == "" {
return ErrFrameRequired
}

buf, err := MarshalImportValuePayload(index, frame, field, slice, vals)
if err != nil {
return fmt.Errorf("Error Creating Payload: %s", err)
}

// Retrieve a list of nodes that own the slice.
nodes, err := c.FragmentNodes(ctx, index, slice)
if err != nil {
return fmt.Errorf("slice nodes: %s", err)
}

// Import to each node.
for _, node := range nodes {
if err := c.importValueNode(ctx, node, buf); err != nil {
return fmt.Errorf("import node: host=%s, err=%s", node.Host, err)
}
}

return nil
}

// MarshalImportValuePayload marshalls the import parameters into a protobuf byte slice.
func MarshalImportValuePayload(index, frame, field string, slice uint64, vals []FieldValue) ([]byte, error) {
// Separate row and column IDs to reduce allocations.
columnIDs := FieldValues(vals).ColumnIDs()
values := FieldValues(vals).Values()

// Marshal bits to protobufs.
buf, err := proto.Marshal(&internal.ImportValueRequest{
Index: index,
Frame: frame,
Slice: slice,
Field: field,
ColumnIDs: columnIDs,
Values: values,
})
if err != nil {
return nil, fmt.Errorf("marshal import request: %s", err)
}
return buf, nil
}

// importValueNode sends a pre-marshaled import request to a node.
func (c *Client) importValueNode(ctx context.Context, node *Node, buf []byte) error {
// Create URL & HTTP request.
u := url.URL{Scheme: "http", Host: node.Host, Path: "/import-value"}
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(buf))
if err != nil {
return err
}
req.Header.Set("Content-Length", strconv.Itoa(len(buf)))
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Accept", "application/x-protobuf")
req.Header.Set("User-Agent", "pilosa/"+Version)

// Execute request against the host.
resp, err := c.HTTPClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()

// Read body and unmarshal response.
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
} else if resp.StatusCode != http.StatusOK {
return errors.New(string(body))
}

var isresp internal.ImportResponse
if err := proto.Unmarshal(body, &isresp); err != nil {
return fmt.Errorf("unmarshal import response: %s", err)
} else if s := isresp.Err; s != "" {
return errors.New(s)
}

return nil
}

// ExportCSV bulk exports data for a single slice from a host to CSV format.
func (c *Client) ExportCSV(ctx context.Context, index, frame, view string, slice uint64, w io.Writer) error {
if index == "" {
Expand Down Expand Up @@ -1090,6 +1178,57 @@ func (p Bits) GroupBySlice() map[uint64][]Bit {
return m
}

// FieldValues represents the value for a column within a
// range-encoded frame.
type FieldValue struct {
ColumnID uint64
Value uint64
}

// FieldValues represents a slice of field values.
type FieldValues []FieldValue

func (p FieldValues) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p FieldValues) Len() int { return len(p) }

func (p FieldValues) Less(i, j int) bool {
return p[i].ColumnID < p[j].ColumnID
}

// ColumnIDs returns a slice of all the column IDs.
func (p FieldValues) ColumnIDs() []uint64 {
other := make([]uint64, len(p))
for i := range p {
other[i] = p[i].ColumnID
}
return other
}

// Values returns a slice of all the values.
func (p FieldValues) Values() []uint64 {
other := make([]uint64, len(p))
for i := range p {
other[i] = p[i].Value
}
return other
}

// GroupBySlice returns a map of field values by slice.
func (p FieldValues) GroupBySlice() map[uint64][]FieldValue {
m := make(map[uint64][]FieldValue)
for _, val := range p {
slice := val.ColumnID / SliceWidth
m[slice] = append(m[slice], val)
}

for slice, vals := range m {
sort.Sort(FieldValues(vals))
m[slice] = vals
}

return m
}

// BitsByPos represents a slice of bits sorted by internal position.
type BitsByPos []Bit

Expand Down
47 changes: 47 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,53 @@ func TestClient_ImportInverseEnabled(t *testing.T) {
}
}

// Ensure client can bulk import value data.
func TestClient_ImportValue(t *testing.T) {
hldr := test.MustOpenHolder()
defer hldr.Close()

fld := pilosa.Field{
Name: "fld",
Type: pilosa.FieldTypeInt,
Min: 0,
Max: 100,
}

// Load bitmap into cache to ensure cache gets updated.
index := hldr.MustCreateIndexIfNotExists("i", pilosa.IndexOptions{})
frame, err := index.CreateFrameIfNotExists("f", pilosa.FrameOptions{RangeEnabled: true, Fields: []*pilosa.Field{&fld}})
if err != nil {
t.Fatal(err)
}

s := test.NewServer()
defer s.Close()
s.Handler.Host = s.Host()
s.Handler.Cluster = test.NewCluster(1)
s.Handler.Cluster.Nodes[0].Host = s.Host()
s.Handler.Holder = hldr.Holder

// Send import request.
c := test.MustNewClient(s.Host())
if err := c.ImportValue(context.Background(), "i", "f", fld.Name, 0, []pilosa.FieldValue{
{ColumnID: 1, Value: 10},
{ColumnID: 2, Value: 20},
{ColumnID: 3, Value: 40},
}); err != nil {
t.Fatal(err)
}

sum, cnt, err := frame.FieldSum(nil, fld.Name)
if err != nil {
t.Fatal(err)
}

// Verify data.
if sum != 70 || cnt != 3 {
t.Fatalf("unexpected values: got sum=%v, count=%v; expected sum=70, cnt=3", sum, cnt)
}
}

// Ensure client backup and restore a frame.
func TestClient_BackupRestore(t *testing.T) {
hldr := test.MustOpenHolder()
Expand Down
1 change: 1 addition & 0 deletions cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ omitted. If it is present then its format should be YYYY-MM-DDTHH:MM.
flags.StringVarP(&Importer.Host, "host", "", "localhost:10101", "host:port of Pilosa.")
flags.StringVarP(&Importer.Index, "index", "i", "", "Pilosa index to import into.")
flags.StringVarP(&Importer.Frame, "frame", "f", "", "Frame to import into.")
flags.StringVarP(&Importer.Field, "field", "", "", "Field to import into.")
flags.IntVarP(&Importer.BufferSize, "buffer-size", "s", 10000000, "Number of bits to buffer/sort before importing.")
flags.BoolVarP(&Importer.Sort, "sort", "", false, "Enables sorting before import.")
flags.BoolVarP(&Importer.CreateSchema, "create", "e", false, "Create the schema if it does not exist before import.")
Expand Down
114 changes: 113 additions & 1 deletion ctl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type ImportCommand struct {
// CreateSchema ensures the schema exists before import
CreateSchema bool

// For Range-Encoded fields, name of the Field to import into.
Field string `json:"field"`

// Filenames to import from.
Paths []string `json:"paths"`

Expand Down Expand Up @@ -122,6 +125,16 @@ func (cmd *ImportCommand) ensureSchema(ctx context.Context) error {

// importPath parses a path into bits and imports it to the server.
func (cmd *ImportCommand) importPath(ctx context.Context, path string) error {
// If a field is provided, treat the import data as values to be range-encoded.
if cmd.Field != "" {
return cmd.bufferFieldValues(ctx, path)
} else {
return cmd.bufferBits(ctx, path)
}
}

// bufferBits buffers slices of bits to be imported as a batch.
func (cmd *ImportCommand) bufferBits(ctx context.Context, path string) error {
a := make([]pilosa.Bit, 0, cmd.BufferSize)

var r *csv.Reader
Expand Down Expand Up @@ -204,7 +217,7 @@ func (cmd *ImportCommand) importPath(ctx context.Context, path string) error {
return nil
}

// importPath parses a path into bits and imports it to the server.
// importBits sends batches of bits to the server.
func (cmd *ImportCommand) importBits(ctx context.Context, bits []pilosa.Bit) error {
logger := log.New(cmd.Stderr, "", log.LstdFlags)

Expand All @@ -224,5 +237,104 @@ func (cmd *ImportCommand) importBits(ctx context.Context, bits []pilosa.Bit) err
}
}

return nil

}

// bufferFieldValues buffers slices of fieldValues to be imported as a batch.
func (cmd *ImportCommand) bufferFieldValues(ctx context.Context, path string) error {
a := make([]pilosa.FieldValue, 0, cmd.BufferSize)

var r *csv.Reader

if path != "-" {
// Open file for reading.
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()

// Read rows as bits.
r = csv.NewReader(f)
} else {
r = csv.NewReader(cmd.Stdin)
}

r.FieldsPerRecord = -1
rnum := 0
for {
rnum++

// Read CSV row.
record, err := r.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}

// Ignore blank rows.
if record[0] == "" {
continue
} else if len(record) < 2 {
return fmt.Errorf("bad column count on row %d: col=%d", rnum, len(record))
}

var val pilosa.FieldValue

// Parse column id.
columnID, err := strconv.ParseUint(record[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid column id on row %d: %q", rnum, record[0])
}
val.ColumnID = columnID

// Parse field value.
value, err := strconv.ParseUint(record[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid value on row %d: %q", rnum, record[1])
}
val.Value = value

a = append(a, val)

// If we've reached the buffer size then import field values.
if len(a) == cmd.BufferSize {
if err := cmd.importFieldValues(ctx, a); err != nil {
return err
}
a = a[:0]
}
}

// If there are still values in the buffer then flush them.
if err := cmd.importFieldValues(ctx, a); err != nil {
return err
}

return nil
}

// importFieldValues sends batches of fieldValues to the server.
func (cmd *ImportCommand) importFieldValues(ctx context.Context, vals []pilosa.FieldValue) error {
logger := log.New(cmd.Stderr, "", log.LstdFlags)

// Group vals by slice.
logger.Printf("grouping %d vals", len(vals))
valsBySlice := pilosa.FieldValues(vals).GroupBySlice()

// Parse path into field values.
for slice, vals := range valsBySlice {
if cmd.Sort {
sort.Sort(pilosa.FieldValues(vals))
}

logger.Printf("importing slice: %d, n=%d", slice, len(vals))
if err := cmd.Client.ImportValue(ctx, cmd.Index, cmd.Frame, cmd.Field, slice, vals); err != nil {
return err
}
}

return nil
}
Loading