Skip to content

Commit

Permalink
Saga: StateMachine ServiceTask supports asynchronous execution apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
long187 committed Oct 30, 2019
1 parent 23f00f2 commit d30ee80
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.seata.saga.engine.expression.seq.SequenceExpressionFactory;
import io.seata.saga.engine.expression.spel.SpringELExpressionFactory;
import io.seata.saga.engine.invoker.ServiceInvokerManager;
import io.seata.saga.engine.invoker.impl.SpringBeanServiceInvoker;
import io.seata.saga.engine.pcext.StateMachineProcessHandler;
import io.seata.saga.engine.pcext.StateMachineProcessRouter;
import io.seata.saga.engine.store.StateLangStore;
Expand Down Expand Up @@ -169,6 +170,11 @@ protected void init() throws Exception {

if (this.serviceInvokerManager == null) {
this.serviceInvokerManager = new ServiceInvokerManager();

SpringBeanServiceInvoker springBeanServiceInvoker = new SpringBeanServiceInvoker();
springBeanServiceInvoker.setApplicationContext(getApplicationContext());
springBeanServiceInvoker.setThreadPoolExecutor(threadPoolExecutor);
this.serviceInvokerManager.putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN, springBeanServiceInvoker);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.seata.saga.engine.invoker;

import io.seata.saga.engine.invoker.impl.SpringBeanServiceInvoker;
import io.seata.saga.statelang.domain.DomainConstants;
import org.springframework.util.StringUtils;

Expand All @@ -31,10 +30,6 @@ public class ServiceInvokerManager {

private Map<String, ServiceInvoker> serviceInvokerMap = new ConcurrentHashMap<>();

public ServiceInvokerManager() {
serviceInvokerMap.put(DomainConstants.SERVICE_TYPE_SPRING_BEAN, new SpringBeanServiceInvoker());
}

public ServiceInvoker getServiceInvoker(String serviceType) {
if (StringUtils.isEmpty(serviceType)) {
serviceType = DomainConstants.SERVICE_TYPE_SPRING_BEAN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

/**
* SpringBean Service Invoker
Expand All @@ -45,11 +46,36 @@ public class SpringBeanServiceInvoker implements ServiceInvoker, ApplicationCont
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBeanServiceInvoker.class);

private ApplicationContext applicationContext;
private ThreadPoolExecutor threadPoolExecutor;

@Override
public Object invoke(ServiceTaskState serviceTaskState, Object... input) {

ServiceTaskStateImpl state = (ServiceTaskStateImpl) serviceTaskState;
if(state.isAsync()){
if(threadPoolExecutor == null){
if(LOGGER.isWarnEnabled()){
LOGGER.warn("threadPoolExecutor is null, Service[{}.{}] cannot execute asynchronously, executing synchronously now. stateName: {}", state.getServiceName(), state.getServiceMethod(), state.getName());
}
return doInvoke(state, input);
}

if(LOGGER.isInfoEnabled()){
LOGGER.info("Submit Service[{}.{}] to asynchronously executing. stateName: {}", state.getServiceName(), state.getServiceMethod(), state.getName());
}
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
doInvoke(state, input);
}
});
return null;
}
else{
return doInvoke(state, input);
}
}

protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) {

Object bean = applicationContext.getBean(state.getServiceName());

Expand All @@ -68,6 +94,7 @@ public Object invoke(ServiceTaskState serviceTaskState, Object... input) {

if (method == null) {
throw new EngineExecutionException("No such method[" + state.getServiceMethod() + "] on BeanClass[" + bean.getClass() + "]", FrameworkErrorCode.NoSuchMethod);

}

Object[] args = new Object[method.getParameterCount()];
Expand All @@ -91,6 +118,10 @@ public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}

protected Method findMethod(Class<?> clazz, String methodName, List<String> parameterTypes) {

if (parameterTypes == null || parameterTypes.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,59 +369,67 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn
Map<String, String> statusMatchList = state.getStatus();
if (statusMatchList != null && statusMatchList.size() > 0) {

StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);

Map<Object, String> statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
synchronized (state) {
statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
statusEvaluators = new LinkedHashMap<>(statusMatchList.size());
for (String expressionStr : statusMatchList.keySet()) {

String statusVal = statusMatchList.get(expressionStr);
Evaluator evaluator = createEvaluator(stateMachineConfig.getEvaluatorFactoryManager(), expressionStr);
if (evaluator != null) {
statusEvaluators.put(evaluator, statusVal);
if(state.isAsync()){
if(LOGGER.isWarnEnabled()){
LOGGER.warn("Service[{}.{}] is execute asynchronously, null return value collected, so user defined Status Matching skipped. stateName: {}, branchId: {}", state.getServiceName(), state.getServiceMethod(), state.getName(), stateInstance.getId());
}
}
else{

StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);

Map<Object, String> statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
synchronized (state) {
statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
statusEvaluators = new LinkedHashMap<>(statusMatchList.size());
for (String expressionStr : statusMatchList.keySet()) {

String statusVal = statusMatchList.get(expressionStr);
Evaluator evaluator = createEvaluator(stateMachineConfig.getEvaluatorFactoryManager(), expressionStr);
if (evaluator != null) {
statusEvaluators.put(evaluator, statusVal);
}
}
}
state.setStatusEvaluators(statusEvaluators);
}
state.setStatusEvaluators(statusEvaluators);
}
}

for (Object evaluatorObj : statusEvaluators.keySet()) {
Evaluator evaluator = (Evaluator) evaluatorObj;
String statusVal = statusEvaluators.get(evaluator);
if (evaluator.evaluate(context.getVariables())) {
stateInstance.setStatus(ExecutionStatus.valueOf(statusVal));
break;
for (Object evaluatorObj : statusEvaluators.keySet()) {
Evaluator evaluator = (Evaluator) evaluatorObj;
String statusVal = statusEvaluators.get(evaluator);
if (evaluator.evaluate(context.getVariables())) {
stateInstance.setStatus(ExecutionStatus.valueOf(statusVal));
break;
}
}
}

if (exp == null && (stateInstance.getStatus() == null || ExecutionStatus.RU.equals(stateInstance.getStatus()))) {
if (exp == null && (stateInstance.getStatus() == null || ExecutionStatus.RU.equals(stateInstance.getStatus()))) {

if (state.isForUpdate()) {
stateInstance.setStatus(ExecutionStatus.UN);
} else {
stateInstance.setStatus(ExecutionStatus.FA);
}
stateInstance.setGmtEnd(new Date());
if (state.isForUpdate()) {
stateInstance.setStatus(ExecutionStatus.UN);
} else {
stateInstance.setStatus(ExecutionStatus.FA);
}
stateInstance.setGmtEnd(new Date());

StateMachineInstance stateMachineInstance = stateInstance.getStateMachineInstance();
StateMachineInstance stateMachineInstance = stateInstance.getStateMachineInstance();

if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist() && stateMachineConfig.getStateLogStore() != null) {
stateMachineConfig.getStateLogStore().recordStateFinished(stateInstance, context);
}
if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist() && stateMachineConfig.getStateLogStore() != null) {
stateMachineConfig.getStateLogStore().recordStateFinished(stateInstance, context);
}

EngineExecutionException exception = new EngineExecutionException("State [" + state.getName() + "] execute finished, but cannot matching status, pls check its status manually",
FrameworkErrorCode.NoMatchedStatus);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("State[{}] execute finish with status[{}]", state.getName(), stateInstance.getStatus());
}
EngineUtils.failStateMachine(context, exception);
EngineExecutionException exception = new EngineExecutionException("State [" + state.getName() + "] execute finished, but cannot matching status, pls check its status manually",
FrameworkErrorCode.NoMatchedStatus);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("State[{}] execute finish with status[{}]", state.getName(), stateInstance.getStatus());
}
EngineUtils.failStateMachine(context, exception);

throw exception;
throw exception;
}
}
}

Expand Down Expand Up @@ -449,8 +457,8 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn
}
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("State[{}] finish with status[{}]", state.getName(), stateInstance.getStatus());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("State[{}] finish with status[{}]", state.getName(), stateInstance.getStatus());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ServiceTaskStateImpl extends AbstractTaskState implements ServiceTa
private List<Object> inputExpressions;
private Map<String, Object> outputExpressions;
private Map<Object, String> statusEvaluators;
private boolean isAsync;

public ServiceTaskStateImpl() {
setType(DomainConstants.STATE_TYPE_SERVICE_TASK);
Expand Down Expand Up @@ -107,4 +108,12 @@ public Map<Object, String> getStatusEvaluators() {
public void setStatusEvaluators(Map<Object, String> statusEvaluators) {
this.statusEvaluators = statusEvaluators;
}

public boolean isAsync() {
return isAsync;
}

public void setAsync(boolean async) {
isAsync = async;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public ServiceTaskState parse(Object node) {
serviceTaskState.setServiceMethod((String)nodeMap.get("ServiceMethod"));
serviceTaskState.setServiceType((String)nodeMap.get("ServiceType"));
serviceTaskState.setParameterTypes((List<String>)nodeMap.get("ParameterTypes"));
Object isAsync = nodeMap.get("IsAsync");
if(isAsync != null && Boolean.TRUE.equals(isAsync)){
serviceTaskState.setAsync(true);
}

return serviceTaskState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,32 @@ public void testStateMachineWithComplextParams() {
Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));
}

@Test
public void testSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, callback);

waittingForFinish(inst);

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void waittingForFinish(StateMachineInstance inst){
synchronized (lock){
if(ExecutionStatus.RU.equals(inst.getStatus())){
Expand Down
24 changes: 24 additions & 0 deletions test/src/test/java/io/seata/saga/engine/StateMachineTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,28 @@ public void testStateMachineWithComplextParams() {

Assertions.assertTrue(ExecutionStatus.SU.equals(instance.getStatus()));
}

@Test
public void testSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,30 @@ public void testReloadStateMachineInstance(){
System.out.println(instance);
}

@Test
public void testSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Test
public void testSimpleCatchesStateMachineAsync() throws Exception {

Expand Down Expand Up @@ -374,6 +398,31 @@ public void testCompensationAndSubStateMachineAsync() throws Exception {
Assertions.assertTrue(GlobalStatus.CommitRetrying.equals(globalTransaction.getStatus()));
}

@Test
public void testAsyncStartSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, callback);

waittingForFinish(inst);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void waittingForFinish(StateMachineInstance inst){
synchronized (lock){
if(ExecutionStatus.RU.equals(inst.getStatus())){
Expand Down
Loading

0 comments on commit d30ee80

Please sign in to comment.