Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement text chunking processor with fixed token length and delimiter algorithm #607

Merged
Merged
Show file tree
Hide file tree
Changes from 176 commits
Commits
Show all changes
189 commits
Select commit Hold shift + click to select a range
cbc5423
implement chunking processor and fixed token length
yuye-aws Feb 18, 2024
3e2d365
initialize node client for document chunking processor
yuye-aws Feb 18, 2024
89584a9
initialize document chunking processor with analysis registry
yuye-aws Feb 22, 2024
596fbf7
chunker factory create with analysis registry
yuye-aws Feb 22, 2024
636f907
implement tokenizer in fixed token length algorithm with analysis reg…
yuye-aws Feb 22, 2024
2ffd6b0
add max token count parsing logic
yuye-aws Feb 22, 2024
2195353
bug fix for non-existing index
yuye-aws Feb 22, 2024
bdd418e
change error log
yuye-aws Feb 22, 2024
458420b
implement evenly chunk
yuye-aws Feb 26, 2024
02420d7
unit tests for chunker factory
yuye-aws Feb 26, 2024
f8f60a1
unit tests for chunker factory
yuye-aws Feb 26, 2024
ff0587c
add error message for chunker factory tests
yuye-aws Feb 26, 2024
afc3189
resolve comments
yuye-aws Feb 26, 2024
159e426
Revert "implement evenly chunk"
yuye-aws Feb 26, 2024
2405952
add default value logic back
yuye-aws Feb 26, 2024
b930222
implement unit test for fixed token length chunker
yuye-aws Feb 26, 2024
ecb8297
add test cases in unit test for fixed token length chunker
yuye-aws Feb 26, 2024
d6d31fa
support map type as an input
yuye-aws Feb 26, 2024
fafae93
support map type as an input
yuye-aws Feb 26, 2024
d23c1fb
bug fix for map type
yuye-aws Feb 26, 2024
39c6162
bug fix for map type
yuye-aws Feb 26, 2024
5714d1e
bug fix for map type in document chunking processor
yuye-aws Feb 26, 2024
2f23c30
remove system out println
yuye-aws Feb 26, 2024
41cff0c
add delimiter chunker
xinyual Feb 26, 2024
b0fda97
add UT for delimiter chunker
xinyual Feb 26, 2024
b16e7c4
add delimiter chunker processor
xinyual Feb 26, 2024
11e6a4b
add more UTs
xinyual Feb 27, 2024
81000f3
add more UTs
xinyual Feb 27, 2024
f3b468f
basic unit tests for document chunking processor
yuye-aws Feb 27, 2024
eea6fc8
fix tests for getProcessors in neural search
yuye-aws Feb 27, 2024
ec6bf49
add unit tests with string, map and nested map type for document chun…
yuye-aws Feb 27, 2024
3ae94e4
add unit tests for parameter valdiation in document chunking processor
yuye-aws Feb 27, 2024
c8dc66c
add back deleted xml file
yuye-aws Feb 27, 2024
1e1ce1b
restore xml file
yuye-aws Feb 27, 2024
b425122
integration tests for document chunking processor
yuye-aws Feb 29, 2024
31bf921
add back Run_Neural_Search.xml
yuye-aws Feb 29, 2024
11d8f53
restore Run_Neural_Search.xml
yuye-aws Feb 29, 2024
0662278
add changelog
yuye-aws Feb 29, 2024
5e75e04
update integration test for cascade processor
yuye-aws Feb 29, 2024
962ed32
add max chunk limit
xinyual Mar 1, 2024
9487de5
remove useless and apply spotless
xinyual Mar 1, 2024
04043ca
update error message
yuye-aws Mar 1, 2024
08bf2d1
change field UT
xinyual Mar 1, 2024
c7cc59f
remove useless and apply spotless
xinyual Mar 1, 2024
0721f7a
change logic of max chunk number
xinyual Mar 1, 2024
d2bc576
add max chunk limit into fixed token length algorithm
yuye-aws Mar 1, 2024
120fae8
Support list<list<string>> type in embedding and extract validation l…
zane-neo Mar 1, 2024
0af3024
fix unit tests for inference processor
yuye-aws Mar 1, 2024
e69bbe1
implement unit tests for unit tests with max_chunk_limit in fixed tok…
yuye-aws Mar 1, 2024
f21f40f
constructor for inference processor
yuye-aws Mar 4, 2024
4babd4d
use inference processor
xinyual Mar 1, 2024
24f4980
draft code for extending inference processor with document chunking p…
yuye-aws Mar 5, 2024
0b4036a
api refactor for document chunking processor
yuye-aws Mar 5, 2024
9ff6645
remove nested list key for chunking processor
yuye-aws Mar 5, 2024
0e464fe
remove unused function
yuye-aws Mar 5, 2024
d6b68ed
remove processor validator
yuye-aws Mar 6, 2024
a7a9260
remove processor validator
yuye-aws Mar 6, 2024
39e8df5
Revert InferenceProcessor.java
yuye-aws Mar 6, 2024
2ee1923
revert changes in text embedding and sparse encoding processor
yuye-aws Mar 6, 2024
ca534ab
implement chunk with map in document chunking processor
yuye-aws Mar 7, 2024
eedd58d
add default delimiter value
Mar 7, 2024
b9bf3ef
implement max chunk logic in document chunking processor
yuye-aws Mar 7, 2024
2ac2f60
add initial value for max chunk limit in document chunking processor
yuye-aws Mar 7, 2024
6067044
bug fix in chunking processor: allow 0 max_chunk_limit
yuye-aws Mar 7, 2024
98d1ab3
implement overlap rate with big decimal
yuye-aws Mar 7, 2024
79a637c
update max chunk limit in delimiter
yuye-aws Mar 7, 2024
6da6395
update parameter setting for fixed token length algorithm
yuye-aws Mar 7, 2024
105d4a0
update max chunk limit implementation in chunking processor
yuye-aws Mar 7, 2024
cd4eda7
fix unit tests for fixed token length algorithm
yuye-aws Mar 7, 2024
ceaa7d2
spotless apply for document chunking processor
yuye-aws Mar 7, 2024
715c145
initialize current chunk count
yuye-aws Mar 7, 2024
75663e1
parameter validation for max chunk limit
yuye-aws Mar 7, 2024
2e5dc00
fix integration tests
yuye-aws Mar 7, 2024
d711390
fix current UT
xinyual Mar 7, 2024
98124ee
change delimiter UT
xinyual Mar 7, 2024
353e88e
remove delimiter useless code
xinyual Mar 7, 2024
de554e6
add more UT
xinyual Mar 7, 2024
2453a79
add UT for list inside map
xinyual Mar 7, 2024
5f00107
add UT for list inside map
xinyual Mar 7, 2024
fc94955
update unit tests for chunking processor
yuye-aws Mar 8, 2024
388fd43
add more unit tests for chunking processor
yuye-aws Mar 8, 2024
bb35c79
resolve code review comments
yuye-aws Mar 8, 2024
b4d5fda
add java doc
yuye-aws Mar 8, 2024
453dd35
update java doc
yuye-aws Mar 8, 2024
8c8fbaf
update java doc
yuye-aws Mar 8, 2024
b588983
fix import order
yuye-aws Mar 8, 2024
23dd769
update java doc
yuye-aws Mar 8, 2024
3ad78da
fix java doc error
yuye-aws Mar 8, 2024
abb9bde
fix update ut for fixed token length chunker
yuye-aws Mar 8, 2024
82aa219
resolve code review comments
yuye-aws Mar 8, 2024
3158e28
resolve code review comments
yuye-aws Mar 8, 2024
38d6e60
resolve code review comments
yuye-aws Mar 8, 2024
584bc59
resolve code review comments
yuye-aws Mar 8, 2024
cbea5df
implement chunk count wrapper for max chunk limit
yuye-aws Mar 8, 2024
c3c8ff2
rename variable end to nextDelimiterPosition
yuye-aws Mar 8, 2024
da055e7
adjust method place
yuye-aws Mar 8, 2024
d32840c
update java doc for fixed token length algorithm
yuye-aws Mar 8, 2024
830f665
reanme interface name and fixed token length algorithm name
yuye-aws Mar 8, 2024
1275bd6
update fixed token length algorithm configuration for integration tests
yuye-aws Mar 8, 2024
4e2f5d4
make delimiter member variables static
yuye-aws Mar 8, 2024
5c20b9b
remove redundant set field value in execute method
yuye-aws Mar 8, 2024
addd37e
resolve code review comments
yuye-aws Mar 8, 2024
7469153
add integration tests with more tokenizers
yuye-aws Mar 10, 2024
ad00b88
bug fix: unit test failure due to invalid tokenizer
yuye-aws Mar 10, 2024
d4673d4
bug fix: token concatenation in fixed token length algorithm
yuye-aws Mar 10, 2024
7a589c6
update chunker interface
yuye-aws Mar 11, 2024
e1f6c79
track chunkCount within function
yuye-aws Mar 11, 2024
bb372e6
bug fix: allow white space as the delimiter
yuye-aws Mar 11, 2024
2538ab3
fix fixed length chunker
xinyual Mar 11, 2024
9c9172d
fix delimiter chunker
xinyual Mar 11, 2024
d05b246
fix chunker factory
xinyual Mar 11, 2024
04fc7d3
fix UTs
xinyual Mar 11, 2024
7fe93c0
fix UT and chunker factory
xinyual Mar 11, 2024
cefb0a6
move analysis_registry to non-runtime parameters
xinyual Mar 11, 2024
16038af
fix Uts
xinyual Mar 11, 2024
d1d88dc
avoid java doc change
xinyual Mar 11, 2024
eb439bd
move validate to commonUtlis
xinyual Mar 11, 2024
bc7f70c
remove useless function
xinyual Mar 11, 2024
bb941cd
change java doc
xinyual Mar 11, 2024
77d4101
fix Document process ut
xinyual Mar 11, 2024
92f587f
fixed token length: re-implement with start and end offset
yuye-aws Mar 11, 2024
94b1967
update exception message
yuye-aws Mar 12, 2024
98944d1
fix document chunking processor IT
yuye-aws Mar 12, 2024
8799fd0
bug fix: adjust start, end content position in fixed token length alg…
yuye-aws Mar 12, 2024
5cda870
update changelog for 2.x release
yuye-aws Mar 12, 2024
c942b17
rename processor
yuye-aws Mar 12, 2024
6461b32
update default delimiter to be \n\n
yuye-aws Mar 12, 2024
2a0a879
remove change log in 3.0 unreleased
yuye-aws Mar 12, 2024
fbb4edb
fix IT failure due to chunking processor rename
yuye-aws Mar 12, 2024
050f163
update javadoc for text chunking processor factory
yuye-aws Mar 12, 2024
e61f295
adjust functions in chunker interface
yuye-aws Mar 12, 2024
4f87008
move algorithm name definition to concrete chunker class
yuye-aws Mar 12, 2024
c651b3e
update string formatted message for text chunking processor
yuye-aws Mar 13, 2024
0f45782
update string formatted message for chunker factory
yuye-aws Mar 13, 2024
3d1b792
update string formatted message for chunker parameter validator
yuye-aws Mar 13, 2024
5600b36
update java doc for delimiter algorithm
yuye-aws Mar 13, 2024
3d962ca
support range double in chunker parameter validator
yuye-aws Mar 13, 2024
42de900
update string formatted message for fixed token length algorithm
yuye-aws Mar 13, 2024
6d4fe8c
update sneaky throw with text chunking processor it
yuye-aws Mar 13, 2024
e666f17
add word tokenizer restriction for fixed token length algorithm
yuye-aws Mar 13, 2024
958cc3b
update error message for multiple algorithms in text chunking processor
yuye-aws Mar 13, 2024
183e928
add comment in text chunking processor
yuye-aws Mar 13, 2024
09fccc1
validate max chunk limit with util parameter class
yuye-aws Mar 13, 2024
8ad1e51
update comments
yuye-aws Mar 13, 2024
489fe7b
update comments
yuye-aws Mar 13, 2024
d67880e
update java doc
yuye-aws Mar 13, 2024
666e7b9
update java doc
yuye-aws Mar 13, 2024
9161c93
make parameter final
yuye-aws Mar 14, 2024
0f9c140
implement a map from chunker name to constuctor function in chunker f…
yuye-aws Mar 14, 2024
a574980
bug fix in chunker factory
yuye-aws Mar 14, 2024
87679ad
remove get all chunkers in chunker factory
yuye-aws Mar 14, 2024
08dcd19
remove type check for parameter check for max token count
yuye-aws Mar 14, 2024
f16882d
remove type check for parameter check for analysis registry
yuye-aws Mar 14, 2024
a969a60
implement parser and validator
yuye-aws Mar 14, 2024
34348b3
update comment
yuye-aws Mar 14, 2024
4153988
provide fixed token length as the default algorithm
yuye-aws Mar 14, 2024
06ca1c7
adjust exception message
yuye-aws Mar 14, 2024
3cf671d
adjust exception message
yuye-aws Mar 14, 2024
5fe5eef
use object nonnull and require nonnull
yuye-aws Mar 15, 2024
f3decb4
apply final to ingest document and chunk count
yuye-aws Mar 15, 2024
3b8a3af
merge parameter validator into the parser
yuye-aws Mar 15, 2024
89c465c
assign positive default value for max chunk limit
yuye-aws Mar 15, 2024
e7dffe0
validate supported chunker algorithm in text chunking processor
yuye-aws Mar 15, 2024
463de71
update parameter setting of max chunk limit
yuye-aws Mar 15, 2024
0a04012
add unit test with non list of string
yuye-aws Mar 15, 2024
a524954
add unit test with null input
yuye-aws Mar 15, 2024
10f6568
add unit test for tokenization excpetion in fixed token length algorithm
yuye-aws Mar 15, 2024
3f41f37
tune method name in text chunking processor unit test
yuye-aws Mar 15, 2024
e4bdabc
tune method name in delimiter algorithm unit test
yuye-aws Mar 15, 2024
9e37171
add unit test for overlap rate too small in fixed token length algorithm
yuye-aws Mar 15, 2024
18ba1b1
tune method modifier for all classes
yuye-aws Mar 15, 2024
2ce9840
tune code
yuye-aws Mar 15, 2024
2aea7a5
tune code
yuye-aws Mar 15, 2024
63bbae9
tune exception type in parameter parser
yuye-aws Mar 15, 2024
aaee028
tune comment
yuye-aws Mar 15, 2024
ab2a151
tune comment
yuye-aws Mar 15, 2024
1eb12aa
include max chunk limit in both algorithms
yuye-aws Mar 15, 2024
40991a3
tune comment
yuye-aws Mar 15, 2024
ea4bbb8
allow 0 for max chunk limit
yuye-aws Mar 15, 2024
f0dfb57
update runtime max chunk limit in text chunking processor
yuye-aws Mar 15, 2024
cb4b39b
tune code for chunker
yuye-aws Mar 15, 2024
98dd886
implement test for multiple field max chunk limit exceed
yuye-aws Mar 15, 2024
d245a04
tune methods name in text chunking proceesor unit tests
yuye-aws Mar 15, 2024
ad7ba25
add unit tests for both algorithms with max chunk limit
yuye-aws Mar 15, 2024
9702168
optimize code
yuye-aws Mar 15, 2024
3d8c030
extract max chunk limit check to util class
yuye-aws Mar 17, 2024
9931fae
resolve code review comments
yuye-aws Mar 18, 2024
fb6a961
fix unit tests
yuye-aws Mar 18, 2024
68fef4f
bug fix: only update runtime max chunk limit when enabled
yuye-aws Mar 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x](https://github.com/opensearch-project/neural-search/compare/2.12...2.x)
### Features
- Implement document chunking processor with fixed token length and delimiter algorithm ([#607](https://github.com/opensearch-project/neural-search/pull/607/))
- Enabled support for applying default modelId in neural sparse query ([#614](https://github.com/opensearch-project/neural-search/pull/614)
### Enhancements
- Adding aggregations in hybrid query ([#630](https://github.com/opensearch-project/neural-search/pull/630))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.opensearch.neuralsearch.processor.NormalizationProcessorWorkflow;
import org.opensearch.neuralsearch.processor.SparseEncodingProcessor;
import org.opensearch.neuralsearch.processor.TextEmbeddingProcessor;
import org.opensearch.neuralsearch.processor.TextChunkingProcessor;
import org.opensearch.neuralsearch.processor.TextImageEmbeddingProcessor;
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationFactory;
import org.opensearch.neuralsearch.processor.combination.ScoreCombiner;
import org.opensearch.neuralsearch.processor.factory.TextChunkingProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.NormalizationProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.RerankProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.SparseEncodingProcessorFactory;
Expand Down Expand Up @@ -114,14 +116,21 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
SparseEncodingProcessor.TYPE,
new SparseEncodingProcessorFactory(clientAccessor, parameters.env),
TextImageEmbeddingProcessor.TYPE,
new TextImageEmbeddingProcessorFactory(clientAccessor, parameters.env, parameters.ingestService.getClusterService())
new TextImageEmbeddingProcessorFactory(clientAccessor, parameters.env, parameters.ingestService.getClusterService()),
TextChunkingProcessor.TYPE,
new TextChunkingProcessorFactory(
parameters.env,
parameters.ingestService.getClusterService(),
parameters.indicesService,
parameters.analysisRegistry
)
);
}

@Override
public Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
// we're using "is_disabled" flag as there are no proper implementation of FeatureFlags.isDisabled(). Both
// cases when flag is not set or it is "false" are interpretted in the same way. In such case core is reading
// cases when flag is not set, or it is "false" are interpreted in the same way. In such case core is reading
// the actual value from settings.
if (FeatureFlags.isEnabled(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED.getKey())) {
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.processor;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Locale;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import com.google.common.annotations.VisibleForTesting;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesService;
import org.opensearch.index.IndexSettings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.neuralsearch.processor.chunker.Chunker;
import org.opensearch.index.mapper.IndexFieldMapper;
import org.opensearch.neuralsearch.processor.chunker.ChunkerFactory;
import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;

import static org.opensearch.neuralsearch.processor.chunker.ChunkerParameterParser.parseIntegerParameter;

/**
* This processor is used for text chunking.
* The text chunking results could be fed to downstream embedding processor.
* The processor needs two fields: algorithm and field_map,
* where algorithm defines chunking algorithm and parameters,
* and field_map specifies which fields needs chunking and the corresponding keys for the chunking results.
*/
public final class TextChunkingProcessor extends AbstractProcessor {

public static final String TYPE = "text_chunking";
public static final String FIELD_MAP_FIELD = "field_map";
public static final String ALGORITHM_FIELD = "algorithm";
@VisibleForTesting
static final String MAX_CHUNK_LIMIT_FIELD = "max_chunk_limit";

private static final int DEFAULT_MAX_CHUNK_LIMIT = 100;
private static final int DISABLED_MAX_CHUNK_LIMIT = -1;
private static final String DEFAULT_ALGORITHM = FixedTokenLengthChunker.ALGORITHM_NAME;

private int maxChunkLimit;
model-collapse marked this conversation as resolved.
Show resolved Hide resolved
private Chunker chunker;
model-collapse marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, Object> fieldMap;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final AnalysisRegistry analysisRegistry;
private final Environment environment;

public TextChunkingProcessor(
final String tag,
final String description,
final Map<String, Object> fieldMap,
final Map<String, Object> algorithmMap,
final Environment environment,
final ClusterService clusterService,
final IndicesService indicesService,
final AnalysisRegistry analysisRegistry
) {
super(tag, description);
this.fieldMap = fieldMap;
this.environment = environment;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.analysisRegistry = analysisRegistry;
parseAlgorithmMap(algorithmMap);
}

public String getType() {
return TYPE;
}

@SuppressWarnings("unchecked")
private void parseAlgorithmMap(final Map<String, Object> algorithmMap) {
if (algorithmMap.size() > 1) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unable to create %s processor as [%s] contains multiple algorithms", TYPE, ALGORITHM_FIELD)
);
}

String algorithmKey;
Object algorithmValue;
if (algorithmMap.isEmpty()) {
algorithmKey = DEFAULT_ALGORITHM;
algorithmValue = new HashMap<>();
} else {
Entry<String, Object> algorithmEntry = algorithmMap.entrySet().iterator().next();
algorithmKey = algorithmEntry.getKey();
algorithmValue = algorithmEntry.getValue();
if (!(algorithmValue instanceof Map)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Unable to create %s processor as parameters for [%s] algorithm must be an object",
TYPE,
algorithmKey
)
);
}
}

if (!ChunkerFactory.CHUNKER_ALGORITHMS.contains(algorithmKey)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Chunking algorithm [%s] is not supported. Supported chunking algorithms are %s",
algorithmKey,
ChunkerFactory.CHUNKER_ALGORITHMS
)
);
}
Map<String, Object> chunkerParameters = (Map<String, Object>) algorithmValue;
if (algorithmKey.equals(FixedTokenLengthChunker.ALGORITHM_NAME)) {
// fixed token length algorithm needs analysis registry for tokenization
chunkerParameters.put(FixedTokenLengthChunker.ANALYSIS_REGISTRY_FIELD, analysisRegistry);
}
this.chunker = ChunkerFactory.create(algorithmKey, chunkerParameters);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add check on algorithmKey here instead of check/get null constructor in the ChunkerFactory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already updated.

this.maxChunkLimit = parseIntegerParameter(chunkerParameters, MAX_CHUNK_LIMIT_FIELD, DEFAULT_MAX_CHUNK_LIMIT);
if (maxChunkLimit <= 0 && maxChunkLimit != DISABLED_MAX_CHUNK_LIMIT) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Parameter [%s] must be positive or %s to disable this parameter",
MAX_CHUNK_LIMIT_FIELD,
DISABLED_MAX_CHUNK_LIMIT
)
);
}
}

@SuppressWarnings("unchecked")
private boolean isListOfString(final Object value) {
// an empty list is also List<String>
if (!(value instanceof List)) {
return false;
}
for (Object element : (List<Object>) value) {
if (!(element instanceof String)) {
return false;
}
}
return true;
}

private int getMaxTokenCount(final Map<String, Object> sourceAndMetadataMap) {
String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString();
IndexMetadata indexMetadata = clusterService.state().metadata().index(indexName);
int maxTokenCount;
if (Objects.nonNull(indexMetadata)) {
// if the index is specified in the metadata, read maxTokenCount from the index setting
IndexService indexService = indicesService.indexServiceSafe(indexMetadata.getIndex());
maxTokenCount = indexService.getIndexSettings().getMaxTokenCount();
} else {

Check warning on line 164 in src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java#L162-L164

Added lines #L162 - L164 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this else is not needed, we can just do assignment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not getting your point. This else here can make the processor more efficiently. When executing on a document, the processor either get the environment setting or the index setting.

Copy link
Member

@martin-gaievski martin-gaievski Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it can be something like:

 if (Objects.isNull(indexMetadata)) {
    return = IndexSettings.MAX_TOKEN_COUNT_SETTING.get(environment.settings());
}
// if the index is specified in the metadata, read maxTokenCount from the index setting
IndexService indexService = indicesService.indexServiceSafe(indexMetadata.getIndex());
return indexService.getIndexSettings().getMaxTokenCount();

not a blocker though, but if you're going to push more commits please address this one too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the sample code. This part will be updated according to your comment.

maxTokenCount = IndexSettings.MAX_TOKEN_COUNT_SETTING.get(environment.settings());
}
return maxTokenCount;
}

/**
* This method will be invoked by PipelineService to perform chunking and then write back chunking results to the document.
* @param ingestDocument {@link IngestDocument} which is the document passed to processor.
*/
@Override
public IngestDocument execute(final IngestDocument ingestDocument) {
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
validateFieldsValue(sourceAndMetadataMap);
// fixed token length algorithm needs runtime parameter max_token_count for tokenization
Map<String, Object> runtimeParameters = new HashMap<>();
if (chunker instanceof FixedTokenLengthChunker) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check of chunker type damages the abstraction you've built with the CunckerFactory. Why can't you set this parameters in all cases and then decide on how to use it later, in a chunker implementation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid there would be some parameter conflict from other chunking algorithms. I need to recover this check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I guess for say delimiter checking algo it may erase correct value of max token count. can we pass source and map data to a factory so this if became part of the factory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the factory should be an independent component, which is only responsible for creating an instance of chunking algorithm and return the instance to the text chunking processor. That's why we are validating parameters in the text chunking processor,

int maxTokenCount = getMaxTokenCount(sourceAndMetadataMap);
runtimeParameters.put(FixedTokenLengthChunker.MAX_TOKEN_COUNT_FIELD, maxTokenCount);
}
chunkMapType(sourceAndMetadataMap, fieldMap, runtimeParameters, 0);
return ingestDocument;
}

private void validateFieldsValue(final Map<String, Object> sourceAndMetadataMap) {
for (Map.Entry<String, Object> embeddingFieldsEntry : fieldMap.entrySet()) {
Object sourceValue = sourceAndMetadataMap.get(embeddingFieldsEntry.getKey());
if (Objects.nonNull(sourceValue)) {
String sourceKey = embeddingFieldsEntry.getKey();
if (sourceValue instanceof List || sourceValue instanceof Map) {
validateNestedTypeValue(sourceKey, sourceValue, 1);
} else if (!(sourceValue instanceof String)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "field [%s] is neither string nor nested type, cannot process it", sourceKey)
);
}
}
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void validateNestedTypeValue(final String sourceKey, final Object sourceValue, final int maxDepth) {
if (maxDepth > MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(environment.settings())) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "map type field [%s] reached max depth limit, cannot process it", sourceKey)
);
} else if (sourceValue instanceof List) {
validateListTypeValue(sourceKey, sourceValue, maxDepth);
} else if (sourceValue instanceof Map) {
((Map) sourceValue).values()
.stream()
.filter(Objects::nonNull)
.forEach(x -> validateNestedTypeValue(sourceKey, x, maxDepth + 1));
} else if (!(sourceValue instanceof String)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "map type field [%s] has non-string type, cannot process it", sourceKey)
);
}
}

@SuppressWarnings({ "rawtypes" })
private void validateListTypeValue(final String sourceKey, final Object sourceValue, final int maxDepth) {
for (Object value : (List) sourceValue) {
if (value instanceof Map) {
validateNestedTypeValue(sourceKey, value, maxDepth + 1);
} else if (value == null) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "list type field [%s] has null, cannot process it", sourceKey)
);
} else if (!(value instanceof String)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "list type field [%s] has non-string value, cannot process it", sourceKey)
);
}
}
}

@SuppressWarnings("unchecked")
private int chunkMapType(
Map<String, Object> sourceAndMetadataMap,
final Map<String, Object> fieldMap,
final Map<String, Object> runtimeParameters,
final int chunkCount
) {
int updatedChunkCount = chunkCount;
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();
if (targetKey instanceof Map) {
// call this method recursively when target key is a map
Object sourceObject = sourceAndMetadataMap.get(originalKey);
if (sourceObject instanceof List) {
List<Object> sourceObjectList = (List<Object>) sourceObject;
for (Object source : sourceObjectList) {
if (source instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) source,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
}
}
} else if (sourceObject instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) sourceObject,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
}
} else {
// chunk the object when target key is of leaf type (null, string and list of string)
Object chunkObject = sourceAndMetadataMap.get(originalKey);
List<String> chunkedResult = new ArrayList<>();
updatedChunkCount = chunkLeafType(chunkObject, chunkedResult, runtimeParameters, updatedChunkCount);
sourceAndMetadataMap.put(String.valueOf(targetKey), chunkedResult);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sourceAndMetadataMap contains some metadata fields such as _index, _routing and _id, if the targetKey equals the name of the metadata field, may cause accident.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple solution is to prohibiting targetKey starting with "_".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check the behavior of other ingestion processors.

}
}
return updatedChunkCount;
}

private int chunkString(final String content, List<String> result, final Map<String, Object> runTimeParameters, final int chunkCount) {
// chunk the content, return the updated chunkCount and add chunk passages into result
int updatedChunkCount = chunkCount;
List<String> contentResult = chunker.chunk(content, runTimeParameters);
updatedChunkCount += contentResult.size();
if (updatedChunkCount > maxChunkLimit && maxChunkLimit != DISABLED_MAX_CHUNK_LIMIT) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"The number of chunks produced by %s processor has exceeded the allowed maximum of [%s]. This limit can be set by changing the [%s] parameter.",
TYPE,
maxChunkLimit,
MAX_CHUNK_LIMIT_FIELD
)
);
}
result.addAll(contentResult);
return updatedChunkCount;
}

private int chunkList(
final List<String> contentList,
List<String> result,
final Map<String, Object> runTimeParameters,
final int chunkCount
) {
// flatten original output format from List<List<String>> to List<String>
int updatedChunkCount = chunkCount;
for (String content : contentList) {
updatedChunkCount = chunkString(content, result, runTimeParameters, updatedChunkCount);
}
return updatedChunkCount;
}

@SuppressWarnings("unchecked")
private int chunkLeafType(final Object value, List<String> result, final Map<String, Object> runTimeParameters, final int chunkCount) {
// leaf type means null, String or List<String>
// the result should be an empty list when the input is null
int updatedChunkCount = chunkCount;
if (value instanceof String) {
updatedChunkCount = chunkString(value.toString(), result, runTimeParameters, updatedChunkCount);
} else if (isListOfString(value)) {
updatedChunkCount = chunkList((List<String>) value, result, runTimeParameters, updatedChunkCount);
}
return updatedChunkCount;
}
}
Loading
Loading