diff --git a/src/main/java/io/takari/aether/connector/AetherRepositoryConnector.java b/src/main/java/io/takari/aether/connector/AetherRepositoryConnector.java index 92940ec..b8d983c 100644 --- a/src/main/java/io/takari/aether/connector/AetherRepositoryConnector.java +++ b/src/main/java/io/takari/aether/connector/AetherRepositoryConnector.java @@ -23,6 +23,11 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLSocketFactory; @@ -59,6 +64,7 @@ import org.eclipse.aether.transfer.TransferResource; import org.eclipse.aether.util.ChecksumUtils; import org.eclipse.aether.util.ConfigUtils; +import org.eclipse.aether.util.concurrency.WorkerThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,6 +116,9 @@ class AetherRepositoryConnector implements RepositoryConnector { private final AetherClient aetherClient; + private final int maxThreads; + private Executor executor; + class FileSource implements RetryableSource { private long bytesTransferred = 0; @@ -187,6 +196,7 @@ public AetherRepositoryConnector(RemoteRepository repository, RepositorySystemSe throw new NoRepositoryConnectorException(repository, e); } + this.maxThreads = ConfigUtils.getInteger(session, 5, "aether.connector.basic.threads", "maven.artifact.threads"); this.aetherClient = newAetherClient(repository, session, sslSocketFactory); } @@ -275,6 +285,23 @@ private static OkHttpAetherClient newAetherClient(RemoteRepository repository, R return new OkHttpAetherClient(config); } + private Executor getExecutor(Collection artifacts, Collection metadatas) { + if (maxThreads <= 1) { + return DirectExecutor.INSTANCE; + } + int tasks = safe(artifacts).size() + safe(metadatas).size(); + if (tasks <= 1) { + return DirectExecutor.INSTANCE; + } + if (executor == null) { + executor = new ThreadPoolExecutor(maxThreads, maxThreads, 3L, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new WorkerThreadFactory( getClass().getSimpleName() + '-' + repository.getHost() + '-' ) + ); + } + return executor; + } + /** * Download artifacts and metadata. * @@ -288,19 +315,20 @@ public void get(Collection artifactDownloads, Collec CountDownLatch latch = new CountDownLatch(artifactDownloads.size() + metadataDownloads.size()); Collection> tasks = new ArrayList>(); + Executor executor = getExecutor(artifactDownloads, metadataDownloads); for (MetadataDownload download : metadataDownloads) { String resource = layout.getLocation(download.getMetadata(), false).getPath(); GetTask task = new GetTask(resource, download.getFile(), download.getChecksumPolicy(), latch, download, METADATA); tasks.add(task); - task.run(); + executor.execute(task); } for (ArtifactDownload download : artifactDownloads) { String resource = layout.getLocation(download.getArtifact(), false).getPath(); GetTask task = new GetTask(resource, download.isExistenceCheck() ? null : download.getFile(), download.getChecksumPolicy(), latch, download, ARTIFACT); tasks.add(task); - task.run(); + executor.execute(task); } await(latch); @@ -323,19 +351,20 @@ public void put(Collection artifactUploads, Collection CountDownLatch latch = new CountDownLatch(artifactUploads.size() + metadataUploads.size()); Collection> tasks = new ArrayList>(); + Executor executor = getExecutor(artifactUploads, metadataUploads); for (ArtifactUpload upload : artifactUploads) { String path = layout.getLocation(upload.getArtifact(), true).getPath(); PutTask task = new PutTask(path, upload.getFile(), latch, upload, ARTIFACT); tasks.add(task); - task.run(); + executor.execute(task); } for (MetadataUpload upload : metadataUploads) { String path = layout.getLocation(upload.getMetadata(), true).getPath(); PutTask task = new PutTask(path, upload.getFile(), latch, upload, METADATA); tasks.add(task); - task.run(); + executor.execute(task); } await(latch); @@ -772,6 +801,9 @@ static interface ExceptionWrapper { public void close() { // this client implementation is thread-safe + if (executor instanceof ExecutorService) { + ((ExecutorService) executor).shutdown(); + } } private Collection safe(Collection items) { @@ -874,4 +906,13 @@ public void countDown() { } } } + + private static class DirectExecutor implements Executor { + static final Executor INSTANCE = new DirectExecutor(); + + public void execute( Runnable command ) { + command.run(); + } + } + }