Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Remove useless ReadonlyConfig flatten feature #5612

Merged
merged 11 commits into from
Oct 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ public interface ConfigAdapter {
String[] extensionIdentifiers();

/**
* Converter config file to path_key-value Map (FlattenedMap) in HOCON
* Converter config file to path_key-value Map in HOCON
*
* @see org.apache.seatunnel.api.configuration.util.ConfigUtil#flatteningMap(Map)
* @param configFilePath config file path.
* @return FlattenedMap
* @return Map
*/
Map<String, Object> loadConfig(Path configFilePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import static org.apache.seatunnel.api.configuration.util.ConfigUtil.convertToJsonString;
import static org.apache.seatunnel.api.configuration.util.ConfigUtil.convertValue;
import static org.apache.seatunnel.api.configuration.util.ConfigUtil.flatteningMap;
import static org.apache.seatunnel.api.configuration.util.ConfigUtil.treeMap;

@Slf4j
public class ReadonlyConfig implements Serializable {
Expand All @@ -50,7 +48,7 @@ private ReadonlyConfig(Map<String, Object> confData) {
}

public static ReadonlyConfig fromMap(Map<String, Object> map) {
return new ReadonlyConfig(treeMap(map));
return new ReadonlyConfig(map);
}

public static ReadonlyConfig fromConfig(Config config) {
Expand All @@ -65,68 +63,38 @@ public static ReadonlyConfig fromConfig(Config config) {
}

public <T> T get(Option<T> option) {
return get(option, true);
}

public <T> T get(Option<T> option, boolean flatten) {
return getOptional(option, flatten).orElseGet(option::defaultValue);
}

public Map<String, String> toMap() {
return toMap(true);
}

public Config toConfig() {
return toConfig(true);
return getOptional(option).orElseGet(option::defaultValue);
}

/**
* Transform to Config todo: This method should be removed after we remove Config
*
* @return Config
*/
public Config toConfig(boolean flatten) {
if (flatten) {
return ConfigFactory.parseMap(flatteningMap(confData));
}
public Config toConfig() {
return ConfigFactory.parseMap(confData);
}

public Map<String, String> toMap(boolean flatten) {
public Map<String, String> toMap() {
if (confData.isEmpty()) {
return Collections.emptyMap();
}

Map<String, String> result = new LinkedHashMap<>();
toMap(result, flatten);
toMap(result);
return result;
}

public void toMap(Map<String, String> result) {
toMap(result, true);
}

public void toMap(Map<String, String> result, boolean flatten) {
if (confData.isEmpty()) {
return;
}
Map<String, Object> map;
if (flatten) {
map = flatteningMap(confData);
} else {
map = confData;
}
for (Map.Entry<String, Object> entry : map.entrySet()) {
for (Map.Entry<String, Object> entry : confData.entrySet()) {
result.put(entry.getKey(), convertToJsonString(entry.getValue()));
}
}

public <T> Optional<T> getOptional(Option<T> option) {
return getOptional(option, true);
}

@SuppressWarnings("unchecked")
public <T> Optional<T> getOptional(Option<T> option, boolean flatten) {
if (option == null) {
throw new NullPointerException("Option not be null.");
}
Expand All @@ -146,24 +114,28 @@ public <T> Optional<T> getOptional(Option<T> option, boolean flatten) {
if (value == null) {
return Optional.empty();
}
return Optional.of(convertValue(value, option, flatten));
return Optional.of(convertValue(value, option));
}

private Object getValue(String key) {
String[] keys = key.split("\\.");
Map<String, Object> data = this.confData;
Object value = null;
for (int i = 0; i < keys.length; i++) {
value = data.get(keys[i]);
if (i < keys.length - 1) {
if (!(value instanceof Map)) {
return null;
} else {
data = (Map<String, Object>) value;
if (this.confData.containsKey(key)) {
return this.confData.get(key);
} else {
String[] keys = key.split("\\.");
Map<String, Object> data = this.confData;
Object value = null;
for (int i = 0; i < keys.length; i++) {
value = data.get(keys[i]);
if (i < keys.length - 1) {
if (!(value instanceof Map)) {
return null;
} else {
data = (Map<String, Object>) value;
}
}
}
return value;
}
return value;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,233 +20,27 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public class ConfigUtil {
private static final JavaPropsMapper PROPERTIES_MAPPER = new JavaPropsMapper();
private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();

/**
*
*
* <pre>
* poll.timeout = 1000
* ==>> poll : {timeout = 1000, interval = 500}
* poll.interval = 500
* </pre>
*/
public static Map<String, Object> treeMap(Map<String, Object> rawMap) {
try {
Map<List<String>, String> properties =
Arrays.stream(PROPERTIES_MAPPER.writeValueAsString(rawMap).split("\n"))
.filter(StringUtils::isNoneEmpty)
.map(line -> line.split("=", 2))
.collect(
Collectors.toMap(
kv -> Arrays.asList(kv[0].split("\\.")),
kv -> kv[1],
(o, n) -> o,
LinkedHashMap::new));
Map<String, Object> result = loadPropertiesStyleMap(properties);
// Special case, we shouldn't change key in schema config.
// TODO we should not hard code it, it should be as a config.
if (rawMap.containsKey(TableSchemaOptions.SCHEMA.key())) {
result.put(
TableSchemaOptions.SCHEMA.key(),
rawMap.get(TableSchemaOptions.SCHEMA.key()));
}
return result;
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Json parsing exception.");
}
}

private static Map<String, Object> loadPropertiesStyleMap(
Map<List<String>, String> properties) {
Map<String, Object> propertiesMap = new LinkedHashMap<>();
Map<List<String>, String> temp = new LinkedHashMap<>();
String tempPrefix = null;
for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
String key = entry.getKey().get(0);
if (!key.equals(tempPrefix)) {
putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
tempPrefix = key;
}
if (entry.getKey().size() > 1) {
temp.put(entry.getKey().subList(1, entry.getKey().size()), entry.getValue());
} else if (!temp.isEmpty()) {
temp.put(Collections.singletonList(""), entry.getValue());
} else {
temp.put(null, entry.getValue());
}
}
putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
return propertiesMap;
}

private static void putKeyValueToMapCheck(
Map<String, Object> propertiesMap, Map<List<String>, String> temp, String tempPrefix) {
if (!temp.isEmpty()) {
if (propertiesMap.containsKey(tempPrefix)) {
if (temp.containsKey(null)) {
((Map) propertiesMap.get(tempPrefix)).put("", temp.get(null));
} else if (propertiesMap.get(tempPrefix) instanceof String) {
loadPropertiesStyleMap(temp).put("", propertiesMap.get(tempPrefix));
} else {
mergeTwoMap((Map) propertiesMap.get(tempPrefix), loadPropertiesStyleMap(temp));
}
} else {
propertiesMap.put(tempPrefix, loadPropertiesStyleObject(temp));
}
temp.clear();
}
}

private static void mergeTwoMap(Map<String, Object> base, Map<String, Object> merged) {
for (Map.Entry<String, Object> entry : merged.entrySet()) {
if (base.containsKey(entry.getKey())) {
if (base.get(entry.getKey()) instanceof Map && entry.getValue() instanceof Map) {
mergeTwoMap((Map) base.get(entry.getKey()), (Map) entry.getValue());
} else if (base.get(entry.getKey()) instanceof Map) {
((Map) base.get(entry.getKey())).put("", entry.getValue());
} else if (entry.getValue() instanceof Map) {
Map<String, Object> child = new LinkedHashMap<>();
child.put("", base.get(entry.getKey()));
child.putAll((Map) entry.getValue());
base.put(entry.getKey(), child);
} else {
throw new IllegalArgumentException(
String.format(
"Duplicate key '%s' in config file, value '%s' and value '%s'",
entry.getKey(), base.get(entry.getKey()), entry.getValue()));
}
} else {
base.put(entry.getKey(), entry.getValue());
}
}
}

private static List<Object> loadPropertiesStyleList(Map<List<String>, String> properties) {
List<Object> propertiesList = new ArrayList<>();
Map<List<String>, String> temp = new LinkedHashMap<>();
int tempIndex = -1;
for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
int index = Integer.parseInt(entry.getKey().get(0));
if (index != tempIndex) {
if (!temp.isEmpty()) {
propertiesList.add(loadPropertiesStyleObject(temp));
temp.clear();
}
tempIndex = index;
}
if (entry.getKey().size() == 1) {
temp.put(null, entry.getValue());
} else {
temp.put(entry.getKey().subList(1, entry.getKey().size()), entry.getValue());
}
}
if (!temp.isEmpty()) {
propertiesList.add(loadPropertiesStyleObject(temp));
}
return propertiesList;
}

private static Object loadPropertiesStyleObject(Map<List<String>, String> properties) {
if (properties.containsKey(null) && properties.size() == 1) {
return StringEscapeUtils.unescapeJava(properties.get(null));
} else if (properties.containsKey(null)) {
if (properties.containsKey(null)) {
properties.put(Collections.singletonList(""), properties.get(null));
properties.remove(null);
}
return loadPropertiesStyleMap(properties);
} else if (properties.entrySet().stream().anyMatch(kv -> kv.getKey().get(0).equals("1"))) {
return loadPropertiesStyleList(properties);
} else {
return loadPropertiesStyleMap(properties);
}
}

@SuppressWarnings("unchecked")
static Object flatteningMap(
Object rawValue, Map<String, Object> newMap, List<String> keys, boolean nestedMap) {
if (rawValue == null) {
return null;
}
if (!(rawValue instanceof List) && !(rawValue instanceof Map)) {
if (newMap == null) {
return rawValue;
}
newMap.put(String.join(".", keys), rawValue);
return newMap;
}

if (rawValue instanceof List) {
List<Object> rawList = (List<Object>) rawValue;
rawList.replaceAll(value -> flatteningMap(value, null, null, false));
if (newMap != null) {
newMap.put(String.join(".", keys), rawList);
return newMap;
}
return rawList;
} else {
Map<String, Object> rawMap = (Map<String, Object>) rawValue;
if (!nestedMap) {
keys = new ArrayList<>();
newMap = new LinkedHashMap<>(rawMap.size());
}
for (Map.Entry<String, Object> entry : rawMap.entrySet()) {
keys.add(entry.getKey());
flatteningMap(entry.getValue(), newMap, keys, true);
keys.remove(keys.size() - 1);
}
return newMap;
}
}

/**
*
*
* <pre>
* poll.timeout = 1000
* poll : {timeout = 1000, interval = 500} ==>>
* poll.interval = 500
* </pre>
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> flatteningMap(Map<String, Object> treeMap) {
return (Map<String, Object>) flatteningMapWithObject(treeMap);
}

static Object flatteningMapWithObject(Object rawValue) {
return flatteningMap(rawValue, null, null, false);
}
private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();

@SuppressWarnings("unchecked")
public static <T> T convertValue(Object rawValue, Option<T> option, boolean flatten) {
public static <T> T convertValue(Object rawValue, Option<T> option) {
TypeReference<T> typeReference = option.typeReference();
rawValue = flatten ? flatteningMapWithObject(rawValue) : rawValue;
if (typeReference.getType() instanceof Class) {
// simple type
Class<T> clazz = (Class<T>) typeReference.getType();
Expand Down Expand Up @@ -404,6 +198,9 @@ static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
}

public static String convertToJsonString(Object o) {
if (o == null) {
return null;
}
if (o instanceof String) {
return (String) o;
}
Expand Down
Loading