Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

download/upload multiple files with thread pool #26

Merged
merged 1 commit into from
Feb 26, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.SSLSocketFactory;
Expand Down Expand Up @@ -59,6 +64,7 @@
import org.eclipse.aether.transfer.TransferResource;
import org.eclipse.aether.util.ChecksumUtils;
import org.eclipse.aether.util.ConfigUtils;
import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,6 +116,9 @@ class AetherRepositoryConnector implements RepositoryConnector {

private final AetherClient aetherClient;

private final int maxThreads;
private Executor executor;

class FileSource implements RetryableSource {

private long bytesTransferred = 0;
Expand Down Expand Up @@ -187,6 +196,7 @@ public AetherRepositoryConnector(RemoteRepository repository, RepositorySystemSe
throw new NoRepositoryConnectorException(repository, e);
}

this.maxThreads = ConfigUtils.getInteger(session, 5, "aether.connector.basic.threads", "maven.artifact.threads");
this.aetherClient = newAetherClient(repository, session, sslSocketFactory);
}

Expand Down Expand Up @@ -275,6 +285,23 @@ private static OkHttpAetherClient newAetherClient(RemoteRepository repository, R
return new OkHttpAetherClient(config);
}

private Executor getExecutor(Collection<?> artifacts, Collection<?> metadatas) {
if (maxThreads <= 1) {
return DirectExecutor.INSTANCE;
}
int tasks = safe(artifacts).size() + safe(metadatas).size();
if (tasks <= 1) {
return DirectExecutor.INSTANCE;
}
if (executor == null) {
executor = new ThreadPoolExecutor(maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new WorkerThreadFactory( getClass().getSimpleName() + '-' + repository.getHost() + '-' )
);
}
return executor;
}

/**
* Download artifacts and metadata.
*
Expand All @@ -288,19 +315,20 @@ public void get(Collection<? extends ArtifactDownload> artifactDownloads, Collec
CountDownLatch latch = new CountDownLatch(artifactDownloads.size() + metadataDownloads.size());

Collection<GetTask<?>> tasks = new ArrayList<GetTask<?>>();
Executor executor = getExecutor(artifactDownloads, metadataDownloads);

for (MetadataDownload download : metadataDownloads) {
String resource = layout.getLocation(download.getMetadata(), false).getPath();
GetTask<?> task = new GetTask<MetadataTransfer>(resource, download.getFile(), download.getChecksumPolicy(), latch, download, METADATA);
tasks.add(task);
task.run();
executor.execute(task);
}

for (ArtifactDownload download : artifactDownloads) {
String resource = layout.getLocation(download.getArtifact(), false).getPath();
GetTask<?> task = new GetTask<ArtifactTransfer>(resource, download.isExistenceCheck() ? null : download.getFile(), download.getChecksumPolicy(), latch, download, ARTIFACT);
tasks.add(task);
task.run();
executor.execute(task);
}

await(latch);
Expand All @@ -323,19 +351,20 @@ public void put(Collection<? extends ArtifactUpload> artifactUploads, Collection
CountDownLatch latch = new CountDownLatch(artifactUploads.size() + metadataUploads.size());

Collection<PutTask<?>> tasks = new ArrayList<PutTask<?>>();
Executor executor = getExecutor(artifactUploads, metadataUploads);

for (ArtifactUpload upload : artifactUploads) {
String path = layout.getLocation(upload.getArtifact(), true).getPath();
PutTask<?> task = new PutTask<ArtifactTransfer>(path, upload.getFile(), latch, upload, ARTIFACT);
tasks.add(task);
task.run();
executor.execute(task);
}

for (MetadataUpload upload : metadataUploads) {
String path = layout.getLocation(upload.getMetadata(), true).getPath();
PutTask<?> task = new PutTask<MetadataTransfer>(path, upload.getFile(), latch, upload, METADATA);
tasks.add(task);
task.run();
executor.execute(task);
}

await(latch);
Expand Down Expand Up @@ -772,6 +801,9 @@ static interface ExceptionWrapper<T> {

public void close() {
// this client implementation is thread-safe
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
}

private <T> Collection<T> safe(Collection<T> items) {
Expand Down Expand Up @@ -874,4 +906,13 @@ public void countDown() {
}
}
}

private static class DirectExecutor implements Executor {
static final Executor INSTANCE = new DirectExecutor();

public void execute( Runnable command ) {
command.run();
}
}

}