Skip to content

Commit

Permalink
Merge pull request #1501 from ClickHouse/fix_json_array
Browse files Browse the repository at this point in the history
Add Array() support for Variant, Dynamic, JSON
  • Loading branch information
SpencerTorres authored Feb 16, 2025
2 parents d7e25aa + 697c611 commit a2b16ef
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 24 deletions.
15 changes: 12 additions & 3 deletions lib/column/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,13 @@ func (c *Dynamic) encodeData(buffer *proto.Buffer) {
c.variant.encodeData(buffer)
}

func (c *Dynamic) Encode(buffer *proto.Buffer) {
func (c *Dynamic) WriteStatePrefix(buffer *proto.Buffer) error {
c.encodeHeader(buffer)

return nil
}

func (c *Dynamic) Encode(buffer *proto.Buffer) {
c.encodeData(buffer)
}

Expand Down Expand Up @@ -393,13 +398,17 @@ func (c *Dynamic) decodeData(reader *proto.Reader, rows int) error {
return nil
}

func (c *Dynamic) Decode(reader *proto.Reader, rows int) error {
func (c *Dynamic) ReadStatePrefix(reader *proto.Reader) error {
err := c.decodeHeader(reader)
if err != nil {
return fmt.Errorf("failed to decode dynamic header: %w", err)
}

err = c.decodeData(reader, rows)
return nil
}

func (c *Dynamic) Decode(reader *proto.Reader, rows int) error {
err := c.decodeData(reader, rows)
if err != nil {
return fmt.Errorf("failed to decode dynamic data: %w", err)
}
Expand Down
45 changes: 37 additions & 8 deletions lib/column/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,32 @@ func (c *JSON) encodeStringData(buffer *proto.Buffer) {
c.jsonStrings.Encode(buffer)
}

func (c *JSON) Encode(buffer *proto.Buffer) {
func (c *JSON) WriteStatePrefix(buffer *proto.Buffer) error {
switch c.serializationVersion {
case JSONObjectSerializationVersion:
buffer.PutUInt64(JSONObjectSerializationVersion)
c.encodeObjectHeader(buffer)

return nil
case JSONStringSerializationVersion:
buffer.PutUInt64(JSONStringSerializationVersion)

return nil
default:
// If the column is an array, it can be empty but still require a prefix.
// Use string encoding since it's smaller
buffer.PutUInt64(JSONStringSerializationVersion)

return nil
}
}

func (c *JSON) Encode(buffer *proto.Buffer) {
switch c.serializationVersion {
case JSONObjectSerializationVersion:
c.encodeObjectData(buffer)
return
case JSONStringSerializationVersion:
buffer.PutUInt64(JSONStringSerializationVersion)
c.encodeStringData(buffer)
return
}
Expand Down Expand Up @@ -686,9 +703,7 @@ func (c *JSON) decodeStringData(reader *proto.Reader, rows int) error {
return c.jsonStrings.Decode(reader, rows)
}

func (c *JSON) Decode(reader *proto.Reader, rows int) error {
c.rows = rows

func (c *JSON) ReadStatePrefix(reader *proto.Reader) error {
jsonSerializationVersion, err := reader.UInt64()
if err != nil {
return fmt.Errorf("failed to read json serialization version: %w", err)
Expand All @@ -703,20 +718,34 @@ func (c *JSON) Decode(reader *proto.Reader, rows int) error {
return fmt.Errorf("failed to decode json object header: %w", err)
}

err = c.decodeObjectData(reader, rows)
return nil
case JSONStringSerializationVersion:
return nil
default:
return fmt.Errorf("unsupported JSON serialization version for prefix decode: %d", jsonSerializationVersion)
}
}

func (c *JSON) Decode(reader *proto.Reader, rows int) error {
c.rows = rows

switch c.serializationVersion {
case JSONObjectSerializationVersion:
err := c.decodeObjectData(reader, rows)
if err != nil {
return fmt.Errorf("failed to decode json object data: %w", err)
}

return nil
case JSONStringSerializationVersion:
err = c.decodeStringData(reader, rows)
err := c.decodeStringData(reader, rows)
if err != nil {
return fmt.Errorf("failed to decode json string data: %w", err)
}

return nil
default:
return fmt.Errorf("unsupported JSON serialization version for decode: %d", jsonSerializationVersion)
return fmt.Errorf("unsupported JSON serialization version for decode: %d", c.serializationVersion)
}
}

Expand Down
15 changes: 12 additions & 3 deletions lib/column/variant.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,13 @@ func (c *Variant) encodeData(buffer *proto.Buffer) {
}
}

func (c *Variant) Encode(buffer *proto.Buffer) {
func (c *Variant) WriteStatePrefix(buffer *proto.Buffer) error {
c.encodeHeader(buffer)

return nil
}

func (c *Variant) Encode(buffer *proto.Buffer) {
c.encodeData(buffer)
}

Expand Down Expand Up @@ -336,13 +341,17 @@ func (c *Variant) decodeData(reader *proto.Reader, rows int) error {
return nil
}

func (c *Variant) Decode(reader *proto.Reader, rows int) error {
func (c *Variant) ReadStatePrefix(reader *proto.Reader) error {
err := c.decodeHeader(reader)
if err != nil {
return fmt.Errorf("failed to decode variant header: %w", err)
}

err = c.decodeData(reader, rows)
return nil
}

func (c *Variant) Decode(reader *proto.Reader, rows int) error {
err := c.decodeData(reader, rows)
if err != nil {
return fmt.Errorf("failed to decode variant data: %w", err)
}
Expand Down
73 changes: 70 additions & 3 deletions tests/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/stretchr/testify/require"
"testing"
Expand Down Expand Up @@ -87,7 +86,7 @@ func TestDynamic(t *testing.T) {
rows, err := conn.Query(ctx, "SELECT c FROM test_dynamic")
require.NoError(t, err)

var row chcol.Dynamic
var row clickhouse.Dynamic

require.True(t, rows.Next())
err = rows.Scan(&row)
Expand Down Expand Up @@ -135,6 +134,74 @@ func TestDynamic(t *testing.T) {
require.Equal(t, colMapStringInt64, row.Any())
}

func TestDynamicArray(t *testing.T) {
ctx := context.Background()
conn := setupDynamicTest(t)

const ddl = `
CREATE TABLE IF NOT EXISTS test_dynamic (
c Array(Dynamic)
) Engine = MergeTree() ORDER BY tuple()
`
require.NoError(t, conn.Exec(ctx, ddl))
defer func() {
require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_dynamic"))
}()

batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_dynamic (c)")
require.NoError(t, err)

batch.Append([]clickhouse.Dynamic{
clickhouse.NewDynamicWithType(int64(42), "Int64"),
clickhouse.NewDynamicWithType(true, "Bool"),
})
require.NoError(t, batch.Send())

rows, err := conn.Query(ctx, "SELECT c FROM test_dynamic")
require.NoError(t, err)

var arrRow []clickhouse.Dynamic

require.True(t, rows.Next())
err = rows.Scan(&arrRow)
require.NoError(t, err)
require.Len(t, arrRow, 2)

require.Equal(t, int64(42), arrRow[0].Any())
require.Equal(t, true, arrRow[1].Any())
}

func TestDynamicEmptyArray(t *testing.T) {
ctx := context.Background()
conn := setupDynamicTest(t)

const ddl = `
CREATE TABLE IF NOT EXISTS test_dynamic (
c Array(Dynamic)
) Engine = MergeTree() ORDER BY tuple()
`
require.NoError(t, conn.Exec(ctx, ddl))
defer func() {
require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_dynamic"))
}()

batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_dynamic (c)")
require.NoError(t, err)

batch.Append([]clickhouse.Dynamic{})
require.NoError(t, batch.Send())

rows, err := conn.Query(ctx, "SELECT c FROM test_dynamic")
require.NoError(t, err)

var arrRow []clickhouse.Dynamic

require.True(t, rows.Next())
err = rows.Scan(&arrRow)
require.NoError(t, err)
require.Len(t, arrRow, 0)
}

func TestDynamic_ScanWithType(t *testing.T) {
ctx := context.Background()
conn := setupDynamicTest(t)
Expand All @@ -160,7 +227,7 @@ func TestDynamic_ScanWithType(t *testing.T) {
rows, err := conn.Query(ctx, "SELECT c FROM test_dynamic")
require.NoError(t, err)

var row chcol.Dynamic
var row clickhouse.Dynamic

require.True(t, rows.Next())
err = rows.Scan(&row)
Expand Down
100 changes: 97 additions & 3 deletions tests/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/stretchr/testify/require"
"testing"
Expand All @@ -39,7 +38,7 @@ func setupJSONTest(t *testing.T) driver.Conn {
})
require.NoError(t, err)

if !CheckMinServerServerVersion(conn, 24, 9, 0) {
if !CheckMinServerServerVersion(conn, 24, 10, 0) {
t.Skip(fmt.Errorf("unsupported clickhouse version for JSON type"))
return nil
}
Expand Down Expand Up @@ -72,7 +71,7 @@ func TestJSONPaths(t *testing.T) {
rows, err := conn.Query(ctx, "SELECT c FROM test_json")
require.NoError(t, err)

var row chcol.JSON
var row clickhouse.JSON

require.True(t, rows.Next())
err = rows.Scan(&row)
Expand All @@ -99,6 +98,101 @@ func TestJSONPaths(t *testing.T) {
}
}

func TestJSONArray(t *testing.T) {
ctx := context.Background()
conn := setupJSONTest(t)

const ddl = `
CREATE TABLE IF NOT EXISTS test_json (
c Array(JSON)
) Engine = MergeTree() ORDER BY tuple()
`
require.NoError(t, conn.Exec(ctx, ddl))
defer func() {
require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json"))
}()

batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)")
require.NoError(t, err)

arrJsonRow := []*clickhouse.JSON{clickhouse.NewJSON(), BuildTestJSONPaths()}

require.NoError(t, batch.Append(arrJsonRow))
require.NoError(t, batch.Send())

rows, err := conn.Query(ctx, "SELECT c FROM test_json")
require.NoError(t, err)

var arrRow []*clickhouse.JSON

require.True(t, rows.Next())
err = rows.Scan(&arrRow)
require.NoError(t, err)
require.Len(t, arrRow, 2)

actualValuesByPathEmpty := arrRow[0].ValuesByPath()
for _, actualValue := range actualValuesByPathEmpty {
// Allow Nil func to compare values without Dynamic wrapper
if v, ok := actualValue.(clickhouse.Dynamic); ok {
actualValue = v.Any()
}

require.Nil(t, actualValue)
}

expectedValuesByPath := arrJsonRow[1].ValuesByPath()
actualValuesByPath := arrRow[1].ValuesByPath()
for path, expectedValue := range expectedValuesByPath {
actualValue, ok := actualValuesByPath[path]
if !ok {
t.Fatalf("result JSON is missing path: %s", path)
}

// Allow Equal func to compare values without Dynamic wrapper
if v, ok := expectedValue.(clickhouse.Dynamic); ok {
expectedValue = v.Any()
}

if v, ok := actualValue.(clickhouse.Dynamic); ok {
actualValue = v.Any()
}

require.Equal(t, expectedValue, actualValue)
}
}

func TestJSONEmptyArray(t *testing.T) {
ctx := context.Background()
conn := setupJSONTest(t)

const ddl = `
CREATE TABLE IF NOT EXISTS test_json (
c Array(JSON)
) Engine = MergeTree() ORDER BY tuple()
`
require.NoError(t, conn.Exec(ctx, ddl))
defer func() {
require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json"))
}()

batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)")
require.NoError(t, err)

var arrJsonRow []*clickhouse.JSON
require.NoError(t, batch.Append(arrJsonRow))
require.NoError(t, batch.Send())

rows, err := conn.Query(ctx, "SELECT c FROM test_json")
require.NoError(t, err)

var arrRow []*clickhouse.JSON

require.True(t, rows.Next())
err = rows.Scan(&arrRow)
require.NoError(t, err)
require.Len(t, arrRow, 0)
}

func TestJSONStruct(t *testing.T) {
ctx := context.Background()
conn := setupJSONTest(t)
Expand Down
Loading

0 comments on commit a2b16ef

Please sign in to comment.