Skip to content

Commit

Permalink
Merge branch '6.x' into ccr-6.x
Browse files Browse the repository at this point in the history
* 6.x:
  Improve message when JAVA_HOME not set (#32022)
  INGEST: Make a few Processors callable by Painless (#32170) (#32261)
  INGEST: Extend KV Processor (#31789) (#32232) (#32262)
  • Loading branch information
dnhatn committed Jul 22, 2018
2 parents 931d3ca + 9f55f03 commit 566f9b4
Show file tree
Hide file tree
Showing 17 changed files with 610 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ class BuildPlugin implements Plugin<Project> {
// IntelliJ does not set JAVA_HOME, so we use the JDK that Gradle was run with
return Jvm.current().javaHome
} else {
throw new GradleException("JAVA_HOME must be set to build Elasticsearch")
throw new GradleException(
"JAVA_HOME must be set to build Elasticsearch. " +
"Note that if the variable was just set you might have to run `./gradlew --stop` for " +
"it to be picked up. See https://github.com/elastic/elasticsearch/issues/31399 details."
)
}
}
return javaHome
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,10 @@ For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
| `exclude_keys` | no | `null` | List of keys to exclude from document
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
| `prefix` | no | `null` | Prefix to be added to extracted keys
| `trim_key` | no | `null` | String of characters to trim from extracted keys
| `trim_value` | no | `null` | String of characters to trim from extracted values
| `strip_brackets` | no | `false` | If `true` strip brackets `()`, `<>`, `[]` as well as quotes `'` and `"` from extracted values
|======


Expand Down
6 changes: 6 additions & 0 deletions modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
esplugin {
description 'Module for ingest processors that do not require additional security permissions or have large dependencies and resources'
classname 'org.elasticsearch.ingest.common.IngestCommonPlugin'
extendedPlugins = ['lang-painless']
}

dependencies {
compileOnly project(':modules:lang-painless')
compile project(':libs:grok')
}

compileJava.options.compilerArgs << "-Xlint:-unchecked,-rawtypes"
compileTestJava.options.compilerArgs << "-Xlint:-unchecked,-rawtypes"

integTestCluster {
module project(':modules:lang-painless')
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ public final class BytesProcessor extends AbstractStringProcessor {
super(processorTag, field, ignoreMissing, targetField);
}

public static long apply(String value) {
return ByteSizeValue.parseBytesSizeValue(value, null, "Ingest Field").getBytes();
}

@Override
protected Long process(String value) {
return ByteSizeValue.parseBytesSizeValue(value, null, getField()).getBytes();
return apply(value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@ boolean isAddToRoot() {
return addToRoot;
}

@Override
public void execute(IngestDocument document) throws Exception {
Object fieldValue = document.getFieldValue(field, Object.class);
BytesReference bytesRef = (fieldValue == null) ? new BytesArray("null") : new BytesArray(fieldValue.toString());
public static Object apply(Object fieldValue) {
BytesReference bytesRef = fieldValue == null ? new BytesArray("null") : new BytesArray(fieldValue.toString());
try (InputStream stream = bytesRef.streamInput();
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
XContentParser.Token token = parser.nextToken();
Object value = null;
if (token == XContentParser.Token.VALUE_NULL) {
Expand All @@ -91,20 +89,32 @@ public void execute(IngestDocument document) throws Exception {
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
throw new IllegalArgumentException("cannot read binary value");
}
if (addToRoot && (value instanceof Map)) {
for (Map.Entry<String, Object> entry : ((Map<String, Object>) value).entrySet()) {
document.setFieldValue(entry.getKey(), entry.getValue());
}
} else if (addToRoot) {
throw new IllegalArgumentException("cannot add non-map fields to root of document");
} else {
document.setFieldValue(targetField, value);
}
return value;
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

public static void apply(Map<String, Object> ctx, String fieldName) {
Object value = apply(ctx.get(fieldName));
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) value;
ctx.putAll(map);
} else {
throw new IllegalArgumentException("cannot add non-map fields to root of document");
}
}

@Override
public void execute(IngestDocument document) throws Exception {
if (addToRoot) {
apply(document.getSourceAndMetadata(), field);
} else {
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
}
}

@Override
public String getType() {
return TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;

/**
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
Expand All @@ -38,16 +41,20 @@ public final class KeyValueProcessor extends AbstractProcessor {

public static final String TYPE = "kv";

private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");

private final String field;
private final String fieldSplit;
private final String valueSplit;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final boolean ignoreMissing;
private final Consumer<IngestDocument> execution;

KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
Set<String> excludeKeys, String targetField, boolean ignoreMissing) {
Set<String> excludeKeys, String targetField, boolean ignoreMissing,
String trimKey, String trimValue, boolean stripBrackets, String prefix) {
super(tag);
this.field = field;
this.targetField = targetField;
Expand All @@ -56,6 +63,92 @@ public final class KeyValueProcessor extends AbstractProcessor {
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
this.ignoreMissing = ignoreMissing;
this.execution = buildExecution(
fieldSplit, valueSplit, field, includeKeys, excludeKeys, targetField, ignoreMissing, trimKey, trimValue,
stripBrackets, prefix
);
}

private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, String field,
Set<String> includeKeys, Set<String> excludeKeys,
String targetField, boolean ignoreMissing,
String trimKey, String trimValue, boolean stripBrackets,
String prefix) {
final Predicate<String> keyFilter;
if (includeKeys == null) {
if (excludeKeys == null) {
keyFilter = key -> true;
} else {
keyFilter = key -> excludeKeys.contains(key) == false;
}
} else {
if (excludeKeys == null) {
keyFilter = includeKeys::contains;
} else {
keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
}
}
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField == null) {
fieldPathPrefix = keyPrefix;
} else {
fieldPathPrefix = targetField + "." + keyPrefix;
}
final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);
return document -> {
String value = document.getFieldValue(field, String.class, ignoreMissing);
if (value == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
}
for (String part : fieldSplitter.apply(value)) {
String[] kv = valueSplitter.apply(part);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
}
String key = keyTrimmer.apply(kv[0]);
if (keyFilter.test(key)) {
append(document, keyPrefixer.apply(key), valueTrimmer.apply(bracketStrip.apply(kv[1])));
}
}
};
}

private static Function<String, String> buildTrimmer(String trim) {
if (trim == null) {
return val -> val;
} else {
Pattern pattern = Pattern.compile("(^([" + trim + "]+))|([" + trim + "]+$)");
return val -> pattern.matcher(val).replaceAll("");
}
}

private static Function<String, String[]> buildSplitter(String split, boolean fields) {
int limit = fields ? 0 : 2;
if (split.length() > 2 || split.length() == 2 && split.charAt(0) != '\\') {
Pattern splitPattern = Pattern.compile(split);
return val -> splitPattern.split(val, limit);
} else {
return val -> val.split(split, limit);
}
}

String getField() {
Expand Down Expand Up @@ -86,7 +179,7 @@ boolean isIgnoreMissing() {
return ignoreMissing;
}

public void append(IngestDocument document, String targetField, String value) {
private static void append(IngestDocument document, String targetField, String value) {
if (document.hasField(targetField)) {
document.appendFieldValue(targetField, value);
} else {
Expand All @@ -96,27 +189,7 @@ public void append(IngestDocument document, String targetField, String value) {

@Override
public void execute(IngestDocument document) {
String oldVal = document.getFieldValue(field, String.class, ignoreMissing);

if (oldVal == null && ignoreMissing) {
return;
} else if (oldVal == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
}

String fieldPathPrefix = (targetField == null) ? "" : targetField + ".";
Arrays.stream(oldVal.split(fieldSplit))
.map((f) -> {
String[] kv = f.split(valueSplit, 2);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
}
return kv;
})
.filter((p) ->
(includeKeys == null || includeKeys.contains(p[0])) &&
(excludeKeys == null || excludeKeys.contains(p[0]) == false))
.forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
execution.accept(document);
}

@Override
Expand All @@ -132,6 +205,11 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
String trimValue = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_value");
String prefix = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "prefix");
boolean stripBrackets =
ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "strip_brackets", false);
Set<String> includeKeys = null;
Set<String> excludeKeys = null;
List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
Expand All @@ -143,7 +221,10 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing);
return new KeyValueProcessor(
processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing,
trimKey, trimValue, stripBrackets, prefix
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ public final class LowercaseProcessor extends AbstractStringProcessor {
super(processorTag, field, ignoreMissing, targetField);
}

public static String apply(String value) {
return value.toLowerCase(Locale.ROOT);
}

@Override
protected String process(String value) {
return value.toLowerCase(Locale.ROOT);
return apply(value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import java.util.Map;

public final class Processors {

public static long bytes(String value) {
return BytesProcessor.apply(value);
}

public static String lowercase(String value) {
return LowercaseProcessor.apply(value);
}

public static String uppercase(String value) {
return UppercaseProcessor.apply(value);
}

public static Object json(Object fieldValue) {
return JsonProcessor.apply(fieldValue);
}

public static void json(Map<String, Object> ctx, String field) {
JsonProcessor.apply(ctx, field);
}

public static String urlDecode(String value) {
return URLDecodeProcessor.apply(value);
}
}
Loading

0 comments on commit 566f9b4

Please sign in to comment.