Skip to content

Commit

Permalink
Merge pull request #33385 Introduce a BoundedTrie metric.
Browse files Browse the repository at this point in the history
Introduce a BoundedTrie metric which is used to efficiently store and aggregate a collection of string sequences (FQNs) with a limited size.

It is recommended to review this PR by commits.

BoundedTrie is a space-saving way to store many string sequences (like FQN/file paths). It acts like a tree with branches, holding sequences within a size limit. It can efficiently add, combine, and search and perform trimming of children when the size increases beyond defined max.

Let's say we want to store these sequences, with a size limit of 3:

"folder1/file1.txt"
"folder1/file2.txt"
"folder2/file3.txt"
Here's how the BoundedTrie might look:

root
  - folder1
     - file1.txt
     - file2.txt
  - folder2
     - file3.txt 
If we try to add "folder1/file4.txt", the trie might trim to "folder1", dropping all children to stay within the size limit.

This will be used to replace the StringSet metric for lineage tracking for very large lineage graphs to overcome the size limits.
  • Loading branch information
robertwb authored Feb 1, 2025
2 parents 3321224 + f68fe75 commit 5e031ab
Show file tree
Hide file tree
Showing 39 changed files with 2,921 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Arrays;
import java.util.Objects;
import org.apache.beam.sdk.metrics.BoundedTrie;
import org.apache.beam.sdk.metrics.MetricName;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Tracks the current value for a {@link BoundedTrie} metric.
*
* <p>This class generally shouldn't be used directly. The only exception is within a runner where a
* counter is being reported for a specific step (rather than the counter in the current context).
* In that case retrieving the underlying cell and reporting directly to it avoids a step of
* indirection.
*/
@SuppressFBWarnings(
value = "IS2_INCONSISTENT_SYNC",
justification = "Some access on purpose are left unsynchronized")
public class BoundedTrieCell implements BoundedTrie, MetricCell<BoundedTrieData> {

private final DirtyState dirty = new DirtyState();
private BoundedTrieData value;
private final MetricName name;

public BoundedTrieCell(MetricName name) {
this.name = name;
this.value = new BoundedTrieData();
}

public synchronized void update(BoundedTrieData other) {
// although BoundedTrieData is thread-safe the cell is made thread safe too because combine
// returns a reference to the combined BoundedTrieData and want the reference update here to
// be thread safe too.
this.value = this.value.combine(other);
dirty.afterModification();
}

@Override
public synchronized void reset() {
value.clear();
dirty.reset();
}

@Override
public DirtyState getDirty() {
return dirty;
}

/**
* @return Returns a deep copy of the {@link BoundedTrieData} contained in this {@link
* BoundedTrieCell}.
*/
@Override
public synchronized BoundedTrieData getCumulative() {
return value.getCumulative();
}

@Override
public MetricName getName() {
return name;
}

@Override
public boolean equals(@Nullable Object object) {
if (object instanceof BoundedTrieCell) {
BoundedTrieCell boundedTrieCell = (BoundedTrieCell) object;
return Objects.equals(dirty, boundedTrieCell.dirty)
&& Objects.equals(value, boundedTrieCell.value)
&& Objects.equals(name, boundedTrieCell.name);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(dirty, value, name);
}

@Override
public synchronized void add(Iterable<String> values) {
this.value.add(values);
dirty.afterModification();
}

@Override
public synchronized void add(String... values) {
add(Arrays.asList(values));
}
}
Loading

0 comments on commit 5e031ab

Please sign in to comment.