Skip to content

Commit

Permalink
Hook common interface
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Mar 18, 2024
1 parent e7949db commit 5396fd9
Show file tree
Hide file tree
Showing 26 changed files with 975 additions and 1,078 deletions.
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "tests/interop/questdb-client-test"]
path = tests/interop/questdb-client-test
[submodule "test/interop/questdb-client-test"]
path = test/interop/questdb-client-test
url = https://github.com/questdb/questdb-client-test.git
58 changes: 29 additions & 29 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,19 @@ import (
"time"
)

const (
DefaultBufferCapacity = 128 * 1024
DefaultFileNameLimit = 127
)

// ErrInvalidMsg indicates a failed attempt to construct an ILP
// errInvalidMsg indicates a failed attempt to construct an ILP
// message, e.g. duplicate calls to Table method or illegal
// chars found in table or column name.
var ErrInvalidMsg = errors.New("invalid message")
var errInvalidMsg = errors.New("invalid message")

// buffer is a wrapper on top of bytes.Buffer. It extends the
// original struct with methods for writing int64 and float64
// numbers without unnecessary allocations.
type buffer struct {
bytes.Buffer

BufCap int
FileNameLimit int
initBufSize int
fileNameLimit int

lastMsgPos int
lastErr error
Expand All @@ -62,11 +57,16 @@ type buffer struct {
msgCount int
}

func newBuffer() buffer {
return buffer{
BufCap: DefaultBufferCapacity,
FileNameLimit: DefaultFileNameLimit,
}
func newBuffer(initBufSize int, fileNameLimit int) buffer {
var b buffer
b.initBufSize = initBufSize
b.fileNameLimit = fileNameLimit
b.ResetSize()
return b
}

func (b *buffer) ResetSize() {
b.Buffer = *bytes.NewBuffer(make([]byte, 0, b.initBufSize))
}

func (b *buffer) HasTable() bool {
Expand Down Expand Up @@ -136,12 +136,12 @@ func (b *buffer) WriteTo(w io.Writer) (int64, error) {

func (buf *buffer) writeTableName(str string) error {
if str == "" {
return fmt.Errorf("table name cannot be empty: %w", ErrInvalidMsg)
return fmt.Errorf("table name cannot be empty: %w", errInvalidMsg)
}
// We use string length in bytes as an approximation. That's to
// avoid calculating the number of runes.
if len(str) > buf.FileNameLimit {
return fmt.Errorf("table name length exceeds the limit: %w", ErrInvalidMsg)
if len(str) > buf.fileNameLimit {
return fmt.Errorf("table name length exceeds the limit: %w", errInvalidMsg)
}
// Since we're interested in ASCII chars, it's fine to iterate
// through bytes instead of runes.
Expand All @@ -154,13 +154,13 @@ func (buf *buffer) writeTableName(str string) error {
buf.WriteByte('\\')
case '.':
if i == 0 || i == len(str)-1 {
return fmt.Errorf("table name contains '.' char at the start or end: %s: %w", str, ErrInvalidMsg)
return fmt.Errorf("table name contains '.' char at the start or end: %s: %w", str, errInvalidMsg)
}
default:
if illegalTableNameChar(b) {
return fmt.Errorf("table name contains an illegal char: "+
"'\\n', '\\r', '?', ',', ''', '\"', '\\', '/', ':', ')', '(', '+', '*' '%%', '~', or a non-printable char: %s: %w",
str, ErrInvalidMsg)
str, errInvalidMsg)
}
}
buf.WriteByte(b)
Expand Down Expand Up @@ -236,12 +236,12 @@ func illegalTableNameChar(ch byte) bool {

func (buf *buffer) writeColumnName(str string) error {
if str == "" {
return fmt.Errorf("column name cannot be empty: %w", ErrInvalidMsg)
return fmt.Errorf("column name cannot be empty: %w", errInvalidMsg)
}
// We use string length in bytes as an approximation. That's to
// avoid calculating the number of runes.
if len(str) > buf.FileNameLimit {
return fmt.Errorf("column name length exceeds the limit: %w", ErrInvalidMsg)
if len(str) > buf.fileNameLimit {
return fmt.Errorf("column name length exceeds the limit: %w", errInvalidMsg)
}
// Since we're interested in ASCII chars, it's fine to iterate
// through bytes instead of runes.
Expand All @@ -256,7 +256,7 @@ func (buf *buffer) writeColumnName(str string) error {
if illegalColumnNameChar(b) {
return fmt.Errorf("column name contains an illegal char: "+
"'\\n', '\\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+', '-', '*' '%%', '~', or a non-printable char: %s: %w",
str, ErrInvalidMsg)
str, errInvalidMsg)
}
}
buf.WriteByte(b)
Expand Down Expand Up @@ -373,7 +373,7 @@ func (b *buffer) prepareForField() bool {
return false
}
if !b.hasTable {
b.lastErr = fmt.Errorf("table name was not provided: %w", ErrInvalidMsg)
b.lastErr = fmt.Errorf("table name was not provided: %w", errInvalidMsg)
return false
}
if !b.hasFields {
Expand Down Expand Up @@ -404,7 +404,7 @@ func (b *buffer) Table(name string) *buffer {
return b
}
if b.hasTable {
b.lastErr = fmt.Errorf("table name already provided: %w", ErrInvalidMsg)
b.lastErr = fmt.Errorf("table name already provided: %w", errInvalidMsg)
return b
}
b.lastErr = b.writeTableName(name)
Expand All @@ -420,11 +420,11 @@ func (b *buffer) Symbol(name, val string) *buffer {
return b
}
if !b.hasTable {
b.lastErr = fmt.Errorf("table name was not provided: %w", ErrInvalidMsg)
b.lastErr = fmt.Errorf("table name was not provided: %w", errInvalidMsg)
return b
}
if b.hasFields {
b.lastErr = fmt.Errorf("symbols have to be written before any other column: %w", ErrInvalidMsg)
b.lastErr = fmt.Errorf("symbols have to be written before any other column: %w", errInvalidMsg)
return b
}
b.WriteByte(',')
Expand Down Expand Up @@ -565,11 +565,11 @@ func (b *buffer) At(ts time.Time, sendTs bool) error {
}
if !b.hasTable {
b.DiscardPendingMsg()
return fmt.Errorf("table name was not provided: %w", ErrInvalidMsg)
return fmt.Errorf("table name was not provided: %w", errInvalidMsg)
}
if !b.hasTags && !b.hasFields {
b.DiscardPendingMsg()
return fmt.Errorf("no symbols or columns were provided: %w", ErrInvalidMsg)
return fmt.Errorf("no symbols or columns were provided: %w", errInvalidMsg)
}

if sendTs {
Expand Down
36 changes: 17 additions & 19 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ import (
"github.com/stretchr/testify/assert"
)

type writerFn func(b *qdb.Buffer) error
type bufWriterFn func(b *qdb.Buffer) error

func TestValidWrites(t *testing.T) {
testCases := []struct {
name string
writerFn writerFn
writerFn bufWriterFn
expectedLines []string
}{
{
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestValidWrites(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := tc.writerFn(&buf)
assert.NoError(t, err)
Expand All @@ -99,7 +99,7 @@ func TestTimestampSerialization(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := buf.Table(testTable).TimestampColumn("a_col", tc.val).At(time.Time{}, false)
assert.NoError(t, err)
Expand All @@ -126,7 +126,7 @@ func TestInt64Serialization(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := buf.Table(testTable).Int64Column("a_col", tc.val).At(time.Time{}, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestLong256Column(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

newVal, _ := big.NewInt(0).SetString(tc.val, 16)
err := buf.Table(testTable).Long256Column("a_col", newVal).At(time.Time{}, false)
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestFloat64Serialization(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := buf.Table(testTable).Float64Column("a_col", tc.val).At(time.Time{}, false)
assert.NoError(t, err)
Expand All @@ -211,7 +211,7 @@ func TestErrorOnLengthyNames(t *testing.T) {

testCases := []struct {
name string
writerFn writerFn
writerFn bufWriterFn
expectedErrMsg string
}{
{
Expand All @@ -232,8 +232,7 @@ func TestErrorOnLengthyNames(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf.FileNameLimit = nameLimit
buf := qdb.NewBuffer(128*1024, nameLimit)

err := tc.writerFn(&buf)
assert.ErrorContains(t, err, tc.expectedErrMsg)
Expand All @@ -246,7 +245,7 @@ func TestErrorOnLengthyNames(t *testing.T) {
func TestErrorOnMissingTableCall(t *testing.T) {
testCases := []struct {
name string
writerFn writerFn
writerFn bufWriterFn
}{
{
"At",
Expand Down Expand Up @@ -294,7 +293,7 @@ func TestErrorOnMissingTableCall(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := tc.writerFn(&buf)

Expand All @@ -306,7 +305,7 @@ func TestErrorOnMissingTableCall(t *testing.T) {
}

func TestErrorOnMultipleTableCalls(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := buf.Table(testTable).Table(testTable).At(time.Time{}, false)

Expand All @@ -315,7 +314,7 @@ func TestErrorOnMultipleTableCalls(t *testing.T) {
}

func TestErrorOnNegativeLong256(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := buf.Table(testTable).Long256Column("long256_col", big.NewInt(-42)).At(time.Time{}, false)

Expand All @@ -324,7 +323,7 @@ func TestErrorOnNegativeLong256(t *testing.T) {
}

func TestErrorOnLargerLong256(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

bigVal, _ := big.NewInt(0).SetString("fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", 16)
err := buf.Table(testTable).Long256Column("long256_col", bigVal).At(time.Time{}, false)
Expand All @@ -336,7 +335,7 @@ func TestErrorOnLargerLong256(t *testing.T) {
func TestErrorOnSymbolCallAfterColumn(t *testing.T) {
testCases := []struct {
name string
writerFn writerFn
writerFn bufWriterFn
}{
{
"string column",
Expand Down Expand Up @@ -372,19 +371,18 @@ func TestErrorOnSymbolCallAfterColumn(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

err := tc.writerFn(&buf)

assert.ErrorContains(t, err, "symbols have to be written before any other column")
assert.Empty(t, buf.Messages())

})
}
}

func TestInvalidMessageGetsDiscarded(t *testing.T) {
buf := qdb.NewBuffer()
buf := qdb.NewBuffer(128*1024, 127)

// Write a valid message.
err := buf.Table(testTable).StringColumn("foo", "bar").At(time.Time{}, false)
Expand Down
Loading

0 comments on commit 5396fd9

Please sign in to comment.