Skip to content

Commit

Permalink
[Improve][Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERAT…
Browse files Browse the repository at this point in the history
…ION_FAILED (apache#5978)
  • Loading branch information
CheneyYin authored Dec 23, 2023
1 parent 2c5b48e commit 456cd17
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 69 deletions.
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
- [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835)
- [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion bug of transform (#4490)
- [Connector-V2] [Common] Remove assert key word (#5915)
- [Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED. (#5978)

### CI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ public static SeaTunnelRuntimeException getCatalogTablesWithUnsupportedType(
GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params);
}

public static SeaTunnelRuntimeException jsonOperationError(String format, String payload) {
return jsonOperationError(format, payload, null);
public static SeaTunnelRuntimeException jsonOperationError(String identifier, String payload) {
return jsonOperationError(identifier, payload, null);
}

public static SeaTunnelRuntimeException jsonOperationError(
String format, String payload, Throwable cause) {
String identifier, String payload, Throwable cause) {
Map<String, String> params = new HashMap<>();
params.put("format", format);
params.put("identifier", identifier);
params.put("payload", payload);
SeaTunnelErrorCode code = JSON_OPERATION_FAILED;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

/** SeaTunnel connector error code interface */
public enum CommonErrorCode implements SeaTunnelErrorCode {
JSON_OPERATION_FAILED("COMMON-02", "<format> JSON convert/parse '<payload>' operation failed."),
JSON_OPERATION_FAILED(
"COMMON-02", "<identifier> JSON convert/parse '<payload>' operation failed."),

UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of '<field>'"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
public enum CommonErrorCodeDeprecated implements SeaTunnelErrorCode {
FILE_OPERATION_FAILED(
"COMMON-01", "File operation failed, such as (read,list,write,move,copy,sync) etc..."),
JSON_OPERATION_FAILED("COMMON-02", "Json covert/parse operation failed"),
REFLECT_CLASS_OPERATION_FAILED("COMMON-03", "Reflect class operation failed"),
SERIALIZE_OPERATION_FAILED("COMMON-04", "Serialize class operation failed"),
UNSUPPORTED_OPERATION("COMMON-05", "Unsupported operation"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.common.exception.CommonError;

import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand Down Expand Up @@ -51,8 +50,7 @@ public void injectFields(PreparedStatement statement, int index, Object value)
statement.setString(index, value.toString());
}
} catch (JsonProcessingException e) {
throw new ClickhouseConnectorException(
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, e.getMessage());
throw CommonError.jsonOperationError("Clickhouse", value.toString(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
Expand Down Expand Up @@ -82,65 +83,82 @@ public String serializeRow(SeaTunnelRow row) {
private String serializeUpsert(SeaTunnelRow row) {
String key = keyExtractor.apply(row);
Map<String, Object> document = toDocumentMap(row);
String documentStr;

try {
if (key != null) {
Map<String, String> upsertMetadata = createMetadata(row, key);
/**
* format example: { "update" : {"_index" : "${your_index}", "_id" :
* "${your_document_id}"} }\n { "doc" : ${your_document_json}, "doc_as_upsert" :
* true }
*/
return new StringBuilder()
.append("{ \"update\" :")
.append(objectMapper.writeValueAsString(upsertMetadata))
.append("}")
.append("\n")
.append("{ \"doc\" :")
.append(objectMapper.writeValueAsString(document))
.append(", \"doc_as_upsert\" : true }")
.toString();
} else {
Map<String, String> indexMetadata = createMetadata(row);
/**
* format example: { "index" : {"_index" : "${your_index}", "_id" :
* "${your_document_id}"} }\n ${your_document_json}
*/
return new StringBuilder()
.append("{ \"index\" :")
.append(objectMapper.writeValueAsString(indexMetadata))
.append("}")
.append("\n")
.append(objectMapper.writeValueAsString(document))
.toString();
documentStr = objectMapper.writeValueAsString(document);
} catch (JsonProcessingException e) {
throw CommonError.jsonOperationError(
"Elasticsearch", "document:" + document.toString(), e);
}

if (key != null) {
Map<String, String> upsertMetadata = createMetadata(row, key);
String upsertMetadataStr;
try {
upsertMetadataStr = objectMapper.writeValueAsString(upsertMetadata);
} catch (JsonProcessingException e) {
throw CommonError.jsonOperationError(
"Elasticsearch", "upsertMetadata:" + upsertMetadata.toString(), e);
}

/**
* format example: { "update" : {"_index" : "${your_index}", "_id" :
* "${your_document_id}"} }\n { "doc" : ${your_document_json}, "doc_as_upsert" : true }
*/
return new StringBuilder()
.append("{ \"update\" :")
.append(upsertMetadataStr)
.append(" }")
.append("\n")
.append("{ \"doc\" :")
.append(documentStr)
.append(", \"doc_as_upsert\" : true }")
.toString();
}

Map<String, String> indexMetadata = createMetadata(row);
String indexMetadataStr;
try {
indexMetadataStr = objectMapper.writeValueAsString(indexMetadata);
} catch (JsonProcessingException e) {
throw new ElasticsearchConnectorException(
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
"Object json deserialization exception.",
e);
throw CommonError.jsonOperationError(
"Elasticsearch", "indexMetadata:" + indexMetadata.toString(), e);
}

/**
* format example: { "index" : {"_index" : "${your_index}", "_id" : "${your_document_id}"}
* }\n ${your_document_json}
*/
return new StringBuilder()
.append("{ \"index\" :")
.append(indexMetadataStr)
.append(" }")
.append("\n")
.append(documentStr)
.toString();
}

private String serializeDelete(SeaTunnelRow row) {
String key = keyExtractor.apply(row);
Map<String, String> deleteMetadata = createMetadata(row, key);
String deleteMetadataStr;
try {
/**
* format example: { "delete" : {"_index" : "${your_index}", "_id" :
* "${your_document_id}"} }
*/
return new StringBuilder()
.append("{ \"delete\" :")
.append(objectMapper.writeValueAsString(deleteMetadata))
.append("}")
.toString();
deleteMetadataStr = objectMapper.writeValueAsString(deleteMetadata);
} catch (JsonProcessingException e) {
throw new ElasticsearchConnectorException(
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
"Object json deserialization exception.",
e);
throw CommonError.jsonOperationError(
"Elasticsearch", "deleteMetadata:" + deleteMetadata.toString(), e);
}

/**
* format example: { "delete" : {"_index" : "${your_index}", "_id" : "${your_document_id}"}
* }
*/
return new StringBuilder()
.append("{ \"delete\" :")
.append(deleteMetadataStr)
.append(" }")
.toString();
}

private Map<String, Object> toDocumentMap(SeaTunnelRow row) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;

public class ElasticsearchRowSerializerTest {
@Test
public void testSerializeUpsert() {
String index = "st_index";
String primaryKey = "id";
Map<String, Object> confMap = new HashMap<>();
confMap.put(SinkConfig.INDEX.key(), index);
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));

Config pluginConf = ConfigFactory.parseMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {primaryKey, "name"},
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});

final ElasticsearchRowSerializer serializer =
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);

String id = "0001";
String name = "jack";
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name});
row.setRowKind(RowKind.UPDATE_AFTER);

String expected =
"{ \"update\" :{\"_index\":\""
+ index
+ "\",\"_id\":\""
+ id
+ "\"} }\n"
+ "{ \"doc\" :{\"name\":\""
+ name
+ "\",\"id\":\""
+ id
+ "\"}, \"doc_as_upsert\" : true }";

String upsertStr = serializer.serializeRow(row);
Assertions.assertEquals(expected, upsertStr);
}

@Test
public void testSerializeUpsertWithoutKey() {
String index = "st_index";
Map<String, Object> confMap = new HashMap<>();
confMap.put(SinkConfig.INDEX.key(), index);

Config pluginConf = ConfigFactory.parseMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {"id", "name"},
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});

final ElasticsearchRowSerializer serializer =
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);

String id = "0001";
String name = "jack";
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name});
row.setRowKind(RowKind.UPDATE_AFTER);

String expected =
"{ \"index\" :{\"_index\":\""
+ index
+ "\"} }\n"
+ "{\"name\":\""
+ name
+ "\",\"id\":\""
+ id
+ "\"}";

String upsertStr = serializer.serializeRow(row);
Assertions.assertEquals(expected, upsertStr);
}

@Test
public void testSerializeUpsertDocumentError() {
String index = "st_index";
String primaryKey = "id";
Map<String, Object> confMap = new HashMap<>();
confMap.put(SinkConfig.INDEX.key(), index);
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));

Config pluginConf = ConfigFactory.parseMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {primaryKey, "name"},
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});

final ElasticsearchRowSerializer serializer =
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);

String id = "0001";
Object mockObj = new Object();
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, mockObj});
row.setRowKind(RowKind.UPDATE_AFTER);

Map<String, Object> expectedMap = new HashMap<>();
expectedMap.put(primaryKey, id);
expectedMap.put("name", mockObj);

SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(
"Elasticsearch", "document:" + expectedMap.toString());
SeaTunnelRuntimeException actual =
Assertions.assertThrows(
SeaTunnelRuntimeException.class, () -> serializer.serializeRow(row));
Assertions.assertEquals(expected.getMessage(), actual.getMessage());
}

@Test
public void testSerializeDelete() {
String index = "st_index";
String primaryKey = "id";
Map<String, Object> confMap = new HashMap<>();
confMap.put(SinkConfig.INDEX.key(), index);
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));

Config pluginConf = ConfigFactory.parseMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {primaryKey, "name"},
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});

final ElasticsearchRowSerializer serializer =
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);

String id = "0001";
String name = "jack";
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name});
row.setRowKind(RowKind.DELETE);

String expected = "{ \"delete\" :{\"_index\":\"" + index + "\",\"_id\":\"" + id + "\"} }";

String upsertStr = serializer.serializeRow(row);
Assertions.assertEquals(expected, upsertStr);
}
}
Loading

0 comments on commit 456cd17

Please sign in to comment.