We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
private SinkDataEntry convertToSinkDataEntry(MessageExt message) { Map<String, String> properties = message.getProperties(); String queueName; EntryType entryType; Schema schema; Long timestamp; Object[] datas = new Object[1]; if (null == recordConverter || recordConverter instanceof RocketMQConverter) { queueName = properties.get(RuntimeConfigDefine.CONNECT_TOPICNAME); String connectEntryType = properties.get(RuntimeConfigDefine.CONNECT_ENTRYTYPE); entryType = StringUtils.isNotEmpty(connectEntryType) ? EntryType.valueOf(connectEntryType) : null; String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP); timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null; String connectSchema = properties.get(RuntimeConfigDefine.CONNECT_SCHEMA); schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null; datas = new Object[1]; datas[0] = new String(message.getBody()); } else { final byte[] messageBody = message.getBody(); final SourceDataEntry sourceDataEntry = JSON.parseObject(new String(messageBody), SourceDataEntry.class); final Object[] payload = sourceDataEntry.getPayload(); final byte[] decodeBytes = Base64.getDecoder().decode((String) payload[0]); Object recodeObject; if (recordConverter instanceof JsonConverter) { JsonConverter jsonConverter = (JsonConverter) recordConverter; jsonConverter.setClazz(Object[].class); recodeObject = recordConverter.byteToObject(decodeBytes); datas = (Object[]) recodeObject; } schema = sourceDataEntry.getSchema(); entryType = sourceDataEntry.getEntryType(); queueName = sourceDataEntry.getQueueName(); timestamp = sourceDataEntry.getTimestamp(); } DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); dataEntryBuilder.entryType(entryType); dataEntryBuilder.queue(queueName); dataEntryBuilder.timestamp(timestamp); List<Field> fields = schema.getFields(); if (null != fields && !fields.isEmpty()) { for (Field field : fields) { dataEntryBuilder.putFiled(field.getName(), datas[field.getIndex()]); } } SinkDataEntry sinkDataEntry = dataEntryBuilder.buildSinkDataEntry(message.getQueueOffset()); return sinkDataEntry; }
datas[0] = new String(message.getBody());
IMO, it will occur some problem in encode and decode.
The text was updated successfully, but these errors were encountered:
No branches or pull requests
IMO, it will occur some problem in encode and decode.
The text was updated successfully, but these errors were encountered: