Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elasticsearch sink): Elasticsearch sink with api_version set to "auto" does not recognize the API version of ES6 as V6 (#17226) #17227

Merged
merged 1 commit into from
May 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ impl ElasticsearchCommon {
)
.await
{
Ok(version) => version,
Ok(version) => {
debug!(message = "Auto-detected Elasticsearch API version.", %version);
version
}
// This error should be fatal, but for now we only emit it as a warning
// to make the transition smoother.
Err(error) => {
Expand All @@ -156,11 +159,11 @@ impl ElasticsearchCommon {
// This is by no means a perfect assumption but it's the best we can
// make with the data we have.
let assumed_version = if config.suppress_type_name { 6 } else { 8 };
debug!(message = "Assumed ElasticsearchApi based on config setting suppress_type_name.",
debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
%assumed_version,
%config.suppress_type_name
);
warn!(message = "Failed to determine Elasticsearch version from `/_cluster/state/version`. Please fix the reported error or set an API version explicitly via `api_version`.",
warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
%assumed_version,
%error
);
Expand Down Expand Up @@ -277,28 +280,34 @@ async fn get_version(
proxy_config: &ProxyConfig,
) -> crate::Result<usize> {
#[derive(Deserialize)]
struct ClusterState {
version: Option<usize>,
struct Version {
number: Option<String>,
}
#[derive(Deserialize)]
struct ResponsePayload {
version: Option<Version>,
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(
base_url,
http_auth,
aws_auth,
region,
request,
client,
"/_cluster/state/version",
)
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

let (_, body) = response.into_parts();
let mut body = body::aggregate(body).await?;
let body = body.copy_to_bytes(body.remaining());
let ClusterState { version } = serde_json::from_slice(&body)?;
version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into())
let ResponsePayload { version } = serde_json::from_slice(&body)?;
if let Some(version) = version {
if let Some(number) = version.number {
let v: Vec<&str> = number.split('.').collect();
if !v.is_empty() {
if let Ok(major_version) = v[0].parse::<usize>() {
return Ok(major_version);
}
}
}
}
Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
}

async fn get(
Expand Down