Skip to content

Commit

Permalink
api: support iproto feature discovery
Browse files Browse the repository at this point in the history
Since version 2.10.0 Tarantool supports feature discovery [1]. Client
can send client protocol version and supported features and receive
server protocol version and supported features information to tune
its behavior.

After this patch, the request will be sent on `dial`, before
authentication is performed. Connector stores server info in connection
internals. User can also set option RequiredProtocolVersion and
RequiredFeatures to fast fail on connect in server (or even client) does
not provide some expected feature.

Feature check iterates over lists to check if feature is
enabled. It seems that iterating over a small list is way faster than
building a map, see [2]. Benchmark tests show that this check is rather
fast (0.5 ns for both client and server check on HP ProBook 440 G5) so
it is not necessary to cache it in any way.

Traces of IPROTO_FEATURE_GRACEFUL_SHUTDOWN flag and protocol version 4
could be found in Tarantool source code but they were removed in the
following commits before the release and treated like they never
existed. We also ignore them here too. See [3] for more info.

1. tarantool/tarantool#6253
2. https://stackoverflow.com/a/52710077/11646599
3. tarantool/tarantool-python#262

Closes #120
  • Loading branch information
DifferentialOrange committed Nov 15, 2022
1 parent c367122 commit 7330a14
Show file tree
Hide file tree
Showing 12 changed files with 558 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

### Added

- Support iproto feature discovery (#120).

### Changed

### Fixed
Expand Down
123 changes: 123 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ type Connection struct {
lenbuf [PacketLengthBytes]byte

lastStreamId uint64

clientProtocolVersion ProtocolVersion
clientFeatures []Feature
serverProtocolVersion ProtocolVersion
serverFeatures []Feature
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -269,6 +274,14 @@ type Opts struct {
Transport string
// SslOpts is used only if the Transport == 'ssl' is set.
Ssl SslOpts
// Minimal protocol version that should be supported by
// Go connection client and Tarantool server. By default
// it is equal to tarantool.NoProtocolVersion (no restrictions)
RequiredProtocolVersion ProtocolVersion
// List of features that should be supported by
// Go connection client and Tarantool server. By default
// it is an empty list (no restrictions)
RequiredFeatures []Feature
}

// SslOpts is a way to configure ssl transport.
Expand Down Expand Up @@ -351,6 +364,17 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
}
}

conn.clientProtocolVersion = ClientProtocolVersion
conn.clientFeatures = ClientFeatures

if err = checkProtocolVersion(opts.RequiredProtocolVersion, conn.clientProtocolVersion); err != nil {
return nil, fmt.Errorf("client: %w", err)
}

if err = checkFeatures(opts.RequiredFeatures, conn.clientFeatures); err != nil {
return nil, fmt.Errorf("client: %w", err)
}

if conn.opts.Logger == nil {
conn.opts.Logger = defaultLogger{}
}
Expand Down Expand Up @@ -502,6 +526,23 @@ func (conn *Connection) dial() (err error) {
conn.Greeting.Version = bytes.NewBuffer(greeting[:64]).String()
conn.Greeting.auth = bytes.NewBuffer(greeting[64:108]).String()

// IPROTO_ID requests can be processed without authentication.
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/requests/#iproto-id
if err = conn.identify(w, r); err != nil {
connection.Close()
return err
}

if err = checkProtocolVersion(opts.RequiredProtocolVersion, conn.serverProtocolVersion); err != nil {
connection.Close()
return fmt.Errorf("server: %w", err)
}

if err = checkFeatures(opts.RequiredFeatures, conn.serverFeatures); err != nil {
connection.Close()
return fmt.Errorf("server: %w", err)
}

// Auth
if opts.User != "" {
scr, err := scramble(conn.Greeting.auth, opts.Pass)
Expand Down Expand Up @@ -608,6 +649,18 @@ func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err
return nil
}

func (conn *Connection) writeIdRequest(w *bufio.Writer, version ProtocolVersion,
features []Feature) (err error) {
req := newIdRequest(version, features)

err = conn.writeRequest(w, req)
if err != nil {
return fmt.Errorf("id: %w", err)
}

return nil
}

func (conn *Connection) readResponse(r io.Reader) (resp Response, err error) {
respBytes, err := conn.read(r)
if err != nil {
Expand Down Expand Up @@ -639,6 +692,15 @@ func (conn *Connection) readAuthResponse(r io.Reader) (err error) {
return nil
}

func (conn *Connection) readIdResponse(r io.Reader) (resp Response, err error) {
resp, err = conn.readResponse(r)
if err != nil {
return resp, fmt.Errorf("id: %w", err)
}

return resp, nil
}

func (conn *Connection) createConnection(reconnect bool) (err error) {
var reconnects uint
for conn.c == nil && conn.state == connDisconnected {
Expand Down Expand Up @@ -1182,3 +1244,64 @@ func (conn *Connection) NewStream() (*Stream, error) {
Conn: conn,
}, nil
}

// identify sends info about client protocol, receives info
// about server protocol in response and store it in the connection.
func (conn *Connection) identify(w *bufio.Writer, r *bufio.Reader) error {
werr := conn.writeIdRequest(w, conn.clientProtocolVersion, conn.clientFeatures)
if werr != nil {
return werr
}

resp, rerr := conn.readIdResponse(r)
if rerr != nil {
if resp.Code == ErrUnknownRequestType {
// IPROTO_ID requests are not supported by server.
conn.serverProtocolVersion = NoProtocolVersion
conn.serverFeatures = []Feature{}

return nil
}

return rerr
}

if len(resp.Data) == 0 {
return fmt.Errorf("identify: unexpected response: no data")
}

serverIdInfo, ok := resp.Data[0].(idInfo)
if !ok {
return fmt.Errorf("identify: unexpected response: wrong data")
}
conn.serverProtocolVersion = serverIdInfo.version
conn.serverFeatures = serverIdInfo.features

return nil
}

// ServerProtocolVersion returns protocol version supported by
// connected Tarantool server.
// Since 1.10.0
func (conn *Connection) ServerProtocolVersion() ProtocolVersion {
return conn.serverProtocolVersion
}

// ClientProtocolVersion returns protocol version supported by
// Go connection client.
// Since 1.10.0
func (conn *Connection) ClientProtocolVersion() ProtocolVersion {
return conn.clientProtocolVersion
}

// ServerFeatures returns features supported by connected Tarantool server.
// Since 1.10.0
func (conn *Connection) ServerFeatures() []Feature {
return conn.serverFeatures
}

// ClientFeatures returns features supported by Go connection client.
// Since 1.10.0
func (conn *Connection) ClientFeatures() []Feature {
return conn.clientFeatures
}
40 changes: 37 additions & 3 deletions connection_pool/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ func examplePool(roles []bool) (*connection_pool.ConnectionPool, error) {
return connPool, nil
}

func examplePoolWithOpts(roles []bool, connOpts tarantool.Opts) (*connection_pool.ConnectionPool, error) {
err := test_helpers.SetClusterRO(servers, connOpts, roles)
if err != nil {
return nil, fmt.Errorf("ConnectionPool is not established")
}
connPool, err := connection_pool.Connect(servers, connOpts)
if err != nil || connPool == nil {
return nil, fmt.Errorf("ConnectionPool is not established")
}

return connPool, nil
}

func ExampleConnectionPool_Select() {
pool, err := examplePool(testRoles)
if err != nil {
Expand Down Expand Up @@ -586,7 +599,14 @@ func ExampleCommitRequest() {
return
}

pool, err := examplePool(testRoles)
// Assert that server supports expected features
txnOpts := connOpts
txnOpts.RequiredFeatures = []tarantool.Feature{
tarantool.StreamsFeature,
tarantool.TransactionsFeature,
}

pool, err := examplePoolWithOpts(testRoles, txnOpts)
if err != nil {
fmt.Println(err)
return
Expand Down Expand Up @@ -672,8 +692,15 @@ func ExampleRollbackRequest() {
return
}

// Assert that server supports expected features
txnOpts := connOpts
txnOpts.RequiredFeatures = []tarantool.Feature{
tarantool.StreamsFeature,
tarantool.TransactionsFeature,
}

// example pool has only one rw instance
pool, err := examplePool(testRoles)
pool, err := examplePoolWithOpts(testRoles, txnOpts)
if err != nil {
fmt.Println(err)
return
Expand Down Expand Up @@ -758,8 +785,15 @@ func ExampleBeginRequest_TxnIsolation() {
return
}

// Assert that server supports expected features
txnOpts := connOpts
txnOpts.RequiredFeatures = []tarantool.Feature{
tarantool.StreamsFeature,
tarantool.TransactionsFeature,
}

// example pool has only one rw instance
pool, err := examplePool(testRoles)
pool, err := examplePoolWithOpts(testRoles, txnOpts)
if err != nil {
fmt.Println(err)
return
Expand Down
3 changes: 3 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
RollbackRequestCode = 16
PingRequestCode = 64
SubscribeRequestCode = 66
IdRequestCode = 73

KeyCode = 0x00
KeySync = 0x01
Expand All @@ -41,6 +42,8 @@ const (
KeySQLBind = 0x41
KeySQLInfo = 0x42
KeyStmtID = 0x43
KeyVersion = 0x54
KeyFeatures = 0x55
KeyTimeout = 0x56
KeyTxnIsolation = 0x59

Expand Down
35 changes: 32 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func example_connect() *tarantool.Connection {
return conn
}

func example_connect_with_opts(opts tarantool.Opts) *tarantool.Connection {
conn, err := tarantool.Connect(server, opts)
if err != nil {
panic("Connection is not established: " + err.Error())
}
return conn
}

// Example demonstrates how to use SSL transport.
func ExampleSslOpts() {
var opts = tarantool.Opts{
Expand Down Expand Up @@ -331,7 +339,14 @@ func ExampleCommitRequest() {
return
}

conn := example_connect()
// Assert that server supports expected features
txnOpts := opts
txnOpts.RequiredFeatures = []tarantool.Feature{
tarantool.StreamsFeature,
tarantool.TransactionsFeature,
}

conn := example_connect_with_opts(txnOpts)
defer conn.Close()

stream, _ := conn.NewStream()
Expand Down Expand Up @@ -407,7 +422,14 @@ func ExampleRollbackRequest() {
return
}

conn := example_connect()
// Assert that server supports expected features
txnOpts := opts
txnOpts.RequiredFeatures = []tarantool.Feature{
tarantool.StreamsFeature,
tarantool.TransactionsFeature,
}

conn := example_connect_with_opts(txnOpts)
defer conn.Close()

stream, _ := conn.NewStream()
Expand Down Expand Up @@ -483,7 +505,14 @@ func ExampleBeginRequest_TxnIsolation() {
return
}

conn := example_connect()
// Assert that server supports expected features
txnOpts := opts
txnOpts.RequiredFeatures = []tarantool.Feature{
tarantool.StreamsFeature,
tarantool.TransactionsFeature,
}

conn := example_connect_with_opts(txnOpts)
defer conn.Close()

stream, _ := conn.NewStream()
Expand Down
6 changes: 6 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func RefImplRollbackBody(enc *encoder) error {
return fillRollback(enc)
}

// RefImplIdBody is reference implementation for filling of an id
// request's body.
func RefImplIdBody(enc *encoder, version ProtocolVersion, features []Feature) error {
return fillId(enc, version, features)
}

func NewEncoder(w io.Writer) *encoder {
return newEncoder(w)
}
Loading

0 comments on commit 7330a14

Please sign in to comment.