Skip to content

Commit

Permalink
Rename functions and refactor tests
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <[email protected]>
  • Loading branch information
chishui committed Jun 27, 2024
1 parent 666403c commit 3a9145c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Abstract base class for batch processors.
*
Expand All @@ -37,7 +39,7 @@ protected AbstractBatchingProcessor(String tag, String description, int batchSiz
* @param ingestDocumentWrappers {@link List} of {@link IngestDocumentWrapper} to be processed.
* @param handler {@link Consumer} to be called with the results of the processing.
*/
public abstract void internalBatchExecute(
protected abstract void subBatchExecute(
List<IngestDocumentWrapper> ingestDocumentWrappers,
Consumer<List<IngestDocumentWrapper>> handler
);
Expand All @@ -49,15 +51,9 @@ public void batchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Con
return;
}

// if batch size is 1, use default implementation in Processor to handle documents one at a time.
if (this.batchSize == 1) {
super.batchExecute(ingestDocumentWrappers, handler);
return;
}

// if batch size is larger than document size, send one batch
if (this.batchSize >= ingestDocumentWrappers.size()) {
internalBatchExecute(ingestDocumentWrappers, handler);
subBatchExecute(ingestDocumentWrappers, handler);
return;
}

Expand All @@ -67,7 +63,7 @@ public void batchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Con
AtomicInteger counter = new AtomicInteger(size);
List<IngestDocumentWrapper> allResults = Collections.synchronizedList(new ArrayList<>());
for (List<IngestDocumentWrapper> batch : batches) {
this.internalBatchExecute(batch, batchResults -> {
this.subBatchExecute(batch, batchResults -> {
allResults.addAll(batchResults);
if (counter.addAndGet(-batchResults.size()) == 0) {
handler.accept(allResults);
Expand Down Expand Up @@ -114,7 +110,15 @@ public AbstractBatchingProcessor create(
String description,
Map<String, Object> config
) throws Exception {
int batchSize = ConfigurationUtils.readIntProperty(processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE);
int batchSize = ConfigurationUtils.readIntProperty(this.processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE);
if (batchSize < 1) {
throw newConfigurationException(
this.processorType,
tag,
BATCH_SIZE_FIELD,
BATCH_SIZE_FIELD + " must be a positive integer"
);
}
return newProcessor(tag, description, batchSize, config);
}

Expand All @@ -127,6 +131,11 @@ public AbstractBatchingProcessor create(
* @param config configuration of the processor
* @return a new batch processor instance
*/
protected abstract AbstractBatchingProcessor newProcessor(String tag, String description, int batchSize, Map<String, Object> config);
protected abstract AbstractBatchingProcessor newProcessor(
String tag,
String description,
int batchSize,
Map<String, Object> config
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,24 @@
package org.opensearch.ingest;

import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class AbstractBatchingProcessorTests extends OpenSearchTestCase {

private static final String DESCRIPTION = "description";
private static final String TAG = "tag";

@Before
public void setup() {}

public void testBatchExecute_emptyInput() throws Exception {
DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 3));
Consumer<List<IngestDocumentWrapper>> handler = (results) -> { assertTrue(results.isEmpty()); };
public void testBatchExecute_emptyInput() {
DummyProcessor processor = new DummyProcessor(3);
Consumer<List<IngestDocumentWrapper>> handler = (results) -> assertTrue(results.isEmpty());
processor.batchExecute(Collections.emptyList(), handler);
verify(processor, never()).internalBatchExecute(anyList(), any());
assertTrue(processor.getSubBatches().isEmpty());
}

public void testBatchExecute_singleBatchSize() throws Exception {
DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 3));
public void testBatchExecute_singleBatchSize() {
DummyProcessor processor = new DummyProcessor(3);
List<IngestDocumentWrapper> wrapperList = Arrays.asList(
IngestDocumentPreparer.createIngestDocumentWrapper(1),
IngestDocumentPreparer.createIngestDocumentWrapper(2),
Expand All @@ -51,11 +35,12 @@ public void testBatchExecute_singleBatchSize() throws Exception {
List<IngestDocumentWrapper> resultList = new ArrayList<>();
processor.batchExecute(wrapperList, resultList::addAll);
assertEquals(wrapperList, resultList);
verify(processor, times(1)).internalBatchExecute(anyList(), any());
assertEquals(1, processor.getSubBatches().size());
assertEquals(wrapperList, processor.getSubBatches().get(0));
}

public void testBatchExecute_multipleBatches() throws Exception {
DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 2));
public void testBatchExecute_multipleBatches() {
DummyProcessor processor = new DummyProcessor(2);
List<IngestDocumentWrapper> wrapperList = Arrays.asList(
IngestDocumentPreparer.createIngestDocumentWrapper(1),
IngestDocumentPreparer.createIngestDocumentWrapper(2),
Expand All @@ -66,59 +51,56 @@ public void testBatchExecute_multipleBatches() throws Exception {
List<IngestDocumentWrapper> resultList = new ArrayList<>();
processor.batchExecute(wrapperList, resultList::addAll);
assertEquals(wrapperList, resultList);
ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
verify(processor, times(3)).internalBatchExecute(argumentCaptor.capture(), any());
assertEquals(wrapperList.subList(0, 2), argumentCaptor.getAllValues().get(0));
assertEquals(wrapperList.subList(2, 4), argumentCaptor.getAllValues().get(1));
assertEquals(wrapperList.subList(4, 5), argumentCaptor.getAllValues().get(2));
assertEquals(3, processor.getSubBatches().size());
assertEquals(wrapperList.subList(0, 2), processor.getSubBatches().get(0));
assertEquals(wrapperList.subList(2, 4), processor.getSubBatches().get(1));
assertEquals(wrapperList.subList(4, 5), processor.getSubBatches().get(2));
}

public void testBatchExecute_randomBatches() throws Exception {
public void testBatchExecute_randomBatches() {
int batchSize = randomIntBetween(2, 32);
int docCount = randomIntBetween(2, 32);
DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, batchSize));
DummyProcessor processor = new DummyProcessor(batchSize);
List<IngestDocumentWrapper> wrapperList = new ArrayList<>();
for (int i = 0; i < docCount; ++i) {
wrapperList.add(IngestDocumentPreparer.createIngestDocumentWrapper(i));
}
List<IngestDocumentWrapper> resultList = new ArrayList<>();
processor.batchExecute(wrapperList, resultList::addAll);
assertEquals(wrapperList, resultList);
verify(processor, times(docCount / batchSize + (docCount % batchSize == 0 ? 0 : 1))).internalBatchExecute(
anyList(),
any()
);
assertEquals(docCount / batchSize + (docCount % batchSize == 0 ? 0 : 1), processor.getSubBatches().size());
}

public void testBatchExecute_defaultBatchSize() throws Exception {
DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 1));
public void testBatchExecute_defaultBatchSize() {
DummyProcessor processor = new DummyProcessor(1);
List<IngestDocumentWrapper> wrapperList = Arrays.asList(
IngestDocumentPreparer.createIngestDocumentWrapper(1),
IngestDocumentPreparer.createIngestDocumentWrapper(2),
IngestDocumentPreparer.createIngestDocumentWrapper(3)
);
List<IngestDocumentWrapper> resultList = new ArrayList<>();
processor.batchExecute(wrapperList, resultList::addAll);
for (int i = 0; i < wrapperList.size(); ++i) {
assertEquals(wrapperList.get(i).getSlot(), resultList.get(i).getSlot());
assertEquals(wrapperList.get(i).getIngestDocument(), resultList.get(i).getIngestDocument());
assertEquals(wrapperList.get(i).getException(), resultList.get(i).getException());
}
verify(processor, never()).internalBatchExecute(anyList(), any());
verify(processor, times(3)).execute(any(IngestDocument.class));
assertEquals(wrapperList, resultList);
assertEquals(3, processor.getSubBatches().size());
assertEquals(wrapperList.subList(0, 1), processor.getSubBatches().get(0));
assertEquals(wrapperList.subList(1, 2), processor.getSubBatches().get(1));
assertEquals(wrapperList.subList(2, 3), processor.getSubBatches().get(2));
}

static class DummyProcessor extends AbstractBatchingProcessor {
private List<List<IngestDocumentWrapper>> subBatches = new ArrayList<>();

public List<List<IngestDocumentWrapper>> getSubBatches() {
return subBatches;
}

protected DummyProcessor(String tag, String description, int batchSize) {
super(tag, description, batchSize);
protected DummyProcessor(int batchSize) {
super("tag", "description", batchSize);
}

@Override
public void internalBatchExecute(
List<IngestDocumentWrapper> ingestDocumentWrappers,
Consumer<List<IngestDocumentWrapper>> handler
) {
public void subBatchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) {
subBatches.add(ingestDocumentWrappers);
handler.accept(ingestDocumentWrappers);
}

Expand Down

0 comments on commit 3a9145c

Please sign in to comment.