From 048d67e867240ebad7276c8bfcb96418ad14024c Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Wed, 24 Feb 2021 09:11:39 +0100 Subject: [PATCH] [7.x] GeoIP database downloader (#68424) (#69481) This change adds component that will download new GeoIP databases from infra service New databases are downloaded in chunks and stored in .geoip_databases index Downloads are verified against MD5 checksum provided by the server Current state of all stored databases is stored in cluster state in persistent task state Relates to #68920 --- modules/ingest-geoip/build.gradle | 55 ++- .../ingest/geoip/AbstractGeoIpIT.java | 71 ++++ .../ingest/geoip/GeoIpDownloaderIT.java | 169 ++++++++ .../geoip/GeoIpProcessorNonIngestNodeIT.java | 48 +-- .../ingest/geoip/GeoIpDownloader.java | 227 +++++++++++ .../geoip/GeoIpDownloaderTaskExecutor.java | 113 ++++++ .../ingest/geoip/GeoIpTaskParams.java | 67 ++++ .../ingest/geoip/GeoIpTaskState.java | 216 ++++++++++ .../ingest/geoip/HttpClient.java | 102 +++++ .../ingest/geoip/IngestGeoIpPlugin.java | 110 +++++- .../plugin-metadata/plugin-security.policy | 1 + .../ingest/geoip/GeoIpDownloaderTests.java | 370 ++++++++++++++++++ .../GeoIpTaskStateSerializationTests.java | 38 ++ settings.gradle | 1 + test/fixtures/geoip-fixture/Dockerfile | 15 + test/fixtures/geoip-fixture/build.gradle | 28 ++ .../fixtures/geoip-fixture/docker-compose.yml | 13 + .../java/fixture/geoip/GeoIpHttpFixture.java | 73 ++++ .../main/resources/GeoIP2-City-Test.mmdb.gz | Bin 0 -> 10141 bytes .../src/main/resources/data.json | 20 + 20 files changed, 1675 insertions(+), 62 deletions(-) create mode 100644 modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java create mode 100644 modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskParams.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/HttpClient.java create mode 100644 modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java create mode 100644 modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java create mode 100644 test/fixtures/geoip-fixture/Dockerfile create mode 100644 test/fixtures/geoip-fixture/build.gradle create mode 100644 test/fixtures/geoip-fixture/docker-compose.yml create mode 100644 test/fixtures/geoip-fixture/src/main/java/fixture/geoip/GeoIpHttpFixture.java create mode 100644 test/fixtures/geoip-fixture/src/main/resources/GeoIP2-City-Test.mmdb.gz create mode 100644 test/fixtures/geoip-fixture/src/main/resources/data.json diff --git a/modules/ingest-geoip/build.gradle b/modules/ingest-geoip/build.gradle index 2f92951df28e1..a9e1f5a502629 100644 --- a/modules/ingest-geoip/build.gradle +++ b/modules/ingest-geoip/build.gradle @@ -24,6 +24,7 @@ dependencies { api('com.maxmind.db:maxmind-db:1.3.1') testImplementation 'org.elasticsearch:geolite2-databases:20191119' + internalClusterTestImplementation project(path: ":modules:reindex") } restResources { @@ -32,6 +33,30 @@ restResources { } } +def useFixture = providers.environmentVariable("geoip_use_service") + .forUseAtConfigurationTime() + .map { s -> Boolean.parseBoolean(s) == false } + .getOrElse(true) + +def fixtureAddress = { + assert useFixture: 'closure should not be used without a fixture' + int ephemeralPort = tasks.getByPath(":test:fixtures:geoip-fixture:postProcessFixture").ext."test.fixtures.geoip-fixture.tcp.80" + assert ephemeralPort > 0 + return "http://127.0.0.1:${ephemeralPort}" +} + +if (useFixture) { + apply plugin: 'elasticsearch.test.fixtures' + testFixtures.useFixture(':test:fixtures:geoip-fixture', 'geoip-fixture') +} + +tasks.named("internalClusterTest").configure { + systemProperty "es.geoip_v2_feature_flag_enabled", "true" + if (useFixture) { + nonInputProperties.systemProperty "geoip_endpoint", "${-> fixtureAddress()}" + } +} + tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) { from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) } into "${project.buildDir}/ingest-geoip" @@ -47,21 +72,21 @@ tasks.named("bundlePlugin").configure { tasks.named("thirdPartyAudit").configure { ignoreMissingClasses( - // geoip WebServiceClient needs apache http client, but we're not using WebServiceClient: - 'org.apache.http.HttpEntity', - 'org.apache.http.HttpHost', - 'org.apache.http.HttpResponse', - 'org.apache.http.StatusLine', - 'org.apache.http.auth.UsernamePasswordCredentials', - 'org.apache.http.client.config.RequestConfig$Builder', - 'org.apache.http.client.config.RequestConfig', - 'org.apache.http.client.methods.CloseableHttpResponse', - 'org.apache.http.client.methods.HttpGet', - 'org.apache.http.client.utils.URIBuilder', - 'org.apache.http.impl.auth.BasicScheme', - 'org.apache.http.impl.client.CloseableHttpClient', - 'org.apache.http.impl.client.HttpClientBuilder', - 'org.apache.http.util.EntityUtils' + // geoip WebServiceClient needs apache http client, but we're not using WebServiceClient: + 'org.apache.http.HttpEntity', + 'org.apache.http.HttpHost', + 'org.apache.http.HttpResponse', + 'org.apache.http.StatusLine', + 'org.apache.http.auth.UsernamePasswordCredentials', + 'org.apache.http.client.config.RequestConfig$Builder', + 'org.apache.http.client.config.RequestConfig', + 'org.apache.http.client.methods.CloseableHttpResponse', + 'org.apache.http.client.methods.HttpGet', + 'org.apache.http.client.utils.URIBuilder', + 'org.apache.http.impl.auth.BasicScheme', + 'org.apache.http.impl.client.CloseableHttpClient', + 'org.apache.http.impl.client.HttpClientBuilder', + 'org.apache.http.util.EntityUtils' ) } diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java new file mode 100644 index 0000000000000..2eac9d558f38d --- /dev/null +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.StreamsUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public abstract class AbstractGeoIpIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class); + } + + + @Override + protected Settings nodeSettings(final int nodeOrdinal) { + final Path databasePath = createTempDir(); + try { + Files.createDirectories(databasePath); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), + databasePath.resolve("GeoLite2-City.mmdb")); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), + databasePath.resolve("GeoLite2-Country.mmdb")); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), + databasePath.resolve("GeoLite2-ASN.mmdb")); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + return Settings.builder() + .put("ingest.geoip.database_path", databasePath) + .put(super.nodeSettings(nodeOrdinal)) + .build(); + } + + public static class IngestGeoIpSettingsPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope)); + } + } +} diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java new file mode 100644 index 0000000000000..af223265597ad --- /dev/null +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import com.maxmind.geoip2.DatabaseReader; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Set; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; + +@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1) +public class GeoIpDownloaderIT extends AbstractGeoIpIT { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false); + String endpoint = System.getProperty("geoip_endpoint"); + if (endpoint != null) { + settings.put(GeoIpDownloader.ENDPOINT_SETTING.getKey(), endpoint); + } + return settings.build(); + } + + public void testGeoIpDatabasesDownload() throws Exception { + ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + assertBusy(() -> { + PersistentTasksCustomMetadata.PersistentTask task = getTask(); + assertNotNull(task); + GeoIpTaskState state = (GeoIpTaskState) task.getState(); + assertNotNull(state); + assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet()); + }, 2, TimeUnit.MINUTES); + + GeoIpTaskState state = (GeoIpTaskState) getTask().getState(); + for (String id : org.elasticsearch.common.collect.List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) { + assertBusy(() -> { + GeoIpTaskState.Metadata metadata = state.get(id); + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(new MatchQueryBuilder("name", id)) + .filter(new RangeQueryBuilder("chunk") + .from(metadata.getFirstChunk()) + .to(metadata.getLastChunk(), true)); + int size = metadata.getLastChunk() - metadata.getFirstChunk() + 1; + SearchResponse res = client().prepareSearch(GeoIpDownloader.DATABASES_INDEX) + .setSize(size) + .setQuery(queryBuilder) + .addSort("chunk", SortOrder.ASC) + .get(); + TotalHits totalHits = res.getHits().getTotalHits(); + assertEquals(TotalHits.Relation.EQUAL_TO, totalHits.relation); + assertEquals(size, totalHits.value); + assertEquals(size, res.getHits().getHits().length); + + List data = new ArrayList<>(); + + for (SearchHit hit : res.getHits().getHits()) { + data.add((byte[]) hit.getSourceAsMap().get("data")); + } + + GZIPInputStream stream = new GZIPInputStream(new MultiByteArrayInputStream(data)); + Path tempFile = createTempFile(); + try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tempFile, TRUNCATE_EXISTING, WRITE, CREATE))) { + byte[] bytes = new byte[4096]; + int read; + while ((read = stream.read(bytes)) != -1) { + os.write(bytes, 0, read); + } + } + + parseDatabase(tempFile); + }); + } + } + + @SuppressForbidden(reason = "Maxmind API requires java.io.File") + private void parseDatabase(Path tempFile) throws IOException { + try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) { + assertNotNull(databaseReader.getMetadata()); + } + } + + private PersistentTasksCustomMetadata.PersistentTask getTask() { + return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER); + } + + private static class MultiByteArrayInputStream extends InputStream { + + private final Iterator data; + private ByteArrayInputStream current; + + private MultiByteArrayInputStream(List data) { + this.data = data.iterator(); + } + + @Override + public int read() { + if (current == null) { + if (data.hasNext() == false) { + return -1; + } + + current = new ByteArrayInputStream(data.next()); + } + int read = current.read(); + if (read == -1) { + current = null; + return read(); + } + return read; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (current == null) { + if (data.hasNext() == false) { + return -1; + } + + current = new ByteArrayInputStream(data.next()); + } + int read = current.read(b, off, len); + if (read == -1) { + current = null; + return read(b, off, len); + } + return read; + } + } +} diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java index c64f180d6a683..4d420f527b77a 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java @@ -13,69 +13,27 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.NodeRoles; -import org.elasticsearch.test.StreamsUtils; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.List; import static org.elasticsearch.test.NodeRoles.nonIngestNode; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase { - - public static class IngestGeoIpSettingsPlugin extends Plugin { - - @Override - public List> getSettings() { - return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope)); - } - } +public class GeoIpProcessorNonIngestNodeIT extends AbstractGeoIpIT { @Override - protected Collection> nodePlugins() { - return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class); - } - - @Override - protected Settings nodeSettings(final int nodeOrdinal) { - final Path databasePath = createTempDir(); - try { - Files.createDirectories(databasePath); - Files.copy( - new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), - databasePath.resolve("GeoLite2-City.mmdb")); - Files.copy( - new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), - databasePath.resolve("GeoLite2-Country.mmdb")); - Files.copy( - new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), - databasePath.resolve("GeoLite2-ASN.mmdb")); - } catch (final IOException e) { - throw new UncheckedIOException(e); - } - return Settings.builder() - .put("ingest.geoip.database_path", databasePath) - .put(nonIngestNode()) - .put(super.nodeSettings(nodeOrdinal)) - .build(); + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nonIngestNode()).build(); } /** diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java new file mode 100644 index 0000000000000..574738affedc6 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.ingest.geoip.GeoIpTaskState.Metadata; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Main component responsible for downloading new GeoIP databases. + * New databases are downloaded in chunks and stored in .geoip_databases index + * Downloads are verified against MD5 checksum provided by the server + * Current state of all stored databases is stored in cluster state in persistent task state + */ +class GeoIpDownloader extends AllocatedPersistentTask { + + private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); + + public static final boolean GEOIP_V2_FEATURE_FLAG_ENABLED = "true".equals(System.getProperty("es.geoip_v2_feature_flag_enabled")); + + public static final Setting POLL_INTERVAL_SETTING = Setting.timeSetting("geoip.downloader.poll.interval", + TimeValue.timeValueDays(3), TimeValue.timeValueDays(1), Property.Dynamic, Property.NodeScope); + public static final Setting ENDPOINT_SETTING = Setting.simpleString("geoip.downloader.endpoint", + "https://paisano.elastic.dev/v1/geoip/database", Property.NodeScope); + + public static final String GEOIP_DOWNLOADER = "geoip-downloader"; + static final String DATABASES_INDEX = ".geoip_databases"; + static final int MAX_CHUNK_SIZE = 1024 * 1024; + + private final Client client; + private final HttpClient httpClient; + private final ThreadPool threadPool; + private final String endpoint; + + //visible for testing + protected volatile GeoIpTaskState state; + private volatile TimeValue pollInterval; + private volatile Scheduler.ScheduledCancellable scheduled; + + GeoIpDownloader(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, Settings settings, + long id, String type, String action, String description, TaskId parentTask, + Map headers) { + super(id, type, action, description, parentTask, headers); + this.httpClient = httpClient; + this.client = client; + this.threadPool = threadPool; + endpoint = ENDPOINT_SETTING.get(settings); + pollInterval = POLL_INTERVAL_SETTING.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(POLL_INTERVAL_SETTING, this::setPollInterval); + } + + public void setPollInterval(TimeValue pollInterval) { + this.pollInterval = pollInterval; + if (scheduled != null && scheduled.cancel()) { + scheduleNextRun(new TimeValue(1)); + } + } + + //visible for testing + void updateDatabases() throws IOException { + logger.info("updating geoip databases"); + List> response = fetchDatabasesOverview(); + for (Map res : response) { + processDatabase(res); + } + } + + @SuppressWarnings("unchecked") + private List fetchDatabasesOverview() throws IOException { + byte[] data = httpClient.getBytes(endpoint + "?key=11111111-1111-1111-1111-111111111111"); + try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, data)) { + return (List) parser.list(); + } + } + + //visible for testing + void processDatabase(Map databaseInfo) { + String name = databaseInfo.get("name").toString().replace(".gz", ""); + String md5 = (String) databaseInfo.get("md5_hash"); + if (state.contains(name) && Objects.equals(md5, state.get(name).getMd5())) { + updateTimestamp(name, state.get(name)); + return; + } + logger.info("updating geoip database [" + name + "]"); + String url = databaseInfo.get("url").toString(); + try (InputStream is = httpClient.get(url)) { + int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0; + int lastChunk = indexChunks(name, is, firstChunk, md5); + if (lastChunk > firstChunk) { + state = state.put(name, new Metadata(System.currentTimeMillis(), firstChunk, lastChunk - 1, md5)); + updateTaskState(); + logger.info("updated geoip database [" + name + "]"); + deleteOldChunks(name, firstChunk); + } + } catch (Exception e) { + logger.error("error updating geoip database [" + name + "]", e); + } + } + + //visible for testing + void deleteOldChunks(String name, int firstChunk) { + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(new MatchQueryBuilder("name", name)) + .filter(new RangeQueryBuilder("chunk").to(firstChunk, false)); + DeleteByQueryRequest request = new DeleteByQueryRequest(); + request.indices(DATABASES_INDEX); + request.setQuery(queryBuilder); + client.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(r -> { + }, e -> logger.warn("could not delete old chunks for geoip database [" + name + "]", e))); + } + + //visible for testing + protected void updateTimestamp(String name, Metadata old) { + logger.info("geoip database [" + name + "] is up to date, updated timestamp"); + state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5())); + updateTaskState(); + } + + void updateTaskState() { + PlainActionFuture> future = PlainActionFuture.newFuture(); + updatePersistentTaskState(state, future); + state = ((GeoIpTaskState) future.actionGet().getState()); + } + + //visible for testing + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) throws IOException { + MessageDigest md = MessageDigests.md5(); + for (byte[] buf = getChunk(is); buf.length != 0; buf = getChunk(is)) { + md.update(buf); + client.prepareIndex(DATABASES_INDEX, "_doc").setId(name + "_" + chunk) + .setSource(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setWaitForActiveShards(ActiveShardCount.ALL) + .get(); + chunk++; + } + String actualMd5 = MessageDigests.toHexString(md.digest()); + if (Objects.equals(expectedMd5, actualMd5) == false) { + throw new IOException("md5 checksum mismatch, expected [" + expectedMd5 + "], actual [" + actualMd5 + "]"); + } + return chunk; + } + + //visible for testing + byte[] getChunk(InputStream is) throws IOException { + byte[] buf = new byte[MAX_CHUNK_SIZE]; + int chunkSize = 0; + while (chunkSize < MAX_CHUNK_SIZE) { + int read = is.read(buf, chunkSize, MAX_CHUNK_SIZE - chunkSize); + if (read == -1) { + break; + } + chunkSize += read; + } + if (chunkSize < MAX_CHUNK_SIZE) { + buf = Arrays.copyOf(buf, chunkSize); + } + return buf; + } + + void setState(GeoIpTaskState state) { + this.state = state; + } + + void runDownloader() { + if (isCancelled() || isCompleted()) { + return; + } + try { + updateDatabases(); + } catch (Exception e) { + logger.error("exception during geoip databases update", e); + } + scheduleNextRun(pollInterval); + } + + @Override + protected void onCancelled() { + if (scheduled != null) { + scheduled.cancel(); + } + } + + private void scheduleNextRun(TimeValue time) { + scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC); + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java new file mode 100644 index 0000000000000..306489e3d798f --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; + +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_V2_FEATURE_FLAG_ENABLED; + +/** + * Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node. + * Also bootstraps GeoIP download task on clean cluster and handles changes to the 'geoip.downloader.enabled' setting + */ +final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor implements ClusterStateListener { + + public static final Setting ENABLED_SETTING = Setting.boolSetting("geoip.downloader.enabled", GEOIP_V2_FEATURE_FLAG_ENABLED, + Setting.Property.Dynamic, Setting.Property.NodeScope); + + private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); + + private final Client client; + private final HttpClient httpClient; + private final ClusterService clusterService; + private final ThreadPool threadPool; + private final Settings settings; + private final PersistentTasksService persistentTasksService; + + GeoIpDownloaderTaskExecutor(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, + Settings settings) { + super(GEOIP_DOWNLOADER, ThreadPool.Names.GENERIC); + this.client = client; + this.httpClient = httpClient; + this.clusterService = clusterService; + this.threadPool = threadPool; + this.settings = settings; + persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + if (ENABLED_SETTING.get(settings)) { + clusterService.addListener(this); + } + clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); + } + + private void setEnabled(boolean enabled) { + if (enabled) { + if (clusterService.state().nodes().isLocalNodeElectedMaster()) { + startTask(() -> { + }); + } + } else { + persistentTasksService.sendRemoveRequest(GEOIP_DOWNLOADER, ActionListener.wrap(r -> { + }, e -> logger.error("failed to remove geoip task", e))); + } + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, GeoIpTaskParams params, PersistentTaskState state) { + GeoIpDownloader downloader = (GeoIpDownloader) task; + GeoIpTaskState geoIpTaskState = state == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) state; + downloader.setState(geoIpTaskState); + downloader.runDownloader(); + } + + @Override + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTasksCustomMetadata.PersistentTask taskInProgress, + Map headers) { + return new GeoIpDownloader(client, httpClient, clusterService, threadPool, settings, id, type, action, + getDescription(taskInProgress), parentTaskId, headers); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + //bootstrap downloader after first cluster start + clusterService.removeListener(this); + if (event.localNodeMaster() && ENABLED_SETTING.get(event.state().getMetadata().settings())) { + startTask(() -> clusterService.addListener(this)); + } + } + + private void startTask(Runnable onFailure) { + persistentTasksService.sendStartRequest(GEOIP_DOWNLOADER, GEOIP_DOWNLOADER, new GeoIpTaskParams(), ActionListener.wrap(r -> { + }, e -> { + if (e instanceof ResourceAlreadyExistsException == false) { + logger.error("failed to create geoip downloader task", e); + onFailure.run(); + } + })); + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskParams.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskParams.java new file mode 100644 index 0000000000000..896becc6b0d8e --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskParams.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTaskParams; + +import java.io.IOException; + +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; + +class GeoIpTaskParams implements PersistentTaskParams { + + public static final ObjectParser PARSER = new ObjectParser<>(GEOIP_DOWNLOADER, true, GeoIpTaskParams::new); + + GeoIpTaskParams() { + } + + GeoIpTaskParams(StreamInput in) { + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return GEOIP_DOWNLOADER; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_7_13_0; + } + + @Override + public void writeTo(StreamOutput out) { + } + + public static GeoIpTaskParams fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof GeoIpTaskParams; + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java new file mode 100644 index 0000000000000..066f45b19311e --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java @@ -0,0 +1,216 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.Version; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTaskState; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; + +class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable { + + private static final ParseField DATABASES = new ParseField("databases"); + + static final GeoIpTaskState EMPTY = new GeoIpTaskState(Collections.emptyMap()); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(GEOIP_DOWNLOADER, true, + args -> { + List> databases = (List>) args[0]; + return new GeoIpTaskState(databases.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2))); + }); + + static { + PARSER.declareNamedObjects(constructorArg(), (p, c, name) -> Tuple.tuple(name, Metadata.fromXContent(p)), DATABASES); + } + + public static GeoIpTaskState fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final Map databases; + + private GeoIpTaskState(Map databases) { + this.databases = Collections.unmodifiableMap(new HashMap<>(databases)); + } + + GeoIpTaskState(StreamInput input) throws IOException { + databases = Collections.unmodifiableMap(input.readMap(StreamInput::readString, + in -> new Metadata(in.readLong(), in.readVInt(), in.readVInt(), in.readString()))); + } + + public GeoIpTaskState put(String name, Metadata metadata) { + HashMap newDatabases = new HashMap<>(databases); + newDatabases.put(name, metadata); + return new GeoIpTaskState(newDatabases); + } + + public Map getDatabases() { + return databases; + } + + public boolean contains(String name) { + return databases.containsKey(name); + } + + public Metadata get(String name) { + return databases.get(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GeoIpTaskState that = (GeoIpTaskState) o; + return databases.equals(that.databases); + } + + @Override + public int hashCode() { + return Objects.hash(databases); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.startObject("databases"); + for (Map.Entry e : databases.entrySet()) { + builder.field(e.getKey(), e.getValue()); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return "geoip-downloader"; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_7_13_0; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(databases, StreamOutput::writeString, (o, v) -> { + o.writeLong(v.lastUpdate); + o.writeVInt(v.firstChunk); + o.writeVInt(v.lastChunk); + o.writeString(v.md5); + }); + } + + static class Metadata implements ToXContentObject { + + static final String NAME = GEOIP_DOWNLOADER + "-metadata"; + private static final ParseField LAST_UPDATE = new ParseField("last_update"); + private static final ParseField FIRST_CHUNK = new ParseField("first_chunk"); + private static final ParseField LAST_CHUNK = new ParseField("last_chunk"); + private static final ParseField MD5 = new ParseField("md5"); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(NAME, true, + args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3])); + + static { + PARSER.declareLong(constructorArg(), LAST_UPDATE); + PARSER.declareInt(constructorArg(), FIRST_CHUNK); + PARSER.declareInt(constructorArg(), LAST_CHUNK); + PARSER.declareString(constructorArg(), MD5); + } + + public static Metadata fromXContent(XContentParser parser) { + try { + return PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private final long lastUpdate; + private final int firstChunk; + private final int lastChunk; + private final String md5; + + Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5) { + this.lastUpdate = lastUpdate; + this.firstChunk = firstChunk; + this.lastChunk = lastChunk; + this.md5 = Objects.requireNonNull(md5); + } + + public long getLastUpdate() { + return lastUpdate; + } + + public int getFirstChunk() { + return firstChunk; + } + + public int getLastChunk() { + return lastChunk; + } + + public String getMd5() { + return md5; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Metadata metadata = (Metadata) o; + return lastUpdate == metadata.lastUpdate + && firstChunk == metadata.firstChunk + && lastChunk == metadata.lastChunk + && md5.equals(metadata.md5); + } + + @Override + public int hashCode() { + return Objects.hash(lastUpdate, firstChunk, lastChunk, md5); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(LAST_UPDATE.getPreferredName(), lastUpdate); + builder.field(FIRST_CHUNK.getPreferredName(), firstChunk); + builder.field(LAST_CHUNK.getPreferredName(), lastChunk); + builder.field(MD5.getPreferredName(), md5); + } + builder.endObject(); + return builder; + } + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/HttpClient.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/HttpClient.java new file mode 100644 index 0000000000000..78f0bfcc18aa8 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/HttpClient.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.rest.RestStatus; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; + +import static java.net.HttpURLConnection.HTTP_MOVED_PERM; +import static java.net.HttpURLConnection.HTTP_MOVED_TEMP; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_SEE_OTHER; + +class HttpClient { + + byte[] getBytes(String url) throws IOException { + byte[] bytes = new byte[4096]; + int read; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (InputStream is = get(url)) { + while ((read = is.read(bytes)) != -1) { + byteArrayOutputStream.write(bytes, 0, read); + } + } + return byteArrayOutputStream.toByteArray(); + } + + InputStream get(String urlToGet) throws IOException { + return doPrivileged(() -> { + String url = urlToGet; + HttpURLConnection conn = createConnection(url); + + int redirectsCount = 0; + while (true) { + switch (conn.getResponseCode()) { + case HTTP_OK: + return new BufferedInputStream(getInputStream(conn)); + case HTTP_MOVED_PERM: + case HTTP_MOVED_TEMP: + case HTTP_SEE_OTHER: + if (redirectsCount++ > 50) { + throw new IllegalStateException("too many redirects connection to [" + urlToGet + "]"); + } + String location = conn.getHeaderField("Location"); + URL base = new URL(url); + URL next = new URL(base, location); // Deal with relative URLs + url = next.toExternalForm(); + conn = createConnection(url); + break; + case HTTP_NOT_FOUND: + throw new ResourceNotFoundException("{} not found", urlToGet); + default: + int responseCode = conn.getResponseCode(); + throw new ElasticsearchStatusException("error during downloading {}", RestStatus.fromCode(responseCode), urlToGet); + } + } + }); + } + + @SuppressForbidden(reason = "we need socket connection to download data from internet") + private InputStream getInputStream(HttpURLConnection conn) throws IOException { + return conn.getInputStream(); + } + + private HttpURLConnection createConnection(String url) throws IOException { + HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + conn.setDoOutput(false); + conn.setInstanceFollowRedirects(false); + return conn; + } + + private static R doPrivileged(CheckedSupplier supplier) throws IOException { + SpecialPermission.check(); + try { + return AccessController.doPrivileged((PrivilegedExceptionAction) supplier::get); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index e1aa2eeb73f55..a841814cc55cb 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -9,17 +9,29 @@ package org.elasticsearch.ingest.geoip; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -28,13 +40,21 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Supplier; -public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_V2_FEATURE_FLAG_ENABLED; + +public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemIndexPlugin, Closeable, PersistentTaskPlugin { public static final Setting CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope); @@ -44,7 +64,13 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable @Override public List> getSettings() { - return Collections.singletonList(CACHE_SIZE); + List> settings = new ArrayList<>(Arrays.asList(CACHE_SIZE, + GeoIpDownloader.ENDPOINT_SETTING, + GeoIpDownloader.POLL_INTERVAL_SETTING)); + if (GEOIP_V2_FEATURE_FLAG_ENABLED) { + settings.add(GeoIpDownloaderTaskExecutor.ENABLED_SETTING); + } + return settings; } @Override @@ -81,4 +107,84 @@ public void close() throws IOException { localDatabases.get().close(); } + @Override + public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, + Client client, SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver) { + if (GEOIP_V2_FEATURE_FLAG_ENABLED) { + Settings settings = settingsModule.getSettings(); + return Collections.singletonList(new GeoIpDownloaderTaskExecutor(client, new HttpClient(), clusterService, threadPool, + settings)); + } else { + return Collections.emptyList(); + } + } + + @Override + public List getNamedXContent() { + return Arrays.asList(new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(GEOIP_DOWNLOADER), + GeoIpTaskParams::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskState::fromXContent)); + } + + @Override + public List getNamedWriteables() { + return Arrays.asList(new NamedWriteableRegistry.Entry(PersistentTaskState.class, GEOIP_DOWNLOADER, GeoIpTaskState::new), + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, GEOIP_DOWNLOADER, GeoIpTaskParams::new)); + } + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + SystemIndexDescriptor geoipDatabasesIndex = SystemIndexDescriptor.builder() + .setIndexPattern(DATABASES_INDEX) + .setDescription("GeoIP databases") + .setMappings(mappings()) + .setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .build()) + .setOrigin("geoip") + .setVersionMetaKey("version") + .setPrimaryIndex(DATABASES_INDEX) + .build(); + return Collections.singleton(geoipDatabasesIndex); + } + + @Override + public String getFeatureName() { + return "geoip"; + } + + @Override + public String getFeatureDescription() { + return "Manages data related to GeoIP database downloader"; + } + + private static XContentBuilder mappings() { + try { + return jsonBuilder() + .startObject() + .startObject(SINGLE_MAPPING_NAME) + .startObject("_meta") + .field("version", Version.CURRENT) + .endObject() + .field("dynamic", "strict") + .startObject("properties") + .startObject("name") + .field("type", "keyword") + .endObject() + .startObject("chunk") + .field("type", "integer") + .endObject() + .startObject("data") + .field("type", "binary") + .endObject() + .endObject() + .endObject() + .endObject(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to build mappings for " + DATABASES_INDEX, e); + } + } } diff --git a/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy b/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy index c961d7248a2bf..2f1e80e8e5578 100644 --- a/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy +++ b/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy @@ -15,4 +15,5 @@ grant { permission java.lang.RuntimePermission "accessDeclaredMembers"; // Also needed because of jackson-databind: permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + permission java.net.SocketPermission "*", "connect"; }; diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java new file mode 100644 index 0000000000000..3ab9386de57bb --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -0,0 +1,370 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.node.Node; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.ENDPOINT_SETTING; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.MAX_CHUNK_SIZE; +import static org.elasticsearch.tasks.TaskId.EMPTY_TASK_ID; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GeoIpDownloaderTests extends ESTestCase { + + private HttpClient httpClient; + private ClusterService clusterService; + private ThreadPool threadPool; + private MockClient client; + private GeoIpDownloader geoIpDownloader; + + @Before + public void setup() { + httpClient = mock(HttpClient.class); + clusterService = mock(ClusterService.class); + threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build()); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, + org.elasticsearch.common.collect.Set.of(GeoIpDownloader.ENDPOINT_SETTING, GeoIpDownloader.POLL_INTERVAL_SETTING, + GeoIpDownloaderTaskExecutor.ENABLED_SETTING))); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build(); + when(clusterService.state()).thenReturn(state); + client = new MockClient(threadPool); + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testGetChunkEndOfStream() throws IOException { + byte[] chunk = geoIpDownloader.getChunk(new InputStream() { + @Override + public int read() { + return -1; + } + }); + assertArrayEquals(new byte[0], chunk); + chunk = geoIpDownloader.getChunk(new ByteArrayInputStream(new byte[0])); + assertArrayEquals(new byte[0], chunk); + } + + public void testGetChunkLessThanChunkSize() throws IOException { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{1, 2, 3, 4}); + byte[] chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[]{1, 2, 3, 4}, chunk); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[0], chunk); + + } + + public void testGetChunkExactlyChunkSize() throws IOException { + byte[] bigArray = new byte[MAX_CHUNK_SIZE]; + for (int i = 0; i < MAX_CHUNK_SIZE; i++) { + bigArray[i] = (byte) i; + } + ByteArrayInputStream is = new ByteArrayInputStream(bigArray); + byte[] chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(bigArray, chunk); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[0], chunk); + } + + public void testGetChunkMoreThanChunkSize() throws IOException { + byte[] bigArray = new byte[MAX_CHUNK_SIZE * 2]; + for (int i = 0; i < MAX_CHUNK_SIZE * 2; i++) { + bigArray[i] = (byte) i; + } + byte[] smallArray = new byte[MAX_CHUNK_SIZE]; + System.arraycopy(bigArray, 0, smallArray, 0, MAX_CHUNK_SIZE); + ByteArrayInputStream is = new ByteArrayInputStream(bigArray); + byte[] chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(smallArray, chunk); + System.arraycopy(bigArray, MAX_CHUNK_SIZE, smallArray, 0, MAX_CHUNK_SIZE); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(smallArray, chunk); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[0], chunk); + } + + public void testGetChunkRethrowsIOException() { + expectThrows(IOException.class, () -> geoIpDownloader.getChunk(new InputStream() { + @Override + public int read() throws IOException { + throw new IOException(); + } + })); + } + + public void testIndexChunksNoData() throws IOException { + assertEquals(0, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(new byte[0]), 0, "d41d8cd98f00b204e9800998ecf8427e")); + } + + public void testIndexChunksMd5Mismatch() { + IOException exception = expectThrows(IOException.class, () -> geoIpDownloader.indexChunks("test", + new ByteArrayInputStream(new byte[0]), 0, "123123")); + assertEquals("md5 checksum mismatch, expected [123123], actual [d41d8cd98f00b204e9800998ecf8427e]", exception.getMessage()); + } + + public void testIndexChunks() throws IOException { + byte[] bigArray = new byte[MAX_CHUNK_SIZE + 20]; + for (int i = 0; i < MAX_CHUNK_SIZE + 20; i++) { + bigArray[i] = (byte) i; + } + byte[][] chunksData = new byte[2][]; + chunksData[0] = new byte[MAX_CHUNK_SIZE]; + System.arraycopy(bigArray, 0, chunksData[0], 0, MAX_CHUNK_SIZE); + chunksData[1] = new byte[20]; + System.arraycopy(bigArray, MAX_CHUNK_SIZE, chunksData[1], 0, 20); + + AtomicInteger chunkIndex = new AtomicInteger(); + + client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener listener) -> { + int chunk = chunkIndex.getAndIncrement(); + assertEquals("test_" + (chunk + 15), request.id()); + assertEquals(XContentType.SMILE, request.getContentType()); + Map source = request.sourceAsMap(); + assertEquals("test", source.get("name")); + assertArrayEquals(chunksData[chunk], (byte[]) source.get("data")); + assertEquals(chunk + 15, source.get("chunk")); + listener.onResponse(mock(IndexResponse.class)); + }); + + assertEquals(17, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(bigArray), 15, "a67563dfa8f3cba8b8cff61eb989a749")); + + assertEquals(2, chunkIndex.get()); + } + + public void testProcessDatabaseNew() throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); + when(httpClient.get("a.b/t1")).thenReturn(bais); + + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void updateTaskState() { + assertEquals(0, state.get("test").getFirstChunk()); + assertEquals(10, state.get("test").getLastChunk()); + } + + @Override + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + assertSame(bais, is); + assertEquals(0, chunk); + return 11; + } + + @Override + protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) { + fail(); + } + + @Override + void deleteOldChunks(String name, int firstChunk) { + assertEquals("test", name); + assertEquals(0, firstChunk); + } + }; + + geoIpDownloader.setState(GeoIpTaskState.EMPTY); + geoIpDownloader.processDatabase(org.elasticsearch.common.collect.Map.of("name", "test.gz", "url", "a.b/t1", "md5_hash", "1")); + } + + public void testProcessDatabaseUpdate() throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); + when(httpClient.get("a.b/t1")).thenReturn(bais); + + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void updateTaskState() { + assertEquals(9, state.get("test").getFirstChunk()); + assertEquals(10, state.get("test").getLastChunk()); + } + + @Override + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + assertSame(bais, is); + assertEquals(9, chunk); + return 11; + } + + @Override + protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) { + fail(); + } + + @Override + void deleteOldChunks(String name, int firstChunk) { + assertEquals("test", name); + assertEquals(9, firstChunk); + } + }; + + geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test", new GeoIpTaskState.Metadata(0, 5, 8, "0"))); + geoIpDownloader.processDatabase(org.elasticsearch.common.collect.Map.of("name", "test.gz", "url", "a.b/t1", "md5_hash", "1")); + } + + + public void testProcessDatabaseSame() throws IOException { + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1"); + GeoIpTaskState taskState = GeoIpTaskState.EMPTY.put("test", metadata); + ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); + when(httpClient.get("a.b/t1")).thenReturn(bais); + + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void updateTaskState() { + fail(); + } + + @Override + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + fail(); + return 0; + } + + @Override + protected void updateTimestamp(String name, GeoIpTaskState.Metadata newMetadata) { + assertEquals(metadata, newMetadata); + assertEquals("test", name); + } + + @Override + void deleteOldChunks(String name, int firstChunk) { + fail(); + } + }; + geoIpDownloader.setState(taskState); + geoIpDownloader.processDatabase(org.elasticsearch.common.collect.Map.of("name", "test.gz", "url", "a.b/t1", "md5_hash", "1")); + } + + @SuppressWarnings("unchecked") + public void testUpdateTaskState() { + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { + assertSame(GeoIpTaskState.EMPTY, state); + PersistentTask task = mock(PersistentTask.class); + when(task.getState()).thenReturn(GeoIpTaskState.EMPTY); + listener.onResponse(task); + } + }; + geoIpDownloader.setState(GeoIpTaskState.EMPTY); + geoIpDownloader.updateTaskState(); + } + + @SuppressWarnings("unchecked") + public void testUpdateTaskStateError() { + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { + assertSame(GeoIpTaskState.EMPTY, state); + PersistentTask task = mock(PersistentTask.class); + when(task.getState()).thenReturn(GeoIpTaskState.EMPTY); + listener.onFailure(new IllegalStateException("test failure")); + } + }; + geoIpDownloader.setState(GeoIpTaskState.EMPTY); + IllegalStateException exception = expectThrows(IllegalStateException.class, geoIpDownloader::updateTaskState); + assertEquals("test failure", exception.getMessage()); + } + + public void testUpdateDatabases() throws IOException { + List> maps = Arrays.asList(singletonMap("a", 1), singletonMap("a", 2)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), baos); + builder.startArray(); + builder.map(singletonMap("a", 1)); + builder.map(singletonMap("a", 2)); + builder.endArray(); + builder.close(); + when(httpClient.getBytes("a.b?key=11111111-1111-1111-1111-111111111111")).thenReturn(baos.toByteArray()); + Iterator> it = maps.iterator(); + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, + Settings.builder().put(ENDPOINT_SETTING.getKey(), "a.b").build(), + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void processDatabase(Map databaseInfo) { + assertEquals(it.next(), databaseInfo); + } + }; + geoIpDownloader.updateDatabases(); + assertFalse(it.hasNext()); + } + + private static class MockClient extends NoOpClient { + + private final Map, BiConsumer>> handlers = new HashMap<>(); + + private MockClient(ThreadPool threadPool) { + super(threadPool); + } + + public void addHandler(ActionType action, + BiConsumer> listener) { + handlers.put(action, listener); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute(ActionType action, + Request request, + ActionListener listener) { + if (handlers.containsKey(action)) { + BiConsumer> biConsumer = + (BiConsumer>) handlers.get(action); + biConsumer.accept(request, listener); + } else { + throw new IllegalStateException("unexpected action called [" + action.name() + "]"); + } + } + } +} diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java new file mode 100644 index 0000000000000..dd5faa9d8fa33 --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class GeoIpTaskStateSerializationTests extends AbstractSerializingTestCase { + @Override + protected GeoIpTaskState doParseInstance(XContentParser parser) throws IOException { + return GeoIpTaskState.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return GeoIpTaskState::new; + } + + @Override + protected GeoIpTaskState createTestInstance() { + GeoIpTaskState state = GeoIpTaskState.EMPTY; + int databaseCount = randomInt(20); + for (int i = 0; i < databaseCount; i++) { + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(), randomAlphaOfLength(32)); + state = state.put(randomAlphaOfLengthBetween(5, 10), metadata); + } + return state; + } +} diff --git a/settings.gradle b/settings.gradle index 978dfb5d19b28..8bddadf081782 100644 --- a/settings.gradle +++ b/settings.gradle @@ -80,6 +80,7 @@ List projects = [ 'test:fixtures:minio-fixture', 'test:fixtures:old-elasticsearch', 'test:fixtures:s3-fixture', + 'test:fixtures:geoip-fixture', 'test:logger-usage' ] diff --git a/test/fixtures/geoip-fixture/Dockerfile b/test/fixtures/geoip-fixture/Dockerfile new file mode 100644 index 0000000000000..bcd8013408818 --- /dev/null +++ b/test/fixtures/geoip-fixture/Dockerfile @@ -0,0 +1,15 @@ +FROM ubuntu:18.04 + +RUN apt-get update -qqy +RUN apt-get install -qqy openjdk-11-jre-headless + +ARG fixtureClass +ARG port + +ENV GEOIP_FIXTURE_CLASS=${fixtureClass} +ENV GEOIP_FIXTURE_PORT=${port} + +ENTRYPOINT exec java -classpath "/fixture/shared/*" \ + $GEOIP_FIXTURE_CLASS 0.0.0.0 "$GEOIP_FIXTURE_PORT" + +EXPOSE $port diff --git a/test/fixtures/geoip-fixture/build.gradle b/test/fixtures/geoip-fixture/build.gradle new file mode 100644 index 0000000000000..d8f85759cf7f8 --- /dev/null +++ b/test/fixtures/geoip-fixture/build.gradle @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +apply plugin: 'elasticsearch.java' +apply plugin: 'elasticsearch.test.fixtures' + +description = 'Fixture for GeoIPv2 service' +tasks.named("test").configure { enabled = false } + +dependencies { + api project(':server') +} + +tasks.named("preProcessFixture").configure { + dependsOn "jar", configurations.runtimeClasspath + doLast { + file("${testFixturesDir}/shared").mkdirs() + project.copy { + from jar + from configurations.runtimeClasspath + into "${testFixturesDir}/shared" + } + } +} diff --git a/test/fixtures/geoip-fixture/docker-compose.yml b/test/fixtures/geoip-fixture/docker-compose.yml new file mode 100644 index 0000000000000..60883da5ae612 --- /dev/null +++ b/test/fixtures/geoip-fixture/docker-compose.yml @@ -0,0 +1,13 @@ +version: '3' +services: + geoip-fixture: + build: + context: . + args: + fixtureClass: fixture.geoip.GeoIpHttpFixture + port: 80 + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" diff --git a/test/fixtures/geoip-fixture/src/main/java/fixture/geoip/GeoIpHttpFixture.java b/test/fixtures/geoip-fixture/src/main/java/fixture/geoip/GeoIpHttpFixture.java new file mode 100644 index 0000000000000..1eb4492634305 --- /dev/null +++ b/test/fixtures/geoip-fixture/src/main/java/fixture/geoip/GeoIpHttpFixture.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package fixture.geoip; + +import com.sun.net.httpserver.HttpServer; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +public class GeoIpHttpFixture { + + private final HttpServer server; + + GeoIpHttpFixture(final String[] args) throws Exception { + byte[] bytes = new byte[4096]; + int read; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (InputStream is = GeoIpHttpFixture.class.getResourceAsStream("/data.json")) { + while ((read = is.read(bytes)) != -1) { + byteArrayOutputStream.write(bytes, 0, read); + } + } + String rawData = new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8); + this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(args[0]), Integer.parseInt(args[1])), 0); + this.server.createContext("/", exchange -> { + String data = rawData.replace("endpoint", "http://" + exchange.getRequestHeaders().getFirst("Host")); + exchange.sendResponseHeaders(200, data.length()); + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(exchange.getResponseBody()))) { + writer.write(data); + } + }); + this.server.createContext("/db.mmdb.gz", exchange -> { + exchange.sendResponseHeaders(200, 0); + try (InputStream inputStream = GeoIpHttpFixture.class.getResourceAsStream("/GeoIP2-City-Test.mmdb.gz"); + OutputStream outputStream = exchange.getResponseBody()) { + int read2; + while ((read2 = inputStream.read(bytes)) != -1) { + outputStream.write(bytes, 0, read2); + } + } + }); + } + + final void start() throws Exception { + try { + server.start(); + // wait to be killed + Thread.sleep(Long.MAX_VALUE); + } finally { + server.stop(0); + } + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length < 2) { + throw new IllegalArgumentException("GeoIpHttpFixture expects 2 arguments [address, port]"); + } + final GeoIpHttpFixture fixture = new GeoIpHttpFixture(args); + fixture.start(); + } +} diff --git a/test/fixtures/geoip-fixture/src/main/resources/GeoIP2-City-Test.mmdb.gz b/test/fixtures/geoip-fixture/src/main/resources/GeoIP2-City-Test.mmdb.gz new file mode 100644 index 0000000000000000000000000000000000000000..38a8c7fc9c03e874c7ae806aa1e27c8dc9f48e84 GIT binary patch literal 10141 zcmZ{HXIK+m7p;IOMT($+ARUQFmtKR?r9=f0q$wc1NC_PR(n2p$=|n(5x`0UUUAmM= z550#PYDn(ze$RdGulwg@&YpeNUTf`hW+v>>WMuE&@Vvc7qW<1R^Z8>zWjhZaLH+mc z9zsq|RuECe$LX5HO!05a0kxPkW=aoEUlE%4B5UZ zeD}%Bby2#UYdN$QcRFn0TyP6G9sDI+MCG1}j0&mBgAOKmIqfT2sly|%ntp60GyDzQ zm3E1fS4lgxhIG*YKF#4AQA4t*3zvo0b8x8~+2m*ne4!oBEhlPQ=<1H-T`Eo-N<&xU zV0SDiI`o?)5kKzP2-1qeYi^&z-$Y`pN7O#GRPm@#a8Zd;EwYu(do*|ok2KaC2f*2t zaI}kx@QcPkvEecrn;lwKS{fyJjiUtA!aKN;O1yV$Wu)x>x;p2oQh!*=+N=z_1<{D5 z+0s+)8~gHt2h5B#Ob6tOen?wBN=4)Fu9dk=$WwXMU&N@a-br;y623z#tkgiZpM-jMb3|7h zbHMCHzw>0t3?497b*UokXH%xs@a)sO{Tm|=_nCa2G!F*vQx191>T7AY8dSWhU$b)T%}qf zDc%T;1g@NUqR(thpD~=;1jGLS!{HFK?*F?x@sryrk44>kss7fYTSW_)5?2XSx8EIf z;3$0nOn%Z9Z#5b_GkLdrHFoVLLJ?k)NdkLAk8pn+<9Po|28I-v0jV=N0^qS?zG0>b zpvPjFi#VKli>4fkgMVD%borrp6C_0678^G@R3`#tg9Nsy;c~si1OP3{o0a!U1(jJY ziRitY#HPp-KoJno1FR?_CBEWRba&BtcZ3|yqY?$I0BrJWvZ$2&ALBr@=>@0JiI2?* zLV~2aKh!p2J;?jo;*+v=Wh;g4%mFTdfVE$2CT)C@#7)$yFKYEk5DXMUWlM!OidPo+ zc#Tl}fEJ=85N3Fz{60`E>!TsjzIewoZt2hE22URB4ZR^LXIQ*P_Fr8n?G~q`(l!4t zTUtZPL03jY|M&C%Oy3E*b|q_yis-5{fR2l+mfnTG=6p12A3|ngQ!yYw zA@!60?M44Js|tLK*g$hL0?U{Y44W|L#4`e_6oXIiEV`)Gq?Tjvb%0Mtk`u5HhVCS+ zaNtq$^9^@6?5ct2`ACAuM@mJysL(L6F&HsUj$BJegwQd<^PCWwON?p(6X7#s9zLkS z)7v24;v*3nK|m5c=*HY27AYM9(8*8(9g1`osI5EngI#?w0x0-7B;&3(#UaE*>Ejvb zDNhU7)eOzB2gzW!wZcb6YX-pPlN`*P$$MBYb1w>aP+8~W_?&BlPUnJmZWqEa;BIe#z(d(aid^4q1R7AQ6$1o zFF|d=YOBZ;rH@W+AJ>0uX50#a<9avisVcJeh$2ERh_oIC{!q_>A#aJUxIP!UMQi>3S>@ zY2;0E>h*~3Nh=naq6G-NM$lD--vv_qxpkE(IBM_+nLOJq)8TN?!mAvcxS!cuhWBg6cGY68x@I4;j%{VBH4M=!nQzo)nk`>wgUM zLzDpFEX29vU;aG>uF&)MQh?5q@9o~ImOF%4T{VG?t{+^E0Cco8i!CGqN!}MmJqXf6 zNQ3r&D~!mx{`;M52?aKSlqRr*1!yi2K7jItVGZJNw51pb$pN8#Pju=Pk3k;_n||=V z7Xa*<0BjXp*3}UHUkP_5Ln3_SJSlJ78EB3hxCGzW?#0nJ@|ZmECGG+mc}~Kb0$bz; z*6!N?o8ojB?!NGA5ypY1V*>V;|CI{%o7=>A@)VqjS6mA0fQc>*W}Y;RyO4^6EljNu zeFlhx5mL2flhhNf@S3W0IK8@2+L9= zmn~qQBp9paD)PQI-39VMoDixt20Mren&=1nP->cy9rz~ogU_6RK_-;v5VD$hoxro% zOk2a(3ib(xT>=aHLFD+}XcFM)_y9zJKtmFcXr>0hxrNRr^BjX8oHDwx)Z!7SD>kjo z^4s^WYRCn5vD7HvyDF`kbOusSx=vWuA*%ccs}K=!?FSn@1q7hY*(wYH_z!P?0TCS_ ztvjh$Whtd4`X_wLtt*c=pDcVGJvTg2!L%se!B?2Aoj|^BYqG}3^Yp#%gQ`NRA+C$k|dFm<4w_Z zR4AAm5s;pt1!!<5-Xg;x4oMisaK}Tn0MW_2hD~Us%B2_4vs-|cEsc*bIvmT#mk{E&Zps|?B7|j6pfi&? zKpA6V%WFVuz>IkSfL8+Gv^j+baAv$n#h1<@P0C5H#6lOI>z zq%2?JY6YJywYV*kC%XWCEvE-Nppn-JCgEDENZegO362!7hYpMEvjZ09;QKlSfEa7< zJbJr2*ak3S_r@yx4Ajig%bUOs?7m)Qo=Hk8cFjG=@eKyzQxMIKt`-~u#tXEPh9v72 zzPI_h;{XnUyh?>=euNz$f~?UWjvg{3PehBSfaFM%TFa-Nnz-f*eqxq!fe6km2H zS>OwDHAM-ZbSQ9oe+X|#fhi910*#dA{Qx?IWGVq+^t`Ib7h=MgW^ld;Va(_oV6(?d zG_viaIlBDpYYP~fH(wXkoj>>>>=FWvgW2)#LBepQWk8zl%YHDFp6{3udXH}v$*dN2 zmLHzv_UaE!0~tYR)Rq8ceLev55m(qQ%>EbHK6UIt{sZa6w7&7Urhg zozwi)>-WRX@)-~SsN!>tETz949c0h^ONp0A8;LwoMIq* z(hshR0G90V0DWf-4g`X%DQe;AzNvtMDlpm~jr5iHN_PmkWaKL)j2T~{fWEbWv1;g% z|G-U@0Rz4O#OKHxXt2n0pOym?Lcx3@K*}5eY?(6RCQ5w4Jn*u|>A|jwfUgTI)Qw#P zb9_CMg5R0e3wR39#@bvpKLvpKN$`m*x_+9kcuPL*EFb?2P{K}nq;i?x0CEm*st-CjbxBmL#aQ99{EBInLNC@Kqp%mXd-!%b}26tlc-3sxbpsjHGY=pEmOQi&y7%`Cm1&4gLBInD2jOB$TJ*JH=ai{uwpx%f&F zERqU8sT!=t8rY2hGRpzhWgi{)hHemM$eco8lBa9o^2R8b#8q+;35oKhO+4i-Z4}Ha z3CN!Axk1=AzIy#!#06#5;nP(lJvp4lV9ss!Auvy6v}ziRCaL61D%R1I(AhhIH5Z!% zm`b2Y(LhxycbLc5SI?4m0ts5DjNfC=hjecqjbY~lu8g|?gn0(R0KU5s!J_N)&A$*p zLe{bAj^z-eGQK%#j#L(Pp835t?HX$M)!VRKL!J=liLtZ1EM&5XW)%RBlsG}-UXwd z!X!%rH4EpFS7%G}i8@MM&x!q$gzUcIx6XoSNvi|7sHh{?gUhENgO^N5MVTyHZ?Fk_ z{P!GD(3j17rtB^sD5AT<;{J?12oJl#{>bq|RP^6SZ--DnbPu0LQug1r-pJ?Bg-<>I zvf|c!Wsi?M95*-C*EbBGkEJo284jT6H=?$4wkJr$DlRH6DvhHsPRxq-+OC8yn~Q1} z1*VFpAy1W+Uw;|v9xd0;DTy#ofni6#k61Mf{&@Zpo(y5y<;k!Wm`Yhp{$gXc%&l1F zV&T*k6RoPJtNcTr*})NB{nKD&+}}aIV~5IhqO3u7jzP0py1h17qGa99d~_7=59vP7 zI?l|t%Z@kdoUV+2Z{C9$#Pnb+B@91GG*Ts={K9m_1Yf3O)k%A}L$-9iN|HT9hoN@hFMb`m#Yj2)NBbKx;a zjw^pW@gmzPtaJRQo{=jTU3!|BPPt+civE>FT+VGt^|>|2P*(-kXkXb0tcRl|_mRTC z*^_^{@-4&S?&hnLWYbpKo>bGpP9M;mD2mHzhJBs8y+_}|*MHe2&1c?u1~Hm-cM1O; z`-oYOj(Mg?q443vzx$CAdwzXw=Ly|O!(~GJXfbu9Ot#$`;kj4i;>m4{Z1zui@fvl= z>UB^m#a-FRg5Esaws#U3+Q|pSr1iRXYc(H>S<~z+>sTDzMFnK&Dhm;2S2Bf ze!;daKQl7U$UH?~SNQX5*D0p@-Itt?R%4O-hw6t!^&bvCE*Y@~EdCU2Q`jl;o$}h7 zAO4louN|PyWtx_W4CrO`-YtENZosjY+B+ze)e=Gdm?3$091UJJ4`qShQI_{KzEJxwkm- zUC)E@{Vy8*F?FA&eSD=hY9E%wZP8m^GizY!Uo;&V9kJ_gd2Rf7%kqVE+aD6vNd-q= zFB$#&Gxf`Rb+G})$3Ndi=|yUYfA()q3lO7=?w)$HxM5}Lz@^wIxZ8+Nzy_QBr5~iE zQv5MM3bQE*moIF5iR8`TDH5})HA<89cih_W4kd)=xu(al_4#OGR(jW(Weq|nLM55` zp%S*-VdgR-tKmq~cC{(OUtv(Q-rhc+Po_`wL0zo5>~7_&uJM*fxn1LT_R{%hylB{~pfOm9*T`nml@3rN)d3NReU#|hDkN*)a$p+B}b#So0E<$;}(rQ7b4 zXY#=}PX<<24~qn#fvhzCJ(o|M-p?&_?yl;vyOEt!oo(X^DR=Q{7jb7W%xL(1U(Bar zv|{5Re@=NpDxVAR{?g9S*EKh~k{sA^8ups}^NbGxj`lw)b?E?O;7PYYXe6>5v zT)rs?cuSd}3iv)IK6BVi$G9@btldG?sP`_W{R&xcRZm@GzJFMFoT;!gv^;nCbEs46 zG!KgH@_F=6+SsRd)uN}Bl>eMNCvw5bCUnhg7A-bWIX;@9mTlSS+ zqC~Q6((;zrg<~84)}fuzZ`jg5LgG)XhV=JX~U__tFz8Y40(vn*|y`Xw=W^cQG#?GysIeSSxZ8ydw zg^#t5>JqOz)GT*Bn(rka{h`yAZG#XEaz?$gOO0;M%&tB2XmTmPdkMvs#hqbheJeKY zk$&HgK{PASFX?c7W=-v=u7f&ZytEDSP*>)&M9m@v(~ky$sr#{E_GuZ0xsoe5^?$4v z3IRj70jR&%s{*;QS33TF4n>1%9yYuB!v8*$D=rajRHZ@g^>5~69glZiRCQliOCSRW zLkb5=-_<#^?XD3#ZIkwrq{Uqv=9>na)=u8VIQhmT4=ar|j#9N?nD{4=Y1}?VsiUf| z&7hNalh>eYqlIaqqklwWM)Nqz8qx0A3bNx9o1cFFoGQ@B+2L3XT%6(jspI}W6=ym%qYqj1-aeO{kr-ea9GCXeQpw*WFIgVd*ZZL)HMYfMZaSP~lpn85 zrR%O?4*mB`Di@3SMQ^XMHF1OW_=3%Jk;W<8PRcKH$=O^PEi@SSI}Q6&sy+-wT!HMr78!<lGx7%syEzMc)~s|AZ>;Thz3swWBfGstDu0PtRoC7?8jncXU|LuDXuNjT(tQ;O zxG#sy&Gp59Od;zcHkq|27(9+gjx|5>0zt`EDJLyj=rs~ zk3IL1U$CuySQeQ-DL6A~wev7QM4;p3z2|;xmz?43Gv}dVF~t{%385Dw)vJ^35aI3l zik9Nc{Z-#1c$TbUT%RH`Ky;0&uj1k#qnXH)gT2wMnj#mgFf#_dldSqIkqTx|mh%L+ z9GYb^wLn3>Vv5{3@D)qs9Eny@qpaodTF=i@kxCA%hH>}Aj<&9a7Q6u8_4WWSg$Qw^+*H+JXDlfR-0T_ zK~@zmmTatQv^C}}ji(liMvR-(4CMFCicE{4Z)Np5LnXEctDCn_8&&ob1;1v(Jo%ED z^_F-U$+j%ByaST@tHFQEmYW^FwGRpG%$KK{_vKG%P{s;l9>c23DMI3o?`@UQ}f zJYW5#1rJSfL)@%N2cNias0bzYu#0#9Jmp(@dq22h+O4W5ck3c6eM8T?SgJ_*p;VDG zo2cYu@#|rZZks32d2^%b4`Ew23szi<;tpWB&8JS%_&%XLV=0BH?u^5NR{VvqtO6%Vm^c#?(KRcYR>aG(J6y-BLGNWg}MCM_jbk0x%eMH>c zmNj(5itc;R;O5cuL^jUU>CmK~%)KU_pnP{@?ZgbN4Fi|GBDr?_+@9wA$YyvP^U}6) zEq|HV>%F~>5@xKQW0+0@`K%3yq{^Qz?*W#qDu;c!6scUsp;2lxPRWls=nH{ z7I>~0r1fd<#eKUa#cQ?u)7jiS<)ujXP_7@n*ddeMH@bx-Ph zPStOTI?o`m@T+ZNeJF3dkZFyy&eHYf|N3qC9Y-FN*W8Q=nzj@fT2K3kA~zN4pJhzF z?Wr}O(T`EN&}wScPs?g=lz92wE6iI_T8MQWU3KtdF5j>YRjn^CeDribRy}<5kF9{r z=bIiT{ZeAdfpznlsV}N6o-f(gZhCB{=e${V3o7s>aTsfykx#h0$5iCs+wu@&`RMP| zd0u~A{iyY)jLgBYe-1SIg55*I1t%OrM`qF@(Xa7B-`?<_d02abQ311=qG8R67>ZC3gMtY+rIYP<_1>&AuVAl=r1NoIcU;A-ErbqPxHSyfHJc z?10II1|-c#xpEQyN8z!lpuhO1Zoh~Y%R;d&Z5z`Fl>u%Y)x}%y#b&a#ji{SvcNTtI zejgI3X5SBgCFoq-qBN9dUa21KBgJ<%w1K%Bo)@cV_)? zu78g{sOL#dKm%5+DNXnWWlwwlzWpY!uO=B3W|F|CEv51%(<-XlchQdBcIR?VVINb^ zAi-WHmt}si(y`^>>C5lq|BW-C%%z4gcE>ZqSRbJk^M)J4Yj>M{^A4BM%TJE6R&@h! zv7D!`-}$?yrm76G^xxg4cRk2QpLx0!7F zc)~L=vjn0o*P9CpXlxrfT0>g|}`BFRml)r^~h~G;cr41yRmjo?Ah`+KjhYO-QNISxX9gqomHY z%w|(r0$vCexu>zrQaQS(IExmr9%hT$Ti6!Zt)2$c){QZtePH_^4vd{WPd*N6x_wu! zfgLW%&)^>jpI1`GkAng`NhIQ#pz@1|*XN_dKJycw3t1j&*^Bwn>1F;H%(EZ$x0;}4 zpUw0{#mNk+;*8WxVI5v&-1UZ?x6%`*#i>pd&W^J0`Inh5y`UK~y4=*P)K*BG`n9m( z+NTiP-mA|e**i9oA9I*4Gr5vh+oBWsr<_0a;;sJ61l>FnVL51uq}vnO@ARHlj5}LY z5njyL_z%DS_G?#{{XWBO8(4p+uiiwJ?)bU&-7X5e3>pybT~U`2k6kN>w_}&XJ~Tg7 zICiP5!Z;tlc{41}R~@MJ;a2Cxud3N-vbG{^3Q=}*sgH6^hz1UMWU^Xyby7E5papFH zOLr|(OENWbGq;)d?TUU`>|+w~SCGr{c_Sg&p2F8PNHMKPIf$cfx)PtTB{p8IM~~Bw z%IDM{OMQJjXDg2M>nL5d+Zr&~aZ~FuxF+oHePU9weWN*19dFmtLyF1Ps(A`Z)7bjy z`=)+KH9YXRZi~zEksC|gi+rL7xxA)lOrY^lz06k8+eg_(39k}=FYmQ!(5)Pt-|w?o zW>`)s)Uek3s~JwgJ+iUSvt{1*`I1^!5o@sMYVd zXohwKdx+M;NjB>wx31Kdd^GHS{M`{!)!B zkVf|K)VPP)k)__)8MXXy=kWla({_1{%%_+RD574`T;zOLgnFpUyyvFOl6!=9GT~l^ zX`v${;%mFV@$}|O%AwAqig8oYOXK7NLm$B-FU7z0-AMJdHaVI)=@f-m>eF0Qb$`GC z!7gVL0cP3Ji$v$LYK0aT}Ov@{ZVZ82KMKl(<>%<&EMJt@o-W3mSpWGxVSB+`#Ss(I1#PVcB!d% zUH$!6qoUXDUC3IX%h!|34-s)Em_}=v<#W#Jad14^wg1DYv2NB;_|d2WE6s{^Ps2mT z*un}(ibB~=(=Ozum%?cC@;yRUk`z%`?W^FD@sD2^5RF+rc4Zd|4gdUAYC23QNfEQV zH&=W7r62b8cVts%A?SsS_+x|x{{(RKfcfYD1>%EmHp~(h+aV=Pr;1r8$HE?8-J_f3 zuHUSXEM-B@L{mD6s<$!P zjhnrB;`+Cu2-Zi82o@4g!wjC`Ei6YPWxuJp50QVa-F1pS{g^L`aU!8Ebq^Vc--6(c z8ICYUbOKDyM*@L9gNv4L+iq7y+B!5Cy>7SB!K#iepayz(l<%xave4JQl+O4tlM`t2 z))c+dMU-}oQA~;l2wgb8}r^p8Ii7Fj94T$`9Gvh&WLn!=rH=q z$0!IPpxM>A`IgWGms%U`)@sE5DvHT)*Vh5&cdV*Z_EfpfK^3-!xXB1!2K~AjlaxHm z89vKeUtq^JS51NY2ESCN8OlUXH?p&Y$yW>7zx$alPV6viPP^_~Lb|P&0D2hgmp}T9K!vU{T#v6fw zZ4P>biA8?RNfeqa{a5v#zpmK+-i%xTzZh&7T02i)!~U`@SIogeSBodR+U=Q|oalYt z_S{@iYF6?)Y`NSR1zrWee*ztMdBx`BpZjGEh z`<<|wHb;+N6Khn*FOL2e%*uYLnV^oaG)WJAH1Jq}29(Z9_3c@)a-&0TNTlL;vwiCH z_OkAP*&#*a>hH#$BEgv%@eej!rl|o3Eg9Q$@3Oa>7A8x4xxLP^no9aES#D(qlexL+ zVmE^~b0-U;A*Ds)4B%ZsBu07ek=3kL+o{C!8Xj4`+jhFDbh3Puf}>__Fc9d-oPNhM zrES9T+UBJp6-U!IDPhJ6cQ2OL^CeOiH2r%pSNV;O=1eR?@J;O~Z#vAZLVE9+fuOiU z`;cN`jCNT}7qd`FeD(b%Cpw6GPKmRd{0N=qc$G?C2R(E&BNSg?oGg7{FTQGOYNW`= z>g4>x{ml|q=h;|*G1$}Zn|T@i`)J6g$pzI`VY}5rb|dWGome~1!f(UuqG!hyh7GN)WejFjQlCK?zJpIT WlSce9OI