Skip to content

Commit

Permalink
Merge pull request #59 from pcnfernando/dev
Browse files Browse the repository at this point in the history
Support concurrent publishing at Email Sink
  • Loading branch information
mohanvive authored Oct 22, 2019
2 parents ad3b46d + 0632e9f commit 81a4257
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 14 deletions.
4 changes: 4 additions & 0 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
</dependency>
<dependency>
<groupId>commons-pool.wso2</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.siddhi.extension.io.email.sink;

import com.sun.mail.smtp.SMTPSendFailedException;
import com.sun.mail.util.MailConnectException;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
Expand All @@ -36,8 +37,10 @@
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.email.sink.transport.EmailClientConnectionPoolManager;
import io.siddhi.extension.io.email.util.EmailConstants;
import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.log4j.Logger;
import org.wso2.transport.email.connector.factory.EmailConnectorFactoryImpl;
import org.wso2.transport.email.contract.EmailClientConnector;
Expand Down Expand Up @@ -149,7 +152,13 @@
type = DataType.STRING,
optional = true,
dynamic = true,
defaultValue = "None")
defaultValue = "None"),
@Parameter(
name = "connection.pool.size",
description = "Number of concurrent Email client connections.",
type = DataType.INT,
optional = true,
defaultValue = "1")
},
examples = {
@Example(syntax = "@sink(type='email', @map(type ='json'), "
Expand Down Expand Up @@ -424,13 +433,11 @@
"scenarios, 'sendmail' responds slowly after many 'NOOP' commands. This is" +
" avoided by using 'RSET' instead.",
defaultValue = "false",
possibleParameters = "true or false"),

possibleParameters = "true or false")
}
)
public class EmailSink extends Sink {
private static final Logger log = Logger.getLogger(EmailSink.class);
private EmailClientConnector emailClientConnector;
private Option optionSubject;
private Option optionTo;
private Option optionCc;
Expand Down Expand Up @@ -478,10 +485,9 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
*/
@Override
public void connect() throws ConnectionUnavailableException {
EmailConnectorFactory emailConnectorFactory = new EmailConnectorFactoryImpl();
try {
emailClientConnector = emailConnectorFactory.createEmailClientConnector();
emailClientConnector.init(initProperties);
EmailConnectorFactory emailConnectorFactory = new EmailConnectorFactoryImpl();
EmailClientConnectionPoolManager.initializeConnectionPool(emailConnectorFactory, initProperties);
} catch (EmailConnectorException e) {
if (e.getCause() instanceof MailConnectException) {
if (e.getCause().getCause() instanceof ConnectException) {
Expand Down Expand Up @@ -542,22 +548,44 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state)
emailBaseMessage = new EmailTextMessage(payload.toString());
}
emailBaseMessage.setHeaders(emailProperties);
try {
emailClientConnector.send(emailBaseMessage);
} catch (EmailConnectorException e) {
GenericKeyedObjectPool objectPool = EmailClientConnectionPoolManager.getConnectionPool();
if (objectPool != null) {
EmailClientConnector connection = null;
try {
connection = (EmailClientConnector)
objectPool.borrowObject(EmailConstants.EMAIL_CLIENT_CONNECTION_POOL_ID);
if (connection != null) {
connection.send(emailBaseMessage);
}
} catch (Exception e) {
//calling super class logs the exception and retry
if (e.getCause() instanceof MailConnectException) {
if (e.getCause().getCause() instanceof ConnectException) {
throw new ConnectionUnavailableException("Error is encountered while connecting the smtp"
throw new ConnectionUnavailableException("Error is encountered while connecting the smtp"
+ " server by the email ClientConnector.", e);
} else {
throw new RuntimeException("Error is encountered while sending the message by the email"
+ " ClientConnector with properties: " + emailProperties.toString() , e);
+ " ClientConnector with properties: " + emailProperties.toString(), e);
}
} else if (e.getCause() instanceof SMTPSendFailedException) {
throw new ConnectionUnavailableException("Error encountered while connecting " +
"to the mail server by the email client connector.", e);
} else {
throw new RuntimeException("Error is encountered while sending the message by the email"
+ " ClientConnector with properties: " + emailProperties.toString() , e);
+ " ClientConnector with properties: " + emailProperties.toString(), e);
}
} finally {
if (connection != null) {
try {
objectPool.returnObject(EmailConstants.EMAIL_CLIENT_CONNECTION_POOL_ID, connection);
} catch (Exception e) {
log.error("Error in returning the email client connection object to the pool. " +
e.getMessage(), e);
}
}
}
} else {
log.error("Error in obtaining connection pool to publish emails to the server.");
}
}

Expand Down Expand Up @@ -706,13 +734,22 @@ private void validateAndGetRequiredParameters() {
attachments = Arrays.asList(attachmentOption.getValue().split(EmailConstants.COMMA_SEPERATOR));
}
}
String connectionPoolSize = optionHolder.validateAndGetStaticValue(EmailConstants.PUBLISHER_POOL_SIZE,
configReader.readConfig(EmailConstants.PUBLISHER_POOL_SIZE, "1"));
try {
this.initProperties.put(EmailConstants.PUBLISHER_POOL_SIZE, connectionPoolSize);
} catch (NumberFormatException e) {
throw new SiddhiAppCreationException(EmailConstants.PUBLISHER_POOL_SIZE
+ " parameter only excepts an Integer value.", e);
}
}

/**
* Called after all publishing is done, or when {@link ConnectionUnavailableException} is thrown
* Implementation of this method should contain the steps needed to disconnect from the sink.
*/
@Override public void disconnect() {
EmailClientConnectionPoolManager.uninitializeConnectionPool();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.extension.io.email.sink.transport;

import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
import org.wso2.transport.email.contract.EmailClientConnector;
import org.wso2.transport.email.contract.EmailConnectorFactory;
import org.wso2.transport.email.exception.EmailConnectorException;

import java.util.Map;

/**
* The abstract class that needs to be implemented when supporting a new non-secure transport
* to mainly create, validate and terminate the client to the endpoint.
*/
public class EmailClientConnectionPoolFactory extends BaseKeyedPoolableObjectFactory {
private EmailClientConnector emailClientConnector;

public EmailClientConnectionPoolFactory(EmailConnectorFactory emailConnectorFactory,
Map<String, String> clientProperties) throws EmailConnectorException {
emailClientConnector = emailConnectorFactory.createEmailClientConnector();
emailClientConnector.init(clientProperties);
}

@Override
public Object makeObject(Object key) throws EmailConnectorException {
if (!emailClientConnector.isConnected()) {
emailClientConnector.connect();
}
return emailClientConnector;
}

@Override
public boolean validateObject(Object key, Object obj) {
return obj != null && ((EmailClientConnector) obj).isConnected();
}

public void destroyObject(Object key, Object obj) {
if (obj != null) {
((EmailClientConnector) obj).disconnect();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.extension.io.email.sink.transport;

import io.siddhi.extension.io.email.util.EmailConstants;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.wso2.transport.email.contract.EmailConnectorFactory;
import org.wso2.transport.email.exception.EmailConnectorException;

import java.util.Map;

/**
* This class is used hold the secure/non-secure connections for an Agent.
*/

public class EmailClientConnectionPoolManager {
private static GenericKeyedObjectPool connectionPool;

public static synchronized void initializeConnectionPool(EmailConnectorFactory emailConnectorFactory,
Map<String, String> clientProperties) throws EmailConnectorException {
EmailClientConnectionPoolFactory emailClientConnectionPoolFactory
= new EmailClientConnectionPoolFactory(emailConnectorFactory, clientProperties);
if (connectionPool == null) {
int poolSize = Integer.parseInt(clientProperties.get(EmailConstants.PUBLISHER_POOL_SIZE));
connectionPool = new GenericKeyedObjectPool();
connectionPool.setFactory(emailClientConnectionPoolFactory);
connectionPool.setMaxTotal(poolSize);
connectionPool.setMaxActive(poolSize);
connectionPool.setTestOnBorrow(true);
connectionPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK);
}
}

public static GenericKeyedObjectPool getConnectionPool() {
return connectionPool;
}

public static void uninitializeConnectionPool() {
connectionPool = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class EmailConstants {
public static final String BCC = "bcc";
public static final String CC = "cc";
public static final String ATTACHMENTS = "attachments";
public static final String PUBLISHER_POOL_SIZE = "connection.pool.size";
public static final String EMAIL_CLIENT_CONNECTION_POOL_ID = "email_client_connection_pool";

/**
* Default values for the email sink configurations.
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
<artifactId>transport-email</artifactId>
<version>${transport.email.version}</version>
</dependency>
<dependency>
<groupId>commons-pool.wso2</groupId>
<artifactId>commons-pool</artifactId>
<version>${commons.pool.version}</version>
</dependency>
<dependency>
<groupId>io.siddhi.extension.map.xml</groupId>
<artifactId>siddhi-map-xml</artifactId>
Expand Down Expand Up @@ -134,14 +139,15 @@
<log4j.version>1.2.17.wso2v1</log4j.version>
<testng.version>6.8</testng.version>
<jacoco.maven.version>0.7.9</jacoco.maven.version>
<transport.email.version>6.0.49</transport.email.version>
<transport.email.version>6.1.0</transport.email.version>
<com.icegreen.version>1.5.5</com.icegreen.version>
<com.sun.mail.version>1.5.6</com.sun.mail.version>
<xml.mapper.version>5.0.2</xml.mapper.version>
<text.mapper.version>2.0.1</text.mapper.version>
<json.mapper.version>5.0.2</json.mapper.version>
<carbon.messaging.version>3.0.1</carbon.messaging.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<commons.pool.version>1.5.6.wso2v1</commons.pool.version>
</properties>

</project>

0 comments on commit 81a4257

Please sign in to comment.