Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
change impl of get/put to benefit from multi-thread
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Zhang <[email protected]>
  • Loading branch information
Eskibear authored and fbricon committed Feb 26, 2020
1 parent 2040c61 commit 32ce3f8
Showing 1 changed file with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.SSLSocketFactory;
Expand Down Expand Up @@ -59,6 +64,7 @@
import org.eclipse.aether.transfer.TransferResource;
import org.eclipse.aether.util.ChecksumUtils;
import org.eclipse.aether.util.ConfigUtils;
import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,6 +116,9 @@ class AetherRepositoryConnector implements RepositoryConnector {

private final AetherClient aetherClient;

private final int maxThreads;
private Executor executor;

class FileSource implements RetryableSource {

private long bytesTransferred = 0;
Expand Down Expand Up @@ -187,6 +196,7 @@ public AetherRepositoryConnector(RemoteRepository repository, RepositorySystemSe
throw new NoRepositoryConnectorException(repository, e);
}

this.maxThreads = ConfigUtils.getInteger(session, 5, "aether.connector.basic.threads", "maven.artifact.threads");
this.aetherClient = newAetherClient(repository, session, sslSocketFactory);
}

Expand Down Expand Up @@ -275,6 +285,23 @@ private static OkHttpAetherClient newAetherClient(RemoteRepository repository, R
return new OkHttpAetherClient(config);
}

private Executor getExecutor(Collection<?> artifacts, Collection<?> metadatas) {
if (maxThreads <= 1) {
return DirectExecutor.INSTANCE;
}
int tasks = safe(artifacts).size() + safe(metadatas).size();
if (tasks <= 1) {
return DirectExecutor.INSTANCE;
}
if (executor == null) {
executor = new ThreadPoolExecutor(maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new WorkerThreadFactory( getClass().getSimpleName() + '-' + repository.getHost() + '-' )
);
}
return executor;
}

/**
* Download artifacts and metadata.
*
Expand All @@ -288,19 +315,20 @@ public void get(Collection<? extends ArtifactDownload> artifactDownloads, Collec
CountDownLatch latch = new CountDownLatch(artifactDownloads.size() + metadataDownloads.size());

Collection<GetTask<?>> tasks = new ArrayList<GetTask<?>>();
Executor executor = getExecutor(artifactDownloads, metadataDownloads);

for (MetadataDownload download : metadataDownloads) {
String resource = layout.getLocation(download.getMetadata(), false).getPath();
GetTask<?> task = new GetTask<MetadataTransfer>(resource, download.getFile(), download.getChecksumPolicy(), latch, download, METADATA);
tasks.add(task);
task.run();
executor.execute(task);
}

for (ArtifactDownload download : artifactDownloads) {
String resource = layout.getLocation(download.getArtifact(), false).getPath();
GetTask<?> task = new GetTask<ArtifactTransfer>(resource, download.isExistenceCheck() ? null : download.getFile(), download.getChecksumPolicy(), latch, download, ARTIFACT);
tasks.add(task);
task.run();
executor.execute(task);
}

await(latch);
Expand All @@ -323,19 +351,20 @@ public void put(Collection<? extends ArtifactUpload> artifactUploads, Collection
CountDownLatch latch = new CountDownLatch(artifactUploads.size() + metadataUploads.size());

Collection<PutTask<?>> tasks = new ArrayList<PutTask<?>>();
Executor executor = getExecutor(artifactUploads, metadataUploads);

for (ArtifactUpload upload : artifactUploads) {
String path = layout.getLocation(upload.getArtifact(), true).getPath();
PutTask<?> task = new PutTask<ArtifactTransfer>(path, upload.getFile(), latch, upload, ARTIFACT);
tasks.add(task);
task.run();
executor.execute(task);
}

for (MetadataUpload upload : metadataUploads) {
String path = layout.getLocation(upload.getMetadata(), true).getPath();
PutTask<?> task = new PutTask<MetadataTransfer>(path, upload.getFile(), latch, upload, METADATA);
tasks.add(task);
task.run();
executor.execute(task);
}

await(latch);
Expand Down Expand Up @@ -772,6 +801,9 @@ static interface ExceptionWrapper<T> {

public void close() {
// this client implementation is thread-safe
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
}

private <T> Collection<T> safe(Collection<T> items) {
Expand Down Expand Up @@ -874,4 +906,13 @@ public void countDown() {
}
}
}

private static class DirectExecutor implements Executor {
static final Executor INSTANCE = new DirectExecutor();

public void execute( Runnable command ) {
command.run();
}
}

}

0 comments on commit 32ce3f8

Please sign in to comment.