diff --git a/.buildkite/hooks/pre-command b/.buildkite/hooks/pre-command index a014542c9c38..7144331fc284 100644 --- a/.buildkite/hooks/pre-command +++ b/.buildkite/hooks/pre-command @@ -61,7 +61,6 @@ ENABLED_BEATS_PIPELINES_SLUGS=( "beats-metricbeat" "beats-winlogbeat" "beats-winlogbeat" - "beats-xpack-packetbeat" "beats-xpack-winlogbeat" "beats-xpack-dockerlogbeat" "beats-xpack-auditbeat" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 7deb787b6afd..00ce04b71d21 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -367,6 +367,33 @@ steps: - BUILDKITE_PULL_REQUEST_BASE_BRANCH=${BUILDKITE_PULL_REQUEST_BASE_BRANCH} - GITHUB_PR_LABELS=${GITHUB_PR_LABELS} + - label: "Trigger x-pack/packetbeat" + plugins: + - monorepo-diff#v1.0.1: + diff: "git diff --name-only origin/${GITHUB_PR_TARGET_BRANCH}...HEAD" + watch: + - path: + - x-pack/packetbeat/ + - .buildkite/x-pack/pipeline.xpack.packetbeat.yml + - .buildkite/scripts/ + - .buildkite/hooks/ + # x-pack + - go.mod + - pytest.ini + - dev-tools/ + - libbeat/ + - testing/ + - x-pack/libbeat/ + config: + trigger: "beats-xpack-packetbeat" + build: + commit: "${BUILDKITE_COMMIT}" + branch: "${BUILDKITE_BRANCH}" + env: + - BUILDKITE_PULL_REQUEST=${BUILDKITE_PULL_REQUEST} + - BUILDKITE_PULL_REQUEST_BASE_BRANCH=${BUILDKITE_PULL_REQUEST_BASE_BRANCH} + - GITHUB_PR_LABELS=${GITHUB_PR_LABELS} + - label: "Trigger Winlogbeat" plugins: - monorepo-diff#v1.0.1: diff --git a/.buildkite/pull-requests.json b/.buildkite/pull-requests.json index c98c1f07578d..b79646acd1bd 100644 --- a/.buildkite/pull-requests.json +++ b/.buildkite/pull-requests.json @@ -47,22 +47,6 @@ "skip_target_branches": [ ], "skip_ci_on_only_changed": [ ], "always_require_ci_on_changed": ["^winlogbeat/.*", ".buildkite/winlogbeat/.*", "^go.mod", "^pytest.ini", "^dev-tools/.*", "^libbeat/.*", "^testing/.*"] - }, - { - "enabled": true, - "pipelineSlug": "beats-xpack-packetbeat", - "allow_org_users": true, - "allowed_repo_permissions": ["admin", "write"], - "allowed_list": [ ], - "set_commit_status": true, - "build_on_commit": true, - "build_on_comment": true, - "trigger_comment_regex": "^/test x-pack/packetbeat$", - "always_trigger_comment_regex": "^/test x-pack/packetbeat$", - "skip_ci_labels": [ ], - "skip_target_branches": [ ], - "skip_ci_on_only_changed": [ ], - "always_require_ci_on_changed": ["^x-pack/packetbeat/.*", "^.buildkite/.*", "^go.mod", "^pytest.ini", "^dev-tools/.*", "^libbeat/.*", "^testing/.*", "^x-pack/libbeat/.*"] } ] } \ No newline at end of file diff --git a/.buildkite/scripts/common.sh b/.buildkite/scripts/common.sh index 2aebe4b5638c..9bd5cf93540c 100755 --- a/.buildkite/scripts/common.sh +++ b/.buildkite/scripts/common.sh @@ -14,15 +14,12 @@ XPACK_MODULE_PATTERN="^x-pack\\/[a-z0-9]+beat\\/module\\/([^\\/]+)\\/.*" # define if needed run the whole pipeline for the particular beat [ -z "${run_filebeat+x}" ] && run_filebeat="$(buildkite-agent meta-data get run_filebeat --default "false")" [ -z "${run_xpack_metricbeat+x}" ] && run_xpack_metricbeat="$(buildkite-agent meta-data get run_xpack_metricbeat --default "false")" -[ -z "${run_xpack_packetbeat+x}" ] && run_xpack_packetbeat="$(buildkite-agent meta-data get run_xpack_packetbeat --default "false")" # define if needed run ARM platform-specific tests for the particular beat [ -z "${run_filebeat_arm_tests+x}" ] && run_filebeat_arm_tests="$(buildkite-agent meta-data get run_filebeat_arm_tests --default "false")" -[ -z "${run_xpack_packetbeat_arm_tests+x}" ] && run_xpack_packetbeat_arm_tests="$(buildkite-agent meta-data get run_xpack_packetbeat_arm_tests --default "false")" # define if needed run MacOS platform-specific tests for the particular beat [ -z "${run_xpack_metricbeat_macos_tests+x}" ] && run_xpack_metricbeat_macos_tests="$(buildkite-agent meta-data get run_xpack_metricbeat_macos_tests --default "false")" -[ -z "${run_xpack_packetbeat_macos_tests+x}" ] && run_xpack_packetbeat_macos_tests="$(buildkite-agent meta-data get run_xpack_packetbeat_macos_tests --default "false")" # define if needed run cloud-specific tests for the particular beat [ -z "${run_xpack_metricbeat_aws_tests+x}" ] && run_xpack_metricbeat_aws_tests="$(buildkite-agent meta-data get run_xpack_metricbeat_aws_tests --default "false")" @@ -35,10 +32,6 @@ xpack_dockerlogbeat_changeset=( "^x-pack/dockerlogbeat/.*" ) -xpack_packetbeat_changeset=( - "^x-pack/packetbeat/.*" - ) - ci_changeset=( "^.buildkite/.*" ) @@ -73,9 +66,6 @@ case "${BUILDKITE_PIPELINE_SLUG}" in "beats-xpack-metricbeat") BEAT_CHANGESET_REFERENCE=${xpack_metricbeat_changeset[@]} ;; - "beats-xpack-packetbeat") - BEAT_CHANGESET_REFERENCE=${xpack_packetbeat_changeset[@]} - ;; *) echo "~~~ The changeset for the ${BUILDKITE_PIPELINE_SLUG} pipeline hasn't been defined yet." ;; diff --git a/.buildkite/scripts/generate_xpack_packetbeat_pipeline.sh b/.buildkite/scripts/generate_xpack_packetbeat_pipeline.sh deleted file mode 100644 index 97f37ca734bf..000000000000 --- a/.buildkite/scripts/generate_xpack_packetbeat_pipeline.sh +++ /dev/null @@ -1,285 +0,0 @@ -#!/usr/bin/env bash - -source .buildkite/scripts/common.sh - -set -euo pipefail - -pipelineName="pipeline.xpack-packetbeat-dynamic.yml" - -echo "Add the mandatory and extended tests without additional conditions into the pipeline" -if are_conditions_met_mandatory_tests; then - cat > $pipelineName <<- YAML - -steps: - - - group: "Mandatory Tests" - key: "mandatory-tests" - steps: - - label: ":linux: Ubuntu Unit Tests" - key: "mandatory-linux-unit-test" - command: "cd $BEATS_PROJECT_NAME && mage build unitTest" - agents: - provider: "gcp" - image: "${IMAGE_UBUNTU_X86_64}" - machineType: "${GCP_DEFAULT_MACHINE_TYPE}" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Ubuntu Unit Tests" - - - label: ":linux: Ubuntu System Tests" - key: "mandatory-linux-system-test" - command: "cd $BEATS_PROJECT_NAME && mage systemTest" - agents: - provider: "gcp" - image: "${IMAGE_UBUNTU_X86_64}" - machineType: "${GCP_DEFAULT_MACHINE_TYPE}" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Ubuntu System Tests" - - - label: ":rhel: RHEL9 Unit Tests" - key: "mandatory-rhel9-unit-test" - command: "cd $BEATS_PROJECT_NAME && mage build unitTest" - agents: - provider: "gcp" - image: "${IMAGE_RHEL9_X86_64}" - machineType: "${GCP_DEFAULT_MACHINE_TYPE}" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: RHEL9 Unit Tests" - - - label: ":windows: Windows 2016 Unit Tests" - command: | - Set-Location -Path $BEATS_PROJECT_NAME - mage build unitTest - key: "mandatory-win-2016-unit-tests" - agents: - provider: "gcp" - image: "${IMAGE_WIN_2016}" - machine_type: "${GCP_WIN_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Windows 2016 Unit Tests" - - - label: ":windows: Windows 2022 Unit Tests" - command: | - Set-Location -Path $BEATS_PROJECT_NAME - mage build unitTest - key: "mandatory-win-2022-unit-tests" - agents: - provider: "gcp" - image: "${IMAGE_WIN_2022}" - machine_type: "${GCP_WIN_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Windows 2022 Unit Tests" - - - label: ":windows: Windows 2022 System Tests" - skip: "see https://github.com/elastic/beats/issues/38142" - key: "mandatory-win-2022-system-tests" - command: ".buildkite/scripts/win_unit_tests.ps1 systemtest" - agents: - provider: "gcp" - image: "${IMAGE_WIN_2022}" - machineType: "${GCP_WIN_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - -## TODO: this condition will be changed in the Phase 3 of the Migration Plan https://docs.google.com/document/d/1IPNprVtcnHlem-uyGZM0zGzhfUuFAh4LeSl9JFHMSZQ/edit#heading=h.sltz78yy249h - - - group: "Extended Windows Tests" - key: "extended-win-tests" - steps: - - label: ":windows: Windows 10 Unit Tests" - command: | - Set-Location -Path $BEATS_PROJECT_NAME - mage build unitTest - key: "extended-win-10-unit-tests" - agents: - provider: "gcp" - image: "${IMAGE_WIN_10}" - machineType: "${GCP_WIN_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Windows 10 Unit Tests" - - - label: ":windows: Windows 11 Unit Tests" - command: | - Set-Location -Path $BEATS_PROJECT_NAME - mage build unitTest - key: "extended-win-11-unit-tests" - agents: - provider: "gcp" - image: "${IMAGE_WIN_11}" - machineType: "${GCP_WIN_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Windows 11 Unit Tests" - - - label: ":windows: Windows 2019 Unit Tests" - command: | - Set-Location -Path $BEATS_PROJECT_NAME - mage build unitTest - key: "extended-win-2019-unit-tests" - agents: - provider: "gcp" - image: "${IMAGE_WIN_2019}" - machineType: "${GCP_WIN_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Windows 2019 Unit Tests" - - - label: ":windows: Windows 10 System Tests" - skip: "see https://github.com/elastic/beats/issues/38142" - key: "extended-win-10-system-tests" - command: ".buildkite/scripts/win_unit_tests.ps1 systemtest" - agents: - provider: "gcp" - image: "${IMAGE_WIN_10}" - machineType: "${GCP_WIN_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - -YAML -else - echo "The conditions don't match to requirements for generating pipeline steps." - exit 0 -fi - -if are_conditions_met_arm_tests || are_conditions_met_macos_tests ; then - cat >> $pipelineName <<- YAML - - - group: "Extended Tests" - key: "extended-tests" - steps: - -YAML -fi - -if are_conditions_met_macos_tests; then - cat >> $pipelineName <<- YAML - - - label: ":mac: MacOS Unit Tests" - key: "extended-macos-unit-tests" - command: ".buildkite/scripts/unit_tests.sh" - agents: - provider: "orka" - imagePrefix: "${IMAGE_MACOS_X86_64}" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: MacOS Unit Tests" - -YAML -fi - -if are_conditions_met_arm_tests; then - cat >> $pipelineName <<- YAML - - label: ":linux: Ubuntu ARM Unit Tests" - key: "extended-arm64-unit-test" - command: "cd $BEATS_PROJECT_NAME && mage build unitTest" - agents: - provider: "aws" - imagePrefix: "${IMAGE_UBUNTU_ARM_64}" - instanceType: "${AWS_ARM_INSTANCE_TYPE}" - artifact_paths: - - "$BEATS_PROJECT_NAME/build/*.xml" - - "$BEATS_PROJECT_NAME/build/*.json" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Ubuntu ARM Unit Tests" - -YAML -fi - -echo "Check and add the Packaging into the pipeline" -if are_conditions_met_packaging; then - cat >> $pipelineName <<- YAML - - - wait: ~ - depends_on: - - step: "mandatory-tests" - allow_failure: false - - - group: "Packaging" # TODO: check conditions for future the main pipeline migration: https://github.com/elastic/beats/pull/28589 - key: "packaging" - steps: - - label: ":linux: Packaging Linux" - key: "packaging-linux" - command: "cd $BEATS_PROJECT_NAME && mage package" - agents: - provider: "gcp" - image: "${IMAGE_UBUNTU_X86_64}" - machineType: "${GCP_HI_PERF_MACHINE_TYPE}" - disk_size: 100 - disk_type: "pd-ssd" - env: - PLATFORMS: "${PACKAGING_PLATFORMS}" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Packaging Linux" - - - label: ":linux: Packaging ARM" - key: "packaging-arm" - command: "cd $BEATS_PROJECT_NAME && mage package" - agents: - provider: "aws" - imagePrefix: "${IMAGE_UBUNTU_ARM_64}" - instanceType: "${AWS_ARM_INSTANCE_TYPE}" - env: - PLATFORMS: "${PACKAGING_ARM_PLATFORMS}" - PACKAGES: "docker" - notify: - - github_commit_status: - context: "$BEATS_PROJECT_NAME: Packaging Linux ARM" - -YAML -fi - -echo "+++ Printing dynamic steps" -cat $pipelineName | yq . -P - -echo "--- Loading dynamic steps" -buildkite-agent pipeline upload $pipelineName diff --git a/.buildkite/x-pack/pipeline.xpack.packetbeat.yml b/.buildkite/x-pack/pipeline.xpack.packetbeat.yml index 5e3ce87e2bf4..77fdf2af8483 100644 --- a/.buildkite/x-pack/pipeline.xpack.packetbeat.yml +++ b/.buildkite/x-pack/pipeline.xpack.packetbeat.yml @@ -3,7 +3,6 @@ name: "beats-xpack-packetbeat" env: AWS_ARM_INSTANCE_TYPE: "t4g.xlarge" - BEATS_PROJECT_NAME: "x-pack/packetbeat" GCP_DEFAULT_MACHINE_TYPE: "c2d-highcpu-8" GCP_HI_PERF_MACHINE_TYPE: "c2d-highcpu-16" GCP_WIN_MACHINE_TYPE: "n2-standard-8" @@ -18,54 +17,288 @@ env: IMAGE_WIN_2019: "family/platform-ingest-beats-windows-2019" IMAGE_WIN_2022: "family/platform-ingest-beats-windows-2022" - #Packaging - PACKAGING_ARM_PLATFORMS: "linux/arm64" - PACKAGING_PLATFORMS: "+all linux/amd64 linux/arm64 windows/amd64 darwin/amd64 darwin/arm64" - #Deps ASDF_MAGE_VERSION: 1.15.0 steps: + - group: "x-pack/packetbeat Mandatory Tests" + key: "x-pack-packetbeat-mandatory-tests" + steps: + - label: ":linux: Ubuntu Unit Tests" + key: "mandatory-linux-unit-test" + command: | + cd x-pack/packetbeat + mage build unitTest + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + machineType: "${GCP_DEFAULT_MACHINE_TYPE}" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Ubuntu Unit Tests" + + - label: ":linux: Ubuntu System Tests" + key: "mandatory-linux-system-test" + command: | + cd x-pack/packetbeat + mage systemTest + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + machineType: "${GCP_DEFAULT_MACHINE_TYPE}" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Ubuntu System Tests" + + - label: ":rhel: RHEL9 Unit Tests" + key: "mandatory-rhel9-unit-test" + command: | + cd x-pack/packetbeat + mage build unitTest + agents: + provider: "gcp" + image: "${IMAGE_RHEL9_X86_64}" + machineType: "${GCP_DEFAULT_MACHINE_TYPE}" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: RHEL9 Unit Tests" + + - label: ":windows: Windows 2016 Unit Tests" + command: | + Set-Location -Path x-pack/packetbeat + mage build unitTest + key: "mandatory-win-2016-unit-tests" + agents: + provider: "gcp" + image: "${IMAGE_WIN_2016}" + machine_type: "${GCP_WIN_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Windows 2016 Unit Tests" + + - label: ":windows: Windows 2022 Unit Tests" + command: | + Set-Location -Path x-pack/packetbeat + mage build unitTest + key: "mandatory-win-2022-unit-tests" + agents: + provider: "gcp" + image: "${IMAGE_WIN_2022}" + machine_type: "${GCP_WIN_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Windows 2022 Unit Tests" + + - label: ":windows: Windows 2022 System Tests" + key: "mandatory-win-2022-system-tests" + skip: "skipping due to elastic/beats#38142" + command: | + Set-Location -Path x-pack/packetbeat + mage systemTest + agents: + provider: "gcp" + image: "${IMAGE_WIN_2022}" + machineType: "${GCP_WIN_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Windows 2022 System Tests" + + - group: "x-pack/packetbeat Extended Windows Tests" + key: "x-pack-packetbeat-extended-win-tests" + if: build.env("BUILDKITE_PULL_REQUEST") == "false" || build.env("GITHUB_PR_LABELS") =~ /.*[Ww]indows.*/ + steps: + - label: ":windows: Windows 10 Unit Tests" + command: | + Set-Location -Path x-pack/packetbeat + mage build unitTest + key: "extended-win-10-unit-tests" + agents: + provider: "gcp" + image: "${IMAGE_WIN_10}" + machineType: "${GCP_WIN_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Windows 10 Unit Tests" + + - label: ":windows: Windows 11 Unit Tests" + command: | + Set-Location -Path x-pack/packetbeat + mage build unitTest + key: "extended-win-11-unit-tests" + agents: + provider: "gcp" + image: "${IMAGE_WIN_11}" + machineType: "${GCP_WIN_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Windows 11 Unit Tests" + + - label: ":windows: Windows 2019 Unit Tests" + command: | + Set-Location -Path x-pack/packetbeat + mage build unitTest + key: "extended-win-2019-unit-tests" + agents: + provider: "gcp" + image: "${IMAGE_WIN_2019}" + machineType: "${GCP_WIN_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Windows 2019 Unit Tests" + + - label: ":windows: Windows 10 System Tests" + key: "extended-win-10-system-tests" + skip: "skipping due to elastic/beats#38142" + command: | + Set-Location -Path x-pack/packetbeat + mage systemTest + agents: + provider: "gcp" + image: "${IMAGE_WIN_10}" + machineType: "${GCP_WIN_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Windows 2022 System Tests" + + - group: "x-pack/packetbeat Linux arm Extended Tests" + key: "x-pack-packetbeat-extended-linux-arm-tests" + if: build.env("BUILDKITE_PULL_REQUEST") == "false" || build.env("GITHUB_PR_LABELS") =~ /.*arm.*/ + steps: + - label: ":linux: Ubuntu ARM Unit Tests" + key: "extended-arm64-unit-test" + command: | + cd x-pack/packetbeat + mage build unitTest + if: build.env("GITHUB_PR_LABELS") =~ /.*arm.*/ + agents: + provider: "aws" + imagePrefix: "${IMAGE_UBUNTU_ARM_64}" + instanceType: "${AWS_ARM_INSTANCE_TYPE}" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Ubuntu ARM Unit Tests" + + - group: "x-pack/packetbeat MacOS Extended Tests" + key: "x-pack-packetbeat-extended-macos-tests" + if: build.env("BUILDKITE_PULL_REQUEST") == "false" || build.env("GITHUB_PR_LABELS") =~ /.*macOS.*/ + steps: + - label: ":mac: MacOS Unit Tests" + key: "extended-macos-unit-tests" + command: | + set -euo pipefail + source .buildkite/scripts/install_macos_tools.sh + cd x-pack/packetbeat + mage build unitTest + agents: + provider: "orka" + imagePrefix: "${IMAGE_MACOS_X86_64}" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: MacOS Unit Tests" + + - label: ":mac: MacOS arm64 Unit Tests" + key: "macos-arm64-unit-tests-extended" + command: | + set -euo pipefail + source .buildkite/scripts/install_macos_tools.sh + cd x-pack/packetbeat + mage build unitTest + agents: + provider: "orka" + imagePrefix: "${IMAGE_MACOS_ARM}" + artifact_paths: + - "x-pack/packetbeat/build/*.xml" + - "x-pack/packetbeat/build/*.json" + notify: + - github_commit_status: + context: "x-pack/packetbeat: MacOS arm64 Unit Tests" + + - group: "x-pack/packetbeat Packaging" + key: "x-pack-packetbeat-packaging" + if: build.env("BUILDKITE_PULL_REQUEST") != "false" + depends_on: + - step: "x-pack-packetbeat-mandatory-tests" + allow_failure: false + steps: + - label: ":linux: Packaging Linux" + key: "packaging-linux" + command: | + cd x-pack/packetbeat + mage package + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + machineType: "${GCP_HI_PERF_MACHINE_TYPE}" + disk_size: 100 + disk_type: "pd-ssd" + env: + PLATFORMS: "+all linux/amd64 linux/arm64 windows/amd64 darwin/amd64 darwin/arm64" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Packaging Linux" - - input: "Input Parameters" - key: "force-run-stages" - fields: - - select: "Packetbeat - run_xpack_packetbeat" - key: "run_xpack_packetbeat" - options: - - label: "True" - value: "true" - - label: "False" - value: "false" - default: "false" - - select: "Packetbeat - run_xpack_packetbeat_macos_tests" - key: "run_xpack_packetbeat_macos_tests" - options: - - label: "True" - value: "true" - - label: "False" - value: "false" - default: "false" - - select: "Packetbeat - run_xpack_packetbeat_arm_tests" - key: "run_xpack_packetbeat_arm_tests" - options: - - label: "True" - value: "true" - - label: "False" - value: "false" - default: "false" - - if: "build.source == 'ui'" - - - wait: ~ - if: "build.source == 'ui'" - allow_dependency_failure: false - - - label: ":linux: Load dynamic x-pack packetbeat pipeline" - key: "packetbeat-pipeline" - command: ".buildkite/scripts/generate_xpack_packetbeat_pipeline.sh" - agents: - image: "docker.elastic.co/ci-agent-images/platform-ingest/buildkite-agent-beats-ci:latest" - notify: - - github_commit_status: - context: "${BEATS_PROJECT_NAME}: Load dynamic pipeline's steps" + - label: ":linux: Packaging ARM" + key: "packaging-arm" + command: | + cd x-pack/packetbeat + mage package + agents: + provider: "aws" + imagePrefix: "${IMAGE_UBUNTU_ARM_64}" + instanceType: "${AWS_ARM_INSTANCE_TYPE}" + env: + PLATFORMS: "linux/arm64" + PACKAGES: "docker" + notify: + - github_commit_status: + context: "x-pack/packetbeat: Packaging Linux ARM" diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8875b834e66a..f74e4a72782d 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -70,6 +70,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. ==== Bugfixes +- Handle the starting of namespace and node watchers for metadata enrichment according to `add_resource_metadata` configuration.{pull}38762[38762] - Fix multiple metricbeat instances reporting same metrics when using autodiscover with provider kubernetes, and ensure leader elector is always running in autodiscover mode.{pull}38471[38471] - Fix how Prometheus histograms are calculated when percentiles are provide.{pull}36537[36537] - Stop using `mage:import` in community beats. This was ignoring the vendorized beats directory for some mage targets, using the code available in GOPATH, this causes inconsistencies and compilation problems if the version of the code in the GOPATH is different to the vendored one. Use of `mage:import` will continue to be unsupported in custom beats till beats is migrated to go modules, or mage supports vendored dependencies. {issue}13998[13998] {pull}14162[14162] diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0b46c100e5dd..9a42fddc663b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] +- Fix a bug in cloudwatch task allocation that could skip some logs {issue}38918[38918] {pull}38953[38953] - Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] - entity-analytics input: Improve structured logging. {pull}38990[38990] - Fix config validation for CEL and HTTPJSON inputs when using password grant authentication and `client.id` or `client.secret` are not present. {pull}38962[38962] diff --git a/dev-tools/mage/crossbuild.go b/dev-tools/mage/crossbuild.go index 94b972ea25e0..ddeabbfdb79b 100644 --- a/dev-tools/mage/crossbuild.go +++ b/dev-tools/mage/crossbuild.go @@ -333,6 +333,19 @@ func (b GolangCrossBuilder) Build() error { "--env", fmt.Sprintf("SNAPSHOT=%v", Snapshot), "-v", repoInfo.RootDir+":"+mountPoint, "-w", workDir, + ) + + // Ensure the proper platform is passed + // This fixes an issue where during arm64 linux build for the currently used docker image + // docker.elastic.co/beats-dev/golang-crossbuild:1.21.9-arm the image for amd64 arch is pulled + // and causes problems when using native arch tools on the binaries that are built for arm64 arch. + if strings.HasPrefix(b.Platform, "linux/") { + args = append(args, + "--platform", b.Platform, + ) + } + + args = append(args, image, // Arguments for docker crossbuild entrypoint. For details see diff --git a/dev-tools/packaging/package_test.go b/dev-tools/packaging/package_test.go index fff920b429c2..308610b4760e 100644 --- a/dev-tools/packaging/package_test.go +++ b/dev-tools/packaging/package_test.go @@ -263,7 +263,7 @@ func checkConfigPermissionsWithMode(t *testing.T, p *packageFile, expectedMode o return } } - t.Errorf("no config file found matching %v", configFilePattern) + t.Logf("no config file found matching %v", configFilePattern) }) } @@ -288,7 +288,7 @@ func checkConfigOwner(t *testing.T, p *packageFile, expectRoot bool) { return } } - t.Errorf("no config file found matching %v", configFilePattern) + t.Logf("no config file found matching %v", configFilePattern) }) } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index b1482033c911..a3d2c82f6ec8 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -29,6 +29,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/fleetmode" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/paths" @@ -149,7 +150,10 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, f stat, err := os.Stat(modulesPath) if err != nil || !stat.IsDir() { log := logp.NewLogger(logName) - log.Errorf("Not loading modules. Module directory not found: %s", modulesPath) + if !fleetmode.Enabled() { + // When run under agent via agentbeat there is no modules directory and this is expected. + log.Errorf("Not loading modules. Module directory not found: %s", modulesPath) + } return &ModuleRegistry{log: log}, nil //nolint:nilerr // empty registry, no error } diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go index f816e285eb32..cdfde85c846e 100644 --- a/filebeat/input/v2/input.go +++ b/filebeat/input/v2/input.go @@ -18,6 +18,9 @@ package v2 import ( + "context" + "time" + "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -111,3 +114,19 @@ type Canceler interface { Done() <-chan struct{} Err() error } + +type cancelerCtx struct { + Canceler +} + +func GoContextFromCanceler(c Canceler) context.Context { + return cancelerCtx{c} +} + +func (c cancelerCtx) Deadline() (deadline time.Time, ok bool) { + return time.Time{}, false +} + +func (c cancelerCtx) Value(_ any) any { + return nil +} diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index d849039a66e4..569b2d21cd4d 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -59,7 +59,7 @@ type pod struct { func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) { logger := logp.NewLogger("autodiscover.pod") - var replicaSetWatcher, jobWatcher kubernetes.Watcher + var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher config := defaultConfig() err := cfg.Unpack(&config) @@ -96,22 +96,27 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Pod{}, err) } - options := kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Node: config.Node, - Namespace: config.Namespace, - } - metaConf := config.AddResourceMetadata - nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + + if metaConf.Node.Enabled() || config.Hints.Enabled() { + options := kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Node: config.Node, + Namespace: config.Namespace, + } + nodeWatcher, err = kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } } - namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 4cc2d8bb3933..3a60342444a4 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -2108,6 +2108,113 @@ func TestNodePodUpdater(t *testing.T) { } } +func TestPodEventer_Namespace_Node_Watcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + "node.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace and add_resource_metadata.node disabled and hints disabled.", + msg: "Watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + "node.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace and add_resource_metadata.node disabled and hints enabled.", + msg: "Watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + "node.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace and add_resource_metadata.node enabled and hints disabled.", + msg: "Watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata default and hints default.", + msg: "Watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + c := defaultConfig() + err = config.Unpack(&c) + assert.NoError(t, err) + + eventer, err := NewPodEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*pod).namespaceWatcher + nodeWatcher := eventer.(*pod).nodeWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.Equalf(t, nil, nodeWatcher, "Node "+test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.NotEqualf(t, nil, nodeWatcher, "Node "+test.msg) + } + }) + } +} + type mockUpdaterHandler struct { objects []interface{} } diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 5a0c6b3cc3f5..de6287f74662 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -70,17 +70,19 @@ func NewServiceEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publis var namespaceMeta metadata.MetaGen var namespaceWatcher kubernetes.Watcher - metaConf := metadata.GetDefaultResourceMetadataConfig() - namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - }, nil) - if err != nil { - return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) + metaConf := config.AddResourceMetadata + + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + }, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) + } + namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) } - namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) - p := &service{ config: config, uuid: uuid, diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 510ac6ebd0d4..90ff678e11ce 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -432,6 +432,104 @@ func TestEmitEvent_Service(t *testing.T) { } } +func TestServiceEventer_NamespaceWatcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace disabled and hints disabled.", + msg: "Namespace watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace disabled and hints enabled.", + msg: "Namespace watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace enabled and hints disabled.", + msg: "Namespace watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata default and hints default.", + msg: "Watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + + eventer, err := NewServiceEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*service).namespaceWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, test.msg) + } + }) + } +} + func NewMockServiceEventerManager(svc *service) EventManager { em := &eventerManager{} em.eventer = svc diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 954a59ab3f12..f9143cdf289b 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -27,13 +27,14 @@ import ( k8sclient "k8s.io/client-go/kubernetes" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" ) const ( @@ -144,7 +145,7 @@ func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig, func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { k.initOnce.Do(func() { - var replicaSetWatcher, jobWatcher kubernetes.Watcher + var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { @@ -203,15 +204,20 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { Namespace: config.Namespace, } - nodeWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil) - if err != nil { - k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + if metaConf.Node.Enabled() { + nodeWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil) + if err != nil { + k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } } - namespaceWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + + if metaConf.Namespace.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to diff --git a/metricbeat/docker-compose.yml b/metricbeat/docker-compose.yml index ab1ee57979f2..4eec70d9bb0b 100644 --- a/metricbeat/docker-compose.yml +++ b/metricbeat/docker-compose.yml @@ -17,11 +17,11 @@ services: # Used by base tests elasticsearch: - image: docker.elastic.co/integrations-ci/beats-elasticsearch:${ELASTICSEARCH_VERSION:-8.12.1}-1 + image: docker.elastic.co/integrations-ci/beats-elasticsearch:${ELASTICSEARCH_VERSION:-8.13.2}-1 build: context: ./module/elasticsearch/_meta args: - ELASTICSEARCH_VERSION: ${ELASTICSEARCH_VERSION:-8.12.1} + ELASTICSEARCH_VERSION: ${ELASTICSEARCH_VERSION:-8.13.2} environment: - "ES_JAVA_OPTS=-Xms256m -Xmx256m" - "transport.host=127.0.0.1" @@ -38,11 +38,11 @@ services: # Used by base tests kibana: - image: docker.elastic.co/integrations-ci/beats-kibana:${KIBANA_VERSION:-8.12.1}-1 + image: docker.elastic.co/integrations-ci/beats-kibana:${KIBANA_VERSION:-8.13.2}-1 build: context: ./module/kibana/_meta args: - KIBANA_VERSION: ${KIBANA_VERSION:-8.12.1} + KIBANA_VERSION: ${KIBANA_VERSION:-8.13.2} healthcheck: test: ["CMD-SHELL", "curl -u beats:testing -s http://localhost:5601/api/status?v8format=true | grep -q '\"overall\":{\"level\":\"available\"'"] retries: 600 @@ -53,11 +53,11 @@ services: # Used by base tests metricbeat: - image: docker.elastic.co/integrations-ci/beats-metricbeat:${BEAT_VERSION:-8.12.1}-1 + image: docker.elastic.co/integrations-ci/beats-metricbeat:${BEAT_VERSION:-8.13.2}-1 build: context: ./module/beat/_meta args: - BEAT_VERSION: ${BEAT_VERSION:-8.12.1} + BEAT_VERSION: ${BEAT_VERSION:-8.13.2} command: '-e' ports: - 5066:5066 diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index d89ca006f0e6..a0c409ca14e3 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -166,7 +166,14 @@ func getResource(resourceName string) kubernetes.Resource { func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddResourceMetadataConfig) []string { switch resourceName { case PodResource: - extra := []string{NamespaceResource, NodeResource} + extra := []string{} + if addResourceMetadata.Node.Enabled() { + extra = append(extra, NodeResource) + } + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + // We need to create watchers for ReplicaSets and Jobs that it might belong to, // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod @@ -179,23 +186,55 @@ func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddReso } return extra case ServiceResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case DeploymentResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case ReplicaSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case StatefulSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case DaemonSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case JobResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case CronJobResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case PersistentVolumeResource: return []string{} case PersistentVolumeClaimResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case StorageClassResource: return []string{} case NodeResource: diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index b4e528100a66..8235e73a2775 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -265,6 +265,12 @@ func TestCreateMetaGenSpecific(t *testing.T) { require.NoError(t, err) log := logp.NewLogger("test") + + namespaceConfig, err := conf.NewConfigFrom(map[string]interface{}{ + "enabled": true, + }) + require.NoError(t, err) + config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -272,6 +278,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { AddResourceMetadata: &metadata.AddResourceMetadataConfig{ CronJob: false, Deployment: true, + Namespace: namespaceConfig, }, } client := k8sfake.NewSimpleClientset() @@ -326,6 +333,10 @@ func TestBuildMetadataEnricher_Start_Stop(t *testing.T) { resourceWatchers.lock.Unlock() funcs := mockFuncs{} + namespaceConfig, err := conf.NewConfigFrom(map[string]interface{}{ + "enabled": true, + }) + require.NoError(t, err) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -333,6 +344,7 @@ func TestBuildMetadataEnricher_Start_Stop(t *testing.T) { AddResourceMetadata: &metadata.AddResourceMetadataConfig{ CronJob: false, Deployment: false, + Namespace: namespaceConfig, }, } diff --git a/x-pack/agentbeat/dev-tools/packaging/packages.yml b/x-pack/agentbeat/dev-tools/packaging/packages.yml index b753d88bf2a6..98c5c0f82898 100644 --- a/x-pack/agentbeat/dev-tools/packaging/packages.yml +++ b/x-pack/agentbeat/dev-tools/packaging/packages.yml @@ -24,9 +24,6 @@ shared: NOTICE.txt: source: '{{ repo.RootDir }}/NOTICE.txt' mode: 0644 - README.md: - template: '{{ elastic_beats_dir }}/dev-tools/packaging/templates/common/README.md.tmpl' - mode: 0644 .build_hash.txt: content: > {{ commit }} @@ -35,32 +32,6 @@ shared: source: '{{.BeatName}}.spec.yml' mode: 0644 - - &config_files - 'auditbeat.yml': - source: '{{ repo.RootDir }}/x-pack/auditbeat/auditbeat.yml' - mode: 0600 - config: true - 'filebeat.yml': - source: '{{ repo.RootDir }}/x-pack/filebeat/filebeat.yml' - mode: 0600 - config: true - 'heartbeat.yml': - source: '{{ repo.RootDir }}/x-pack/heartbeat/heartbeat.yml' - mode: 0600 - config: true - 'metricbeat.yml': - source: '{{ repo.RootDir }}/x-pack/metricbeat/metricbeat.yml' - mode: 0600 - config: true - 'osquerybeat.yml': - source: '{{ repo.RootDir }}/x-pack/osquerybeat/osquerybeat.yml' - mode: 0600 - config: true - 'packetbeat.yml': - source: '{{ repo.RootDir }}/x-pack/packetbeat/packetbeat.yml' - mode: 0600 - config: true - - &unix_osquery_files 'osquery-extension.ext': source: '{{ repo.RootDir }}/x-pack/osquerybeat/ext/osquery-extension/build/golang-crossbuild/osquery-extension-{{.GOOS}}-{{.Platform.Arch}}{{.BinaryExt}}' @@ -76,14 +47,12 @@ shared: <<: *common files: <<: *binary_files - <<: *config_files <<: *unix_osquery_files - &windows_binary_spec <<: *common files: <<: *binary_files - <<: *config_files <<: *windows_osquery_files # License modifiers for the Elastic License diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index ca54721bd279..d85480891a0a 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -14,61 +14,69 @@ import ( awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" ) type cloudwatchPoller struct { - numberOfWorkers int - apiSleep time.Duration + config config region string - logStreams []*string - logStreamPrefix string - startTime int64 - endTime int64 - workerSem *awscommon.Sem log *logp.Logger metrics *inputMetrics workersListingMap *sync.Map workersProcessingMap *sync.Map + + // When a worker is ready for its next task, it should + // send to workRequestChan and then read from workResponseChan. + // The worker can cancel the request based on other context + // cancellations, but if the write succeeds it _must_ read from + // workResponseChan to avoid deadlocking the main loop. + workRequestChan chan struct{} + workResponseChan chan workResponse + + workerWg sync.WaitGroup +} + +type workResponse struct { + logGroup string + startTime, endTime time.Time } func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, - awsRegion string, apiSleep time.Duration, - numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller { + awsRegion string, config config) *cloudwatchPoller { if metrics == nil { metrics = newInputMetrics("", nil) } return &cloudwatchPoller{ - numberOfWorkers: numberOfWorkers, - apiSleep: apiSleep, - region: awsRegion, - logStreams: logStreams, - logStreamPrefix: logStreamPrefix, - startTime: int64(0), - endTime: int64(0), - workerSem: awscommon.NewSem(numberOfWorkers), log: log, metrics: metrics, + region: awsRegion, + config: config, workersListingMap: new(sync.Map), workersProcessingMap: new(sync.Map), + // workRequestChan is unbuffered to guarantee that + // the worker and main loop agree whether a request + // was sent. workerResponseChan is buffered so the + // main loop doesn't have to block on the workers + // while distributing new data. + workRequestChan: make(chan struct{}), + workResponseChan: make(chan workResponse, 10), } } -func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { +func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) { err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) if err != nil { var errRequestCanceled *awssdk.RequestCanceledError if errors.As(err, &errRequestCanceled) { - p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) + p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled) } p.log.Error("getLogEventsFromCloudWatch failed: ", err) } } // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error { // construct FilterLogEventsInput filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput) @@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). - p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) - time.Sleep(p.apiSleep) + p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep) + time.Sleep(p.config.APISleep) p.log.Debug("done sleeping") p.log.Debugf("Processing #%v events", len(logEvents)) @@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client return nil } -func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ LogGroupName: awssdk.String(logGroup), - StartTime: awssdk.Int64(startTime), - EndTime: awssdk.Int64(endTime), + StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)), + EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)), } - if len(p.logStreams) > 0 { - for _, stream := range p.logStreams { + if len(p.config.LogStreams) > 0 { + for _, stream := range p.config.LogStreams { filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream) } } - if p.logStreamPrefix != "" { - filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) + if p.config.LogStreamPrefix != "" { + filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix) } return filterLogEventsInput } + +func (p *cloudwatchPoller) startWorkers( + ctx context.Context, + svc *cloudwatchlogs.Client, + logProcessor *logProcessor, +) { + for i := 0; i < p.config.NumberOfWorkers; i++ { + p.workerWg.Add(1) + go func() { + defer p.workerWg.Done() + for { + var work workResponse + select { + case <-ctx.Done(): + return + case p.workRequestChan <- struct{}{}: + work = <-p.workResponseChan + } + + p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup) + p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor) + p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup) + } + }() + } +} + +// receive implements the main run loop that distributes tasks to the worker +// goroutines. It accepts a "clock" callback (which on a live input should +// equal time.Now) to allow deterministic unit tests. +func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) { + defer p.workerWg.Wait() + // startTime and endTime are the bounds of the current scanning interval. + // If we're starting at the end of the logs, advance the start time to the + // most recent scan window + var startTime time.Time + endTime := clock().Add(-p.config.Latency) + if p.config.StartPosition == "end" { + startTime = endTime.Add(-p.config.ScanFrequency) + } + for ctx.Err() == nil { + for _, lg := range logGroupNames { + select { + case <-ctx.Done(): + return + case <-p.workRequestChan: + p.workResponseChan <- workResponse{ + logGroup: lg, + startTime: startTime, + endTime: endTime, + } + } + } + + // Delay for ScanFrequency after finishing a time span + p.log.Debugf("sleeping for %v before checking new logs", p.config.ScanFrequency) + select { + case <-time.After(p.config.ScanFrequency): + case <-ctx.Done(): + } + p.log.Debug("done sleeping") + + // Advance to the next time span + startTime, endTime = endTime, clock().Add(-p.config.Latency) + } +} diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go new file mode 100644 index 000000000000..2f8198c021dd --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go @@ -0,0 +1,207 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +type clock struct { + time time.Time +} + +func (c *clock) now() time.Time { + return c.time +} + +type receiveTestStep struct { + expected []workResponse + nextTime time.Time +} + +type receiveTestCase struct { + name string + logGroups []string + configOverrides func(*config) + startTime time.Time + steps []receiveTestStep +} + +func TestReceive(t *testing.T) { + // We use a mocked clock so scan frequency can be any positive value. + const defaultScanFrequency = time.Microsecond + t0 := time.Time{} + t1 := t0.Add(time.Hour) + t2 := t1.Add(time.Minute) + t3 := t2.Add(time.Hour) + testCases := []receiveTestCase{ + { + name: "Default config with one log group", + logGroups: []string{"a"}, + startTime: t1, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + }, + nextTime: t3, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t2, endTime: t3}, + }, + }, + }, + }, + { + name: "Default config with two log groups", + logGroups: []string{"a", "b"}, + startTime: t1, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + // start/end times for the second log group should be the same + // even though the clock has changed. + {logGroup: "b", startTime: t0, endTime: t1}, + }, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + {logGroup: "b", startTime: t1, endTime: t2}, + }, + nextTime: t3, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t2, endTime: t3}, + {logGroup: "b", startTime: t2, endTime: t3}, + }, + }, + }, + }, + { + name: "One log group with start_position: end", + logGroups: []string{"a"}, + startTime: t1, + configOverrides: func(c *config) { + c.StartPosition = "end" + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + }, + }, + }, + }, + { + name: "Two log group with start_position: end and latency", + logGroups: []string{"a", "b"}, + startTime: t1, + configOverrides: func(c *config) { + c.StartPosition = "end" + c.Latency = time.Second + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + }, + }, + }, + }, + { + name: "Three log groups with latency", + logGroups: []string{"a", "b", "c"}, + startTime: t1, + configOverrides: func(c *config) { + c.Latency = time.Second + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroup: "b", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroup: "c", startTime: t0, endTime: t1.Add(-time.Second)}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + }, + }, + }, + }, + } + clock := &clock{} + for stepIndex, test := range testCases { + ctx, cancel := context.WithCancel(context.Background()) + p := &cloudwatchPoller{ + workRequestChan: make(chan struct{}), + // Unlike the live cwPoller, we make workResponseChan unbuffered, + // so we can guarantee that clock updates happen when cwPoller has already + // decided on its output + workResponseChan: make(chan workResponse), + log: logp.NewLogger("test"), + } + + p.config = defaultConfig() + p.config.ScanFrequency = defaultScanFrequency + if test.configOverrides != nil { + test.configOverrides(&p.config) + } + clock.time = test.startTime + go p.receive(ctx, test.logGroups, clock.now) + for _, step := range test.steps { + for i, expected := range step.expected { + p.workRequestChan <- struct{}{} + if i+1 == len(step.expected) && !step.nextTime.Equal(time.Time{}) { + // On the last request of the step, we advance the clock if a + // time is set + clock.time = step.nextTime + } + response := <-p.workResponseChan + assert.Equalf(t, expected, response, "%v: step %v response %v doesn't match", test.name, stepIndex, i) + } + } + cancel() + } +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 4ee9daa05ad0..f274fb5fcc99 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -6,10 +6,8 @@ package awscloudwatch import ( "context" - "errors" "fmt" "strings" - "sync" "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -96,24 +94,10 @@ func (in *cloudwatchInput) Test(ctx v2.TestContext) error { } func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { - var err error - - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) // Create client for publishing events and receive notification of their ACKs. - client, err := pipeline.ConnectWith(beat.ClientConfig{ - EventListener: awscommon.NewEventACKHandler(), - }) + client, err := pipeline.ConnectWith(beat.ClientConfig{}) if err != nil { return fmt.Errorf("failed to create pipeline client: %w", err) } @@ -137,82 +121,12 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) log.Named("cloudwatch_poller"), in.metrics, in.awsConfig.Region, - in.config.APISleep, - in.config.NumberOfWorkers, - in.config.LogStreams, - in.config.LogStreamPrefix) + in.config) logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx) cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) - return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) -} - -func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error { - // This loop tries to keep the workers busy as much as possible while - // honoring the number in config opposed to a simpler loop that does one - // listing, sequentially processes every object and then does another listing - start := true - workerWg := new(sync.WaitGroup) - lastLogGroupOffset := 0 - for ctx.Err() == nil { - if !start { - cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) - time.Sleep(in.config.ScanFrequency) - cwPoller.log.Debug("done sleeping") - } - start = false - - currentTime := time.Now() - cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.endTime, in.config.ScanFrequency, in.config.Latency) - cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) - availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) - if err != nil { - break - } - - if availableWorkers == 0 { - continue - } - - workerWg.Add(availableWorkers) - logGroupNamesLength := len(logGroupNames) - runningGoroutines := 0 - - for i := lastLogGroupOffset; i < logGroupNamesLength; i++ { - if runningGoroutines >= availableWorkers { - break - } - - runningGoroutines++ - lastLogGroupOffset = i + 1 - if lastLogGroupOffset >= logGroupNamesLength { - // release unused workers - cwPoller.workerSem.Release(availableWorkers - runningGoroutines) - for j := 0; j < availableWorkers-runningGoroutines; j++ { - workerWg.Done() - } - lastLogGroupOffset = 0 - } - - lg := logGroupNames[i] - go func(logGroup string, startTime int64, endTime int64) { - defer func() { - cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) - workerWg.Done() - cwPoller.workerSem.Release(1) - }() - cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) - cwPoller.run(svc, logGroup, startTime, endTime, logProcessor) - }(lg, cwPoller.startTime, cwPoller.endTime) - } - } - - // Wait for all workers to finish. - workerWg.Wait() - if errors.Is(ctx.Err(), context.Canceled) { - // A canceled context is a normal shutdown. - return nil - } - return ctx.Err() + cwPoller.startWorkers(ctx, svc, logProcessor) + cwPoller.receive(ctx, logGroupNames, time.Now) + return nil } func parseARN(logGroupARN string) (string, string, error) { @@ -256,24 +170,3 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log } return logGroupNames, nil } - -func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) { - if latency != 0 { - // add latency if config is not 0 - currentTime = currentTime.Add(latency * -1) - } - - switch startPosition { - case "beginning": - if endTime != int64(0) { - return endTime, currentTime.UnixNano() / int64(time.Millisecond) - } - return 0, currentTime.UnixNano() / int64(time.Millisecond) - case "end": - if endTime != int64(0) { - return endTime, currentTime.UnixNano() / int64(time.Millisecond) - } - return currentTime.Add(-scanFrequency).UnixNano() / int64(time.Millisecond), currentTime.UnixNano() / int64(time.Millisecond) - } - return 0, 0 -} diff --git a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go index f3a45fb5c40a..3a5aa179cf0e 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go @@ -32,7 +32,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -163,12 +162,6 @@ func TestInputWithLogGroupNamePrefix(t *testing.T) { client := pubtest.NewChanClient(0) defer close(client.Channel) - go func() { - for event := range client.Channel { - // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*awscommon.EventACKTracker).ACK() - } - }() var errGroup errgroup.Group errGroup.Go(func() error { diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index c51c6a072f4f..4f9754c6a130 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -15,109 +15,6 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -func TestGetStartPosition(t *testing.T) { - currentTime := time.Date(2020, time.June, 1, 0, 0, 0, 0, time.UTC) - cases := []struct { - title string - startPosition string - prevEndTime int64 - scanFrequency time.Duration - latency time.Duration - expectedStartTime int64 - expectedEndTime int64 - }{ - { - "startPosition=beginning", - "beginning", - int64(0), - 30 * time.Second, - 0, - int64(0), - int64(1590969600000), - }, - { - "startPosition=end", - "end", - int64(0), - 30 * time.Second, - 0, - int64(1590969570000), - int64(1590969600000), - }, - { - "startPosition=typo", - "typo", - int64(0), - 30 * time.Second, - 0, - int64(0), - int64(0), - }, - { - "startPosition=beginning with prevEndTime", - "beginning", - int64(1590000000000), - 30 * time.Second, - 0, - int64(1590000000000), - int64(1590969600000), - }, - { - "startPosition=end with prevEndTime", - "end", - int64(1590000000000), - 30 * time.Second, - 0, - int64(1590000000000), - int64(1590969600000), - }, - { - "startPosition=beginning with latency", - "beginning", - int64(0), - 30 * time.Second, - 10 * time.Minute, - int64(0), - int64(1590969000000), - }, - { - "startPosition=beginning with prevEndTime and latency", - "beginning", - int64(1590000000000), - 30 * time.Second, - 10 * time.Minute, - int64(1590000000000), - int64(1590969000000), - }, - { - "startPosition=end with latency", - "end", - int64(0), - 30 * time.Second, - 10 * time.Minute, - int64(1590968970000), - int64(1590969000000), - }, - { - "startPosition=end with prevEndTime and latency", - "end", - int64(1590000000000), - 30 * time.Second, - 10 * time.Minute, - int64(1590000000000), - int64(1590969000000), - }, - } - - for _, c := range cases { - t.Run(c.title, func(t *testing.T) { - startTime, endTime := getStartPosition(c.startPosition, currentTime, c.prevEndTime, c.scanFrequency, c.latency) - assert.Equal(t, c.expectedStartTime, startTime) - assert.Equal(t, c.expectedEndTime, endTime) - }) - } -} - func TestCreateEvent(t *testing.T) { logEvent := &types.FilteredLogEvent{ EventId: awssdk.String("id-1"), diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go index 999cad4d7f0f..0ac2bc244d58 100644 --- a/x-pack/filebeat/input/awscloudwatch/processor.go +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -11,7 +11,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" "github.com/elastic/beats/v7/libbeat/beat" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -20,7 +19,6 @@ type logProcessor struct { log *logp.Logger metrics *inputMetrics publisher beat.Client - ack *awscommon.EventACKTracker } func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor { @@ -31,24 +29,17 @@ func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Cli log: log, metrics: metrics, publisher: publisher, - ack: awscommon.NewEventACKTracker(ctx), } } func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroup string, regionName string) { for _, logEvent := range logEvents { event := createEvent(logEvent, logGroup, regionName) - p.publish(p.ack, &event) + p.metrics.cloudwatchEventsCreatedTotal.Inc() + p.publisher.Publish(event) } } -func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { - ack.Add() - event.Private = ack - p.metrics.cloudwatchEventsCreatedTotal.Inc() - p.publisher.Publish(*event) -} - func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName string) beat.Event { event := beat.Event{ Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 855403e5dc46..733de949f298 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -114,17 +114,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { return fmt.Errorf("can not start persistent store: %w", err) } - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) if in.config.QueueURL != "" { regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName) diff --git a/x-pack/metricbeat/docker-compose.yml b/x-pack/metricbeat/docker-compose.yml index 42b946e42214..0bd47337aa10 100644 --- a/x-pack/metricbeat/docker-compose.yml +++ b/x-pack/metricbeat/docker-compose.yml @@ -24,11 +24,11 @@ services: kibana: # Copied configuration from OSS metricbeat because services with depends_on # cannot be extended with extends - image: docker.elastic.co/integrations-ci/beats-kibana:${KIBANA_VERSION:-8.12.1}-1 + image: docker.elastic.co/integrations-ci/beats-kibana:${KIBANA_VERSION:-8.13.2}-1 build: context: ../../metricbeat/module/kibana/_meta args: - KIBANA_VERSION: ${KIBANA_VERSION:-8.12.1} + KIBANA_VERSION: ${KIBANA_VERSION:-8.13.2} depends_on: - elasticsearch ports: diff --git a/x-pack/osquerybeat/magefile.go b/x-pack/osquerybeat/magefile.go index ae5c4c9874be..d8a9dee310bb 100644 --- a/x-pack/osquerybeat/magefile.go +++ b/x-pack/osquerybeat/magefile.go @@ -7,16 +7,23 @@ package main import ( + "context" "fmt" "os" "path/filepath" "runtime" + "strings" "time" "github.com/magefile/mage/mg" devtools "github.com/elastic/beats/v7/dev-tools/mage" "github.com/elastic/beats/v7/dev-tools/mage/target/build" + + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/command" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/distro" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/fileutil" + osquerybeat "github.com/elastic/beats/v7/x-pack/osquerybeat/scripts/mage" // mage:import @@ -87,9 +94,95 @@ func Clean() error { return devtools.Clean(paths) } +func execCommand(ctx context.Context, name string, args ...string) error { + ps := strings.Join(append([]string{name}, args...), " ") + fmt.Println(ps) + output, err := command.Execute(ctx, name, args...) + if err != nil { + fmt.Println(ps, ", failed: ", err) + return err + } + fmt.Print(output) + return err +} + +// stripLinuxOsqueryd Strips osqueryd binary, that is not stripped in linux tar.gz distro +func stripLinuxOsqueryd() error { + if os.Getenv("GOOS") != "linux" { + return nil + } + + // Check that this step is called during x-pack/osquerybeat/ext/osquery-extension build + cwd, err := os.Getwd() + if err != nil { + return err + } + + // Strip osqueryd only once when osquery-extension is built + // There are two build paths at the moment both through GolangCrossBuild + // 1. Standlone osquerybeat package (this function is called twice: for osquerybeat and osquery-extension) + // 2. Agentbeat package, this function is only called once for osquery-extension + if !strings.HasSuffix(cwd, "/osquery-extension") { + return nil + } + + ctx := context.Background() + + osArchs := osquerybeat.OSArchs(devtools.Platforms) + + strip := func(oquerydPath string) error { + ok, err := fileutil.FileExists(oquerydPath) + if err != nil { + return err + } + if ok { + if err := execCommand(ctx, "strip", oquerydPath); err != nil { + return err + } + } + return nil + } + + for _, osarch := range osArchs { + // Skip everything but matching linux arch + if osarch.OS != os.Getenv("GOOS") || osarch.Arch != os.Getenv("GOARCH") { + continue + } + + // Strip osqueryd + // There are two scenarios where the build path is created depending on the type of build + // 1. Standlone osquerybeat build: the osqueryd binaries are downloaded into osquerybeat/build/data/install/[GOOS]/[GOARCH] + // 2. Agentbeat build: the osqueryd binaries are downloaded agentbeat/build/data/install/[GOOS]/[GOARCH] + + // This returns something like build/data/install/linux/amd64/osqueryd + querydRelativePath := distro.OsquerydPath(distro.GetDataInstallDir(osarch)) + + // Checking and stripping osqueryd binary and both paths osquerybeat/build and agentbeat/build + // because at the moment it's unclear if this step was initiated from osquerybeat or agentbeat build + osquerybeatPath := filepath.Clean(filepath.Join(cwd, "../..", querydRelativePath)) + err = strip(osquerybeatPath) + if err != nil { + return err + } + + agentbeatPath := filepath.Clean(filepath.Join(cwd, "../../../agentbeat", querydRelativePath)) + err = strip(agentbeatPath) + if err != nil { + return err + } + } + + return nil +} + // GolangCrossBuild build the Beat binary inside of the golang-builder. // Do not use directly, use crossBuild instead. func GolangCrossBuild() error { + // Strip linux osqueryd binary + if err := stripLinuxOsqueryd(); err != nil { + return err + } + return devtools.GolangCrossBuild(devtools.DefaultGolangCrossBuildArgs()) }