diff --git a/component/pom.xml b/component/pom.xml
index 0ec66b8..9ab3d0a 100644
--- a/component/pom.xml
+++ b/component/pom.xml
@@ -60,6 +60,10 @@
com.sun.mail
javax.mail
+
+ commons-pool.wso2
+ commons-pool
+
org.testng
testng
diff --git a/component/src/main/java/io/siddhi/extension/io/email/sink/EmailSink.java b/component/src/main/java/io/siddhi/extension/io/email/sink/EmailSink.java
index d5f9120..ee95082 100644
--- a/component/src/main/java/io/siddhi/extension/io/email/sink/EmailSink.java
+++ b/component/src/main/java/io/siddhi/extension/io/email/sink/EmailSink.java
@@ -19,6 +19,7 @@
package io.siddhi.extension.io.email.sink;
+import com.sun.mail.smtp.SMTPSendFailedException;
import com.sun.mail.util.MailConnectException;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
@@ -36,8 +37,10 @@
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
+import io.siddhi.extension.io.email.sink.transport.EmailClientConnectionPoolManager;
import io.siddhi.extension.io.email.util.EmailConstants;
import io.siddhi.query.api.definition.StreamDefinition;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.log4j.Logger;
import org.wso2.transport.email.connector.factory.EmailConnectorFactoryImpl;
import org.wso2.transport.email.contract.EmailClientConnector;
@@ -149,7 +152,13 @@
type = DataType.STRING,
optional = true,
dynamic = true,
- defaultValue = "None")
+ defaultValue = "None"),
+ @Parameter(
+ name = "connection.pool.size",
+ description = "Number of concurrent Email client connections.",
+ type = DataType.INT,
+ optional = true,
+ defaultValue = "1")
},
examples = {
@Example(syntax = "@sink(type='email', @map(type ='json'), "
@@ -424,13 +433,11 @@
"scenarios, 'sendmail' responds slowly after many 'NOOP' commands. This is" +
" avoided by using 'RSET' instead.",
defaultValue = "false",
- possibleParameters = "true or false"),
-
+ possibleParameters = "true or false")
}
)
public class EmailSink extends Sink {
private static final Logger log = Logger.getLogger(EmailSink.class);
- private EmailClientConnector emailClientConnector;
private Option optionSubject;
private Option optionTo;
private Option optionCc;
@@ -478,10 +485,9 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
*/
@Override
public void connect() throws ConnectionUnavailableException {
- EmailConnectorFactory emailConnectorFactory = new EmailConnectorFactoryImpl();
try {
- emailClientConnector = emailConnectorFactory.createEmailClientConnector();
- emailClientConnector.init(initProperties);
+ EmailConnectorFactory emailConnectorFactory = new EmailConnectorFactoryImpl();
+ EmailClientConnectionPoolManager.initializeConnectionPool(emailConnectorFactory, initProperties);
} catch (EmailConnectorException e) {
if (e.getCause() instanceof MailConnectException) {
if (e.getCause().getCause() instanceof ConnectException) {
@@ -542,22 +548,44 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state)
emailBaseMessage = new EmailTextMessage(payload.toString());
}
emailBaseMessage.setHeaders(emailProperties);
- try {
- emailClientConnector.send(emailBaseMessage);
- } catch (EmailConnectorException e) {
+ GenericKeyedObjectPool objectPool = EmailClientConnectionPoolManager.getConnectionPool();
+ if (objectPool != null) {
+ EmailClientConnector connection = null;
+ try {
+ connection = (EmailClientConnector)
+ objectPool.borrowObject(EmailConstants.EMAIL_CLIENT_CONNECTION_POOL_ID);
+ if (connection != null) {
+ connection.send(emailBaseMessage);
+ }
+ } catch (Exception e) {
//calling super class logs the exception and retry
if (e.getCause() instanceof MailConnectException) {
if (e.getCause().getCause() instanceof ConnectException) {
- throw new ConnectionUnavailableException("Error is encountered while connecting the smtp"
+ throw new ConnectionUnavailableException("Error is encountered while connecting the smtp"
+ " server by the email ClientConnector.", e);
} else {
throw new RuntimeException("Error is encountered while sending the message by the email"
- + " ClientConnector with properties: " + emailProperties.toString() , e);
+ + " ClientConnector with properties: " + emailProperties.toString(), e);
}
+ } else if (e.getCause() instanceof SMTPSendFailedException) {
+ throw new ConnectionUnavailableException("Error encountered while connecting " +
+ "to the mail server by the email client connector.", e);
} else {
throw new RuntimeException("Error is encountered while sending the message by the email"
- + " ClientConnector with properties: " + emailProperties.toString() , e);
+ + " ClientConnector with properties: " + emailProperties.toString(), e);
}
+ } finally {
+ if (connection != null) {
+ try {
+ objectPool.returnObject(EmailConstants.EMAIL_CLIENT_CONNECTION_POOL_ID, connection);
+ } catch (Exception e) {
+ log.error("Error in returning the email client connection object to the pool. " +
+ e.getMessage(), e);
+ }
+ }
+ }
+ } else {
+ log.error("Error in obtaining connection pool to publish emails to the server.");
}
}
@@ -706,6 +734,14 @@ private void validateAndGetRequiredParameters() {
attachments = Arrays.asList(attachmentOption.getValue().split(EmailConstants.COMMA_SEPERATOR));
}
}
+ String connectionPoolSize = optionHolder.validateAndGetStaticValue(EmailConstants.PUBLISHER_POOL_SIZE,
+ configReader.readConfig(EmailConstants.PUBLISHER_POOL_SIZE, "1"));
+ try {
+ this.initProperties.put(EmailConstants.PUBLISHER_POOL_SIZE, connectionPoolSize);
+ } catch (NumberFormatException e) {
+ throw new SiddhiAppCreationException(EmailConstants.PUBLISHER_POOL_SIZE
+ + " parameter only excepts an Integer value.", e);
+ }
}
/**
@@ -713,6 +749,7 @@ private void validateAndGetRequiredParameters() {
* Implementation of this method should contain the steps needed to disconnect from the sink.
*/
@Override public void disconnect() {
+ EmailClientConnectionPoolManager.uninitializeConnectionPool();
}
/**
diff --git a/component/src/main/java/io/siddhi/extension/io/email/sink/transport/EmailClientConnectionPoolFactory.java b/component/src/main/java/io/siddhi/extension/io/email/sink/transport/EmailClientConnectionPoolFactory.java
new file mode 100644
index 0000000..4a0fe68
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/email/sink/transport/EmailClientConnectionPoolFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.email.sink.transport;
+
+import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.wso2.transport.email.contract.EmailClientConnector;
+import org.wso2.transport.email.contract.EmailConnectorFactory;
+import org.wso2.transport.email.exception.EmailConnectorException;
+
+import java.util.Map;
+
+/**
+ * The abstract class that needs to be implemented when supporting a new non-secure transport
+ * to mainly create, validate and terminate the client to the endpoint.
+ */
+public class EmailClientConnectionPoolFactory extends BaseKeyedPoolableObjectFactory {
+ private EmailClientConnector emailClientConnector;
+
+ public EmailClientConnectionPoolFactory(EmailConnectorFactory emailConnectorFactory,
+ Map clientProperties) throws EmailConnectorException {
+ emailClientConnector = emailConnectorFactory.createEmailClientConnector();
+ emailClientConnector.init(clientProperties);
+ }
+
+ @Override
+ public Object makeObject(Object key) throws EmailConnectorException {
+ if (!emailClientConnector.isConnected()) {
+ emailClientConnector.connect();
+ }
+ return emailClientConnector;
+ }
+
+ @Override
+ public boolean validateObject(Object key, Object obj) {
+ return obj != null && ((EmailClientConnector) obj).isConnected();
+ }
+
+ public void destroyObject(Object key, Object obj) {
+ if (obj != null) {
+ ((EmailClientConnector) obj).disconnect();
+ }
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/email/sink/transport/EmailClientConnectionPoolManager.java b/component/src/main/java/io/siddhi/extension/io/email/sink/transport/EmailClientConnectionPoolManager.java
new file mode 100644
index 0000000..036a079
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/email/sink/transport/EmailClientConnectionPoolManager.java
@@ -0,0 +1,57 @@
+/*
+* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package io.siddhi.extension.io.email.sink.transport;
+
+import io.siddhi.extension.io.email.util.EmailConstants;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.wso2.transport.email.contract.EmailConnectorFactory;
+import org.wso2.transport.email.exception.EmailConnectorException;
+
+import java.util.Map;
+
+/**
+ * This class is used hold the secure/non-secure connections for an Agent.
+ */
+
+public class EmailClientConnectionPoolManager {
+ private static GenericKeyedObjectPool connectionPool;
+
+ public static synchronized void initializeConnectionPool(EmailConnectorFactory emailConnectorFactory,
+ Map clientProperties) throws EmailConnectorException {
+ EmailClientConnectionPoolFactory emailClientConnectionPoolFactory
+ = new EmailClientConnectionPoolFactory(emailConnectorFactory, clientProperties);
+ if (connectionPool == null) {
+ int poolSize = Integer.parseInt(clientProperties.get(EmailConstants.PUBLISHER_POOL_SIZE));
+ connectionPool = new GenericKeyedObjectPool();
+ connectionPool.setFactory(emailClientConnectionPoolFactory);
+ connectionPool.setMaxTotal(poolSize);
+ connectionPool.setMaxActive(poolSize);
+ connectionPool.setTestOnBorrow(true);
+ connectionPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK);
+ }
+ }
+
+ public static GenericKeyedObjectPool getConnectionPool() {
+ return connectionPool;
+ }
+
+ public static void uninitializeConnectionPool() {
+ connectionPool = null;
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/email/util/EmailConstants.java b/component/src/main/java/io/siddhi/extension/io/email/util/EmailConstants.java
index 9875a74..441bf67 100644
--- a/component/src/main/java/io/siddhi/extension/io/email/util/EmailConstants.java
+++ b/component/src/main/java/io/siddhi/extension/io/email/util/EmailConstants.java
@@ -41,7 +41,8 @@ public class EmailConstants {
public static final String BCC = "bcc";
public static final String CC = "cc";
public static final String ATTACHMENTS = "attachments";
- public static final String MAIL_PUBLISHER_POOL_SIZE = "pool.size";
+ public static final String PUBLISHER_POOL_SIZE = "connection.pool.size";
+ public static final String EMAIL_CLIENT_CONNECTION_POOL_ID = "email_client_connection_pool";
/**
* Default values for the email sink configurations.
diff --git a/pom.xml b/pom.xml
index a42a739..04029a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,11 @@
transport-email
${transport.email.version}
+
+ commons-pool.wso2
+ commons-pool
+ ${commons.pool.version}
+
io.siddhi.extension.map.xml
siddhi-map-xml
@@ -134,7 +139,7 @@
1.2.17.wso2v1
6.8
0.7.9
- 6.0.49
+ 6.1.0
1.5.5
1.5.6
5.0.2
@@ -142,6 +147,7 @@
5.0.2
3.0.1
UTF-8
+ 1.5.6.wso2v1