Skip to content

Commit

Permalink
Implement Composable Templates to support Elasticsearch 8.0 (#980)
Browse files Browse the repository at this point in the history
Elasticsearch's /_template API is being deprecated in 7.8
Use the new endpoint /_index_template to manage template if elasticsearch version >= 8
Add a new template 8x with priority set for default usage
Fixed: #944
  • Loading branch information
kaisecheng authored Nov 24, 2020
1 parent 5470c13 commit 853871b
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 76 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 10.7.3
- Added composable index template support for elasticsearch version 8 [#980](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/980)

## 10.7.2
- [DOC] Fixed links to restructured Logstash-to-cloud docs [#975](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/975)

Expand Down
8 changes: 6 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,19 @@ def exists?(path, use_get=false)
end

def template_exists?(name)
exists?("/_template/#{name}")
exists?("/#{template_endpoint}/#{name}")
end

def template_put(name, template)
path = "_template/#{name}"
path = "#{template_endpoint}/#{name}"
logger.info("Installing elasticsearch template to #{path}")
@pool.put(path, nil, LogStash::Json.dump(template))
end

def template_endpoint
maximum_seen_major_version < 8 ? '_template' : '_index_template'
end

# ILM methods

# check whether rollover alias already exists
Expand Down
11 changes: 8 additions & 3 deletions lib/logstash/outputs/elasticsearch/template_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ def self.install(client, template_name, template, template_overwrite)
def self.add_ilm_settings_to_template(plugin, template)
# Overwrite any index patterns, and use the rollover alias. Use 'index_patterns' rather than 'template' for pattern
# definition - remove any existing definition of 'template'
template.delete('template') if template.include?('template')
template.delete('template') if template.include?('template') if plugin.maximum_seen_major_version < 8
template['index_patterns'] = "#{plugin.ilm_rollover_alias}-*"
if template['settings'] && (template['settings']['index.lifecycle.name'] || template['settings']['index.lifecycle.rollover_alias'])
settings = template_settings(plugin, template)
if settings && (settings['index.lifecycle.name'] || settings['index.lifecycle.rollover_alias'])
plugin.logger.info("Overwriting index lifecycle name and rollover alias as ILM is enabled.")
end
template['settings'].update({ 'index.lifecycle.name' => plugin.ilm_policy, 'index.lifecycle.rollover_alias' => plugin.ilm_rollover_alias})
settings.update({ 'index.lifecycle.name' => plugin.ilm_policy, 'index.lifecycle.rollover_alias' => plugin.ilm_rollover_alias})
end

def self.template_settings(plugin, template)
plugin.maximum_seen_major_version < 8 ? template['settings']: template['template']['settings']
end

# Template name - if template_name set, use it
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,50 @@
{
"index_patterns" : "logstash-*",
"version" : 80001,
"settings" : {
"index.refresh_interval" : "5s",
"number_of_shards": 1
},
"mappings" : {
"dynamic_templates" : [ {
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
"template" : {
"settings" : {
"index.refresh_interval" : "5s",
"number_of_shards": 1
},
"mappings" : {
"dynamic_templates" : [ {
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
}
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false,
"fields" : {
"keyword" : { "type": "keyword", "ignore_above": 256 }
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false,
"fields" : {
"keyword" : { "type": "keyword", "ignore_above": 256 }
}
}
}
}
} ],
"properties" : {
"@timestamp": { "type": "date"},
"@version": { "type": "keyword"},
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "half_float" },
"longitude" : { "type" : "half_float" }
} ],
"properties" : {
"@timestamp": { "type": "date" },
"@version": { "type": "keyword" },
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "half_float" },
"longitude" : { "type" : "half_float" }
}
}
}
}
},
"priority": 200,
"_meta" : {
"description": "index template for logstash-output-elasticsearch"
}
}
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '10.7.2'
s.version = '10.7.3'

s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
Expand Down
44 changes: 32 additions & 12 deletions spec/es_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,10 @@ def todays_date
Time.now.strftime("%Y.%m.%d")
end


def default_mapping_from_mappings(mappings)
if ESHelper.es_version_satisfies?(">=7")
mappings
else
mappings["_default_"]
end
end

def field_properties_from_template(template_name, field)
mappings = @es.indices.get_template(name: template_name)[template_name]["mappings"]
mapping = default_mapping_from_mappings(mappings)
mapping["properties"][field]["properties"]
template = get_template(@es, template_name)
mappings = get_template_mappings(template)
mappings["properties"][field]["properties"]
end

def routing_field_name
Expand Down Expand Up @@ -105,6 +96,7 @@ def self.es_version_satisfies?(*requirement)

def clean(client)
client.indices.delete_template(:name => "*")
client.indices.delete_index_template(:name => "logstash*") rescue nil
# This can fail if there are no indexes, ignore failure.
client.indices.delete(:index => "*") rescue nil
clean_ilm(client) if supports_ilm?(client)
Expand Down Expand Up @@ -182,6 +174,34 @@ def max_age_policy(max_age)
}
}
end

def get_template(client, name)
if ESHelper.es_version_satisfies?(">=8")
t = client.indices.get_index_template(name: name)
t['index_templates'][0]['index_template']
else
t = client.indices.get_template(name: name)
t[name]
end
end

def get_template_settings(template)
if ESHelper.es_version_satisfies?(">=8")
template['template']['settings']
else
template['settings']
end
end

def get_template_mappings(template)
if ESHelper.es_version_satisfies?(">=8")
template['template']['mappings']
elsif ESHelper.es_version_satisfies?(">=7")
template['mappings']
else
template['mappings']["_default_"]
end
end
end

RSpec.configure do |config|
Expand Down
50 changes: 50 additions & 0 deletions spec/fixtures/template-with-policy-es8x.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"index_patterns" : "overwrite-*",
"version" : 80001,
"template" : {
"settings" : {
"index.refresh_interval" : "1s",
"number_of_shards": 1
},
"mappings" : {
"dynamic_templates" : [ {
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false,
"fields" : {
"keyword" : { "type": "keyword", "ignore_above": 256 }
}
}
}
} ],
"properties" : {
"@timestamp": { "type": "date" },
"@version": { "type": "keyword" },
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "half_float" },
"longitude" : { "type" : "half_float" }
}
}
}
}
},
"priority": 200,
"_meta" : {
"description": "index template for logstash-output-elasticsearch"
}
}
54 changes: 34 additions & 20 deletions spec/integration/outputs/ilm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
let (:settings) { super.merge("ilm_policy" => ilm_policy_name)}

it 'should rollover when the policy max docs is reached' do
put_policy(@es,ilm_policy_name, policy)
put_policy(@es, ilm_policy_name, policy)
subject.register

subject.multi_receive([
Expand Down Expand Up @@ -108,9 +108,11 @@
it 'should not write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: "logstash")["logstash"]).to have_index_pattern("logstash-*")

template = get_template(@es, "logstash")
expect(template).to have_index_pattern("logstash-*")
if ESHelper.es_version_satisfies?(">= 2")
expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']).to be_nil
expect(get_template_settings(template)['index']['lifecycle']).to be_nil
end
end

Expand Down Expand Up @@ -152,16 +154,17 @@
end

context 'with a custom template name' do
let (:template_name) { "custom_template_name" }
let (:template_name) { "logstash_custom_template_name" }
let (:settings) { super.merge('template_name' => template_name)}

it 'should not write the ILM settings into the template' do
subject.register
sleep(1)

expect(@es.indices.get_template(name: template_name)[template_name]).to have_index_pattern("logstash-*")
template = get_template(@es, template_name)
expect(template).to have_index_pattern("logstash-*")
if ESHelper.es_version_satisfies?(">= 2")
expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']).to be_nil
expect(get_template_settings(template)['index']['lifecycle']).to be_nil
end
end
end
Expand Down Expand Up @@ -387,16 +390,20 @@
it 'should write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: "logstash")["logstash"]).to have_index_pattern("logstash-*")
expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']['name']).to eq("logstash-policy")
expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']['rollover_alias']).to eq("logstash")

template = get_template(@es, "logstash")
expect(template).to have_index_pattern("logstash-*")
expect(get_template_settings(template)['index']['lifecycle']['name']).to eq("logstash-policy")
expect(get_template_settings(template)['index']['lifecycle']['rollover_alias']).to eq("logstash")
end

it_behaves_like 'an ILM enabled Logstash'
end

context 'with a set index and a custom index pattern' do
if ESHelper.es_version_satisfies?(">= 7.0")
if ESHelper.es_version_satisfies?(">= 8.0")
let (:template) { "spec/fixtures/template-with-policy-es8x.json" }
elsif ESHelper.es_version_satisfies?(">= 7.0")
let (:template) { "spec/fixtures/template-with-policy-es7x.json" }
else
let (:template) { "spec/fixtures/template-with-policy-es6x.json" }
Expand All @@ -408,21 +415,25 @@
it 'should not overwrite the index patterns' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: "logstash")["logstash"]).to have_index_pattern("overwrite-*")

template = get_template(@es, "logstash")
expect(template).to have_index_pattern("overwrite-*")
end
end


context 'with a custom template' do
let (:ilm_rollover_alias) { "the_cat_in_the_hat" }
let (:ilm_rollover_alias) { "logstash_the_cat_in_the_hat" }
let (:index) { ilm_rollover_alias }
let(:expected_index) { index }
let (:settings) { super.merge("ilm_policy" => ilm_policy_name,
"template" => template,
"ilm_rollover_alias" => ilm_rollover_alias)}


if ESHelper.es_version_satisfies?(">= 7.0")
if ESHelper.es_version_satisfies?(">= 8.0")
let (:template) { "spec/fixtures/template-with-policy-es8x.json" }
elsif ESHelper.es_version_satisfies?(">= 7.0")
let (:template) { "spec/fixtures/template-with-policy-es7x.json" }
else
let (:template) { "spec/fixtures/template-with-policy-es6x.json" }
Expand Down Expand Up @@ -460,23 +471,26 @@
it 'should write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]["settings"]['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]["settings"]['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)

template = get_template(@es, ilm_rollover_alias)
expect(template).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(get_template_settings(template)['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(get_template_settings(template)['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)
end

context 'with a different template_name' do
let (:template_name) { "custom_template_name" }
let (:template_name) { "logstash_custom_template_name" }
let (:settings) { super.merge('template_name' => template_name)}

it_behaves_like 'an ILM enabled Logstash'

it 'should write the ILM settings into the template' do
subject.register
sleep(1)
expect(@es.indices.get_template(name: template_name)[template_name]).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)
template = get_template(@es, template_name)
expect(template).to have_index_pattern("#{ilm_rollover_alias}-*")
expect(get_template_settings(template)['index']['lifecycle']['name']).to eq(ilm_policy_name)
expect(get_template_settings(template)['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias)
end
end

Expand Down
Loading

0 comments on commit 853871b

Please sign in to comment.