Skip to content

Commit

Permalink
[BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored May 5, 2022
1 parent 017f846 commit 0af670d
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .test-infra/jenkins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/)
| beam_LoadTests_Go_GBK_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch_PR/) | `Run Load Tests Go GBK Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/) |
| beam_LoadTests_Go_ParDo_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch_PR/) | `Run Load Tests Go ParDo Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/) |
| beam_LoadTests_Go_ParDo_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch_PR/) | `Run Load Tests Go ParDo Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/) |
| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch suite` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) |
| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) |
| beam_LoadTests_Go_SideInput_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch_PR/) | `Run Load Tests Go SideInput Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/) |
| beam_Java_LoadTests_Smoke | [phrase](https://ci-beam.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | `Run Java Load Tests Smoke` | |
| beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Java CoGBK Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) |
Expand Down
8 changes: 8 additions & 0 deletions .test-infra/jenkins/job_LoadTests_Combine_Go.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper

import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY


String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))

Expand All @@ -47,6 +49,8 @@ def batchScenarios = {
top_count : 20,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -69,6 +73,8 @@ def batchScenarios = {
top_count : 20,
num_workers : 16,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -91,6 +97,8 @@ def batchScenarios = {
top_count : 20,
num_workers : 16,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
Expand Down
16 changes: 16 additions & 0 deletions .test-infra/jenkins/job_LoadTests_GBK_Go.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper

import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY

String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))

def batchScenarios = {
Expand All @@ -46,6 +48,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -68,6 +72,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -90,6 +96,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -112,6 +120,8 @@ def batchScenarios = {
fanout : 4,
num_workers : 16,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -134,6 +144,8 @@ def batchScenarios = {
fanout : 8,
num_workers : 16,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -158,6 +170,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -182,6 +196,8 @@ def batchScenarios = {
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
]
Expand Down
10 changes: 10 additions & 0 deletions .test-infra/jenkins/job_LoadTests_ParDo_Go.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper

import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY

String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))


Expand All @@ -48,6 +50,8 @@ def batchScenarios = {
number_of_counters : 0,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -71,6 +75,8 @@ def batchScenarios = {
number_of_counters : 0,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -94,6 +100,8 @@ def batchScenarios = {
number_of_counters : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -117,6 +125,8 @@ def batchScenarios = {
number_of_counters : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
Expand Down
6 changes: 6 additions & 0 deletions .test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper

import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY

String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))

def batchScenarios = {
Expand All @@ -45,6 +47,8 @@ def batchScenarios = {
access_percentage: 1,
num_workers : 10,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand All @@ -65,6 +69,8 @@ def batchScenarios = {
'"value_size": 900}\'',
num_workers : 10,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
]
]
Expand Down
10 changes: 10 additions & 0 deletions .test-infra/jenkins/job_LoadTests_coGBK_Go.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import InfluxDBCredentialsHelper

import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY

String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))

def batchScenarios = {
Expand Down Expand Up @@ -53,6 +55,8 @@ def batchScenarios = {
iterations : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand Down Expand Up @@ -82,6 +86,8 @@ def batchScenarios = {
iterations : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand Down Expand Up @@ -111,6 +117,8 @@ def batchScenarios = {
iterations : 4,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
[
Expand Down Expand Up @@ -140,6 +148,8 @@ def batchScenarios = {
iterations : 4,
num_workers : 5,
autoscaling_algorithm: 'NONE',
environment_type : 'DOCKER',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
]
],
]
Expand Down
7 changes: 3 additions & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@
## Bugfixes
* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Fixed Java expansion service to allow specific files to stage ([BEAM-14160](https://issues.apache.org/jira/browse/BEAM-14160)).
## Known Issues
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->

# [2.40.0] - Unreleased

## Highlights
Expand All @@ -68,6 +67,7 @@
## Breaking Changes

* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))

## Deprecations

Expand All @@ -82,8 +82,7 @@

* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).


# [2.39.0] - Unreleased
# [2.39.0] - Unreleased, Cut

## Highlights

Expand Down
22 changes: 11 additions & 11 deletions sdks/go/pkg/beam/io/synthetic/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func DefaultSourceConfig() *SourceConfigBuilder {
// Valid values are in the range of [1, ...] and the default value is 1. Values
// of 0 (and below) are invalid as they result in sources that emit no elements.
func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
b.cfg.NumElements = val
b.cfg.NumElements = int64(val)
return b
}

Expand All @@ -210,7 +210,7 @@ func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
// of 0 (and below) are invalid as they would result in dropping elements that
// are expected to be emitted.
func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
b.cfg.InitialSplits = val
b.cfg.InitialSplits = int64(val)
return b
}

Expand All @@ -219,7 +219,7 @@ func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
//
// Valid values are in the range of [1, ...] and the default value is 8.
func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
b.cfg.KeySize = val
b.cfg.KeySize = int64(val)
return b
}

Expand All @@ -228,7 +228,7 @@ func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
//
// Valid values are in the range of [1, ...] and the default value is 8.
func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
b.cfg.ValueSize = val
b.cfg.ValueSize = int64(val)
return b
}

Expand All @@ -237,7 +237,7 @@ func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
//
// Valid values are in the range of [0, ...] and the default value is 0.
func (b *SourceConfigBuilder) NumHotKeys(val int) *SourceConfigBuilder {
b.cfg.NumHotKeys = val
b.cfg.NumHotKeys = int64(val)
return b
}

Expand Down Expand Up @@ -299,10 +299,10 @@ func (b *SourceConfigBuilder) BuildFromJSON(jsonData []byte) SourceConfig {
// synthetic source. It should be created via a SourceConfigBuilder, not by
// directly initializing it (the fields are public to allow encoding).
type SourceConfig struct {
NumElements int `json:"num_records"`
InitialSplits int `json:"initial_splits"`
KeySize int `json:"key_size"`
ValueSize int `json:"value_size"`
NumHotKeys int `json:"num_hot_keys"`
HotKeyFraction float64 `json:"hot_key_fraction"`
NumElements int64 `json:"num_records" beam:"num_records"`
InitialSplits int64 `json:"initial_splits" beam:"initial_splits"`
KeySize int64 `json:"key_size" beam:"key_size"`
ValueSize int64 `json:"value_size" beam:"value_size"`
NumHotKeys int64 `json:"num_hot_keys" beam:"num_hot_keys"`
HotKeyFraction float64 `json:"hot_key_fraction" beam:"hot_key_fraction"`
}
9 changes: 4 additions & 5 deletions sdks/go/test/load/sideinput/sideinput.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}

type doFn struct {
ElementsToAccess int
ElementsToAccess int64
}

func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
var key []byte
var value []byte
i := 0
var key, value []byte
var i int64
for values(&key, &value) {
if i >= fn.ElementsToAccess {
break
Expand All @@ -75,7 +74,7 @@ func main() {
p, s := beam.NewPipelineWithRoot()

syntheticConfig := parseSyntheticConfig()
elementsToAccess := syntheticConfig.NumElements * *accessPercentage / 100
elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100)

src := synthetic.SourceSingle(s, syntheticConfig)
src = beam.ParDo(s, &load.RuntimeMonitor{}, src)
Expand Down

0 comments on commit 0af670d

Please sign in to comment.