Skip to content

Commit

Permalink
Fix handling of malformed log messages (#2693)
Browse files Browse the repository at this point in the history
  • Loading branch information
akkomar authored Nov 6, 2024
1 parent 0079132 commit d4990f2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,28 @@ public PubsubMessage apply(PubsubMessage message) {
final Map<String, String> attributes = new HashMap<>(message.getAttributeMap());

logEntry = Json.readObjectNode(message.getPayload());
ObjectNode jsonPayload = (ObjectNode) logEntry.path("jsonPayload");
ObjectNode fields = (ObjectNode) jsonPayload.path("Fields");
JsonNode ipAddress = fields.remove("ip_address");
if (ipAddress != null) {
// if ipAddress is null, it means it wasn't present in the payload
String clientIpAddress = ipAddress.textValue();
attributes.put(Attribute.X_FORWARDED_FOR, clientIpAddress);
countIpExtracted.inc();
JsonNode maybeJsonPayloadNode = logEntry.path("jsonPayload");
if (maybeJsonPayloadNode.isObject()) {
ObjectNode jsonPayload = (ObjectNode) maybeJsonPayloadNode;
JsonNode maybeFieldsNode = jsonPayload.path("Fields");
if (maybeFieldsNode.isObject()) {
ObjectNode fields = (ObjectNode) maybeFieldsNode;
JsonNode ipAddress = fields.remove("ip_address");
if (ipAddress != null) {
// if ipAddress is null, it means it wasn't present in the payload
String clientIpAddress = ipAddress.textValue();
attributes.put(Attribute.X_FORWARDED_FOR, clientIpAddress);
countIpExtracted.inc();

jsonPayload.replace("Fields", fields);
logEntry.replace("jsonPayload", jsonPayload);
byte[] sanitizedPayload = logEntry.toString().getBytes(StandardCharsets.UTF_8);
jsonPayload.replace("Fields", fields);
logEntry.replace("jsonPayload", jsonPayload);
byte[] sanitizedPayload = logEntry.toString().getBytes(StandardCharsets.UTF_8);

return new PubsubMessage(sanitizedPayload, attributes);
} else {
return message;
return new PubsubMessage(sanitizedPayload, attributes);
}
}
}
return message;

} catch (IOException e) {
throw new ParseLogEntry.InvalidLogEntryException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,22 @@ public void testExtractIpAndScrub() {

pipeline.run();
}

@Test
public void canHandleMalformedMessage() {
String inputPath = Resources.getResource("testdata").getPath();
// This file contains a single message without the expected `Fields` object
String input = inputPath + "/logentry_malformed.ndjson";

PCollection<PubsubMessage> output = pipeline
.apply(new Read.FileInput(input, InputFileFormat.json)).apply(ExtractIpFromLogEntry.of());

final List<String> expectedPayloads = Arrays.asList(
"{\"insertId\":\"i7ymfvfsay14vgp7\",\"jsonPayload\":{\"EnvVersion\":\"2.0\",\"Logger\":\"fxa-oauth-server\",\"Pid\":26,\"Severity\":6,\"Timestamp\":1.6879373570750001e+18,\"Type\":\"glean-server-event\"},\"labels\":{\"compute.googleapis.com/resource_name\":\"gke-custom-fluentbit-default-pool-3fa9e570-j753\",\"k8s-pod/component\":\"test-js-logger\",\"k8s-pod/pod-template-hash\":\"584d9fc78c\"},\"logName\":\"projects/akomar-server-telemetry-poc/logs/stdout\",\"receiveTimestamp\":\"2023-06-28T07:29:17.288291899Z\",\"resource\":{\"labels\":{\"cluster_name\":\"custom-fluentbit\",\"container_name\":\"test-js-logger\",\"location\":\"us-east1-b\",\"namespace_name\":\"default\",\"pod_name\":\"test-js-logger-584d9fc78c-pz9x9\",\"project_id\":\"akomar-server-telemetry-poc\"},\"type\":\"k8s_container\"},\"severity\":\"INFO\",\"timestamp\":\"2023-06-28T07:29:17.075926141Z\"}");
final PCollection<String> payloads = output.apply("encodeText", OutputFileFormat.text.encode());
PAssert.that(payloads).containsInAnyOrder(expectedPayloads);

// Pipeline should run and not throw an exception on the malformed message
pipeline.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import com.mozilla.telemetry.options.InputFileFormat;
import com.mozilla.telemetry.options.OutputFileFormat;
import com.mozilla.telemetry.util.Json;
import java.util.List;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.WithFailures;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;

@SuppressWarnings("checkstyle:lineLength")
public class ParseLogEntryTest {

@Rule
Expand Down Expand Up @@ -48,13 +51,39 @@ public void testParse() throws Exception {
.put("app_display_version", "0.0.0").put("app_channel", "development").build())
.build());

PCollection<PubsubMessage> output = pipeline.apply(new FileInput(input, InputFileFormat.json))
.apply(ParseLogEntry.of()).output();
WithFailures.Result<PCollection<PubsubMessage>, PubsubMessage> outputWithFailures = pipeline
.apply(new FileInput(input, InputFileFormat.json)).apply(ParseLogEntry.of());

PAssert.that(output.apply(OutputFileFormat.text.encode()))
PAssert.that(outputWithFailures.output().apply(OutputFileFormat.text.encode()))
.containsInAnyOrder(ImmutableList.of(expected));

PAssert.that(outputWithFailures.failures()).empty();

pipeline.run();
}

@Test
public void canHandleMalformedMessage() {
String inputPath = Resources.getResource("testdata").getPath();
// This file contains a single message without the expected `Fields` object
String input = inputPath + "/logentry_malformed.ndjson";

WithFailures.Result<PCollection<PubsubMessage>, PubsubMessage> outputWithFailures = pipeline
.apply(new FileInput(input, InputFileFormat.json)).apply(ParseLogEntry.of());

// output should be empty
PAssert.that(outputWithFailures.output()).empty();

// message should go to error stream
PAssert.that(outputWithFailures.failures() //
.apply("EncodeTextError", OutputFileFormat.json.encode())).satisfies(collection -> {
List<String> errors = ImmutableList.copyOf(collection);
assert errors.size() == 1;
assert errors.get(0).contains(
"{\"attributeMap\":{\"error_message\":\"com.mozilla.telemetry.decoder.ParseLogEntry$InvalidLogEntryException: Message has no submission_timestamp but is not in a known LogEntry format\",\"exception_class\":\"com.mozilla.telemetry.decoder.ParseLogEntry$InvalidLogEntryException\"");
return null;
});

pipeline.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"attributeMap":{"logging.googleapis.com/timestamp":"2023-06-28T07:29:17.075926141Z"},"payload":"eyJpbnNlcnRJZCI6Imk3eW1mdmZzYXkxNHZncDciLCJqc29uUGF5bG9hZCI6eyJFbnZWZXJzaW9uIjoiMi4wIiwiTG9nZ2VyIjoiZnhhLW9hdXRoLXNlcnZlciIsIlBpZCI6MjYsIlNldmVyaXR5Ijo2LCJUaW1lc3RhbXAiOjEuNjg3OTM3MzU3MDc1MDAwMWUrMTgsIlR5cGUiOiJnbGVhbi1zZXJ2ZXItZXZlbnQifSwibGFiZWxzIjp7ImNvbXB1dGUuZ29vZ2xlYXBpcy5jb20vcmVzb3VyY2VfbmFtZSI6ImdrZS1jdXN0b20tZmx1ZW50Yml0LWRlZmF1bHQtcG9vbC0zZmE5ZTU3MC1qNzUzIiwiazhzLXBvZC9jb21wb25lbnQiOiJ0ZXN0LWpzLWxvZ2dlciIsIms4cy1wb2QvcG9kLXRlbXBsYXRlLWhhc2giOiI1ODRkOWZjNzhjIn0sImxvZ05hbWUiOiJwcm9qZWN0cy9ha29tYXItc2VydmVyLXRlbGVtZXRyeS1wb2MvbG9ncy9zdGRvdXQiLCJyZWNlaXZlVGltZXN0YW1wIjoiMjAyMy0wNi0yOFQwNzoyOToxNy4yODgyOTE4OTlaIiwicmVzb3VyY2UiOnsibGFiZWxzIjp7ImNsdXN0ZXJfbmFtZSI6ImN1c3RvbS1mbHVlbnRiaXQiLCJjb250YWluZXJfbmFtZSI6InRlc3QtanMtbG9nZ2VyIiwibG9jYXRpb24iOiJ1cy1lYXN0MS1iIiwibmFtZXNwYWNlX25hbWUiOiJkZWZhdWx0IiwicG9kX25hbWUiOiJ0ZXN0LWpzLWxvZ2dlci01ODRkOWZjNzhjLXB6OXg5IiwicHJvamVjdF9pZCI6ImFrb21hci1zZXJ2ZXItdGVsZW1ldHJ5LXBvYyJ9LCJ0eXBlIjoiazhzX2NvbnRhaW5lciJ9LCJzZXZlcml0eSI6IklORk8iLCJ0aW1lc3RhbXAiOiIyMDIzLTA2LTI4VDA3OjI5OjE3LjA3NTkyNjE0MVoifQ=="}

0 comments on commit d4990f2

Please sign in to comment.