diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index 101a3d590a11a..c4da722838543 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -145,7 +145,10 @@ impl ElasticsearchCommon { ) .await { - Ok(version) => version, + Ok(version) => { + debug!(message = "Auto-detected Elasticsearch API version.", %version); + version + } // This error should be fatal, but for now we only emit it as a warning // to make the transition smoother. Err(error) => { @@ -156,11 +159,11 @@ impl ElasticsearchCommon { // This is by no means a perfect assumption but it's the best we can // make with the data we have. let assumed_version = if config.suppress_type_name { 6 } else { 8 }; - debug!(message = "Assumed ElasticsearchApi based on config setting suppress_type_name.", + debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.", %assumed_version, %config.suppress_type_name ); - warn!(message = "Failed to determine Elasticsearch version from `/_cluster/state/version`. Please fix the reported error or set an API version explicitly via `api_version`.", + warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.", %assumed_version, %error ); @@ -277,28 +280,34 @@ async fn get_version( proxy_config: &ProxyConfig, ) -> crate::Result { #[derive(Deserialize)] - struct ClusterState { - version: Option, + struct Version { + number: Option, + } + #[derive(Deserialize)] + struct ResponsePayload { + version: Option, } let client = HttpClient::new(tls_settings.clone(), proxy_config)?; - let response = get( - base_url, - http_auth, - aws_auth, - region, - request, - client, - "/_cluster/state/version", - ) - .await - .map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?; + let response = get(base_url, http_auth, aws_auth, region, request, client, "/") + .await + .map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?; let (_, body) = response.into_parts(); let mut body = body::aggregate(body).await?; let body = body.copy_to_bytes(body.remaining()); - let ClusterState { version } = serde_json::from_slice(&body)?; - version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into()) + let ResponsePayload { version } = serde_json::from_slice(&body)?; + if let Some(version) = version { + if let Some(number) = version.number { + let v: Vec<&str> = number.split('.').collect(); + if !v.is_empty() { + if let Ok(major_version) = v[0].parse::() { + return Ok(major_version); + } + } + } + } + Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into()) } async fn get(