Skip to content

Commit

Permalink
fix(elasticsearch sink): Elasticsearch sink with api_version set to "…
Browse files Browse the repository at this point in the history
…auto"

  does not recognize the API version of ES6 as V6 (#17226)
  • Loading branch information
syedriko committed May 1, 2023
1 parent 752d424 commit 13be8fd
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ impl ElasticsearchCommon {
)
.await
{
Ok(version) => version,
Ok(version) => {
debug!(message = "Auto-detected Elasticsearch API version.", %version);
version
}
// This error should be fatal, but for now we only emit it as a warning
// to make the transition smoother.
Err(error) => {
Expand All @@ -156,11 +159,11 @@ impl ElasticsearchCommon {
// This is by no means a perfect assumption but it's the best we can
// make with the data we have.
let assumed_version = if config.suppress_type_name { 6 } else { 8 };
debug!(message = "Assumed ElasticsearchApi based on config setting suppress_type_name.",
debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
%assumed_version,
%config.suppress_type_name
);
warn!(message = "Failed to determine Elasticsearch version from `/_cluster/state/version`. Please fix the reported error or set an API version explicitly via `api_version`.",
warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
%assumed_version,
%error
);
Expand Down Expand Up @@ -277,28 +280,34 @@ async fn get_version(
proxy_config: &ProxyConfig,
) -> crate::Result<usize> {
#[derive(Deserialize)]
struct ClusterState {
version: Option<usize>,
struct Version {
number: Option<String>,
}
#[derive(Deserialize)]
struct ResponsePayload {
version: Option<Version>,
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(
base_url,
http_auth,
aws_auth,
region,
request,
client,
"/_cluster/state/version",
)
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

let (_, body) = response.into_parts();
let mut body = body::aggregate(body).await?;
let body = body.copy_to_bytes(body.remaining());
let ClusterState { version } = serde_json::from_slice(&body)?;
version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into())
let ResponsePayload { version } = serde_json::from_slice(&body)?;
if let Some(version) = version {
if let Some(number) = version.number {
let v: Vec<&str> = number.split('.').collect();
if !v.is_empty() {
if let Ok(major_version) = v[0].parse::<usize>() {
return Ok(major_version);
}
}
}
}
Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
}

async fn get(
Expand Down

0 comments on commit 13be8fd

Please sign in to comment.