Skip to content

Commit

Permalink
Merge branch 'master' into set_num_connections
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 authored Jul 9, 2024
2 parents d0f627e + a4558df commit 16c0840
Show file tree
Hide file tree
Showing 131 changed files with 4,128 additions and 808 deletions.
4 changes: 4 additions & 0 deletions .github/ACTIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,7 @@ Phrases self-assign, close, or manage labels on an issue:
| `.add-labels` | Add comma separated labels to the issue (e.g. `add-labels l1, 'l2 with spaces'`) |
| `.remove-labels` | Remove comma separated labels to the issue (e.g. `remove-labels l1, 'l2 with spaces'`) |
| `.set-labels` | Sets comma separated labels to the issue and removes any other labels (e.g. `set-labels l1, 'l2 with spaces'`) |

## Security Model

For information on the Beam CI security model, see https://cwiki.apache.org/confluence/display/BEAM/CI+Security+Model
7 changes: 4 additions & 3 deletions .github/actions/setup-environment-action/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ runs:
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
with:
cache-read-only: ${{ inputs.disable-cache }}
cache-disabled: ${{ inputs.disable-cache }}
- name: Install Go
if: ${{ inputs.go-version != '' }}
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: ${{ inputs.go-version == 'default' && '1.21' || inputs.go-version }} # never set patch, to get latest patch releases.
go-version: ${{ inputs.go-version == 'default' && '1.22' || inputs.go-version }} # never set patch, to get latest patch releases.
cache-dependency-path: $${{ inputs.disable-cache && '' || 'sdks/go.sum' }}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ RUN curl -OL https://nodejs.org/dist/v18.16.0/node-v18.16.0-linux-x64.tar.xz &&
mv /usr/local/node-v18.16.0-linux-x64 /usr/local/node
ENV PATH="${PATH}:/usr/local/node/bin"
#Install Go
ARG go_version=1.22.4
ARG go_version=1.22.5
RUN curl -OL https://go.dev/dl/go${go_version}.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go${go_version}.linux-amd64.tar.gz && \
rm go${go_version}.linux-amd64.tar.gz
Expand Down
3 changes: 2 additions & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
2 changes: 1 addition & 1 deletion .github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
- name: run Solace IO IT script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:solace:integrationTest
gradle-command: :sdks:java:io:solace:integrationTest --info
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
Expand Down
5 changes: 1 addition & 4 deletions .github/workflows/build_release_candidate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ on:
env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
publish_java_artifacts:
Expand Down Expand Up @@ -186,6 +184,7 @@ jobs:
uses: ./.github/actions/setup-environment-action
with:
python-version: default
disable-cache: true
- name: Import GPG key
id: import_gpg
uses: crazy-max/ghaction-import-gpg@111c56156bcc6918c056dbef52164cfa583dc549
Expand Down Expand Up @@ -435,8 +434,6 @@ jobs:
- uses: actions/setup-go@v5
with:
go-version: '1.22'
cache-dependency-path: |
sdks/go.sum
- name: Import GPG key
id: import_gpg
uses: crazy-max/ghaction-import-gpg@111c56156bcc6918c056dbef52164cfa583dc549
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/go_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 2
- uses: actions/setup-go@v5
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
go-version: '1.22'
cache-dependency-path: |
sdks/go.sum
go-version: default
- name: Delete old coverage
run: "cd sdks && rm -rf .coverage.txt || :"
- name: Run coverage
Expand Down
18 changes: 8 additions & 10 deletions .github/workflows/local_env_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
go-version: '1.22'
- uses: actions/setup-python@v5
with:
python-version: '3.8'
go-version: default
python-version: default
- name: "Installing local env dependencies"
run: "sudo ./local-env-setup.sh"
id: local_env_install_ubuntu
Expand All @@ -64,12 +63,11 @@ jobs:
runs-on: macos-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.22'
- uses: actions/setup-python@v5
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: '3.8'
go-version: default
python-version: default
- name: "Installing local env dependencies"
run: "./local-env-setup.sh"
id: local_env_install_mac
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@

* Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)).
* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721))
* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726))
* Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)).
* Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)])

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726))

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ class BeamModulePlugin implements Plugin<Project> {
testcontainers_oracle : "org.testcontainers:oracle-xe:$testcontainers_version",
testcontainers_postgresql : "org.testcontainers:postgresql:$testcontainers_version",
testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version",
testcontainers_solace : "org.testcontainers:solace:$testcontainers_version",
truth : "com.google.truth:truth:1.1.5",
threetenbp : "org.threeten:threetenbp:1.6.8",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
Expand Down Expand Up @@ -2223,7 +2224,7 @@ class BeamModulePlugin implements Plugin<Project> {

// This sets the whole project Go version.
// The latest stable Go version can be checked at https://go.dev/dl/
project.ext.goVersion = "go1.22.4"
project.ext.goVersion = "go1.22.5"

// Minor TODO: Figure out if we can pull out the GOCMD env variable after goPrepare script
// completion, and avoid this GOBIN substitution.
Expand Down
2 changes: 1 addition & 1 deletion dev-support/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ RUN pip3 install distlib==0.3.1 yapf==0.29.0 pytest
###
# Install Go
###
ENV DOWNLOAD_GO_VERSION=1.22.4
ENV DOWNLOAD_GO_VERSION=1.22.5
RUN wget https://golang.org/dl/go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz
ENV GOROOT /usr/local/go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,14 @@ message MonitoringInfoTypeUrns {
PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:progress:v1"];

// Represents a set of strings.
//
// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,14 @@ public static <KeyT> StateTag<MapState<KeyT, Boolean>> convertToMapTagInternal(
new StructuredId(setTag.getId()), StateSpecs.convertToMapSpecInternal(setTag.getSpec()));
}

public static <KeyT, ValueT> StateTag<MultimapState<KeyT, ValueT>> convertToMultiMapTagInternal(
StateTag<MapState<KeyT, ValueT>> mapTag) {
StateSpec<MapState<KeyT, ValueT>> spec = mapTag.getSpec();
StateSpec<MultimapState<KeyT, ValueT>> multimapSpec =
StateSpecs.convertToMultimapSpecInternal(spec);
return new SimpleStateTag<>(new StructuredId(mapTag.getId()), multimapSpec);
}

private static class StructuredId implements Serializable {
private final StateKind kind;
private final String rawId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Default implementation of {@link org.apache.beam.sdk.metrics.MetricResults}, which takes static
* {@link Iterable}s of counters, distributions, and gauges, and serves queries by applying {@link
* org.apache.beam.sdk.metrics.MetricsFilter}s linearly to them.
* {@link Iterable}s of counters, distributions, gauges, and stringsets, and serves queries by
* applying {@link org.apache.beam.sdk.metrics.MetricsFilter}s linearly to them.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -40,14 +41,17 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<Long>> counters;
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges) {
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
}

@Override
Expand All @@ -56,6 +60,8 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
Iterables.filter(counters, counter -> MetricFiltering.matches(filter, counter.getKey())),
Iterables.filter(
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())));
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
public abstract class MetricUpdates {

public static final MetricUpdates EMPTY =
MetricUpdates.create(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

/**
* Representation of a single metric update.
Expand All @@ -52,25 +54,33 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
}
}

/** Returns true if there are no updates in this MetricUpdates object. */
public boolean isEmpty() {
return Iterables.isEmpty(counterUpdates()) && Iterables.isEmpty(distributionUpdates());
}

/** All of the counter updates. */
/** All the counter updates. */
public abstract Iterable<MetricUpdate<Long>> counterUpdates();

/** All of the distribution updates. */
/** All the distribution updates. */
public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates();

/** All of the gauges updates. */
/** All the gauges updates. */
public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();

/** All the sets updates. */
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates) {
return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, gaugeUpdates);
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
public boolean isEmpty() {
return Iterables.isEmpty(counterUpdates())
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates());
}
}
Loading

0 comments on commit 16c0840

Please sign in to comment.