Skip to content

Commit

Permalink
Add TmpStorageProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
rohangarg committed Jan 6, 2025
1 parent d5eb94d commit 1a65314
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.local;

import com.google.inject.Provider;
import org.apache.druid.java.util.common.FileUtils;

import java.io.File;

/**
* LocalTmpStorage is a provider for temporary directories. A default implementation is binded in all services except
* Peon. For peons, a custom implementation is binded in CliPeon which uses the working directory of the peon to create
* a temporary storage. This interface will be guice injectable in all services.
* The cleaning up of the temporary files/directories created in this storage is handled by the caller.
*/
public interface LocalTmpStorage
{
/**
* Get a temporary directory.
*
* @return a temporary directory
*/
File getTmpDir();

class DefaultLocalTmpStorageProvider implements Provider<LocalTmpStorage>
{
private final String prefix;

public DefaultLocalTmpStorageProvider(String prefix)
{
this.prefix = prefix;
}

@Override
public LocalTmpStorage get()
{
File tmpStorage = FileUtils.createTempDir(prefix);
return () -> tmpStorage;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.local;

import org.apache.druid.java.util.common.FileUtils;
import org.junit.Assert;
import org.junit.Test;

import java.nio.file.Paths;

public class LocalTmpStorageTest
{
@Test
public void testDefaultLocalTmpStorage()
{
LocalTmpStorage localTmpStorage = new LocalTmpStorage.DefaultLocalTmpStorageProvider("test").get();
String prefixPath =
Paths.get(FileUtils.getTempDir().toAbsolutePath().toString(), "test").toAbsolutePath().toString();
if (!localTmpStorage.getTmpDir().getAbsolutePath().startsWith(prefixPath)) {
Assert.fail("Temporary directory path does not start with the expected prefix");
}
}
}
5 changes: 5 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.apache.druid.timeline.PruneLoadSpec;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -182,6 +183,10 @@ protected List<? extends Module> getModules()

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorage.class)
.toProvider(new LocalTmpStorage.DefaultLocalTmpStorageProvider("broker"))
.in(LazySingleton.class);
},
new LookupModule(),
new SqlModule()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.eclipse.jetty.server.Server;
import org.joda.time.Duration;

Expand Down Expand Up @@ -293,6 +294,10 @@ public void configure(Binder binder)
binder.bind(CoordinatorCustomDutyGroups.class)
.toProvider(new CoordinatorCustomDutyGroupsProvider())
.in(LazySingleton.class);

binder.bind(LocalTmpStorage.class)
.toProvider(new LocalTmpStorage.DefaultLocalTmpStorageProvider("coordinator"))
.in(LazySingleton.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -140,6 +141,10 @@ protected List<? extends Module> getModules()

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorage.class)
.toProvider(new LocalTmpStorage.DefaultLocalTmpStorageProvider("historical"))
.in(LazySingleton.class);
},
new LookupModule()
);
Expand Down
4 changes: 4 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.eclipse.jetty.server.Server;

import java.util.List;
Expand Down Expand Up @@ -196,6 +197,9 @@ public void configure(Binder binder)

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
binder.bind(LocalTmpStorage.class)
.toProvider(new LocalTmpStorage.DefaultLocalTmpStorageProvider("indexer"))
.in(LazySingleton.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -184,6 +185,10 @@ public void configure(Binder binder)
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

configureIntermediaryData(binder);

binder.bind(LocalTmpStorage.class)
.toProvider(new LocalTmpStorage.DefaultLocalTmpStorageProvider("middle-manager"))
.in(LazySingleton.class);
}

private void configureIntermediaryData(Binder binder)
Expand Down
5 changes: 5 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.druid.server.security.AuthenticationUtils;
import org.apache.druid.server.security.Authenticator;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.tasklogs.TaskLogs;
import org.eclipse.jetty.rewrite.handler.RewriteHandler;
Expand Down Expand Up @@ -317,6 +318,10 @@ public void configure(Binder binder)

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorage.class)
.toProvider(new LocalTmpStorage.DefaultLocalTmpStorageProvider("overlord"))
.in(LazySingleton.class);
}

private void configureTaskStorage(Binder binder)
Expand Down
16 changes: 16 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.apache.druid.tasklogs.TaskPayloadManager;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -355,6 +356,21 @@ public BroadcastDatasourceLoadingSpec getBroadcastDatasourcesToLoad(final Task t
{
return task.getBroadcastDatasourceLoadingSpec();
}

@Provides
@LazySingleton
public LocalTmpStorage getLocalTmpStorage()
{
File tmpDir = new File(taskDirPath, "tmp");
try {
org.apache.druid.java.util.common.FileUtils.mkdirp(tmpDir);
}
catch (IOException e) {
log.error("Failed to create tmp directory for the task");
throw new RuntimeException(e);
}
return () -> tmpDir;
}
},
new QueryablePeonModule(),
new IndexingServiceInputSourceModule(),
Expand Down
5 changes: 5 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.server.router.TieredBrokerHostSelector;
import org.apache.druid.server.router.TieredBrokerSelectorStrategiesProvider;
import org.apache.druid.server.router.TieredBrokerSelectorStrategy;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.eclipse.jetty.server.Server;

import java.util.List;
Expand Down Expand Up @@ -126,6 +127,10 @@ protected List<? extends Module> getModules()

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorage.class)
.toProvider(new LocalTmpStorage.DefaultLocalTmpStorageProvider("router"))
.in(LazySingleton.class);
},
new LookupSerdeModule()
);
Expand Down
15 changes: 15 additions & 0 deletions services/src/test/java/org/apache/druid/cli/CliPeonTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.storage.local.LocalTmpStorage;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -143,6 +144,20 @@ public void testCliPeonHeartbeatDimensions() throws IOException
);
}

@Test
public void testCliPeonLocalTmpStorage() throws IOException
{
File file = temporaryFolder.newFile("task.json");
FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), "httpRemote");
final Injector injector = GuiceInjectors.makeStartupInjector();
injector.injectMembers(runnable);
Injector secondaryInjector = runnable.makeInjector();
Assert.assertNotNull(secondaryInjector);
LocalTmpStorage localTmpStorage = secondaryInjector.getInstance(LocalTmpStorage.class);
Assert.assertEquals(new File(file.getParent(), "/tmp").getAbsolutePath(), localTmpStorage.getTmpDir().getAbsolutePath());
}

private static class FakeCliPeon extends CliPeon
{
List<String> taskAndStatusFile = new ArrayList<>();
Expand Down

0 comments on commit 1a65314

Please sign in to comment.