Skip to content

Commit

Permalink
Merge pull request #12 from Arekkusuva/master
Browse files Browse the repository at this point in the history
Add Context support
  • Loading branch information
koblas authored May 10, 2019
2 parents 124a62c + 911973f commit 84e5f9c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 29 deletions.
15 changes: 12 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package impalathing
import (
"context"
"fmt"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/koblas/impalathing/services/beeswax"
impala "github.com/koblas/impalathing/services/impalaservice"
"time"
)

type Options struct {
Expand Down Expand Up @@ -76,17 +77,25 @@ func (c *Connection) Close() error {
return nil
}

func (c *Connection) Query(query string) (RowSet, error) {
func (c *Connection) query(ctx context.Context, query string) (RowSet, error) {
bquery := beeswax.Query{}

bquery.Query = query
bquery.Configuration = []string{}

handle, err := c.client.Query(context.Background(), &bquery)
handle, err := c.client.Query(ctx, &bquery)

if err != nil {
return nil, err
}

return newRowSet(c.client, handle, c.options), nil
}

func (c *Connection) Query(query string) (RowSet, error) {
return c.query(context.Background(), query)
}

func (c *Connection) QueryWithContext(ctx context.Context, query string) (RowSet, error) {
return c.query(ctx, query)
}
101 changes: 75 additions & 26 deletions rowset.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ type rowSet struct {
// from the prior operation.
type RowSet interface {
Columns() []string
ColumnsWithContext(ctx context.Context) []string
Next() bool
NextWithContext(ctx context.Context) bool
Scan(dest ...interface{}) error
Poll() (*Status, error)
PollWithContext(ctx context.Context) (*Status, error)
Wait() (*Status, error)
WaitWithContext(ctx context.Context) (*Status, error)
FetchAll() []map[string]interface{}
FetchAllWithContext(ctx context.Context) []map[string]interface{}
MapScan(dest map[string]interface{}) error
}

Expand All @@ -68,10 +73,8 @@ func (s *Status) IsComplete() bool {
return s.state == beeswax.QueryState_FINISHED
}

// Issue a thrift call to check for the job's current status.
func (r *rowSet) Poll() (*Status, error) {
state, err := r.client.GetState(context.Background(), r.handle)

func (r *rowSet) poll(ctx context.Context) (*Status, error) {
state, err := r.client.GetState(ctx, r.handle)
if err != nil {
return nil, fmt.Errorf("Error getting status: %v", err)
}
Expand All @@ -83,10 +86,19 @@ func (r *rowSet) Poll() (*Status, error) {
return &Status{state, nil}, nil
}

// Wait until the job is complete, one way or another, returning Status and error.
func (r *rowSet) Wait() (*Status, error) {
// Issue a thrift call to check for the job's current status.
func (r *rowSet) Poll() (*Status, error) {
return r.poll(context.Background())
}

// Same as the Poll() but additionally takes the context.
func (r *rowSet) PollWithContext(ctx context.Context) (*Status, error) {
return r.poll(ctx)
}

func (r *rowSet) wait(ctx context.Context) (*Status, error) {
for {
status, err := r.Poll()
status, err := r.poll(ctx)

if err != nil {
return nil, err
Expand All @@ -104,9 +116,19 @@ func (r *rowSet) Wait() (*Status, error) {
}
}

func (r *rowSet) waitForSuccess() error {
// Wait until the job is complete, one way or another, returning Status and error.
func (r *rowSet) Wait() (*Status, error) {
return r.wait(context.Background())
}

// Same as the Wait() but additionally takes the context.
func (r *rowSet) WaitWithContext(ctx context.Context) (*Status, error) {
return r.wait(ctx)
}

func (r *rowSet) waitForSuccess(ctx context.Context) error {
if !r.ready {
status, err := r.Wait()
status, err := r.wait(ctx)
if err != nil {
return err
}
Expand All @@ -118,13 +140,8 @@ func (r *rowSet) waitForSuccess() error {
return nil
}

// Prepares a row for scanning into memory, by reading data from hive if
// the operation is successful, blocking until the operation is
// complete, if necessary.
// Returns true is a row is available to Scan(), and false if the
// results are empty or any other error occurs.
func (r *rowSet) Next() bool {
if err := r.waitForSuccess(); err != nil {
func (r *rowSet) next(ctx context.Context) bool {
if err := r.waitForSuccess(ctx); err != nil {
return false
}

Expand All @@ -133,14 +150,14 @@ func (r *rowSet) Next() bool {
return false
}

resp, err := r.client.Fetch(context.Background(), r.handle, false, 1000000)
resp, err := r.client.Fetch(ctx, r.handle, false, 1000000)
if err != nil {
log.Printf("FetchResults failed: %v\n", err)
return false
}

if r.metadata == nil {
r.metadata, err = r.client.GetResultsMetadata(context.Background(), r.handle)
r.metadata, err = r.client.GetResultsMetadata(ctx, r.handle)
if err != nil {
log.Printf("GetResultsMetadata failed: %v\n", err)
}
Expand All @@ -166,6 +183,20 @@ func (r *rowSet) Next() bool {
return true
}

// Prepares a row for scanning into memory, by reading data from hive if
// the operation is successful, blocking until the operation is
// complete, if necessary.
// Returns true is a row is available to Scan(), and false if the
// results are empty or any other error occurs.
func (r *rowSet) Next() bool {
return r.next(context.Background())
}

// Same as the Next() but additionally takes the context.
func (r *rowSet) NextWithContext(ctx context.Context) bool {
return r.next(ctx)
}

// Scan the last row prepared via Next() into the destination(s) provided,
// which must be pointers to value types, as in database.sql. Further,
// only pointers of the following types are supported:
Expand Down Expand Up @@ -243,11 +274,9 @@ func (r *rowSet) convertRawValue(raw string, hiveType string) (interface{}, erro
}
}

//Fetch all rows and convert to a []map[string]interface{} with
//appropriate type conversion already carried out
func (r *rowSet) FetchAll() []map[string]interface{} {
func (r *rowSet) fetchAll(ctx context.Context) []map[string]interface{} {
response := make([]map[string]interface{}, 0)
for r.Next() {
for r.next(ctx) {
row := make(map[string]interface{})
for i, val := range r.nextRow {
conv, err := r.convertRawValue(val, r.metadata.Schema.FieldSchemas[i].Type)
Expand All @@ -261,18 +290,38 @@ func (r *rowSet) FetchAll() []map[string]interface{} {
return response
}

// Returns the names of the columns for the given operation,
// blocking if necessary until the information is available.
func (r *rowSet) Columns() []string {
//Fetch all rows and convert to a []map[string]interface{} with
//appropriate type conversion already carried out
func (r *rowSet) FetchAll() []map[string]interface{} {
return r.fetchAll(context.Background())
}

// Same as the FetchAll() but additionally takes the context.
func (r *rowSet) FetchAllWithContext(ctx context.Context) []map[string]interface{} {
return r.fetchAll(ctx)
}

func (r *rowSet) columns(ctx context.Context) []string {
if r.columnNames == nil {
if err := r.waitForSuccess(); err != nil {
if err := r.waitForSuccess(ctx); err != nil {
return nil
}
}

return r.columnNames
}

// Returns the names of the columns for the given operation,
// blocking if necessary until the information is available.
func (r *rowSet) Columns() []string {
return r.columns(context.Background())
}

// Same as the Columns() but additionally takes the context.
func (r *rowSet) ColumnsWithContext(ctx context.Context) []string {
return r.columns(ctx)
}

// MapScan scans a single Row into the dest map[string]interface{}.
func (r *rowSet) MapScan(row map[string]interface{}) error {
for i, val := range r.nextRow {
Expand Down

0 comments on commit 84e5f9c

Please sign in to comment.