Skip to content

Commit

Permalink
[Improve] Remove useless ReadonlyConfig flatten feature (#5612)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Wenjun Ruan <[email protected]>
  • Loading branch information
Hisoka-X and ruanwenjun authored Oct 12, 2023
1 parent 38764f1 commit 243edfe
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 569 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ public interface ConfigAdapter {
String[] extensionIdentifiers();

/**
* Converter config file to path_key-value Map (FlattenedMap) in HOCON
* Converter config file to path_key-value Map in HOCON
*
* @see org.apache.seatunnel.api.configuration.util.ConfigUtil#flatteningMap(Map)
* @param configFilePath config file path.
* @return FlattenedMap
* @return Map
*/
Map<String, Object> loadConfig(Path configFilePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import static org.apache.seatunnel.api.configuration.util.ConfigUtil.convertToJsonString;
import static org.apache.seatunnel.api.configuration.util.ConfigUtil.convertValue;
import static org.apache.seatunnel.api.configuration.util.ConfigUtil.flatteningMap;
import static org.apache.seatunnel.api.configuration.util.ConfigUtil.treeMap;

@Slf4j
public class ReadonlyConfig implements Serializable {
Expand All @@ -50,7 +48,7 @@ private ReadonlyConfig(Map<String, Object> confData) {
}

public static ReadonlyConfig fromMap(Map<String, Object> map) {
return new ReadonlyConfig(treeMap(map));
return new ReadonlyConfig(map);
}

public static ReadonlyConfig fromConfig(Config config) {
Expand All @@ -65,68 +63,38 @@ public static ReadonlyConfig fromConfig(Config config) {
}

public <T> T get(Option<T> option) {
return get(option, true);
}

public <T> T get(Option<T> option, boolean flatten) {
return getOptional(option, flatten).orElseGet(option::defaultValue);
}

public Map<String, String> toMap() {
return toMap(true);
}

public Config toConfig() {
return toConfig(true);
return getOptional(option).orElseGet(option::defaultValue);
}

/**
* Transform to Config todo: This method should be removed after we remove Config
*
* @return Config
*/
public Config toConfig(boolean flatten) {
if (flatten) {
return ConfigFactory.parseMap(flatteningMap(confData));
}
public Config toConfig() {
return ConfigFactory.parseMap(confData);
}

public Map<String, String> toMap(boolean flatten) {
public Map<String, String> toMap() {
if (confData.isEmpty()) {
return Collections.emptyMap();
}

Map<String, String> result = new LinkedHashMap<>();
toMap(result, flatten);
toMap(result);
return result;
}

public void toMap(Map<String, String> result) {
toMap(result, true);
}

public void toMap(Map<String, String> result, boolean flatten) {
if (confData.isEmpty()) {
return;
}
Map<String, Object> map;
if (flatten) {
map = flatteningMap(confData);
} else {
map = confData;
}
for (Map.Entry<String, Object> entry : map.entrySet()) {
for (Map.Entry<String, Object> entry : confData.entrySet()) {
result.put(entry.getKey(), convertToJsonString(entry.getValue()));
}
}

public <T> Optional<T> getOptional(Option<T> option) {
return getOptional(option, true);
}

@SuppressWarnings("unchecked")
public <T> Optional<T> getOptional(Option<T> option, boolean flatten) {
if (option == null) {
throw new NullPointerException("Option not be null.");
}
Expand All @@ -146,24 +114,28 @@ public <T> Optional<T> getOptional(Option<T> option, boolean flatten) {
if (value == null) {
return Optional.empty();
}
return Optional.of(convertValue(value, option, flatten));
return Optional.of(convertValue(value, option));
}

private Object getValue(String key) {
String[] keys = key.split("\\.");
Map<String, Object> data = this.confData;
Object value = null;
for (int i = 0; i < keys.length; i++) {
value = data.get(keys[i]);
if (i < keys.length - 1) {
if (!(value instanceof Map)) {
return null;
} else {
data = (Map<String, Object>) value;
if (this.confData.containsKey(key)) {
return this.confData.get(key);
} else {
String[] keys = key.split("\\.");
Map<String, Object> data = this.confData;
Object value = null;
for (int i = 0; i < keys.length; i++) {
value = data.get(keys[i]);
if (i < keys.length - 1) {
if (!(value instanceof Map)) {
return null;
} else {
data = (Map<String, Object>) value;
}
}
}
return value;
}
return value;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,233 +20,27 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public class ConfigUtil {
private static final JavaPropsMapper PROPERTIES_MAPPER = new JavaPropsMapper();
private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();

/**
*
*
* <pre>
* poll.timeout = 1000
* ==>> poll : {timeout = 1000, interval = 500}
* poll.interval = 500
* </pre>
*/
public static Map<String, Object> treeMap(Map<String, Object> rawMap) {
try {
Map<List<String>, String> properties =
Arrays.stream(PROPERTIES_MAPPER.writeValueAsString(rawMap).split("\n"))
.filter(StringUtils::isNoneEmpty)
.map(line -> line.split("=", 2))
.collect(
Collectors.toMap(
kv -> Arrays.asList(kv[0].split("\\.")),
kv -> kv[1],
(o, n) -> o,
LinkedHashMap::new));
Map<String, Object> result = loadPropertiesStyleMap(properties);
// Special case, we shouldn't change key in schema config.
// TODO we should not hard code it, it should be as a config.
if (rawMap.containsKey(TableSchemaOptions.SCHEMA.key())) {
result.put(
TableSchemaOptions.SCHEMA.key(),
rawMap.get(TableSchemaOptions.SCHEMA.key()));
}
return result;
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Json parsing exception.");
}
}

private static Map<String, Object> loadPropertiesStyleMap(
Map<List<String>, String> properties) {
Map<String, Object> propertiesMap = new LinkedHashMap<>();
Map<List<String>, String> temp = new LinkedHashMap<>();
String tempPrefix = null;
for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
String key = entry.getKey().get(0);
if (!key.equals(tempPrefix)) {
putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
tempPrefix = key;
}
if (entry.getKey().size() > 1) {
temp.put(entry.getKey().subList(1, entry.getKey().size()), entry.getValue());
} else if (!temp.isEmpty()) {
temp.put(Collections.singletonList(""), entry.getValue());
} else {
temp.put(null, entry.getValue());
}
}
putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
return propertiesMap;
}

private static void putKeyValueToMapCheck(
Map<String, Object> propertiesMap, Map<List<String>, String> temp, String tempPrefix) {
if (!temp.isEmpty()) {
if (propertiesMap.containsKey(tempPrefix)) {
if (temp.containsKey(null)) {
((Map) propertiesMap.get(tempPrefix)).put("", temp.get(null));
} else if (propertiesMap.get(tempPrefix) instanceof String) {
loadPropertiesStyleMap(temp).put("", propertiesMap.get(tempPrefix));
} else {
mergeTwoMap((Map) propertiesMap.get(tempPrefix), loadPropertiesStyleMap(temp));
}
} else {
propertiesMap.put(tempPrefix, loadPropertiesStyleObject(temp));
}
temp.clear();
}
}

private static void mergeTwoMap(Map<String, Object> base, Map<String, Object> merged) {
for (Map.Entry<String, Object> entry : merged.entrySet()) {
if (base.containsKey(entry.getKey())) {
if (base.get(entry.getKey()) instanceof Map && entry.getValue() instanceof Map) {
mergeTwoMap((Map) base.get(entry.getKey()), (Map) entry.getValue());
} else if (base.get(entry.getKey()) instanceof Map) {
((Map) base.get(entry.getKey())).put("", entry.getValue());
} else if (entry.getValue() instanceof Map) {
Map<String, Object> child = new LinkedHashMap<>();
child.put("", base.get(entry.getKey()));
child.putAll((Map) entry.getValue());
base.put(entry.getKey(), child);
} else {
throw new IllegalArgumentException(
String.format(
"Duplicate key '%s' in config file, value '%s' and value '%s'",
entry.getKey(), base.get(entry.getKey()), entry.getValue()));
}
} else {
base.put(entry.getKey(), entry.getValue());
}
}
}

private static List<Object> loadPropertiesStyleList(Map<List<String>, String> properties) {
List<Object> propertiesList = new ArrayList<>();
Map<List<String>, String> temp = new LinkedHashMap<>();
int tempIndex = -1;
for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
int index = Integer.parseInt(entry.getKey().get(0));
if (index != tempIndex) {
if (!temp.isEmpty()) {
propertiesList.add(loadPropertiesStyleObject(temp));
temp.clear();
}
tempIndex = index;
}
if (entry.getKey().size() == 1) {
temp.put(null, entry.getValue());
} else {
temp.put(entry.getKey().subList(1, entry.getKey().size()), entry.getValue());
}
}
if (!temp.isEmpty()) {
propertiesList.add(loadPropertiesStyleObject(temp));
}
return propertiesList;
}

private static Object loadPropertiesStyleObject(Map<List<String>, String> properties) {
if (properties.containsKey(null) && properties.size() == 1) {
return StringEscapeUtils.unescapeJava(properties.get(null));
} else if (properties.containsKey(null)) {
if (properties.containsKey(null)) {
properties.put(Collections.singletonList(""), properties.get(null));
properties.remove(null);
}
return loadPropertiesStyleMap(properties);
} else if (properties.entrySet().stream().anyMatch(kv -> kv.getKey().get(0).equals("1"))) {
return loadPropertiesStyleList(properties);
} else {
return loadPropertiesStyleMap(properties);
}
}

@SuppressWarnings("unchecked")
static Object flatteningMap(
Object rawValue, Map<String, Object> newMap, List<String> keys, boolean nestedMap) {
if (rawValue == null) {
return null;
}
if (!(rawValue instanceof List) && !(rawValue instanceof Map)) {
if (newMap == null) {
return rawValue;
}
newMap.put(String.join(".", keys), rawValue);
return newMap;
}

if (rawValue instanceof List) {
List<Object> rawList = (List<Object>) rawValue;
rawList.replaceAll(value -> flatteningMap(value, null, null, false));
if (newMap != null) {
newMap.put(String.join(".", keys), rawList);
return newMap;
}
return rawList;
} else {
Map<String, Object> rawMap = (Map<String, Object>) rawValue;
if (!nestedMap) {
keys = new ArrayList<>();
newMap = new LinkedHashMap<>(rawMap.size());
}
for (Map.Entry<String, Object> entry : rawMap.entrySet()) {
keys.add(entry.getKey());
flatteningMap(entry.getValue(), newMap, keys, true);
keys.remove(keys.size() - 1);
}
return newMap;
}
}

/**
*
*
* <pre>
* poll.timeout = 1000
* poll : {timeout = 1000, interval = 500} ==>>
* poll.interval = 500
* </pre>
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> flatteningMap(Map<String, Object> treeMap) {
return (Map<String, Object>) flatteningMapWithObject(treeMap);
}

static Object flatteningMapWithObject(Object rawValue) {
return flatteningMap(rawValue, null, null, false);
}
private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();

@SuppressWarnings("unchecked")
public static <T> T convertValue(Object rawValue, Option<T> option, boolean flatten) {
public static <T> T convertValue(Object rawValue, Option<T> option) {
TypeReference<T> typeReference = option.typeReference();
rawValue = flatten ? flatteningMapWithObject(rawValue) : rawValue;
if (typeReference.getType() instanceof Class) {
// simple type
Class<T> clazz = (Class<T>) typeReference.getType();
Expand Down Expand Up @@ -404,6 +198,9 @@ static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
}

public static String convertToJsonString(Object o) {
if (o == null) {
return null;
}
if (o instanceof String) {
return (String) o;
}
Expand Down
Loading

0 comments on commit 243edfe

Please sign in to comment.