diff --git a/CHANGELOG.md b/CHANGELOG.md index 32287cd..1e134d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ _2016-09-30_ * Erased the response headers in the ResponseOnSingleTask if `task.getConfig().isSaveResponseToTask()` is false (default). * Minor refactored multiple places. * Fixed javadoc. +* Fixed the test cases with HttpWorker constructor. ```java diff --git a/src/main/java/io/parallec/core/ParallelTask.java b/src/main/java/io/parallec/core/ParallelTask.java index e5bf474..53a08b0 100644 --- a/src/main/java/io/parallec/core/ParallelTask.java +++ b/src/main/java/io/parallec/core/ParallelTask.java @@ -54,21 +54,28 @@ import com.ning.http.client.AsyncHttpClient; - /** - * The key class represents a onetime execution on multiple requests. It contains all the task - * and request metadata, target hosts, configs, and the responses. + * The key class represents a onetime execution on multiple requests. It + * contains all the task and request metadata, target hosts, configs, and the + * responses. * - * A ParallelTask is the returned object from the {@link ParallelTaskBuilder#execute} + * A ParallelTask is the returned object from the + * {@link ParallelTaskBuilder#execute} * * + *
  • The metadata on this whole task, including config, running state, + * progress, request count, task id. 
  • + *
  • The results of the task: parallelTaskResult,  which is a + * hashmap of each target host map with its response. There is also a received + * count
  • + *
  • Detailed request metadata on HTTP/SSH/PING/TCP.  The async http + * client used for this task ( you may replace it with your own )
  • + *
  • Target host list
  • + *
  • The user defined response handler
  • + *
  • A actorRef pointer to the command manager so that you may use it to + * cancel the whole task or those requests that match a sublist of target + * host lists.
  • + * * * * @author Yuanteng (Jeff) Pei @@ -121,10 +128,11 @@ public class ParallelTask { /** The aggregate result map. */ private final Map> aggregateResultMap = new ConcurrentHashMap>(); - /** The parallel task result: - * a hashmap to store the request parameters, host name, ResponseOnSingleTask. - * Note that by default, the response content is not saved into the ResponseOnSingleTask. - * Unless the user changes the config by calling {@link ParallelTaskBuilder#setSaveResponseToTask} + /** + * The parallel task result: a hashmap to store the request parameters, host + * name, ResponseOnSingleTask. Note that by default, the response content is + * not saved into the ResponseOnSingleTask. Unless the user changes the + * config by calling {@link ParallelTaskBuilder#setSaveResponseToTask} * * */ private Map parallelTaskResult = new ConcurrentHashMap(); @@ -143,10 +151,10 @@ public class ParallelTask { /** The UDP meta. */ private UdpMeta udpMeta; - + /** The ping meta. */ private PingMeta pingMeta; - + /** * The command manager. if private: getter/setter: openpojo unit test will * fail. @@ -178,43 +186,58 @@ public ParallelTask() { this.responsedNum = 0; this.requestNum = 0; this.state = ParallelTaskState.WAITING; - // use default config this.config = new ParallelTaskConfig(); } - /** * Instantiates a new parallel task. * - * @param requestProtocol the request protocol - * @param concurrency the concurrency - * @param httpMeta the http meta - * @param targetHostMeta the target host meta - * @param sshMeta the ssh meta - * @param tcpMeta the tcp meta - * @param udpMeta the udp meta - * @param pingMeta the ping meta - * @param handler the handler - * @param responseContext the response context - * @param replacementVarMapNodeSpecific the replacement var map node specific - * @param replacementVarMap the replacement var map - * @param requestReplacementType the request replacement type - * @param config the config - */ - public ParallelTask(RequestProtocol requestProtocol, int concurrency, HttpMeta httpMeta, TargetHostMeta targetHostMeta, - SshMeta sshMeta, TcpMeta tcpMeta, UdpMeta udpMeta, PingMeta pingMeta, ParallecResponseHandler handler, + * @param requestProtocol + * the request protocol + * @param concurrency + * the concurrency + * @param httpMeta + * the http meta + * @param targetHostMeta + * the target host meta + * @param sshMeta + * the ssh meta + * @param tcpMeta + * the tcp meta + * @param udpMeta + * the udp meta + * @param pingMeta + * the ping meta + * @param handler + * the handler + * @param responseContext + * the response context + * @param replacementVarMapNodeSpecific + * the replacement var map node specific + * @param replacementVarMap + * the replacement var map + * @param requestReplacementType + * the request replacement type + * @param config + * the config + */ + public ParallelTask(RequestProtocol requestProtocol, int concurrency, + HttpMeta httpMeta, TargetHostMeta targetHostMeta, SshMeta sshMeta, + TcpMeta tcpMeta, UdpMeta udpMeta, PingMeta pingMeta, + ParallecResponseHandler handler, Map responseContext, Map replacementVarMapNodeSpecific, Map replacementVarMap, - RequestReplacementType requestReplacementType, ParallelTaskConfig config + RequestReplacementType requestReplacementType, + ParallelTaskConfig config ) { this.requestProtocol = requestProtocol; this.concurrency = concurrency; - this.targetHostMeta =targetHostMeta; + this.targetHostMeta = targetHostMeta; // set taskid / requestNum must be after set target hosts meta; // as it is using the target hosts count this.taskId = this.generateTaskId(); @@ -245,7 +268,8 @@ public ParallelTask(RequestProtocol requestProtocol, int concurrency, HttpMeta h /** * Cancel on target hosts. * - * @param targetHosts the target hosts + * @param targetHosts + * the target hosts * @return true, if successful */ @SuppressWarnings("deprecation") @@ -258,10 +282,10 @@ public boolean cancelOnTargetHosts(List targetHosts) { switch (state) { case IN_PROGRESS: - if (executionManager != null && !executionManager.isTerminated()) { - executionManager.tell( - new CancelTaskOnHostRequest(targetHosts), - executionManager); + if (executionManager != null + && !executionManager.isTerminated()) { + executionManager.tell(new CancelTaskOnHostRequest( + targetHosts), executionManager); logger.info( "asked task to stop from running on target hosts with count {}...", targetHosts.size()); @@ -293,7 +317,8 @@ public boolean cancelOnTargetHosts(List targetHosts) { /** * Cancel. * - * @param sync the sync + * @param sync + * the sync * @return true, if successful */ @SuppressWarnings("deprecation") @@ -309,7 +334,8 @@ public boolean cancel(boolean sync) { success = true; break; case IN_PROGRESS: - if (executionManager != null && !executionManager.isTerminated()) { + if (executionManager != null + && !executionManager.isTerminated()) { executionManager.tell(ExecutionManagerMsgType.CANCEL, executionManager); @@ -323,9 +349,7 @@ public boolean cancel(boolean sync) { try { Thread.sleep(100L); } catch (InterruptedException e) { - logger.error( - "running task {} was interrupted {}", - this.taskId, e); + logger.error(" task {} interrupted ", this.taskId); } } logger.info("Task completed! Cancellation is completed."); @@ -408,13 +432,12 @@ public boolean validateWithFillDefault() } // check if ssh - if (this.requestProtocol== - RequestProtocol.SSH) { + if (this.requestProtocol == RequestProtocol.SSH) { // this will throw ParallelTaskInvalidException this.sshMeta.validation(); - - if (this.getConcurrency() > ParallecGlobalConfig.concurrencySshLimit){ + + if (this.getConcurrency() > ParallecGlobalConfig.concurrencySshLimit) { logger.info("SSH CONCURRENCY LIMIT is lower. Apply value as: " + ParallecGlobalConfig.concurrencySshLimit); this.setConcurrency(ParallecGlobalConfig.concurrencySshLimit); @@ -430,46 +453,42 @@ public boolean validateWithFillDefault() this.pingMeta = null; // remove udp object this.udpMeta = null; - }else if (this.requestProtocol== - RequestProtocol.PING) { + } else if (this.requestProtocol == RequestProtocol.PING) { if (this.httpMeta.isPollable()) throw new ParallelTaskInvalidException( "Not support pollable job with PING."); this.httpMeta.initValuesNa(); - + this.pingMeta.validation(); // remove ssh object this.sshMeta = null; // remove tcp object this.tcpMeta = null; // remove udp object - this.udpMeta = null; + this.udpMeta = null; - - //TCP - }else if (this.requestProtocol== - RequestProtocol.TCP) { + // TCP + } else if (this.requestProtocol == RequestProtocol.TCP) { if (this.httpMeta.isPollable()) throw new ParallelTaskInvalidException( "Not support pollable job with TCP."); this.httpMeta.initValuesNa(); - + this.tcpMeta.validation(); // remove ssh object this.sshMeta = null; // remove ping object - this.pingMeta = null; + this.pingMeta = null; // remove udp object - this.udpMeta = null; - //UDP - }else if (this.requestProtocol== - RequestProtocol.UDP) { + this.udpMeta = null; + // UDP + } else if (this.requestProtocol == RequestProtocol.UDP) { if (this.httpMeta.isPollable()) throw new ParallelTaskInvalidException( "Not support pollable job with UDP."); this.httpMeta.initValuesNa(); - + this.udpMeta.validation(); // remove tcp object this.tcpMeta = null; @@ -477,9 +496,9 @@ public boolean validateWithFillDefault() this.sshMeta = null; // remove ping object this.pingMeta = null; - //HTTP/HTTPS + // HTTP/HTTPS } else { - + this.httpMeta.validation(); // remove ssh object this.sshMeta = null; @@ -516,12 +535,12 @@ public Double getProgress() { if (state.equals(ParallelTaskState.IN_PROGRESS)) { if (requestNum != 0) { - return 100.0 * ((double) responsedNum / (double)requestNumActual); + return 100.0 * ((double) responsedNum / (double) requestNumActual); } else { return 0.0; } } - + if (state.equals(ParallelTaskState.WAITING)) { return 0.0; } @@ -532,7 +551,6 @@ public Double getProgress() { return 100.0; } - return 0.0; } @@ -634,10 +652,9 @@ public List getTaskErrorMetas() { return taskErrorMetas; } - - - - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see java.lang.Object#toString() */ @Override @@ -662,7 +679,6 @@ public String toString() { + requestProtocol + ", concurrency=" + concurrency + "]"; } - /** * Gets the ssh meta. * @@ -717,13 +733,14 @@ public void setTargetHostMeta(TargetHostMeta targetHostMeta) { */ public HttpMeta getHttpMeta() { return httpMeta; - + } /** * Sets the command meta. * - * @param httpMeta the new http meta + * @param httpMeta + * the new http meta */ public void setHttpMeta(HttpMeta httpMeta) { this.httpMeta = httpMeta; @@ -856,7 +873,7 @@ public Map getAggregateResultFullSummary() { return summaryMap; } - + /** * Gets the aggregated result human str. * @@ -865,8 +882,6 @@ public Map getAggregateResultFullSummary() { public String getAggregatedResultHumanStr() { return PcStringUtils.getAggregatedResultHuman(aggregateResultMap); } - - /** * Gets the aggregate result count summary. only list the counts for brief @@ -926,12 +941,13 @@ public TcpMeta getTcpMeta() { /** * Sets the tcp meta. * - * @param tcpMeta the new tcp meta + * @param tcpMeta + * the new tcp meta */ public void setTcpMeta(TcpMeta tcpMeta) { this.tcpMeta = tcpMeta; } - + /** * Gets the request protocol. * @@ -969,7 +985,7 @@ public int getConcurrency() { public void setConcurrency(int concurrency) { this.concurrency = concurrency; } - + /** * Gets the submit time. * @@ -1115,18 +1131,17 @@ public PingMeta getPingMeta() { /** * Sets the ping meta. * - * @param pingMeta the new ping meta + * @param pingMeta + * the new ping meta */ public void setPingMeta(PingMeta pingMeta) { this.pingMeta = pingMeta; } - public UdpMeta getUdpMeta() { return udpMeta; } - public void setUdpMeta(UdpMeta udpMeta) { this.udpMeta = udpMeta; } diff --git a/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java b/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java index 34a03bd..03b5bf2 100644 --- a/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java +++ b/src/main/java/io/parallec/core/resources/AsyncHttpClientFactoryEmbed.java @@ -156,7 +156,7 @@ public void state() { /** * class CustomTrustManager. */ - private static class CustomTrustManager implements X509TrustManager { + public static class CustomTrustManager implements X509TrustManager { /** * Gets the accepted issuers. diff --git a/src/test/java/io/parallec/core/actor/HttpWorkerTest.java b/src/test/java/io/parallec/core/actor/HttpWorkerTest.java index e5c5680..3e4863d 100644 --- a/src/test/java/io/parallec/core/actor/HttpWorkerTest.java +++ b/src/test/java/io/parallec/core/actor/HttpWorkerTest.java @@ -41,7 +41,7 @@ public static void shutdown() throws Exception { * fake a bad request */ @Test - public void testSshWorkerCreateRequestException() { + public void testHttpWorkerCreateRequestException() { ActorRef asyncWorker = null; try { // Start new job @@ -52,7 +52,7 @@ public void testSshWorkerCreateRequestException() { Props.create(HttpWorker.class, actorMaxOperationTimeoutSec, HttpClientStore.getInstance() .getCurrentDefaultClient(), urlComplete, - HttpMethod.GET, "", null)); + HttpMethod.GET, "", null,null)); ; final FiniteDuration duration = Duration.create(20, @@ -83,7 +83,7 @@ public void testHttpWorkerNormalCheckComplete() { Props.create(HttpWorker.class, actorMaxOperationTimeoutSec, HttpClientStore.getInstance() .getCurrentDefaultClient(), urlComplete, - HttpMethod.GET, "", null)); + HttpMethod.GET, "", null,null)); final FiniteDuration duration = Duration.create(20, TimeUnit.SECONDS); @@ -115,7 +115,7 @@ public void testHttpWorkerDupAndCancel() { Props.create(HttpWorker.class, actorMaxOperationTimeoutSec, HttpClientStore.getInstance() .getCurrentDefaultClient(), urlComplete, - HttpMethod.GET, "", null)); + HttpMethod.GET, "", null,null)); final FiniteDuration duration = Duration.create(20, TimeUnit.SECONDS); @@ -155,7 +155,7 @@ public void testHttpWorkerException() { Props.create(HttpWorker.class, actorMaxOperationTimeoutSec, HttpClientStore.getInstance() .getCurrentDefaultClient(), urlComplete, - HttpMethod.GET, "", null)); + HttpMethod.GET, "", null,null)); ; final FiniteDuration duration = Duration.create(20, @@ -188,7 +188,7 @@ public void testHttpWorkerTimeoutException() { Props.create(HttpWorker.class, actorMaxOperationTimeoutSec, HttpClientStore.getInstance() .getCurrentDefaultClient(), urlComplete, - HttpMethod.GET, "", null)); + HttpMethod.GET, "", null,null)); final FiniteDuration duration = Duration.create(20, TimeUnit.SECONDS); @@ -216,7 +216,7 @@ public void testHttpWorkerBadMsgType() { Props.create(HttpWorker.class, actorMaxOperationTimeoutSec, HttpClientStore.getInstance() .getCurrentDefaultClient(), urlComplete, - HttpMethod.GET, "", null)); + HttpMethod.GET, "", null,null)); final FiniteDuration duration = Duration.create(20, TimeUnit.SECONDS); diff --git a/src/test/java/io/parallec/core/main/http/ParallelClientBeanTest.java b/src/test/java/io/parallec/core/main/http/ParallelClientBeanTest.java index f6eb0f4..ed8dbee 100644 --- a/src/test/java/io/parallec/core/main/http/ParallelClientBeanTest.java +++ b/src/test/java/io/parallec/core/main/http/ParallelClientBeanTest.java @@ -30,6 +30,7 @@ public void parallecClient() { pc.setCustomClientFast(null); pc.setCustomClientSlow(null); pc.setHttpClientTypeCurrentDefault(HttpClientType.EMBED_FAST); + pc.getRunningJobCount(); } diff --git a/src/test/java/io/parallec/core/pojo/ParallecPojoStrTest.java b/src/test/java/io/parallec/core/pojo/ParallecPojoStrTest.java index cba60ad..2e38700 100644 --- a/src/test/java/io/parallec/core/pojo/ParallecPojoStrTest.java +++ b/src/test/java/io/parallec/core/pojo/ParallecPojoStrTest.java @@ -1,6 +1,7 @@ package io.parallec.core.pojo; import io.parallec.core.ParallelTaskBuilder; +import io.parallec.core.ResponseOnSingleTask; import io.parallec.core.TestBase; import io.parallec.core.actor.message.NodeReqResponse; import io.parallec.core.actor.poll.PollerData; @@ -12,6 +13,8 @@ import io.parallec.core.bean.udp.UdpMeta; import io.parallec.core.exception.HttpRequestCreateException; import io.parallec.core.exception.ParallelTaskInvalidException; +import io.parallec.core.monitor.MonitorProvider; +import io.parallec.core.resources.AsyncHttpClientFactoryEmbed.CustomTrustManager; import io.parallec.core.resources.HttpMethod; import io.parallec.core.resources.TcpUdpSshPingResourceStore; import io.parallec.core.task.TaskErrorMeta; @@ -47,75 +50,74 @@ public void testMetaValidationToString() { HttpMeta httpMeta = new HttpMeta(); logger.info(httpMeta.toString()); - + HttpMeta httpMeta2 = new HttpMeta(HttpMethod.GET, "", "", "", null); httpMeta2.toString(); - + TcpMeta tcpMeta = new TcpMeta("", 80, 1000, 5, null); tcpMeta.toString(); - - //empty and all pass validation: test validation + + // empty and all pass validation: test validation TcpMeta tcpMeta2 = new TcpMeta(); TcpMeta tcpMeta3 = new TcpMeta("", 80, 1000, 5, TcpUdpSshPingResourceStore.getInstance().getChannelFactory()); - - try{ - + + try { + tcpMeta2.validation(); - }catch(ParallelTaskInvalidException e){ + } catch (ParallelTaskInvalidException e) { logger.info("expected exception {}", e.getLocalizedMessage()); } tcpMeta2.setCommand(""); - try{ - + try { + tcpMeta2.validation(); - }catch(ParallelTaskInvalidException e){ + } catch (ParallelTaskInvalidException e) { logger.info("expected exception {}", e.getLocalizedMessage()); } - + tcpMeta3.validation(); - - SetAndCount sc = new SetAndCount( new HashSet()); + + SetAndCount sc = new SetAndCount(new HashSet()); sc.toString(); - - //udp meta validation + // udp meta validation UdpMeta udpMeta = new UdpMeta("", 80, 5, null); udpMeta.toString(); - - //empty and all pass validation: test validation + + // empty and all pass validation: test validation UdpMeta udpMeta2 = new UdpMeta(); - UdpMeta udpMeta3 = new UdpMeta("", 80, 5, - TcpUdpSshPingResourceStore.getInstance().getDatagramChannelFactory()); + UdpMeta udpMeta3 = new UdpMeta("", 80, 5, TcpUdpSshPingResourceStore + .getInstance().getDatagramChannelFactory()); + + // null command + try { - //null command - try{ - udpMeta2.validation(); - }catch(ParallelTaskInvalidException e){ + } catch (ParallelTaskInvalidException e) { logger.info("expected exception {}", e.getLocalizedMessage()); } - //now null port + // now null port udpMeta2.setCommand(""); - try{ - + try { + udpMeta2.validation(); - }catch(ParallelTaskInvalidException e){ + } catch (ParallelTaskInvalidException e) { logger.info("expected exception {}", e.getLocalizedMessage()); } - //now with null idle + // now with null idle udpMeta2.setUdpPort(40); - try{ + try { udpMeta2.validation(); - }catch(ParallelTaskInvalidException e){ + } catch (ParallelTaskInvalidException e) { logger.info("expected exception {}", e.getLocalizedMessage()); } - + udpMeta3.validation(); } @Test - public void testStaticFuncAndUtilsClass() { + public void testStaticFuncUtilsClassAndMisc() { new ParallecAppMin(); ParallecAppMin.main(null); @@ -130,21 +132,30 @@ public void testStaticFuncAndUtilsClass() { new PcFileNetworkIoUtils(); new PcNumberUtils(); new TaskErrorMeta(TaskErrorType.COMMAND_MANAGER_ERROR, "", null); - StrStrMap ssm = new StrStrMap(new HashMap()); + StrStrMap ssm = new StrStrMap(new HashMap()); ssm.addPair(null, ""); ssm.addPair("", null); ssm.addPair("k", "v"); + + // misc new HttpRequestCreateException("", new RuntimeException()); new ParallelTaskInvalidException("", new RuntimeException()); - + boolean removeDuplicate = false; - PcTargetHostsUtils - .getNodeListFromStringLineSeperateOrSpaceSeperate( - "a b", removeDuplicate); - + PcTargetHostsUtils.getNodeListFromStringLineSeperateOrSpaceSeperate( + "a b", removeDuplicate); + PcStringUtils.printStackTrace(null); - - + ResponseOnSingleTask task = new ResponseOnSingleTask(); + task.getHost(); + + logger.info("thread count {}", MonitorProvider.getInstance() + .getLiveThreadCount()); + + CustomTrustManager manager = new CustomTrustManager(); + manager.checkClientTrusted(null, null); + manager.checkServerTrusted(null, null); + manager.getAcceptedIssuers(); } }