Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Take the node id into account when creating geoip tmp dir. #70489

Merged
merged 1 commit into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Set;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -40,6 +41,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -143,7 +145,6 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/69972")
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
// setup:
Expand Down Expand Up @@ -228,12 +229,24 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
final DiscoveryNodes nodes = clusterService().state().nodes();
final java.util.Set<String> ids = StreamSupport.stream(nodes.getDataNodes().values().spliterator(), false)
.map(c -> c.value.getId())
.collect(Collectors.toSet());
final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getDataNodeInstances(Environment.class).spliterator(), false)
.map(env -> {
Path geoipTmpDir = env.tmpFile().resolve("geoip-databases");
assertThat(Files.exists(geoipTmpDir), is(true));
return geoipTmpDir;
}).collect(Collectors.toList());
}).flatMap(path -> {
try {
return Files.list(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).filter(path -> ids.contains(path.getFileName().toString()))
.collect(Collectors.toList());
assertThat(geoipTmpDirs.size(), equalTo(internalCluster().numDataNodes()));
assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private static DatabaseRegistry createRegistry(Path geoIpModulesDir, Path geoIpC
LocalDatabases localDatabases = new LocalDatabases(geoIpModulesDir, geoIpConfigDir, cache);
DatabaseRegistry databaseRegistry =
new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run);
databaseRegistry.initialize(mock(ResourceWatcherService.class), mock(IngestService.class));
databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ final class DatabaseRegistry implements Closeable {

private final Client client;
private final GeoIpCache cache;
private final Path geoipTmpDirectory;
private final Path geoipTmpBaseDirectory;
private Path geoipTmpDirectory;
private final LocalDatabases localDatabases;
private final Consumer<Runnable> genericExecutor;

Expand All @@ -100,13 +101,14 @@ final class DatabaseRegistry implements Closeable {
Consumer<Runnable> genericExecutor) {
this.client = client;
this.cache = cache;
this.geoipTmpDirectory = tmpDir.resolve("geoip-databases");
this.geoipTmpBaseDirectory = tmpDir.resolve("geoip-databases");
this.localDatabases = localDatabases;
this.genericExecutor = genericExecutor;
}

public void initialize(ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
localDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
Expand Down Expand Up @@ -138,7 +140,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
}
});
if (Files.exists(geoipTmpDirectory) == false) {
Files.createDirectory(geoipTmpDirectory);
Files.createDirectories(geoipTmpDirectory);
}
LOGGER.info("initialized database registry, using geoip-databases directory [{}]", geoipTmpDirectory);
ingestService.addIngestClusterStateListener(this::checkDatabases);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public Collection<Object> createComponents(Client client,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
try {
databaseRegistry.get().initialize(resourceWatcherService, ingestService.get());
String nodeId = nodeEnvironment.nodeId();
databaseRegistry.get().initialize(nodeId, resourceWatcherService, ingestService.get());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setup() throws IOException {
LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache);
geoIpTmpDir = createTempDir();
databaseRegistry = new DatabaseRegistry(geoIpTmpDir, client, cache, localDatabases, Runnable::run);
databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class));
databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
}

@After
Expand Down Expand Up @@ -142,7 +142,7 @@ public void testCheckDatabases() throws Exception {
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), notNullValue());
verify(client, times(10)).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), hasSize(1));
}
}
Expand All @@ -166,7 +166,7 @@ public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Excepti
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand All @@ -188,7 +188,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Ex
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand All @@ -209,7 +209,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws E
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void testLoadingCustomDatabase() throws IOException {
Client client = mock(Client.class);
GeoIpCache cache = new GeoIpCache(1000);
DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run);
databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class));
databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
assertNull(lazyLoader.databaseReader.get());
Expand Down