Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Composable Templates to support Elasticsearch 8.0 #980

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 10.7.3
- Added composable index template support for elasticsearch version 8 [#980](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/980)

## 10.7.2
- [DOC] Fixed links to restructured Logstash-to-cloud docs [#975](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/975)

Expand Down
8 changes: 6 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,19 @@ def exists?(path, use_get=false)
end

def template_exists?(name)
exists?("/_template/#{name}")
exists?("/#{template_endpoint}/#{name}")
end

def template_put(name, template)
path = "_template/#{name}"
path = "#{template_endpoint}/#{name}"
logger.info("Installing elasticsearch template to #{path}")
@pool.put(path, nil, LogStash::Json.dump(template))
end

def template_endpoint
maximum_seen_major_version < 8 ? '_template' : '_index_template'
end

# ILM methods

# check whether rollover alias already exists
Expand Down
11 changes: 8 additions & 3 deletions lib/logstash/outputs/elasticsearch/template_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ def self.install(client, template_name, template, template_overwrite)
def self.add_ilm_settings_to_template(plugin, template)
# Overwrite any index patterns, and use the rollover alias. Use 'index_patterns' rather than 'template' for pattern
# definition - remove any existing definition of 'template'
template.delete('template') if template.include?('template')
template.delete('template') if template.include?('template') if plugin.maximum_seen_major_version < 8
template['index_patterns'] = "#{plugin.ilm_rollover_alias}-*"
if template['settings'] && (template['settings']['index.lifecycle.name'] || template['settings']['index.lifecycle.rollover_alias'])
settings = template_settings(plugin, template)
if settings && (settings['index.lifecycle.name'] || settings['index.lifecycle.rollover_alias'])
plugin.logger.info("Overwriting index lifecycle name and rollover alias as ILM is enabled.")
end
template['settings'].update({ 'index.lifecycle.name' => plugin.ilm_policy, 'index.lifecycle.rollover_alias' => plugin.ilm_rollover_alias})
settings.update({ 'index.lifecycle.name' => plugin.ilm_policy, 'index.lifecycle.rollover_alias' => plugin.ilm_rollover_alias})
end

def self.template_settings(plugin, template)
plugin.maximum_seen_major_version < 8 ? template['settings']: template['template']['settings']
end

# Template name - if template_name set, use it
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,50 @@
{
"index_patterns" : "logstash-*",
"version" : 80001,
"settings" : {
"index.refresh_interval" : "5s",
"number_of_shards": 1
},
"mappings" : {
"dynamic_templates" : [ {
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
"template" : {
"settings" : {
"index.refresh_interval" : "5s",
"number_of_shards": 1
},
"mappings" : {
"dynamic_templates" : [ {
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
}
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false,
"fields" : {
"keyword" : { "type": "keyword", "ignore_above": 256 }
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false,
"fields" : {
"keyword" : { "type": "keyword", "ignore_above": 256 }
}
}
}
}
} ],
"properties" : {
"@timestamp": { "type": "date"},
"@version": { "type": "keyword"},
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "half_float" },
"longitude" : { "type" : "half_float" }
} ],
"properties" : {
"@timestamp": { "type": "date" },
"@version": { "type": "keyword" },
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "half_float" },
"longitude" : { "type" : "half_float" }
}
}
}
}
},
"priority": 200,
"_meta" : {
"description": "index template for logstash-output-elasticsearch"
}
}
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '10.7.2'
s.version = '10.7.3'

s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
Expand Down
44 changes: 32 additions & 12 deletions spec/es_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,10 @@ def todays_date
Time.now.strftime("%Y.%m.%d")
end


def default_mapping_from_mappings(mappings)
if ESHelper.es_version_satisfies?(">=7")
mappings
else
mappings["_default_"]
end
end

def field_properties_from_template(template_name, field)
mappings = @es.indices.get_template(name: template_name)[template_name]["mappings"]
mapping = default_mapping_from_mappings(mappings)
mapping["properties"][field]["properties"]
template = get_template(@es, template_name)
mappings = get_template_mappings(template)
mappings["properties"][field]["properties"]
end

def routing_field_name
Expand Down Expand Up @@ -105,6 +96,7 @@ def self.es_version_satisfies?(*requirement)

def clean(client)
client.indices.delete_template(:name => "*")
client.indices.delete_index_template(:name => "logstash*") rescue nil
# This can fail if there are no indexes, ignore failure.
client.indices.delete(:index => "*") rescue nil
clean_ilm(client) if supports_ilm?(client)
Expand Down Expand Up @@ -182,6 +174,34 @@ def max_age_policy(max_age)
}
}
end

def get_template(client, name)
if ESHelper.es_version_satisfies?(">=8")
t = client.indices.get_index_template(name: name)
t['index_templates'][0]['index_template']
else
t = client.indices.get_template(name: name)
t[name]
end
end

def get_template_settings(template)
if ESHelper.es_version_satisfies?(">=8")
template['template']['settings']
else
template['settings']
end
end

def get_template_mappings(template)
if ESHelper.es_version_satisfies?(">=8")
template['template']['mappings']
elsif ESHelper.es_version_satisfies?(">=7")
template['mappings']
else
template['mappings']["_default_"]
end
end
end

RSpec.configure do |config|
Expand Down
50 changes: 50 additions & 0 deletions spec/fixtures/template-with-policy-es8x.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"index_patterns" : "overwrite-*",
"version" : 80001,
"template" : {
"settings" : {
"index.refresh_interval" : "1s",
"number_of_shards": 1
},
"mappings" : {
"dynamic_templates" : [ {
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false,
"fields" : {
"keyword" : { "type": "keyword", "ignore_above": 256 }
}
}
}
} ],
"properties" : {
"@timestamp": { "type": "date" },
"@version": { "type": "keyword" },
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "half_float" },
"longitude" : { "type" : "half_float" }
}
}
}
}
},
"priority": 200,
"_meta" : {
"description": "index template for logstash-output-elasticsearch"
}
}
54 changes: 34 additions & 20 deletions spec/integration/outputs/ilm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
let (:settings) { super.merge("ilm_policy" => ilm_policy_name)}

it 'should rollover when the policy max docs is reached' do
put_policy(@es,ilm_policy_name, policy)
put_policy(@es, ilm_policy_name, policy)
subject.register

subject.multi_receive([
Expand Down Expand Up @@ -108,9 +108,11 @@
it 'should not write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: "logstash")["logstash"]).to have_index_pattern("logstash-*")

template = get_template(@es, "logstash")
expect(template).to have_index_pattern("logstash-*")
if ESHelper.es_version_satisfies?(">= 2")
expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']).to be_nil
expect(get_template_settings(template)['index']['lifecycle']).to be_nil
end
end

Expand Down Expand Up @@ -152,16 +154,17 @@
end

context 'with a custom template name' do
let (:template_name) { "custom_template_name" }
let (:template_name) { "logstash_custom_template_name" }
let (:settings) { super.merge('template_name' => template_name)}

it 'should not write the ILM settings into the template' do
subject.register
sleep(1)

expect(@es.indices.get_template(name: template_name)[template_name]).to have_index_pattern("logstash-*")
template = get_template(@es, template_name)
expect(template).to have_index_pattern("logstash-*")
if ESHelper.es_version_satisfies?(">= 2")
expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']).to be_nil
expect(get_template_settings(template)['index']['lifecycle']).to be_nil
end
end
end
Expand Down Expand Up @@ -387,16 +390,20 @@
it 'should write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: "logstash")["logstash"]).to have_index_pattern("logstash-*")
expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']['name']).to eq("logstash-policy")
expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']['rollover_alias']).to eq("logstash")

template = get_template(@es, "logstash")
expect(template).to have_index_pattern("logstash-*")
expect(get_template_settings(template)['index']['lifecycle']['name']).to eq("logstash-policy")
expect(get_template_settings(template)['index']['lifecycle']['rollover_alias']).to eq("logstash")
end

it_behaves_like 'an ILM enabled Logstash'
end

context 'with a set index and a custom index pattern' do
if ESHelper.es_version_satisfies?(">= 7.0")
if ESHelper.es_version_satisfies?(">= 8.0")
let (:template) { "spec/fixtures/template-with-policy-es8x.json" }
elsif ESHelper.es_version_satisfies?(">= 7.0")
let (:template) { "spec/fixtures/template-with-policy-es7x.json" }
else
let (:template) { "spec/fixtures/template-with-policy-es6x.json" }
Expand All @@ -408,21 +415,25 @@
it 'should not overwrite the index patterns' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: "logstash")["logstash"]).to have_index_pattern("overwrite-*")

template = get_template(@es, "logstash")
expect(template).to have_index_pattern("overwrite-*")
end
end


context 'with a custom template' do
let (:ilm_rollover_alias) { "the_cat_in_the_hat" }
let (:ilm_rollover_alias) { "logstash_the_cat_in_the_hat" }
let (:index) { ilm_rollover_alias }
let(:expected_index) { index }
let (:settings) { super.merge("ilm_policy" => ilm_policy_name,
"template" => template,
"ilm_rollover_alias" => ilm_rollover_alias)}


if ESHelper.es_version_satisfies?(">= 7.0")
if ESHelper.es_version_satisfies?(">= 8.0")
let (:template) { "spec/fixtures/template-with-policy-es8x.json" }
elsif ESHelper.es_version_satisfies?(">= 7.0")
let (:template) { "spec/fixtures/template-with-policy-es7x.json" }
else
let (:template) { "spec/fixtures/template-with-policy-es6x.json" }
Expand Down Expand Up @@ -460,23 +471,26 @@
it 'should write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]["settings"]['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]["settings"]['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)

template = get_template(@es, ilm_rollover_alias)
expect(template).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(get_template_settings(template)['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(get_template_settings(template)['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)
end

context 'with a different template_name' do
let (:template_name) { "custom_template_name" }
let (:template_name) { "logstash_custom_template_name" }
let (:settings) { super.merge('template_name' => template_name)}

it_behaves_like 'an ILM enabled Logstash'

it 'should write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: template_name)[template_name]).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)
template = get_template(@es, template_name)
expect(template).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(get_template_settings(template)['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(get_template_settings(template)['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)
end
end

Expand Down
Loading