Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong encoding character inserted by KSQL (due to KSQL 5.0 limited to String as key format) #1865

Closed
larryxia-ddl opened this issue Sep 11, 2018 · 9 comments

Comments

@larryxia-ddl
Copy link

larryxia-ddl commented Sep 11, 2018

During a POC, we defined a stream test1 on topic poc.api.internal, then create a table ktab4 which aggregate data from test1 with a tumbling window for 5 minutes. Topic poc.api.ktab4 was created by KSQL. We then create a Kafka to elasticsearch connector to dump data in topic poc.api.ktab4 to elasticsearh. We also have another elasticsearch connector to dump data from top poc.api.internal to elasticsearch.

When we pump date to poc.api.internal, we found we can dump original data to elasticsearch. But we can't dump the aggregated data from poc.api.ktab4 to elasticsearch, The exception complains wrong encoding.

Because there is no encoding issue in the original message, can we assume the encoding issue is introduced by KSQL.

Please advise how to avoid this issue.

The script to create the KSQL stream and table.

create stream test1 (requestId varchar, requesttype varchar, csaDetails  STRUCT< clientName VARCHAR, accountNumbers ARRAY<VARCHAR>, lobs ARRAY<VARCHAR> >) WITH (KAFKA_TOPIC='poc.api.internal', value_format='json', key='requestid');
 
create table ktab4 with (KAFKA_TOPIC='poc.api.ktab4', VALUE_FORMAT='DELIMITED') as select requesttype, count(*) count from test1 window tumbling (size 5 minutes) group by requesttype;

The exception

{"name":"elasticsearch-sink1","connector":{"state":"RUNNING","worker_id":"10.87.97.155:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 start byte 0xb5\n at [Source: (byte[])\"omc\u0000\u0000\u0001e�~��\"; line: 1, column: 9]\n
Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 start byte 0xb5\n at [Source: (byte[])\"omc\u0000\u0000\u0001e�~��\"; line: 1, column: 9]\n\t
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidInitial(UTF8StreamJsonParser.java:3539)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._decodeCharForError(UTF8StreamJsonParser.java:3282)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3514)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)\n\tat com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)\n\tat com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)\n\tat org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n","id":0,"worker_id":"10.87.97.155:8083"}],"type":"sink"}(

The message in poc.api.internal:

ksql> print 'poc.api.internal'  FROM BEGINNING;
Format:STRING

9/10/18 2:22:54 PM EDT , NULL , {\x0A  "sourceAppId" : "OMC_CSA",\x0A  "sourceCorrelationId" : "99",\x0A  "userAcf2id" : "jonbar2s",\x0A  "userRole" : "TDWFP_REQUESTOR",\x0A  "lob" : "DI",\x0A  "wealthPartyId" : "200",\x0A  "requestType" : "omc",\x0A  "requestSubType" : "RRIF",\x0A  "requestId" : "e1ea3999-24d7-440f-b3b5-052566162de6",\x0A  "csaDetails" : {\x0A    "clientName" : "Saturn, Bill",\x0A    "accountNumbers" : [ "9AVG6K" ],\x0A    "lobs" : [ "DI" ]\x0A  }\x0A}

The message in poc.api.ktab4:

ksql> print 'poc.api.ktab4'  FROM BEGINNING;
Format:STRING
9/10/18 12:43:12 PM EDT , omce�Z� , omc,1
@larryxia-ddl
Copy link
Author

larryxia-ddl commented Sep 13, 2018

it happens only with key.converter=org.apache.kafka.connect.json.JsonConverter.

It works fine with org.apache.kafka.connect.storage.StringConverter.

@larryxia-ddl
Copy link
Author

Is there anyone take a look of this issue, it has been reported for a week. BTW, it happened during a POC in TD bank.

@big-andy-coates
Copy link
Contributor

Hi @larryxia-ddl,

This is a little strange, as I would of expected it just to complain its not valid json. At the moment KSQL only support string keys. This is something we're aware of and its high up on our list of things to fix.

So, I'm not surprised deserialising the keys to json isn't working, but the way its failing is unexpected.

I'm guessing that requestId in the original stream contains a json document? However, once you group this in the table statement the key will no contain not only requestId but information about the window it is a part of, e.g. for requestId="{some='json'}" the key might be {some='json'} : Window{start=1234567890,end=2345678900}

While this does not explain why you're getting an invalid UTF-8 char from the JsonConverter, I hope it does explain why what you're trying to do is not working!

any questions, let me know. Otherwise, if you're happy with the explanation, please close this issue.

@larryxia-ddl
Copy link
Author

Thank you for the reply. Are you going to open a Jira for this issue. I didn't see there is anyplace mentioned this limitation in KSQL, and the exception thrown is really misleading. Even prsale from confluent didn't know this. It was a blocking issue until we found it works with StringConverter. BTW, the jsonconverter is the default key converter of Kafka Elasticsearch connector, so it has high chance that people meet this issue if they don't know this limitation or bug.

@apurvam
Copy link
Contributor

apurvam commented Sep 21, 2018

The restrictions on the key format is documented here: https://docs.confluent.io/current/ksql/docs/syntax-reference.html#key-requirements

The issue to track the enhancement to support structured key types is here: #824

@larryxia-ddl
Copy link
Author

larryxia-ddl commented Sep 24, 2018

I don't think it is same case. The key is "requesttype" which is varchar, it is a legal data type which ksql support.

@miguno miguno added the question label Oct 2, 2018
@miguno
Copy link
Contributor

miguno commented Oct 2, 2018

Hi @larryxia-ddl !

The key is "requesttype" which is varchar, it is a legal data type which ksql support.

I think you're saying this because requesttype is being used in the GROUP BY clause, correct?

create table ktab4 with (KAFKA_TOPIC='poc.api.ktab4', VALUE_FORMAT='DELIMITED') as select requesttype, count(*) count from test1 window tumbling (size 5 minutes) group by requesttype;

If so, see what @big-andy-coates wrote above:

However, once you group this in the table statement the key will contain not only requestId but information about the window it is a part of, e.g. for requestId="{some='json'}" the key might be {some='json'} : Window{start=1234567890,end=2345678900}

(Note that Andy mixed up requesttype with requestId, but we can ignore this typo.)

In other words, the create table ktab4 ... query above will not have a simple VARCHAR key (i.e., Kafka message key) but a composite key that includes additional information, and the composite key's format will not be valid JSON.

So what to do next?

  • I agree that we need to clarify this in the KSQL documentation. From your point of view, where would you have expected to be informed about this? Any suggestions where/how we should improve the docs in this regard?
  • Anything else that you'd suggest as follow-up, apart from documentation?

@larryxia-ddl
Copy link
Author

larryxia-ddl commented Oct 3, 2018

Thank you for the update. I am satisfied with this detailed explain. It will be great if It is mentioned in ksql document that we should use StringConvertor When trying move data from ksql topics to elastisearch. It has to be specified in the connector configuration explicitly . Because The default KeyConvertor is JsonConvertor.

Larry

@miguno miguno changed the title Wrong encoding character inserted by KSQL Wrong encoding character inserted by KSQL (due to KSQL 5.0 limited to String as key format) Nov 15, 2018
@rmoff
Copy link
Member

rmoff commented Nov 15, 2018

I've raised an internal JIRA for our docs team to add this note re. use with Kafka Connect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants