Skip to content

Commit

Permalink
feat(AsyncBeanOperationExecutor): executor support specifying batch s…
Browse files Browse the repository at this point in the history
…izes to divide operations based on the same data source into multiple small tasks (GitHub #195)
  • Loading branch information
Createsequence committed Jan 28, 2024
1 parent 0cb988d commit 551474f
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import cn.crane4j.core.container.Container;
import cn.crane4j.core.container.ContainerManager;
import cn.crane4j.core.container.lifecycle.SmartOperationAware;
import cn.crane4j.core.exception.OperationExecuteException;
import cn.crane4j.core.executor.handler.DisassembleOperationHandler;
import cn.crane4j.core.parser.BeanOperations;
Expand All @@ -20,9 +19,11 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
*
Expand Down Expand Up @@ -58,7 +59,6 @@
*
* @author huangchengxing
* @see OperationAwareBeanOperationExecutor
* @see SmartOperationAware
* @see AsyncBeanOperationExecutor
* @see DisorderedBeanOperationExecutor
* @see OrderedBeanOperationExecutor
Expand Down Expand Up @@ -90,6 +90,17 @@ public abstract class AbstractBeanOperationExecutor implements BeanOperationExec
@Setter
public boolean enableExecuteNotActiveOperation = false;

/**
* <p>process target num of each batch when executing an operation.<br />
* for example, if we have 1000 targets and batch size is 100,
* and each target has 3 operations, so we will get 3000 executions.<br />
* it's maybe useful when using asynchronous executor to process large number of targets.
*
* @since 2.5.0
*/
@Setter
private int batchSize = -1;

/**
* Complete operations on all objects in {@code targets} according to the specified {@link BeanOperations} and {@link Options}.
*
Expand Down Expand Up @@ -131,12 +142,12 @@ public void execute(Collection<?> targets, BeanOperations operations, Options op
// flattened objects are grouped according to assembly operations, then encapsulated as execution objects
beforeAssembleOperation(targetWithOperations);
List<AssembleExecution> executions = new ArrayList<>();
targetWithOperations.asMap().forEach((op, ts) -> op.getAssembleOperations()
.stream()
.filter(filter)
.map(p -> createAssembleExecution(op, p, ts, options))
.forEach(executions::add)
);
targetWithOperations.asMap().forEach((op, ts) -> {
List<AssembleExecution> executionsOfOp = combineExecutions(options, filter, op, ts);
if (CollectionUtils.isNotEmpty(executionsOfOp)) {
executions.addAll(executionsOfOp);
}
});

// complete assembly operation
TimerUtil.getExecutionTime(
Expand All @@ -147,6 +158,36 @@ public void execute(Collection<?> targets, BeanOperations operations, Options op
afterOperationsCompletion(targetWithOperations);
}

@NonNull
private List<AssembleExecution> combineExecutions(
Options options, Predicate<? super KeyTriggerOperation> filter, BeanOperations beanOperations, Collection<Object> targets) {
List<Collection<Object>> batches = batchSize > 1 ?
CollectionUtils.split(targets, batchSize) : Collections.singletonList(targets);
return batches.stream()
.map(batch -> doCombineExecutions(options, filter, beanOperations, batch))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

/**
* Combine the {@link AssembleExecution} objects according to the specified {@link BeanOperations} and {@link Options}.
*
* @param options options for execution
* @param filter filter
* @param beanOperations bean operations
* @param targets targets
* @return {@link AssembleExecution} objects
*/
@NonNull
protected List<AssembleExecution> doCombineExecutions(
Options options, Predicate<? super KeyTriggerOperation> filter, BeanOperations beanOperations, Collection<Object> targets) {
return beanOperations.getAssembleOperations()
.stream()
.filter(filter)
.map(p -> createAssembleExecution(beanOperations, p, targets, options))
.collect(Collectors.toList());
}

/**
* Create a {@link AssembleExecution}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class AsyncBeanOperationExecutor extends DisorderedBeanOperationExecutor
* @param executor thread pool used to perform operations
*/
public AsyncBeanOperationExecutor(
ContainerManager containerManager, Executor executor) {
ContainerManager containerManager, Executor executor) {
super(containerManager);
this.executor = executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,55 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CollectionUtils {

/**
* <p>Split the given list into sub lists.
*
* @param list list
* @param size size of sub list
* @param <T> element type
* @return sub lists
*/
public static <T> List<Collection<T>> split(Collection<T> list, int size) {
return split(list, size, ls -> ls);
}

/**
* <p>Split the given list into sub lists.
*
* @param list list
* @param size size of sub list
* @param mapper mapper to convert list to sub list
* @param <T> element type
* @param <C> collection type
* @return sub lists
*/
public static <T, C extends Collection<T>> List<C> split(
Collection<T> list, int size, Function<List<T>, C> mapper) {
if (size <= 0) {
throw new IllegalArgumentException("size must be greater than 0");
}
int listSize;
if (Objects.isNull(list) || (listSize = list.size()) == 0) {
return Collections.emptyList();
}
if (listSize == size) {
return Collections.singletonList(mapper.apply(new ArrayList<>(list)));
}
List<C> result = new ArrayList<>();
List<T> subList = new ArrayList<>(size);
for (T t : list) {
subList.add(t);
if (subList.size() == size) {
result.add(mapper.apply(subList));
subList = new ArrayList<>(size);
}
}
if (!subList.isEmpty()) {
result.add(mapper.apply(subList));
}
return result;
}

/**
* <p>Get first not null element from the target.<br />
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,6 +40,7 @@ public void init() {
1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()
);
operationExecutor = new AsyncBeanOperationExecutor(configuration, executor);
operationExecutor.setBatchSize(5);

Map<Integer, Object> sources = new HashMap<>();
sources.put(1, new Source(1, "one"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* test for {@link CollectionUtils}
Expand All @@ -12,6 +19,24 @@
*/
public class CollectionUtilsTest {

@Test
public void split() {
Assert.assertThrows(IllegalArgumentException.class, () -> CollectionUtils.split(Collections.emptyList(), -1));
Assert.assertEquals(Collections.emptyList(), CollectionUtils.split(null, 1));
Assert.assertEquals(Collections.emptyList(), CollectionUtils.split(Collections.emptyList(), 1));

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<Collection<Integer>> splitList = CollectionUtils.split(list, 2);
Assert.assertEquals(3, splitList.size());
Assert.assertEquals(2, splitList.get(0).size());
Assert.assertEquals(2, splitList.get(1).size());
Assert.assertEquals(1, splitList.get(2).size());

splitList = CollectionUtils.split(list, 5);
Assert.assertEquals(1, splitList.size());
Assert.assertEquals(list, splitList.get(0));
}

@Test
public void getFirstNotNull() {
// if target is iterator
Expand Down
3 changes: 2 additions & 1 deletion website/docs/.vuepress/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ const sidebarConfig = [
{ title: '注解处理器', path: "/advanced/operation_annotation_handler.md"},
{ title: '使用抽象方法填充', path: "/advanced/operator_interface.md"},
{ title: '反射工厂', path: "/advanced/reflection_factory.md"},
{ title: '类型转换', path: "/advanced/type_converter.md"}
{ title: '类型转换', path: "/advanced/type_converter.md"},
{ title: '异步填充', path: "/advanced/async_executor.md"},
]
},
{
Expand Down
156 changes: 156 additions & 0 deletions website/docs/advanced/async_executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
## 异步填充

在 crane4j 中,所有的填充操作都通过操作执行器 `BeanOperationExecutor` 完成触发,因此,你可以通过改变执行器支持异步填充,Crane4j 默认已经提供这样的执行器

:::tip

关于执行器,请参照 "[基本概念](./../user_guide/basic_concept.md)" 一节中执行器部分内容。

:::

## 1.启用执行器

由于需要指定线程池,因为 Crane4j 默认并没有启用异步执行器,你需要自行启用并配置它。

### 1.1.在 Spring 环境

在 Spring 中,你可以通过配置文件重新配置执行器,它将会覆盖 Crane4j 中的默认配置:

~~~java
@Configuration
public class Crane4jConfig {

@Bean("AsyncBeanOperationExecutor")
public AsyncBeanOperationExecutor asyncBeanOperationExecutor(Crane4jGlobalConfiguration configuration) {
// 创建线程池
int processNum = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
processNum * 2, processNum * 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("crane4j-thread-pool-executor-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 创建异步操作执行器
AsyncBeanOperationExecutor operationExecutor = new AsyncBeanOperationExecutor(configuration, threadPoolExecutor);
// 指定每一批处理对象数
operationExecutor.setBatchSize(5);
return operationExecutor;
}
}
~~~

### 1.2.在非 Spring 环境

在非 Spring 环境中,你可以直接将其注册到 Crane4j 全局配置类中:

~~~java
// 创建线程池
int processNum = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
processNum * 2, processNum * 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("crane4j-thread-pool-executor-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 创建异步操作执行器
AsyncBeanOperationExecutor operationExecutor = new AsyncBeanOperationExecutor(configuration, threadPoolExecutor);
// 指定每一批处理对象数
operationExecutor.setBatchSize(5);

// 将其注册到全局配置类
SimpleCrane4jGlobalConfiguration configuration = SimpleCrane4jGlobalConfiguration.create();
configuration.getBeanOperationExecutorMap().put(operationExecutor.getClass().getSimpleName(), operationExecutor);
~~~

## 2.使用执行器

### 2.1.在自动填充时使用

在进行自动填充时,你可以在 `@AutoOperate` 注解中指定使用异步执行器:

~~~java
@AutoOperate(type = Foo.class, executor = "AsyncBeanOperationExecutor")
public List<Foo> getFoo(Integer type) {
// do nothing
}
~~~

### 2.2.在手动填充时使用

你也可以在手动填充时使用:

~~~java
List<Foo> foos = fooService.list();
OperateTemplate template = SpringUtil.getBean(OperateTemplate.class);
AsyncBeanOperationExecutor operationExecutor = SpringUtil.getBean(AsyncBeanOperationExecutor.class);
OperateTemplate.execute(foos, operationExecutor, op -> true);
~~~

### 2.3.在操作者接口中使用

在操作者接口中,你可以在 `@Operator` 注解中指定要使用异步执行器:

~~~java
@Operator(executor = "AsyncBeanOperationExecutor") // 使用异步执行器
private interface OperatorInterface {
@Assemble(key = "id", container = "test", props = @Mapping(ref = "name"))
void operate(Collection<Map<String, Object>> targets);
}
~~~

## 3.批量大小

异步执行器执行的时候,将会先将操作按数据源容器分组,然后在前者的基础上,再根据指定的操作数量进行分组,你可以通过 `setBatchSize` 调整执行器的每次想线程池提交任务时,一次任务中操作对象的数量。

比如:

~~~java
// 创建异步操作执行器
AsyncBeanOperationExecutor operationExecutor = new AsyncBeanOperationExecutor(configuration, threadPoolExecutor);
// 指定批量大小
operationExecutor.setBatchSize(5);
~~~

举个例子:

假设我们现在有 10 个待填充 Bean 对象,而每个 Bean 又各自通过 `nsetedBean` 嵌套一个 Bean 对象,即总共有 20 个 Bean。

现在,我们在 Bean 中分别基于 `id``key``code` 声明了三个装配操作,具体配置如下:

~~~java
@Data
private static class Bean {

@Assemble(container = "container1", props = @Mapping("name"))
private Integer id;
private String name;

@Assemble(container = "container2", props = @Mapping("value"))
private Integer key;
private String value;

@Assemble(container = "container2", props = @Mapping("val"))
private Integer code;
private String val;

@Disassemble(type = Bean.class)
private Bean nsetedBean;
}
~~~

那么,当我们填充这 10 个 Bean 时,实际上总共需要完成 10 * (1 + 1) * 3 共 60 组操作。

为了保证尽可能减少查库次数,因此默认情况下,执行器会将 60 次填充按对应的数据源容器打包成两个任务提交给线程池完成:

- 查询 `container1`,然后完成全部基于 `container1` 的 40 组操作;
- 查询 `container2`,然后完成基于 `container1` 的 20 组操作;

上述这个逻辑的问题在于,当需要填充的对象越来越多,且需要映射的字段也越来越多时,反射读写字段消耗的时间也会越来越多,甚至可能会超过查库或 RPC 调用所消耗的时间。

此时,为了提高效率,你可以指定批量大小,将每一组操作再拆分为更细力度的任务。比如,如果你可以指定批量大小为 20,那么第一个任务就会被拆成两份,此时实际上提交到线程池中的任务就是三个:

- 查询 `container1`,然后完成基于 `container1` 的 20 组操作;
- 查询 `container1`,然后完成基于 `container1` 的 20 组操作;
- 查询 `container2`,然后完成基于 `container1` 的 20 组操作;
Loading

0 comments on commit 551474f

Please sign in to comment.