Skip to content

Commit

Permalink
Merge pull request #4726 from ntisseyre/reindex_subset
Browse files Browse the repository at this point in the history
Reindex subset of vertices
  • Loading branch information
ntisseyre authored Jan 23, 2025
2 parents 7f6ffcf + e5859c6 commit 53a5332
Show file tree
Hide file tree
Showing 22 changed files with 541 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ Configuration options for the storage backend. Some options are applicable only
| storage.directory | Storage directory for those storage backends that require local storage. | String | (no default value) | LOCAL |
| storage.drop-on-clear | Whether to drop the graph database (true) or delete rows (false) when clearing storage. Note that some backends always drop the graph database when clearing storage. Also note that indices are always dropped when clearing storage. | Boolean | true | MASKABLE |
| storage.hostname | The hostname or comma-separated list of hostnames of storage backend servers. This is only applicable to some storage backends, such as cassandra and hbase. | String[] | 127.0.0.1 | LOCAL |
| storage.keys-size | The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request. | Integer | 100 | MASKABLE |
| storage.num-mutations-parallel-threshold | This parameter determines the minimum number of mutations a transaction must have before parallel processing is applied during aggregation. Leveraging parallel processing can enhance the commit times for transactions involving a large number of mutations. However, it is advisable not to set the threshold too low (e.g., 0 or 1) due to the overhead associated with parallelism synchronization. This overhead is more efficiently offset in the context of larger transactions. | Integer | 100 | MASKABLE |
| storage.page-size | JanusGraph break requests that may return many results from distributed storage backends into a series of requests for small chunks/pages of results, where each chunk contains up to this many elements. | Integer | 100 | MASKABLE |
| storage.parallel-backend-ops | Whether JanusGraph should attempt to parallelize storage operations | Boolean | true | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public static void evaluateQuery(JanusGraphQuery query, ElementCategory resultTy
}

protected ScanMetrics executeScanJob(VertexScanJob job) throws Exception {
return executeScanJob(VertexJobConverter.convert(graph,job));
return executeScanJob(VertexJobConverter.convert(graph, job, null));
}

protected ScanMetrics executeScanJob(ScanJob job) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex;
import org.janusgraph.graphdb.vertices.CacheVertex;
import org.janusgraph.testutil.TestGraphConfigs;
import org.javatuples.Pair;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
Expand Down Expand Up @@ -1451,6 +1452,94 @@ public void testCompositeVsMixedIndexing() {
assertTrue(tx.traversal().V().has("intId2", 234).hasNext());
}

@Test
public void testSubsetReindex() throws Exception {

clopen(option(FORCE_INDEX_USAGE), true);

mgmt.makeVertexLabel("cat").make();
mgmt.makeVertexLabel("dog").make();

makeKey("id", Integer.class);
makeKey("name", String.class);
final PropertyKey typeKey = makeKey("type", String.class);

String typeIndex = "searchByType";
mgmt.buildIndex(typeIndex, Vertex.class)
.addKey(typeKey)
.buildCompositeIndex();
mgmt.commit();

//Cats
int catsCount = 3;
for (int i = 0; i < catsCount; i++) {
Vertex v = tx.addVertex("cat");
v.property("id", i);
v.property("name", "cat_" + i);
v.property("type", "cat");
}

//Dogs
for (int i = 0; i < 5; i++) {
Vertex v = tx.addVertex("dog");
v.property("id", i);
v.property("name", "dog_" + i);
v.property("type", "dog");
}

tx.commit();

//Select a subset of vertices to index
clopen(option(FORCE_INDEX_USAGE), true);
List<Vertex> cats = tx.traversal().V().has("type", "cat").toList();
assertEquals(catsCount, cats.size());
String excludedCat = cats.get(cats.size() - 1).value("name");
List<Pair<Object, String>> catsSubset = cats.subList(0, cats.size() - 1).stream()
.map(kitty -> new Pair<Object, String>(kitty.id(), kitty.value("name")))
.collect(Collectors.toList());

List<Vertex> dogs = tx.traversal().V().has("type", "dog").toList();
assertEquals(5, dogs.size());
tx.rollback();

//Create new Index
graph.getOpenTransactions().forEach(JanusGraphTransaction::rollback);
mgmt = graph.openManagement();
mgmt.getOpenInstances().stream().filter(i -> !i.contains("current")).forEach(i -> mgmt.forceCloseInstance(i));
mgmt.commit();

String catsNameIndex = "searchByName_CatsOnly";
mgmt = graph.openManagement();
mgmt.buildIndex(catsNameIndex, Vertex.class)
.addKey(mgmt.getPropertyKey("name"))
.indexOnly(mgmt.getVertexLabel("cat"))
.buildCompositeIndex();
mgmt.commit();

//Make Index as REGISTERED
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REGISTER_INDEX).get();
mgmt.commit();
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.REGISTERED).call();

//Reindex a given subset
List<Object> reIndexOnlyIds = catsSubset.stream().map(Pair::getValue0).collect(Collectors.toList());
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REINDEX, reIndexOnlyIds).get();
mgmt.commit();
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.ENABLED).call();

clopen(option(FORCE_INDEX_USAGE), true);
catsSubset.forEach(kitty -> {
List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", kitty.getValue1()).toList();
assertEquals(1, catsByName.size());
});

List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", excludedCat).toList();
assertEquals(0, catsByName.size());
tx.rollback();
}

@Test
public void testIndexInlineProperties() throws NoSuchMethodException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture;

import java.time.Duration;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -341,6 +342,17 @@ interface IndexBuilder {
*/
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads);

/**
* Updates the provided index according to the given {@link SchemaAction} for
* the given subset of vertices.
*
* @param index
* @param updateAction
* @param vertexOnly Set of vertexIds that only should be considered for index update
* @return a future that completes when the index action is done
*/
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly);

/**
* If an index update job was triggered through {@link #updateIndex(Index, SchemaAction)} with schema actions
* {@link org.janusgraph.core.schema.SchemaAction#REINDEX} or {@link org.janusgraph.core.schema.SchemaAction#DISCARD_INDEX}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_USERNAME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CONNECTION_TIMEOUT;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.KEYS_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT;
Expand Down Expand Up @@ -69,6 +70,7 @@ public enum Deployment {
protected final int port;
protected final Duration connectionTimeoutMS;
protected final int pageSize;
protected final int keysSize;

protected final String username;
protected final String password;
Expand All @@ -83,6 +85,7 @@ public DistributedStoreManager(Configuration storageConfig, int portDefault) {
else this.port = portDefault;
this.connectionTimeoutMS = storageConfig.get(CONNECTION_TIMEOUT);
this.pageSize = storageConfig.get(PAGE_SIZE);
this.keysSize = storageConfig.get(KEYS_SIZE);
this.times = storageConfig.get(TIMESTAMP_PROVIDER);

if (storageConfig.has(AUTH_USERNAME)) {
Expand Down Expand Up @@ -121,6 +124,15 @@ public int getPageSize() {
return pageSize;
}

/**
* Returns the default configured keys size for this storage backend. The keys size is used to determine
* how many keys/partitions to request from storage within single request.
* @return
*/
public int getKeysSize() {
return keysSize;
}

/*
* TODO this should go away once we have a JanusGraphConfig that encapsulates TimestampProvider
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.keycolumnvalue;

import org.apache.commons.lang.NotImplementedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
Expand Down Expand Up @@ -181,6 +182,10 @@ default Map<SliceQuery, Map<StaticBuffer, EntryList>> getMultiSlices(MultiKeysQu
*/
void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException;

default KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
throw new NotImplementedException();
}

/**
* Returns a {@link KeyIterator} over all keys that fall within the key-range specified by the given query and have one or more columns matching the column-range.
* Calling {@link KeyIterator#getEntries()} returns the list of all entries that match the column-range specified by the given query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
private final StoreTransaction storeTx;
private final List<SliceQuery> queries;
private final Predicate<StaticBuffer> keyFilter;
private final List<StaticBuffer> keysToScan;
private final Configuration graphConfiguration;
private final DataPuller[] pullThreads;
private final BlockingQueue<SliceResult>[] dataQueues;
Expand All @@ -72,6 +73,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
StoreTransaction storeTx,
List<SliceQuery> queries,
Predicate<StaticBuffer> keyFilter,
List<StaticBuffer> keysToScan,
BlockingQueue<Row> rowQueue,
Configuration graphConfiguration) throws BackendException {

Expand All @@ -80,6 +82,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
this.storeTx = storeTx;
this.queries = queries;
this.keyFilter = keyFilter;
this.keysToScan = keysToScan;
this.graphConfiguration = graphConfiguration;

this.dataQueues = new BlockingQueue[queries.size()];
Expand Down Expand Up @@ -189,8 +192,14 @@ private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws
this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE));
dataQueues[pos] = queue;

DataPuller dp = new DataPuller(sq, queue,
KCVSUtil.getKeys(store,sq,storeFeatures, MAX_KEY_LENGTH,stx), keyFilter);
KeyIterator keyIterator;
if (keysToScan != null) {
keyIterator = store.getKeys(keysToScan, sq, stx);
} else {
keyIterator = KCVSUtil.getKeys(store, sq, storeFeatures, MAX_KEY_LENGTH, stx);
}

DataPuller dp = new DataPuller(sq, queue, keyIterator, keyFilter);
pullThreads[pos] = dp;
dp.setName("data-puller-" + pos); // setting the name for thread dumps!
dp.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ default void workerIterationEnd(ScanMetrics metrics) {}
*/
List<SliceQuery> getQueries();

/**
* Get keys to scan by the job. If stream is empty, all keys will be scanned.
* @return
*/
default List<StaticBuffer> getKeysToScan() {
return null;
}

/**
* A predicate that determines whether
* {@link #process(org.janusgraph.diskstorage.StaticBuffer, java.util.Map, ScanMetrics)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private RowsCollector buildScanner(BlockingQueue<Row> processorQueue, List<Slice
job.getKeyFilter(), processorQueue);
} else {
return new MultiThreadsRowsCollector(store, storeFeatures, storeTx, queries,
job.getKeyFilter(), processorQueue, graphConfiguration);
job.getKeyFilter(), job.getKeysToScan(), processorQueue, graphConfiguration);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ public void acquireLock(final StaticBuffer key,
});
}

@Override
public KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
final KeyIterator ki = backend.getKeys(keys, query, txh);
if (txh.getConfiguration().hasGroupName()) {
return MetricInstrumentedIterator.of(ki, txh.getConfiguration().getGroupName(), metricsStoreName, M_GET_KEYS, M_ITERATOR);
} else {
return ki;
}
});
}

@Override
public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException {
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,10 @@ public boolean apply(@Nullable String s) {
"up to this many elements.",
ConfigOption.Type.MASKABLE, 100);

public static final ConfigOption<Integer> KEYS_SIZE = new ConfigOption<>(STORAGE_NS,"keys-size",
"The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request.",
ConfigOption.Type.MASKABLE, 100);

public static final ConfigOption<Boolean> DROP_ON_CLEAR = new ConfigOption<>(STORAGE_NS, "drop-on-clear",
"Whether to drop the graph database (true) or delete rows (false) when clearing storage. " +
"Note that some backends always drop the graph database when clearing storage. Also note that indices are " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,11 +914,20 @@ public JanusGraphIndex buildMixedIndex(String backingIndex) {
--------------- */
@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction) {
return updateIndex(index, updateAction, Runtime.getRuntime().availableProcessors());
return updateIndex(index, updateAction, null, Runtime.getRuntime().availableProcessors());
}

@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads) {
return updateIndex(index, updateAction, null, numOfThreads);
}

@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly) {
return updateIndex(index, updateAction, vertexOnly, Runtime.getRuntime().availableProcessors());
}

private ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly, int numOfThreads) {
Preconditions.checkArgument(index != null, "Need to provide an index");
Preconditions.checkArgument(updateAction != null, "Need to provide update action");

Expand Down Expand Up @@ -967,7 +976,7 @@ public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int num
builder.setFinishJob(indexId.getIndexJobFinisher(graph, SchemaAction.ENABLE_INDEX));
builder.setJobId(indexId);
builder.setNumProcessingThreads(numOfThreads);
builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName)));
builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName), vertexOnly));
try {
future = builder.execute();
} catch (BackendException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public abstract class AbstractScanJob implements ScanJob {
protected final GraphProvider graph;
protected StandardJanusGraphTx tx;
private IDManager idManager;
protected IDManager idManager;

public AbstractScanJob(JanusGraph graph) {
this.graph = new GraphProvider();
Expand Down
Loading

0 comments on commit 53a5332

Please sign in to comment.