Skip to content

Commit

Permalink
api: support iproto feature discovery
Browse files Browse the repository at this point in the history
Since version 2.10.0 Tarantool supports feature discovery [1]. Client
can send the schema version and supported features and receive
server-side schema version and supported features information to tune
its behavior.

After this patch, the request will be send on `Connect`. Connector
stores server info in connection internals. After that, user may call
`IsProtocolVersionSupported` and `IsProtocolFeatureSupported` handles to
check if it is possible to use a feature.

`IsProtocolFeatureSupported` iterates over lists to check if feature is
enabled. It seems that iterating over a small list is way faster than
building a map, see [3]. Benchmark tests show that this check is rather
fast (0.5 ns on HP ProBook 440 G5) so it is not necessary to cache it in
any way.

Traces of IPROTO_FEATURE_GRACEFUL_SHUTDOWN flag and protocol version 4
could be found in Tarantool source code but they were removed in the
following commits before the release and treated like they never
existed. We also ignore them here too. See [2] for more info.

1. tarantool/tarantool#6253
2. tarantool/tarantool-python#262
3. https://stackoverflow.com/a/52710077/11646599

Closes #120
  • Loading branch information
DifferentialOrange committed Nov 14, 2022
1 parent 48cf0c7 commit 6afeb6a
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

### Added

- Support IProto feature discovery (#120).

### Changed

### Fixed
Expand Down
64 changes: 64 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ type Connection struct {
lenbuf [PacketLengthBytes]byte

lastStreamId uint64

serverProtocolInfo protocolInfo
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -391,6 +393,13 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
}
}

if err = conn.loadProtocolInfo(); err != nil {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.closeConnection(err, true)
return nil, err
}

return conn, err
}

Expand Down Expand Up @@ -687,6 +696,10 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
conn.closeConnection(neterr, false)
if err := conn.createConnection(true); err != nil {
conn.closeConnection(err, true)
} else {
if err = conn.loadProtocolInfo(); err != nil {
conn.closeConnection(err, true)
}
}
}
} else {
Expand Down Expand Up @@ -1163,3 +1176,54 @@ func (conn *Connection) NewStream() (*Stream, error) {
Conn: conn,
}, nil
}

// loadProtocolInfo sends info about client protocol,
// receives info about server protocol in response
// and store in in connection serverProtocolInfo.
func (conn *Connection) loadProtocolInfo() error {
var ok bool

resp, err := conn.exchangeProtocolInfo(
ClientProtocolVersion,
ClientProtocolFeatures)

if err != nil {
if resp.Code == ErrUnknownRequestType {
// IPROTO_ID requests are not supported by server.
conn.serverProtocolInfo = protocolInfo{
version: ProtocolVersionUnsupported,
features: []ProtocolFeature{},
}
return nil
}

return err
}

if len(resp.Data) == 0 {
return fmt.Errorf("Unexpected response on protocol info exchange: no data")
}

conn.serverProtocolInfo, ok = resp.Data[0].(protocolInfo)
if !ok {
return fmt.Errorf("Unexpected response on protocol info exchange: wrong data")
}

return nil
}

// IsProtocolVersionSupported checks if expected protocol version
// is supported by both server and client.
// Since 1.10.0
func (conn *Connection) IsProtocolVersionSupported(version ProtocolVersion) bool {
return (version <= ClientProtocolVersion) &&
(version <= conn.serverProtocolInfo.version)
}

// IsProtocolFeatureSupported checks if expected protocol feature
// is supported by both server and client.
// Since 1.10.0
func (conn *Connection) IsProtocolFeatureSupported(feature ProtocolFeature) bool {
return isFeatureSupported(feature, ClientProtocolFeatures) &&
isFeatureSupported(feature, conn.serverProtocolInfo.features)
}
3 changes: 3 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
RollbackRequestCode = 16
PingRequestCode = 64
SubscribeRequestCode = 66
IdRequestCode = 73

KeyCode = 0x00
KeySync = 0x01
Expand All @@ -41,6 +42,8 @@ const (
KeySQLBind = 0x41
KeySQLInfo = 0x42
KeyStmtID = 0x43
KeyVersion = 0x54
KeyFeatures = 0x55
KeyTimeout = 0x56
KeyTxnIsolation = 0x59

Expand Down
30 changes: 30 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,18 @@ func ExampleCommitRequest() {
conn := example_connect()
defer conn.Close()

if conn.IsProtocolFeatureSupported(tarantool.ProtocolFeatureStreams) != true {
fmt.Printf("Streams are not supported")
return
}

stream, _ := conn.NewStream()

if conn.IsProtocolFeatureSupported(tarantool.ProtocolFeatureTransactions) != true {
fmt.Printf("Transactions are not supported")
return
}

// Begin transaction
req = tarantool.NewBeginRequest()
resp, err = stream.Do(req).Get()
Expand Down Expand Up @@ -410,8 +420,18 @@ func ExampleRollbackRequest() {
conn := example_connect()
defer conn.Close()

if conn.IsProtocolFeatureSupported(tarantool.ProtocolFeatureStreams) != true {
fmt.Printf("Streams are not supported")
return
}

stream, _ := conn.NewStream()

if conn.IsProtocolFeatureSupported(tarantool.tarantool.ProtocolFeatureTransactions) != true {
fmt.Printf("Transactions are not supported")
return
}

// Begin transaction
req = tarantool.NewBeginRequest()
resp, err = stream.Do(req).Get()
Expand Down Expand Up @@ -486,8 +506,18 @@ func ExampleBeginRequest_TxnIsolation() {
conn := example_connect()
defer conn.Close()

if conn.IsProtocolFeatureSupported(tarantool.ProtocolFeatureStreams) != true {
fmt.Printf("Streams are not supported")
return
}

stream, _ := conn.NewStream()

if conn.IsProtocolFeatureSupported(tarantool.ProtocolFeatureTransactions) != true {
fmt.Printf("Transactions are not supported")
return
}

// Begin transaction
req = tarantool.NewBeginRequest().
TxnIsolation(tarantool.ReadConfirmedLevel).
Expand Down
44 changes: 44 additions & 0 deletions protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tarantool

type ProtocolVersion uint64
type ProtocolFeature uint64

type protocolInfo struct {
version ProtocolVersion
features []ProtocolFeature
}

const ProtocolVersionUnsupported ProtocolVersion = 0

const (
// Streams support.
ProtocolFeatureStreams ProtocolFeature = 0
// Interactive tranactions support.
ProtocolFeatureTransactions ProtocolFeature = 1
// Support of MP_ERROR object over MessagePack.
ProtocolFeatureErrorExtension ProtocolFeature = 2
// Support of watchers.
ProtocolFeatureWatchers ProtocolFeature = 3
)

// Protocol version supported by connector. Version 3
// was introduced in Tarantool 2.10.0 and used in latest 2.10.4.
const ClientProtocolVersion ProtocolVersion = 3

// Protocol features supported by connector.
var ClientProtocolFeatures = []ProtocolFeature{
ProtocolFeatureStreams,
ProtocolFeatureTransactions,
}

func isFeatureSupported(feature ProtocolFeature, supportedFeatures []ProtocolFeature) bool {
// It seems that iterating over a small list is way faster
// than building a map: https://stackoverflow.com/a/52710077/11646599
for _, supportedFeature := range supportedFeatures {
if feature == supportedFeature {
return true
}
}

return false
}
65 changes: 65 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,68 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
req.ctx = ctx
return req
}

// protocolInfoRequest informs the server about supported protocol
// version and features.
type protocolInfoRequest struct {
baseRequest
protocolInfo
}

// newProtocolInfoRequest returns a new protocolInfoRequest.
func newProtocolInfoRequest(protocolVersion ProtocolVersion,
protocolFeatures []ProtocolFeature) *protocolInfoRequest {
req := new(protocolInfoRequest)
req.requestCode = IdRequestCode
req.version = protocolVersion
req.features = protocolFeatures
return req
}

// exchangeProtocolInfo sends info about client protocol
// and receives info about server protocol in response.
func (conn *Connection) exchangeProtocolInfo(version ProtocolVersion,
features []ProtocolFeature) (resp *Response, err error) {
req := newProtocolInfoRequest(version, features)
return conn.Do(req).Get()
}

// Body fills an encoder with the protocol version request body.
func (req *protocolInfoRequest) Body(res SchemaResolver, enc *encoder) error {
return req.fillProtocolInfoRequest(enc)
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *protocolInfoRequest) Context(ctx context.Context) *protocolInfoRequest {
req.ctx = ctx
return req
}

func (req *protocolInfoRequest) fillProtocolInfoRequest(enc *encoder) error {
enc.EncodeMapLen(2)

encodeUint(enc, KeyVersion)
if err := enc.Encode(req.version); err != nil {
return err
}

encodeUint(enc, KeyFeatures)

t := len(req.features)
if err := enc.EncodeArrayLen(t); err != nil {
return err
}

for _, feature := range req.features {
if err := enc.Encode(feature); err != nil {
return err
}
}

return nil
}
32 changes: 31 additions & 1 deletion response.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,15 @@ func (resp *Response) decodeBody() (err error) {
offset := resp.buf.Offset()
defer resp.buf.Seek(offset)

var l int
var l, larr int
var stmtID, bindCount uint64
var serverProtocolInfo protocolInfo = protocolInfo{
version: ProtocolVersionUnsupported,
features: []ProtocolFeature{},
}

var protocolFeature ProtocolFeature
isProtocolInfoResponse := false

d := newDecoder(&resp.buf)

Expand Down Expand Up @@ -190,6 +197,24 @@ func (resp *Response) decodeBody() (err error) {
if bindCount, err = d.DecodeUint64(); err != nil {
return err
}
case KeyVersion:
isProtocolInfoResponse = true
if err = d.Decode(&serverProtocolInfo.version); err != nil {
return err
}
case KeyFeatures:
isProtocolInfoResponse = true
if larr, err = d.DecodeArrayLen(); err != nil {
return err
}

serverProtocolInfo.features = make([]ProtocolFeature, larr)
for i := 0; i < larr; i++ {
if err = d.Decode(&protocolFeature); err != nil {
return err
}
serverProtocolInfo.features[i] = protocolFeature
}
default:
if err = d.Skip(); err != nil {
return err
Expand All @@ -204,6 +229,11 @@ func (resp *Response) decodeBody() (err error) {
}
resp.Data = []interface{}{stmt}
}

if isProtocolInfoResponse {
resp.Data = []interface{}{serverProtocolInfo}
}

if resp.Code != OkCode && resp.Code != PushCode {
resp.Code &^= ErrorCodeBit
err = Error{resp.Code, resp.Error}
Expand Down
44 changes: 44 additions & 0 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,50 @@ func TestStream_DoWithClosedConn(t *testing.T) {
}
}

func TestConnectionProtocolInfo(t *testing.T) {
conn := test_helpers.ConnectWithValidation(t, server, opts)

isProtocolInfoUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0)
if err != nil {
t.Fatalf("Unexpected error has been caught: %s", err.Error())
}
// First Tarantool protocol version (1) was introduced between
// 2.10.0-beta1 and 2.10.0-beta2. Versions 2 and 3 were also
// introduced between 2.10.0-beta1 and 2.10.0-beta2. Version 4
// was introduced between 2.10.0-beta2 and 2.10.0-rc1 and reverted
// back to version 3 in the same version interval.
// So each release Tarantool >= 2.10 have protocol version >= 3.
// (Tarantool 2.10.4 still has version 3.)
minProtocolVersion := ProtocolVersion(1)
tarantool210ProtocolVersion := ProtocolVersion(3)

if isProtocolInfoUnsupported {
require.Equal(t, conn.IsProtocolVersionSupported(minProtocolVersion), false)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureStreams), false)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureTransactions), false)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureErrorExtension), false)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureWatchers), false)
} else {
require.Equal(t, conn.IsProtocolVersionSupported(tarantool210ProtocolVersion), true)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureStreams), true)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureTransactions), true)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureErrorExtension), false)
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureWatchers), false)
}
}

func BenchmarkConnectionProtocolFeature(b *testing.B) {
conn := test_helpers.ConnectWithValidation(b, server, opts)
defer conn.Close()

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = conn.IsProtocolFeatureSupported(ProtocolFeatureStreams)
}
})
}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
Expand Down

0 comments on commit 6afeb6a

Please sign in to comment.