diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index 7724ae9743d2b..274f4b8c05001 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -11,9 +11,17 @@ import com.maxmind.geoip2.DatabaseReader; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.env.Environment; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -41,11 +49,20 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import java.util.zip.GZIPInputStream; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; import static java.nio.file.StandardOpenOption.WRITE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1) public class GeoIpDownloaderIT extends AbstractGeoIpIT { @@ -128,6 +145,130 @@ public void testGeoIpDatabasesDownload() throws Exception { } } + public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { + // setup: + BytesReference bytes; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.startArray("processors"); + { + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-country"); + builder.field("database_file", "GeoLite2-Country.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-asn"); + builder.field("database_file", "GeoLite2-ASN.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get()); + + // verify before updating dbs + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + builder.startArray("docs"); + { + builder.startObject(); + builder.field("_index", "my-index"); + { + builder.startObject("_source"); + builder.field("ip", "89.160.20.128"); + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON); + simulateRequest.setId("_id"); + { + SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet(); + assertThat(simulateResponse.getPipelineId(), equalTo("_id")); + assertThat(simulateResponse.getResults().size(), equalTo(1)); + SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0); + assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba")); + assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB")); + assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden")); + } + + // Enable downloader: + Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + + final List geoipTmpDirs = StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false) + .map(env -> { + Path geoipTmpDir = env.tmpFile().resolve("geoip-databases"); + assertThat(Files.exists(geoipTmpDir), is(true)); + return geoipTmpDir; + }).collect(Collectors.toList()); + assertBusy(() -> { + for (Path geoipTmpDir : geoipTmpDirs) { + try (Stream list = Files.list(geoipTmpDir)) { + List files = list.map(Path::toString).collect(Collectors.toList()); + assertThat(files, containsInAnyOrder(endsWith("GeoLite2-City.mmdb"), endsWith("GeoLite2-Country.mmdb"), + endsWith("GeoLite2-ASN.mmdb"))); + } + } + }); + + // Verify after updating dbs: + assertBusy(() -> { + SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet(); + assertThat(simulateResponse.getPipelineId(), equalTo("_id")); + assertThat(simulateResponse.getResults().size(), equalTo(1)); + SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0); + assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping")); + assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB")); + assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden")); + }); + + // Disable downloader: + settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + + assertBusy(() -> { + for (Path geoipTmpDir : geoipTmpDirs) { + try (Stream list = Files.list(geoipTmpDir)) { + List files = list.map(Path::toString).collect(Collectors.toList()); + assertThat(files, empty()); + } + } + }); + } + @SuppressForbidden(reason = "Maxmind API requires java.io.File") private void parseDatabase(Path tempFile) throws IOException { try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) { diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java index bf5463866af65..2d54ad24aa787 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java @@ -8,15 +8,13 @@ package org.elasticsearch.ingest.geoip; +import org.elasticsearch.client.Client; import org.elasticsearch.common.network.InetAddresses; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import java.io.IOException; @@ -26,6 +24,7 @@ import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +37,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { @@ -49,122 +49,122 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { * geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance */ public void test() throws Exception { - ThreadPool threadPool = new TestThreadPool("test"); - Settings settings = Settings.builder().put("resource.reload.interval.high", TimeValue.timeValueMillis(100)).build(); - ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); - try { - final Path geoIpDir = createTempDir(); - copyDatabaseFiles(geoIpDir); - final Path geoIpConfigDir = createTempDir(); - Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), - geoIpConfigDir.resolve("GeoLite2-City.mmdb")); - Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), - geoIpConfigDir.resolve("GeoLite2-City-Test.mmdb")); - - LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(0)); - localDatabases.configDatabases.putAll(localDatabases.initConfigDatabases(localDatabases.geoipConfigDir)); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); - lazyLoadReaders(localDatabases); - - final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field"))); - final GeoIpProcessor processor2 = factory.create(null, "_tag", null, - new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb"))); - - final AtomicBoolean completed = new AtomicBoolean(false); - final int numberOfDatabaseUpdates = randomIntBetween(2, 4); - final AtomicInteger numberOfIngestRuns = new AtomicInteger(); - final int numberOfIngestThreads = randomIntBetween(16, 32); - final Thread[] ingestThreads = new Thread[numberOfIngestThreads]; - final AtomicArray ingestFailures = new AtomicArray<>(numberOfIngestThreads); - for (int i = 0; i < numberOfIngestThreads; i++) { - final int id = i; - ingestThreads[id] = new Thread(() -> { - while (completed.get() == false) { - try { - IngestDocument document1 = - new IngestDocument("index", "id", "routing", 1L, VersionType.EXTERNAL, Map.of("_field", "89.160.20.128")); - processor1.execute(document1); - assertThat(document1.getSourceAndMetadata().get("geoip"), notNullValue()); - IngestDocument document2 = - new IngestDocument("index", "id", "routing", 1L, VersionType.EXTERNAL, Map.of("_field", "89.160.20.128")); - processor2.execute(document2); - assertThat(document2.getSourceAndMetadata().get("geoip"), notNullValue()); - numberOfIngestRuns.incrementAndGet(); - } catch (Exception | AssertionError e) { - logger.error("error in ingest thread after run [" + numberOfIngestRuns.get() + "]", e); - ingestFailures.setOnce(id, e); - break; - } - } - }); - } - - final AtomicReference failureHolder2 = new AtomicReference<>(); - Thread updateDatabaseThread = new Thread(() -> { - for (int i = 0; i < numberOfDatabaseUpdates; i++) { + Path geoIpTmpDir = createTempDir(); + DatabaseRegistry databaseRegistry = createDatabaseRegistry(geoIpTmpDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), + geoIpTmpDir.resolve("GeoLite2-City.mmdb")); + Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), + geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); + lazyLoadReaders(databaseRegistry); + + final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field"))); + final GeoIpProcessor processor2 = factory.create(null, "_tag", null, + new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb"))); + + final AtomicBoolean completed = new AtomicBoolean(false); + final int numberOfDatabaseUpdates = randomIntBetween(2, 4); + final AtomicInteger numberOfIngestRuns = new AtomicInteger(); + final int numberOfIngestThreads = randomIntBetween(16, 32); + final Thread[] ingestThreads = new Thread[numberOfIngestThreads]; + final AtomicArray ingestFailures = new AtomicArray<>(numberOfIngestThreads); + for (int i = 0; i < numberOfIngestThreads; i++) { + final int id = i; + ingestThreads[id] = new Thread(() -> { + while (completed.get() == false) { try { - DatabaseReaderLazyLoader previous1 = localDatabases.configDatabases.get("GeoLite2-City.mmdb"); - if (Files.exists(geoIpConfigDir.resolve("GeoLite2-City.mmdb"))) { - localDatabases.updateDatabase(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), false); - Files.delete(geoIpConfigDir.resolve("GeoLite2-City.mmdb")); - } else { - Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), - geoIpConfigDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING); - localDatabases.updateDatabase(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), true); - } - DatabaseReaderLazyLoader previous2 = localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb"); - InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" : - "/GeoLite2-City-Test.mmdb"); - Files.copy(source, geoIpConfigDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING); - localDatabases.updateDatabase(geoIpConfigDir.resolve("GeoLite2-City-Test.mmdb"), true); - - DatabaseReaderLazyLoader current1 = localDatabases.configDatabases.get("GeoLite2-City.mmdb"); - DatabaseReaderLazyLoader current2 = localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb"); - assertThat(current1, not(sameInstance(previous1))); - assertThat(current2, not(sameInstance(previous2))); - - // lazy load type and reader: - lazyLoadReaders(localDatabases); + IngestDocument document1 = + new IngestDocument("index", "id", "routing", 1L, VersionType.EXTERNAL, Map.of("_field", "89.160.20.128")); + processor1.execute(document1); + assertThat(document1.getSourceAndMetadata().get("geoip"), notNullValue()); + IngestDocument document2 = + new IngestDocument("index", "id", "routing", 1L, VersionType.EXTERNAL, Map.of("_field", "89.160.20.128")); + processor2.execute(document2); + assertThat(document2.getSourceAndMetadata().get("geoip"), notNullValue()); + numberOfIngestRuns.incrementAndGet(); } catch (Exception | AssertionError e) { - logger.error("error in update databases thread after run [" + i + "]", e); - failureHolder2.set(e); + logger.error("error in ingest thread after run [" + numberOfIngestRuns.get() + "]", e); + ingestFailures.setOnce(id, e); break; } } - completed.set(true); }); + } - Arrays.stream(ingestThreads).forEach(Thread::start); - updateDatabaseThread.start(); - Arrays.stream(ingestThreads).forEach(thread -> { + final AtomicReference failureHolder2 = new AtomicReference<>(); + Thread updateDatabaseThread = new Thread(() -> { + for (int i = 0; i < numberOfDatabaseUpdates; i++) { try { - thread.join(); - } catch (InterruptedException e) { - throw new AssertionError(e); + DatabaseReaderLazyLoader previous1 = databaseRegistry.get("GeoLite2-City.mmdb"); + if (Files.exists(geoIpTmpDir.resolve("GeoLite2-City.mmdb"))) { + databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb")); + } else { + Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), + geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); + } + DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb"); + InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" : + "/GeoLite2-City-Test.mmdb"); + Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING); + databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); + + DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb"); + DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb"); + assertThat(current1, not(sameInstance(previous1))); + assertThat(current2, not(sameInstance(previous2))); + + // lazy load type and reader: + lazyLoadReaders(databaseRegistry); + } catch (Exception | AssertionError e) { + logger.error("error in update databases thread after run [" + i + "]", e); + failureHolder2.set(e); + break; } - }); - updateDatabaseThread.join(); + } + completed.set(true); + }); + + Arrays.stream(ingestThreads).forEach(Thread::start); + updateDatabaseThread.start(); + Arrays.stream(ingestThreads).forEach(thread -> { + try { + thread.join(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }); + updateDatabaseThread.join(); - ingestFailures.asList().forEach(r -> assertThat(r, nullValue())); - assertThat(failureHolder2.get(), nullValue()); - assertThat(numberOfIngestRuns.get(), greaterThan(0)); + ingestFailures.asList().forEach(r -> assertThat(r, nullValue())); + assertThat(failureHolder2.get(), nullValue()); + assertThat(numberOfIngestRuns.get(), greaterThan(0)); - for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) { - assertThat(lazyLoader.current(), equalTo(0)); - } - } finally { - resourceWatcherService.close(); - threadPool.shutdown(); + for (DatabaseReaderLazyLoader lazyLoader : databaseRegistry.getAllDatabases()) { + assertThat(lazyLoader.current(), equalTo(0)); } } - private static void lazyLoadReaders(LocalDatabases localDatabases) throws IOException { - if (localDatabases.configDatabases.get("GeoLite2-City.mmdb") != null) { - localDatabases.configDatabases.get("GeoLite2-City.mmdb").getDatabaseType(); - localDatabases.configDatabases.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216")); + private static DatabaseRegistry createDatabaseRegistry(Path geoIpTmpDir) throws IOException { + Path geoIpDir = createTempDir(); + copyDatabaseFiles(geoIpDir); + GeoIpCache cache = new GeoIpCache(0); + LocalDatabases localDatabases = new LocalDatabases(geoIpDir, createTempDir(), cache); + DatabaseRegistry databaseRegistry = + new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run); + databaseRegistry.initialize(mock(ResourceWatcherService.class), mock(IngestService.class)); + return databaseRegistry; + } + + private static void lazyLoadReaders(DatabaseRegistry databaseRegistry) throws IOException { + if (databaseRegistry.get("GeoLite2-City.mmdb") != null) { + databaseRegistry.get("GeoLite2-City.mmdb").getDatabaseType(); + databaseRegistry.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216")); } - localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb").getDatabaseType(); - localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216")); + databaseRegistry.get("GeoLite2-City-Test.mmdb").getDatabaseType(); + databaseRegistry.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216")); } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index 9a8460688cf2e..af2b1d1be566c 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -49,6 +49,7 @@ class DatabaseReaderLazyLoader implements Closeable { private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class); + private final String md5; private final GeoIpCache cache; private final Path databasePath; private final CheckedSupplier loader; @@ -57,15 +58,17 @@ class DatabaseReaderLazyLoader implements Closeable { // cache the database type so that we do not re-read it on every pipeline execution final SetOnce databaseType; + private volatile boolean deleteDatabaseFileOnClose; private final AtomicInteger currentUsages = new AtomicInteger(0); - DatabaseReaderLazyLoader(final GeoIpCache cache, final Path databasePath) { - this(cache, databasePath, createDatabaseLoader(databasePath)); + DatabaseReaderLazyLoader(GeoIpCache cache, Path databasePath, String md5) { + this(cache, databasePath, md5, createDatabaseLoader(databasePath)); } - DatabaseReaderLazyLoader(final GeoIpCache cache, final Path databasePath, final CheckedSupplier loader) { + DatabaseReaderLazyLoader(GeoIpCache cache, Path databasePath, String md5, CheckedSupplier loader) { this.cache = cache; this.databasePath = Objects.requireNonNull(databasePath); + this.md5 = md5; this.loader = Objects.requireNonNull(loader); this.databaseReader = new SetOnce<>(); this.databaseType = new SetOnce<>(); @@ -202,6 +205,15 @@ DatabaseReader get() throws IOException { return databaseReader.get(); } + String getMd5() { + return md5; + } + + public void close(boolean deleteDatabaseFileOnClose) throws IOException { + this.deleteDatabaseFileOnClose = deleteDatabaseFileOnClose; + close(); + } + @Override public void close() throws IOException { if (currentUsages.updateAndGet(u -> -1 - u) == -1) { @@ -213,6 +225,10 @@ private void doClose() throws IOException { IOUtils.close(databaseReader.get()); int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(databasePath); LOGGER.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath); + if (deleteDatabaseFileOnClose) { + LOGGER.info("deleting [{}]", databasePath); + Files.delete(databasePath); + } } private static CheckedSupplier createDatabaseLoader(Path databasePath) { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java new file mode 100644 index 0000000000000..b8e40422e0b28 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java @@ -0,0 +1,319 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.ingest.geoip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; +import java.util.zip.GZIPInputStream; + +/** + * A component that is responsible for making the databases maintained by {@link GeoIpDownloader} + * available for ingest processors. + * + * Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}. + * All databases are downloaded into a geoip tmp directory, which is created at node startup. + * + * The following high level steps are executed after each cluster state update: + * 1) Check which databases are available in {@link GeoIpTaskState}, + * which is part of the geoip downloader persistent task. + * 2) For each database check whether the databases have changed + * by comparing the local and remote md5 hash or are locally missing. + * 3) For each database identified in step 2 start downloading the database + * chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and + * after all chunks have been downloaded, the database is uncompressed and + * renamed to the final filename.After this the database is loaded and + * if there is an old instance of this database then that is closed. + * 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}. + */ +final class DatabaseRegistry implements Closeable { + + private static final Logger LOGGER = LogManager.getLogger(DatabaseRegistry.class); + + private final Client client; + private final GeoIpCache cache; + private final Path geoipTmpDirectory; + private final LocalDatabases localDatabases; + private final Consumer genericExecutor; + + private final ConcurrentMap databases = new ConcurrentHashMap<>(); + + DatabaseRegistry(Environment environment, Client client, GeoIpCache cache, Consumer genericExecutor) { + this( + environment.tmpFile(), + new OriginSettingClient(client, "geoip"), + cache, + new LocalDatabases(environment, cache), + genericExecutor + ); + } + + DatabaseRegistry(Path tmpDir, + Client client, + GeoIpCache cache, + LocalDatabases localDatabases, + Consumer genericExecutor) { + this.client = client; + this.cache = cache; + this.geoipTmpDirectory = tmpDir.resolve("geoip-databases"); + this.localDatabases = localDatabases; + this.genericExecutor = genericExecutor; + } + + public void initialize(ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException { + localDatabases.initialize(resourceWatcher); + if (Files.exists(geoipTmpDirectory)) { + Files.walk(geoipTmpDirectory) + .filter(Files::isRegularFile) + .peek(path -> LOGGER.info("deleting stale file [{}]", path)) + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } else { + Files.createDirectory(geoipTmpDirectory); + } + LOGGER.info("initialized database registry, using geoip-databases directory [{}]", geoipTmpDirectory); + ingestService.addIngestClusterStateListener(this::checkDatabases); + } + + public DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDefaultDatabases) { + // There is a need for reference counting in order to avoid using an instance + // that gets closed while using it. (this can happen during a database update) + while (true) { + DatabaseReaderLazyLoader instance = + databases.getOrDefault(name, localDatabases.getDatabase(name, fallbackUsingDefaultDatabases)); + if (instance == null || instance.preLookup()) { + return instance; + } + // instance is closed after incrementing its usage, + // drop this instance and fetch another one. + } + } + + List getAllDatabases() { + List all = new ArrayList<>(localDatabases.getAllDatabases()); + this.databases.forEach((key, value) -> all.add(value)); + return all; + } + + // for testing only: + DatabaseReaderLazyLoader get(String key) { + return databases.get(key); + } + + @Override + public void close() throws IOException { + IOUtils.close(databases.values()); + } + + void checkDatabases(ClusterState state) { + DiscoveryNode localNode = state.nodes().getLocalNode(); + if (localNode.isIngestNode() == false) { + return; + } + + PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE); + if (persistentTasks == null) { + return; + } + + IndexRoutingTable databasesIndexRT = state.getRoutingTable().index(GeoIpDownloader.DATABASES_INDEX); + if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { + return; + } + + PersistentTasksCustomMetadata.PersistentTask task = + PersistentTasksCustomMetadata.getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER); + // Empty state will purge stale entries in databases map. + GeoIpTaskState taskState = task == null || task.getState() == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) task.getState(); + + taskState.getDatabases().forEach((name, metadata) -> { + DatabaseReaderLazyLoader reference = databases.get(name); + String remoteMd5 = metadata.getMd5(); + String localMd5 = reference != null ? reference.getMd5() : null; + if (Objects.equals(localMd5, remoteMd5)) { + LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5); + return; + } + + try { + retrieveAndUpdateDatabase(name, metadata); + } catch (Exception e) { + LOGGER.error((Supplier) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), e); + } + }); + + List staleEntries = new ArrayList<>(databases.keySet()); + staleEntries.removeAll(taskState.getDatabases().keySet()); + removeStaleEntries(staleEntries); + } + + void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata metadata) throws IOException { + final String recordedMd5 = metadata.getMd5(); + + // This acts as a lock, if this method for a specific db is executed later and downloaded for this db is still ongoing then + // FileAlreadyExistsException is thrown and this method silently returns. + // (this method is never invoked concurrently and is invoked by a cluster state applier thread) + final Path databaseTmpGzFile; + try { + databaseTmpGzFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp.gz")); + } catch (FileAlreadyExistsException e) { + LOGGER.debug("database update [{}] already in progress, skipping...", databaseName); + return; + } + + final Path databaseTmpFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp")); + LOGGER.info("downloading geoip database [{}] to [{}]", databaseName, databaseTmpGzFile); + retrieveDatabase( + databaseName, + recordedMd5, + metadata, + bytes -> Files.write(databaseTmpGzFile, bytes, StandardOpenOption.APPEND), + () -> { + LOGGER.debug("decompressing [{}]", databaseTmpGzFile.getFileName()); + decompress(databaseTmpGzFile, databaseTmpFile); + + Path databaseFile = geoipTmpDirectory.resolve(databaseName); + LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile); + Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + updateDatabase(databaseName, recordedMd5, databaseFile); + Files.delete(databaseTmpGzFile); + }, + failure -> { + LOGGER.error((Supplier) () -> new ParameterizedMessage("failed to download database [{}]", databaseName), failure); + try { + Files.delete(databaseTmpFile); + Files.delete(databaseTmpGzFile); + } catch (IOException ioe) { + ioe.addSuppressed(failure); + LOGGER.error("Unable to delete tmp database file after failure", ioe); + } + }); + } + + void updateDatabase(String databaseFileName, String recordedMd5, Path file) { + try { + LOGGER.info("database file changed [{}], reload database...", file); + DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5); + DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader); + if (existing != null) { + existing.close(); + } + } catch (Exception e) { + LOGGER.error((Supplier) () -> new ParameterizedMessage("failed to update database [{}]", databaseFileName), e); + } + } + + void removeStaleEntries(Collection staleEntries) { + for (String staleEntry : staleEntries) { + try { + LOGGER.info("database [{}] no longer exists, cleaning up...", staleEntry); + DatabaseReaderLazyLoader existing = databases.remove(staleEntry); + assert existing != null; + existing.close(true); + } catch (Exception e) { + LOGGER.error((Supplier) () -> new ParameterizedMessage("failed to clean database [{}]", staleEntry), e); + } + } + } + + void retrieveDatabase(String databaseName, + String expectedMd5, + GeoIpTaskState.Metadata metadata, + CheckedConsumer chunkConsumer, + CheckedRunnable completedHandler, + Consumer failureHandler) { + // Need to run the search from a different thread, since this is executed from cluster state applier thread: + genericExecutor.accept(() -> { + MessageDigest md = MessageDigests.md5(); + int firstChunk = metadata.getFirstChunk(); + int lastChunk = metadata.getLastChunk(); + try { + // TODO: invoke open point in time api when this api is moved from xpack core to server module. + // (so that we have a consistent view of the chunk documents while doing the lookups) + // (the chance that the documents change is rare, given the low frequency of the updates for these databases) + for (int chunk = firstChunk; chunk <= lastChunk; chunk++) { + SearchRequest searchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX); + String id = String.format(Locale.ROOT, "%s_%d", databaseName, chunk); + searchRequest.source().query(new TermQueryBuilder("_id", id)); + + // At most once a day a few searches may be executed to fetch the new files, + // so it is ok if this happens in a blocking manner on a thread from generic thread pool. + // This makes the code easier to understand and maintain. + SearchResponse searchResponse = client.search(searchRequest).actionGet(); + SearchHit[] hits = searchResponse.getHits().getHits(); + assert hits.length == 1 : "expected 1 hit, but instead got [" + hits.length + "]"; + if (searchResponse.getHits().getHits().length == 0) { + failureHandler.accept(new ResourceNotFoundException("chunk document with id [" + id + "] not found")); + return; + } + byte[] data = (byte[]) hits[0].getSourceAsMap().get("data"); + md.update(data); + chunkConsumer.accept(data); + } + String actualMd5 = MessageDigests.toHexString(md.digest()); + if (Objects.equals(expectedMd5, actualMd5)) { + completedHandler.run(); + } else { + failureHandler.accept(new RuntimeException("expected md5 hash [" + expectedMd5 + + "], but got md5 hash [" + actualMd5 + "]")); + } + } catch (Exception e) { + failureHandler.accept(e); + } + }); + } + + static void decompress(Path source, Path target) throws IOException { + try (GZIPInputStream in = new GZIPInputStream(Files.newInputStream(source), 8192)) { + Files.copy(in, target, StandardCopyOption.REPLACE_EXISTING); + } + } + +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 8b85a6ba693bc..d842e3490a15f 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -70,12 +70,13 @@ public final class GeoIpProcessor extends AbstractProcessor { */ GeoIpProcessor( final String tag, - String description, final String field, + final String description, + final String field, final CheckedSupplier supplier, final String targetField, final Set properties, final boolean ignoreMissing, - boolean firstOnly) { + final boolean firstOnly) { super(tag, description); this.field = field; this.targetField = targetField; @@ -340,14 +341,14 @@ public static final class Factory implements Processor.Factory { Property.IP, Property.ASN, Property.ORGANIZATION_NAME, Property.NETWORK )); - private final LocalDatabases localDatabases; + private final DatabaseRegistry databaseRegistry; List getAllDatabases() { - return localDatabases.getAllDatabases(); + return databaseRegistry.getAllDatabases(); } - public Factory(LocalDatabases localDatabases) { - this.localDatabases = localDatabases; + public Factory(DatabaseRegistry databaseRegistry) { + this.databaseRegistry = databaseRegistry; } @Override @@ -361,8 +362,9 @@ public GeoIpProcessor create( List propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true); + boolean fallbackUsingDefaultDatabases = readBooleanProperty(TYPE, processorTag, config, "fallback_to_default_databases", true); - DatabaseReaderLazyLoader lazyLoader = localDatabases.getDatabase(databaseFile); + DatabaseReaderLazyLoader lazyLoader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases); if (lazyLoader == null) { throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist"); @@ -398,11 +400,10 @@ public GeoIpProcessor create( } } CheckedSupplier supplier = () -> { - DatabaseReaderLazyLoader loader = localDatabases.getDatabase(databaseFile); + DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases); if (loader == null) { - throw new ResourceNotFoundException("database_file", "database file [" + databaseFile + "] doesn't exist"); + throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist"); } - // Only check whether the suffix has changed and not the entire database type. // To sanity check whether a city db isn't overwriting with a country or asn db. // For example overwriting a geoip lite city db with geoip city db is a valid change, but the db type is slightly different, diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java index 6182f05733d97..08297866bf51f 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java @@ -55,7 +55,7 @@ public static GeoIpTaskState fromXContent(XContentParser parser) throws IOExcept private final Map databases; - private GeoIpTaskState(Map databases) { + GeoIpTaskState(Map databases) { this.databases = Map.copyOf(databases); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 31074cb505ca2..20d7621ddbb4c 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -24,6 +24,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTaskState; @@ -60,7 +61,8 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd static String[] DEFAULT_DATABASE_FILENAMES = new String[]{"GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"}; - private final SetOnce localDatabases = new SetOnce<>(); + private final SetOnce ingestService = new SetOnce<>(); + private final SetOnce databaseRegistry = new SetOnce<>(); @Override public List> getSettings() { @@ -75,11 +77,13 @@ public List> getSettings() { @Override public Map getProcessors(Processor.Parameters parameters) { - final long cacheSize = CACHE_SIZE.get(parameters.env.settings()); - final GeoIpCache cache = new GeoIpCache(cacheSize); - LocalDatabases localDatabases = new LocalDatabases(parameters.env, cache); - this.localDatabases.set(localDatabases); - return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(localDatabases)); + ingestService.set(parameters.ingestService); + + long cacheSize = CACHE_SIZE.get(parameters.env.settings()); + GeoIpCache geoIpCache = new GeoIpCache(cacheSize); + DatabaseRegistry registry = new DatabaseRegistry(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor); + databaseRegistry.set(registry); + return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry)); } @Override @@ -95,16 +99,16 @@ public Collection createComponents(Client client, IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier) { try { - localDatabases.get().initialize(resourceWatcherService); + databaseRegistry.get().initialize(resourceWatcherService, ingestService.get()); } catch (IOException e) { throw new UncheckedIOException(e); - } + } return List.of(); } @Override public void close() throws IOException { - localDatabases.get().close(); + databaseRegistry.get().close(); } @Override diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java index de461fb9f259d..d6d5921a08c4a 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java @@ -46,10 +46,10 @@ final class LocalDatabases implements Closeable { private static final Logger LOGGER = LogManager.getLogger(LocalDatabases.class); private final GeoIpCache cache; - final Path geoipConfigDir; + private final Path geoipConfigDir; private final Map defaultDatabases; - final ConcurrentMap configDatabases; + private final ConcurrentMap configDatabases; LocalDatabases(Environment environment, GeoIpCache cache) { this( @@ -83,20 +83,8 @@ void initialize(ResourceWatcherService resourceWatcher) throws IOException { defaultDatabases.keySet(), configDatabases.keySet(), geoipConfigDir); } - // There is a need for reference counting in order to avoid using an instance - // that gets closed while using it. (this can happen during a database update) - DatabaseReaderLazyLoader getDatabase(String name) { - while (true) { - DatabaseReaderLazyLoader instance = configDatabases.getOrDefault(name, defaultDatabases.get(name)); - if (instance == null) { - return null; - } - if (instance.preLookup()) { - return instance; - } - // instance is closed after incrementing its usage, - // drop this instance and fetch another one. - } + DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDefaultDatabases) { + return configDatabases.getOrDefault(name, fallbackUsingDefaultDatabases ? defaultDatabases.get(name) : null); } List getAllDatabases() { @@ -118,7 +106,7 @@ void updateDatabase(Path file, boolean update) { try { if (update) { LOGGER.info("database file changed [{}], reload database...", file); - DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file); + DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, null); DatabaseReaderLazyLoader existing = configDatabases.put(databaseFileName, loader); if (existing != null) { existing.close(); @@ -141,7 +129,7 @@ Map initDefaultDatabases(Path geoipModuleDir) Path source = geoipModuleDir.resolve(filename); assert Files.exists(source); String databaseFileName = source.getFileName().toString(); - DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, source); + DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, source, null); databases.put(databaseFileName, loader); } @@ -161,7 +149,7 @@ Map initConfigDatabases(Path geoipConfigDir) t if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { assert Files.exists(databasePath); String databaseFileName = databasePath.getFileName().toString(); - DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, databasePath); + DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, databasePath, null); databases.put(databaseFileName, loader); } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java new file mode 100644 index 0000000000000..202c71764b380 --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java @@ -0,0 +1,349 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.junit.After; +import org.junit.Before; +import org.mockito.ArgumentCaptor; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static org.elasticsearch.ingest.geoip.GeoIpProcessorFactoryTests.copyDatabaseFiles; +import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.TYPE; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory. +public class DatabaseRegistryTests extends ESTestCase { + + private Client client; + private Path geoIpTmpDir; + private ThreadPool threadPool; + private DatabaseRegistry databaseRegistry; + private ResourceWatcherService resourceWatcherService; + + @Before + public void setup() throws IOException { + final Path geoIpDir = createTempDir(); + final Path geoIpConfigDir = createTempDir(); + Files.createDirectories(geoIpConfigDir); + copyDatabaseFiles(geoIpDir); + + threadPool = new TestThreadPool(LocalDatabases.class.getSimpleName()); + Settings settings = Settings.builder().put("resource.reload.interval.high", TimeValue.timeValueMillis(100)).build(); + resourceWatcherService = new ResourceWatcherService(settings, threadPool); + + client = mock(Client.class); + GeoIpCache cache = new GeoIpCache(1000); + LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache); + geoIpTmpDir = createTempDir(); + databaseRegistry = new DatabaseRegistry(geoIpTmpDir, client, cache, localDatabases, Runnable::run); + databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class)); + } + + @After + public void cleanup() { + resourceWatcherService.close(); + threadPool.shutdownNow(); + } + + public void testCheckDatabases() throws Exception { + String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; + PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", + new GeoIpTaskState.Metadata(0L, 5, 14, "8a76ac17c58d8f3f2e15e3f1ec37d473")))); + PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); + + ClusterState state = ClusterState.builder(new ClusterName("name")) + .metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build()) + .nodes(new DiscoveryNodes.Builder() + .add(new DiscoveryNode("_id1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_id1")) + .routingTable(createIndexRoutingTable()) + .build(); + + mockSearches("GeoIP2-City.mmdb", 5, 14); + + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + databaseRegistry.checkDatabases(state); + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), notNullValue()); + verify(client, times(10)).search(any()); + try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) { + assertThat(files.collect(Collectors.toList()), hasSize(1)); + } + } + + public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Exception { + String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; + PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", + new GeoIpTaskState.Metadata(0L, 0, 9, "8a76ac17c58d8f3f2e15e3f1ec37d473")))); + PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); + + ClusterState state = ClusterState.builder(new ClusterName("name")) + .metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build()) + .nodes(new DiscoveryNodes.Builder() + .add(new DiscoveryNode("_name1", "_id1", buildNewFakeTransportAddress(), Map.of(), + Set.of(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT)) + .localNodeId("_id1")) + .routingTable(createIndexRoutingTable()) + .build(); + + mockSearches("GeoIP2-City.mmdb", 0, 9); + + databaseRegistry.checkDatabases(state); + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + verify(client, never()).search(any()); + try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) { + assertThat(files.collect(Collectors.toList()), empty()); + } + } + + public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Exception { + String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; + PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", + new GeoIpTaskState.Metadata(0L, 0, 9, "8a76ac17c58d8f3f2e15e3f1ec37d473")))); + PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); + + ClusterState state = ClusterState.builder(new ClusterName("name")) + .metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build()) + .nodes(new DiscoveryNodes.Builder() + .add(new DiscoveryNode("_id1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_id1")) + .build(); + + mockSearches("GeoIP2-City.mmdb", 0, 9); + + databaseRegistry.checkDatabases(state); + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + verify(client, never()).search(any()); + try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) { + assertThat(files.collect(Collectors.toList()), empty()); + } + } + + public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws Exception { + PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(0L, Map.of()); + + ClusterState state = ClusterState.builder(new ClusterName("name")) + .metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build()) + .nodes(new DiscoveryNodes.Builder() + .add(new DiscoveryNode("_id1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_id1")) + .routingTable(createIndexRoutingTable()) + .build(); + + mockSearches("GeoIP2-City.mmdb", 0, 9); + + databaseRegistry.checkDatabases(state); + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + verify(client, never()).search(any()); + try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) { + assertThat(files.collect(Collectors.toList()), empty()); + } + } + + public void testRetrieveDatabase() throws Exception { + String md5 = "7a39822e85d3eeb863657b7865597a7a"; + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 29, md5); + mockSearches("_name", 0, 29); + + @SuppressWarnings("unchecked") + CheckedConsumer chunkConsumer = mock(CheckedConsumer.class); + @SuppressWarnings("unchecked") + CheckedRunnable completedHandler = mock(CheckedRunnable.class); + @SuppressWarnings("unchecked") + Consumer failureHandler = mock(Consumer.class); + databaseRegistry.retrieveDatabase("_name", md5, metadata, chunkConsumer, completedHandler, failureHandler); + verify(failureHandler, never()).accept(any()); + verify(chunkConsumer, times(30)).accept(any()); + verify(completedHandler, times(1)).run(); + verify(client, times(30)).search(any()); + } + + public void testRetrieveDatabaseCorruption() throws Exception { + String md5 = "different"; + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 9, md5); + mockSearches("_name", 0, 9); + + @SuppressWarnings("unchecked") + CheckedConsumer chunkConsumer = mock(CheckedConsumer.class); + @SuppressWarnings("unchecked") + CheckedRunnable completedHandler = mock(CheckedRunnable.class); + @SuppressWarnings("unchecked") + Consumer failureHandler = mock(Consumer.class); + databaseRegistry.retrieveDatabase("_name", md5, metadata, chunkConsumer, completedHandler, failureHandler); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(failureHandler, times(1)).accept(exceptionCaptor.capture()); + assertThat(exceptionCaptor.getAllValues().size(), equalTo(1)); + assertThat(exceptionCaptor.getAllValues().get(0).getMessage(), equalTo("expected md5 hash [different], " + + "but got md5 hash [7a39822e85d3eeb863657b7865597a7a]")); + verify(chunkConsumer, times(10)).accept(any()); + verify(completedHandler, times(0)).run(); + verify(client, times(10)).search(any()); + } + + private void mockSearches(String databaseName, int firstChunk, int lastChunk) throws IOException { + String dummyContent = "test: " + databaseName; + List data = gzip(dummyContent, lastChunk - firstChunk + 1); + assertThat(gunzip(data), equalTo(dummyContent)); + + for (int i = firstChunk; i <= lastChunk; i++) { + byte[] chunk = data.get(i - firstChunk); + SearchHit hit = new SearchHit(i); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { + builder.map(Map.of("data", chunk)); + builder.flush(); + ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream(); + hit.sourceRef(new BytesArray(outputStream.toByteArray())); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + + SearchHits hits = new SearchHits(new SearchHit[] {hit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f); + SearchResponse searchResponse = + new SearchResponse(new SearchResponseSections(hits, null, null, false, null, null, 0), null, 1, 1, 0, 1L, null, null); + @SuppressWarnings("unchecked") + ActionFuture actionFuture = mock(ActionFuture.class); + when(actionFuture.actionGet()).thenReturn(searchResponse); + SearchRequest expectedSearchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX); + String id = String.format(Locale.ROOT, "%s_%d", databaseName, i); + expectedSearchRequest.source().query(new TermQueryBuilder("_id", id)); + when(client.search(eq(expectedSearchRequest))).thenReturn(actionFuture); + } + } + + private static RoutingTable createIndexRoutingTable() { + Index index = new Index(GeoIpDownloader.DATABASES_INDEX, UUID.randomUUID().toString()); + ShardRouting shardRouting = ShardRouting.newUnassigned( + new ShardId(index, 0), + true, + RecoverySource.ExistingStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + String nodeId = ESTestCase.randomAlphaOfLength(8); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0)).addShard( + shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize()).moveToStarted() + ).build(); + return RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(table).build()).build(); + } + + private static List gzip(String content, int chunks) throws IOException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bytes); + gzipOutputStream.write(content.getBytes(StandardCharsets.UTF_8)); + gzipOutputStream.close(); + + byte[] all = bytes.toByteArray(); + int chunkSize = all.length / chunks; + List data = new ArrayList<>(); + + for (int from = 0; from < all.length;) { + int to = from + chunkSize; + if (to > all.length) { + to = all.length; + } + data.add(Arrays.copyOfRange(all, from, to)); + from = to; + } + + if (data.size() > chunks) { + byte[] last = data.remove(data.size() - 1); + byte[] secondLast = data.remove(data.size() -1); + byte[] merged = new byte[secondLast.length + last.length]; + System.arraycopy(secondLast, 0, merged, 0, secondLast.length); + System.arraycopy(last, 0, merged, secondLast.length, last.length); + data.add(merged); + } + + assert data.size() == chunks; + return data; + } + + private static String gunzip(List chunks) throws IOException { + byte[] gzippedContent = new byte[chunks.stream().mapToInt(value -> value.length).sum()]; + int written = 0; + for (byte[] chunk : chunks) { + System.arraycopy(chunk, 0, gzippedContent, written, chunk.length); + written += chunk.length; + } + GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(gzippedContent)); + return Streams.readFully(gzipInputStream).utf8ToString(); + } + +} diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index b2e0390ca5458..4396574a0a9cc 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -10,17 +10,21 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.client.Client; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -36,30 +40,38 @@ import java.util.Set; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; public class GeoIpProcessorFactoryTests extends ESTestCase { - private static LocalDatabases localDatabases; + private Path geoipTmpDir; + private DatabaseRegistry databaseRegistry; - @BeforeClass - public static void loadDatabaseReaders() throws IOException { + @Before + public void loadDatabaseReaders() throws IOException { final Path geoIpDir = createTempDir(); final Path configDir = createTempDir(); final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); copyDatabaseFiles(geoIpDir); - localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000)); + + Client client = mock(Client.class); + GeoIpCache cache = new GeoIpCache(1000); + LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000)); + geoipTmpDir = createTempDir(); + databaseRegistry = new DatabaseRegistry(geoipTmpDir, client, cache, localDatabases, Runnable::run); } - @AfterClass - public static void closeDatabaseReaders() throws IOException { - localDatabases.close(); - localDatabases = null; + @After + public void closeDatabaseReaders() throws IOException { + databaseRegistry.close(); + databaseRegistry = null; } public void testBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); @@ -75,7 +87,7 @@ public void testBuildDefaults() throws Exception { } public void testSetIgnoreMissing() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); @@ -92,7 +104,7 @@ public void testSetIgnoreMissing() throws Exception { } public void testCountryBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); @@ -110,7 +122,7 @@ public void testCountryBuildDefaults() throws Exception { } public void testAsnBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); @@ -128,7 +140,7 @@ public void testAsnBuildDefaults() throws Exception { } public void testBuildTargetField() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); config.put("target_field", "_field"); @@ -139,7 +151,7 @@ public void testBuildTargetField() throws Exception { } public void testBuildDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -152,7 +164,7 @@ public void testBuildDbFile() throws Exception { } public void testBuildWithCountryDbAndAsnFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -166,7 +178,7 @@ public void testBuildWithCountryDbAndAsnFields() throws Exception { } public void testBuildWithAsnDbAndCityFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-ASN.mmdb"); @@ -180,7 +192,7 @@ public void testBuildWithAsnDbAndCityFields() throws Exception { } public void testBuildNonExistingDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config = new HashMap<>(); config.put("field", "_field"); @@ -190,7 +202,7 @@ public void testBuildNonExistingDbFile() throws Exception { } public void testBuildFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Set properties = EnumSet.noneOf(GeoIpProcessor.Property.class); List fieldNames = new ArrayList<>(); @@ -214,7 +226,7 @@ public void testBuildFields() throws Exception { } public void testBuildIllegalFieldOption() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); Map config1 = new HashMap<>(); config1.put("field", "_field"); @@ -240,8 +252,11 @@ public void testLazyLoading() throws Exception { // Loading another database reader instances, because otherwise we can't test lazy loading as the // database readers used at class level are reused between tests. (we want to keep that otherwise running this // test will take roughly 4 times more time) - LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000)); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + Client client = mock(Client.class); + GeoIpCache cache = new GeoIpCache(1000); + LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache); + DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) { assertNull(lazyLoader.databaseReader.get()); } @@ -255,10 +270,10 @@ public void testLazyLoading() throws Exception { final GeoIpProcessor city = factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(localDatabases.getDatabase("GeoLite2-City.mmdb").databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb", true).databaseReader.get()); city.execute(document); // the first ingest should trigger a database load - assertNotNull(localDatabases.getDatabase("GeoLite2-City.mmdb").databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb", true).databaseReader.get()); config = new HashMap<>(); config.put("field", "_field"); @@ -266,10 +281,10 @@ public void testLazyLoading() throws Exception { final GeoIpProcessor country = factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(localDatabases.getDatabase("GeoLite2-Country.mmdb").databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb", true).databaseReader.get()); country.execute(document); // the first ingest should trigger a database load - assertNotNull(localDatabases.getDatabase("GeoLite2-Country.mmdb").databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb", true).databaseReader.get()); config = new HashMap<>(); config.put("field", "_field"); @@ -277,10 +292,10 @@ public void testLazyLoading() throws Exception { final GeoIpProcessor asn = factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(localDatabases.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb", true).databaseReader.get()); asn.execute(document); // the first ingest should trigger a database load - assertNotNull(localDatabases.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb", true).databaseReader.get()); } public void testLoadingCustomDatabase() throws IOException { @@ -300,8 +315,11 @@ public void testLoadingCustomDatabase() throws IOException { ThreadPool threadPool = new TestThreadPool("test"); ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool); LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000)); - localDatabases.initialize(resourceWatcherService); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases); + Client client = mock(Client.class); + GeoIpCache cache = new GeoIpCache(1000); + DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run); + databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) { assertNull(lazyLoader.databaseReader.get()); } @@ -315,18 +333,81 @@ public void testLoadingCustomDatabase() throws IOException { final GeoIpProcessor city = factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(localDatabases.getDatabase("GeoIP2-City.mmdb").databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb", true).databaseReader.get()); city.execute(document); // the first ingest should trigger a database load - assertNotNull(localDatabases.getDatabase("GeoIP2-City.mmdb").databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb", true).databaseReader.get()); resourceWatcherService.close(); threadPool.shutdown(); } + public void testFallbackUsingDefaultDatabases() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + { + Map config = new HashMap<>(); + config.put("field", "source_field"); + config.put("fallback_to_default_databases", false); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config)); + assertThat(e.getMessage(), equalTo("[database_file] database file [GeoLite2-City.mmdb] doesn't exist")); + } + { + Map config = new HashMap<>(); + config.put("field", "source_field"); + if (randomBoolean()) { + config.put("fallback_to_default_databases", true); + } + GeoIpProcessor processor = factory.create(null, null, null, config); + assertThat(processor, notNullValue()); + } + } + + public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception { + copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb"); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + // fallback_to_default_databases=true, first use default city db then a custom city db: + { + Map config = new HashMap<>(); + config.put("field", "source_field"); + if (randomBoolean()) { + config.put("fallback_to_default_databases", true); + } + GeoIpProcessor processor = factory.create(null, null, null, config); + Map document = new HashMap<>(); + document.put("source_field", "89.160.20.128"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); + assertThat(geoData.get("city_name"), equalTo("Tumba")); + + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); + assertThat(geoData.get("city_name"), equalTo("Linköping")); + } + // fallback_to_default_databases=false, first use a custom city db then remove the custom db and expect failure: + { + Map config = new HashMap<>(); + config.put("field", "source_field"); + config.put("fallback_to_default_databases", false); + GeoIpProcessor processor = factory.create(null, null, null, config); + Map document = new HashMap<>(); + document.put("source_field", "89.160.20.128"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); + assertThat(geoData.get("city_name"), equalTo("Linköping")); + databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb")); + Exception e = expectThrows(ResourceNotFoundException.class, () -> processor.execute(ingestDocument)); + assertThat(e.getMessage(), equalTo("database file [GeoLite2-City.mmdb] doesn't exist")); + } + } + private static void copyDatabaseFile(final Path path, final String databaseFilename) throws IOException { Files.copy( - new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/" + databaseFilename)), - path.resolve(databaseFilename)); + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/" + databaseFilename)), + path.resolve(databaseFilename) + ); } static void copyDatabaseFiles(final Path path) throws IOException { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 83d622a82eaaa..c3f71da2db813 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -304,7 +304,7 @@ private CheckedSupplier loader(final Stri final CheckedSupplier loader = () -> new DatabaseReader.Builder(databaseInputStreamSupplier.get()).build(); final GeoIpCache cache = new GeoIpCache(1000); - DatabaseReaderLazyLoader lazyLoader = new DatabaseReaderLazyLoader(cache, PathUtils.get(path), loader) { + DatabaseReaderLazyLoader lazyLoader = new DatabaseReaderLazyLoader(cache, PathUtils.get(path), null, loader) { @Override long databaseFileSize() throws IOException { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java index a497527cebd79..4fca6e86be636 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java @@ -51,13 +51,13 @@ public void testLocalDatabasesEmptyConfig() throws Exception { assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(0)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb"); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); } @@ -71,16 +71,16 @@ public void testDatabasesConfigDir() throws Exception { assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(2)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb"); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); - loader = localDatabases.getDatabase("GeoIP2-City.mmdb"); + loader = localDatabases.getDatabase("GeoIP2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City")); } @@ -90,13 +90,13 @@ public void testDatabasesDynamicUpdateConfigDir() throws Exception { localDatabases.initialize(resourceWatcherService); { assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb"); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); } @@ -105,16 +105,16 @@ public void testDatabasesDynamicUpdateConfigDir() throws Exception { assertBusy(() -> { assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(2)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb"); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb"); + loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); - loader = localDatabases.getDatabase("GeoIP2-City.mmdb"); + loader = localDatabases.getDatabase("GeoIP2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City")); }); } @@ -130,12 +130,11 @@ public void testDatabasesUpdateExistingConfDatabase() throws Exception { assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(1)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128")); assertThat(cityResponse.getCity().getName(), equalTo("Tumba")); assertThat(cache.count(), equalTo(1)); - loader.postLookup(); } Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"), @@ -145,12 +144,11 @@ public void testDatabasesUpdateExistingConfDatabase() throws Exception { assertThat(localDatabases.getConfigDatabases().size(), equalTo(1)); assertThat(cache.count(), equalTo(0)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128")); assertThat(cityResponse.getCity().getName(), equalTo("Linköping")); assertThat(cache.count(), equalTo(1)); - loader.postLookup(); }); Files.delete(configDir.resolve("GeoLite2-City.mmdb"));