-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Connect Avro Support Part 2: AvroEventBatchEncoder #733
Conversation
cdc/puller/mock_puller.go
Outdated
pm: m, | ||
startTs: startTs, | ||
} | ||
//return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove it.
|
||
package codec | ||
|
||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import order should be
std libs
<new line>
third-party libs
cdc/sink/codec/avro_test.go
Outdated
"time" | ||
) | ||
|
||
type AvroBatchEncoderSuite struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type AvroBatchEncoderSuite struct { | |
type avroBatchEncoderSuite struct { |
Please avoid unnecessary exports.
@@ -44,7 +44,7 @@ type mockRegistrySchema struct { | |||
ID int | |||
} | |||
|
|||
func (s *AvroSchemaRegistrySuite) SetUpSuite(c *check.C) { | |||
func StartHTTPInterceptForTestingRegistry(c *check.C) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func StartHTTPInterceptForTestingRegistry(c *check.C) { | |
func startHTTPInterceptForTestingRegistry(c *check.C) { |
@@ -140,10 +140,18 @@ func (s *AvroSchemaRegistrySuite) SetUpSuite(c *check.C) { | |||
|
|||
} | |||
|
|||
func (s *AvroSchemaRegistrySuite) TearDownSuite(c *check.C) { | |||
func StopHTTPInterceptForTestingRegistry() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func StopHTTPInterceptForTestingRegistry() { | |
func stopHTTPInterceptForTestingRegistry() { |
cdc/sink/codec/avro.go
Outdated
return "long.timestamp-millis" | ||
default: | ||
log.Warn("getAvroDataTypeName: unknown type") | ||
return "errorType" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "errorType" valid in Avro?
/lgtm |
@amyangfei,Thanks for your review. |
Schema string | ||
SchemaID int64 | ||
Table string | ||
ColumnInfo []*ColumnInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use a TableInfo
with a table name, schema name, column info, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm |
/merge |
/run-all-tests |
@liuzix merge failed. |
Please fix the build error @liuzix |
/merge |
/run-all-tests |
/merge |
1 similar comment
/merge |
@liuzix merge failed. |
/run-all-tests |
What problem does this PR solve?
#660
What is changed and how it works?
Add AvroEventBatchEncoder which handles RowChangedEvent and DDLEvent.
Check List
Tests
Code changes
TODO:
Release note