Skip to content

Commit

Permalink
apacheGH-37720: [Go][FlightSQL] Add prepared statement handle to DoPu…
Browse files Browse the repository at this point in the history
…t result (apache#40311)

### Rationale for this change
See discussion on apache#37720 and mailing list: https://lists.apache.org/thread/3kb82ypx99q96g84qv555l6x8r0bppyq

### What changes are included in this PR?

Changes the Go FlightSQL client and server implementations to support returning an updated prepared statement handle to the client as part of the `DoPut(PreparedStatement)` RPC call.

### Are these changes tested?

### Are there any user-facing changes?

See parent issue and docs PR apache#40243  for details of user facing changes.

**This PR includes breaking changes to public APIs.**

* GitHub Issue: apache#37720

Lead-authored-by: Adam Curtis <[email protected]>
Co-authored-by: David Li <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
2 people authored and rok committed May 8, 2024
1 parent ec60df9 commit 0092eec
Show file tree
Hide file tree
Showing 3 changed files with 600 additions and 537 deletions.
59 changes: 50 additions & 9 deletions go/arrow/flight/flightsql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,9 +1119,22 @@ func (p *PreparedStatement) Execute(ctx context.Context, opts ...grpc.CallOption
return nil, err
}

desc, err = p.bindParameters(ctx, desc, opts...)
if err != nil {
return nil, err
if p.hasBindParameters() {
pstream, err := p.client.Client.DoPut(ctx, opts...)
if err != nil {
return nil, err
}
wr, err := p.writeBindParameters(pstream, desc)
if err != nil {
return nil, err
}
if err = wr.Close(); err != nil {
return nil, err
}
pstream.CloseSend()
if err = p.captureDoPutPreparedStatementHandle(pstream); err != nil {
return nil, err
}
}
return p.client.getFlightInfo(ctx, desc, opts...)
}
Expand All @@ -1142,9 +1155,23 @@ func (p *PreparedStatement) ExecutePut(ctx context.Context, opts ...grpc.CallOpt
return err
}

_, err = p.bindParameters(ctx, desc, opts...)
if err != nil {
return err
if p.hasBindParameters() {
pstream, err := p.client.Client.DoPut(ctx, opts...)
if err != nil {
return err
}

wr, err := p.writeBindParameters(pstream, desc)
if err != nil {
return err
}
if err = wr.Close(); err != nil {
return err
}
pstream.CloseSend()
if err = p.captureDoPutPreparedStatementHandle(pstream); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -1172,9 +1199,23 @@ func (p *PreparedStatement) ExecutePoll(ctx context.Context, retryDescriptor *fl
}

if retryDescriptor == nil {
desc, err = p.bindParameters(ctx, desc, opts...)
if err != nil {
return nil, err
if p.hasBindParameters() {
pstream, err := p.client.Client.DoPut(ctx, opts...)
if err != nil {
return nil, err
}

wr, err := p.writeBindParameters(pstream, desc)
if err != nil {
return nil, err
}
if err = wr.Close(); err != nil {
return nil, err
}
pstream.CloseSend()
if err = p.captureDoPutPreparedStatementHandle(pstream); err != nil {
return nil, err
}
}
}
return p.client.Client.PollFlightInfo(ctx, desc, opts...)
Expand Down
10 changes: 5 additions & 5 deletions go/arrow/flight/flightsql/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,9 @@ func (s *FlightSqlClientSuite) TestPreparedStatementExecuteParamBinding() {
expectedDesc := getDesc(&pb.CommandPreparedStatementQuery{PreparedStatementHandle: []byte(handle)})

// mocked DoPut result
doPutPreparedStatementResult := &pb.DoPutPreparedStatementResult{PreparedStatementHandle: []byte(updatedHandle)}
doPutPreparedStatementResult := &pb.DoPutPreparedStatementResult{PreparedStatementHandle: []byte(updatedHandle)}
resdata, _ := proto.Marshal(doPutPreparedStatementResult)
putResult := &pb.PutResult{AppMetadata: resdata}
putResult := &pb.PutResult{ AppMetadata: resdata }

// mocked client stream for DoPut
mockedPut := &mockDoPutClient{}
Expand All @@ -461,7 +461,7 @@ func (s *FlightSqlClientSuite) TestPreparedStatementExecuteParamBinding() {
mockedPut.On("CloseSend").Return(nil)
mockedPut.On("Recv").Return(putResult, nil)

infoCmd := &pb.CommandPreparedStatementQuery{PreparedStatementHandle: []byte(updatedHandle)}
infoCmd := &pb.CommandPreparedStatementQuery{PreparedStatementHandle: []byte(handle)}
desc := getDesc(infoCmd)
s.mockClient.On("GetFlightInfo", desc.Type, desc.Cmd, s.callOpts).Return(&emptyFlightInfo, nil)

Expand Down Expand Up @@ -525,9 +525,9 @@ func (s *FlightSqlClientSuite) TestPreparedStatementExecuteReaderBinding() {
expectedDesc := getDesc(&pb.CommandPreparedStatementQuery{PreparedStatementHandle: []byte(query)})

// mocked DoPut result
doPutPreparedStatementResult := &pb.DoPutPreparedStatementResult{PreparedStatementHandle: []byte(query)}
doPutPreparedStatementResult := &pb.DoPutPreparedStatementResult{PreparedStatementHandle: []byte(query)}
resdata, _ := proto.Marshal(doPutPreparedStatementResult)
putResult := &pb.PutResult{AppMetadata: resdata}
putResult := &pb.PutResult{ AppMetadata: resdata }

// mocked client stream for DoPut
mockedPut := &mockDoPutClient{}
Expand Down
Loading

0 comments on commit 0092eec

Please sign in to comment.