Skip to content

Commit

Permalink
cr
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Apr 5, 2024
1 parent ef6a9ee commit a353b11
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -305,10 +306,12 @@ public void onConsumingToOnline(String segmentNameStr) {
@Override
public List<SegmentContext> getSegmentContexts(List<IndexSegment> selectedSegments,
Map<String, String> queryOptions) {
List<SegmentContext> segmentContexts = new ArrayList<>();
selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
return _tableUpsertMetadataManager.getSegmentContexts(selectedSegments);
_tableUpsertMetadataManager.setSegmentContexts(segmentContexts);
}
return super.getSegmentContexts(selectedSegments, queryOptions);
return segmentContexts;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testConsistentSnapshot()
// Result should be invariant - always exactly 3 docs
for (int i = 0; i < 10_000; i++) {
SegmentContext segmentContext = new SegmentContext(segment);
segmentContext.setQueryableDocIdsSnapshot(SegmentContext.getQueryableDocIdsSnapshotFromSegment(segment));
segmentContext.setQueryableDocIdsSnapshot(TestUtils.getQueryableDocIdsSnapshotFromSegment(segment));
assertEquals(getNumberOfFilteredDocs(segmentContext, queryContext), 3);
}

Expand Down
43 changes: 43 additions & 0 deletions pinot-core/src/test/java/org/apache/pinot/core/plan/TestUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.plan;

import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;


public class TestUtils {
private TestUtils() {
}

public static MutableRoaringBitmap getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
MutableRoaringBitmap queryableDocIdsSnapshot = null;
ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
if (queryableDocIds != null) {
queryableDocIdsSnapshot = queryableDocIds.getMutableRoaringBitmap();
} else {
ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
if (validDocIds != null) {
queryableDocIdsSnapshot = validDocIds.getMutableRoaringBitmap();
}
}
return queryableDocIdsSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.core.operator.query.GroupByOperator;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
import org.apache.pinot.core.plan.TestUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
Expand Down Expand Up @@ -158,8 +159,7 @@ public void testPlanMaker(String query, Class<? extends Operator<?>> operatorCla
assertTrue(operatorClass.isInstance(operator));

SegmentContext segmentContext = new SegmentContext(_upsertIndexSegment);
segmentContext.setQueryableDocIdsSnapshot(
SegmentContext.getQueryableDocIdsSnapshotFromSegment(_upsertIndexSegment));
segmentContext.setQueryableDocIdsSnapshot(TestUtils.getQueryableDocIdsSnapshotFromSegment(_upsertIndexSegment));
Operator<?> upsertOperator = PLAN_MAKER.makeSegmentPlanNode(segmentContext, queryContext).run();
assertTrue(upsertOperatorClass.isInstance(upsertOperator));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,16 +1080,29 @@ protected void removeDocId(IndexSegment segment, int docId) {
* Use the segmentContexts to collect the contexts for selected segments. Reuse the segmentContext object if
* present, to avoid overwriting the contexts specified at the others places.
*/
public void getSegmentContexts(List<IndexSegment> selectedSegments, List<SegmentContext> segmentContexts) {
for (IndexSegment segment : selectedSegments) {
public void setSegmentContexts(List<SegmentContext> segmentContexts) {
for (SegmentContext segmentContext : segmentContexts) {
IndexSegment segment = segmentContext.getIndexSegment();
if (_trackedSegments.contains(segment)) {
SegmentContext segmentContext = new SegmentContext(segment);
segmentContext.setQueryableDocIdsSnapshot(SegmentContext.getQueryableDocIdsSnapshotFromSegment(segment));
segmentContexts.add(segmentContext);
segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment));
}
}
}

private static MutableRoaringBitmap getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
MutableRoaringBitmap queryableDocIdsSnapshot = null;
ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
if (queryableDocIds != null) {
queryableDocIdsSnapshot = queryableDocIds.getMutableRoaringBitmap();
} else {
ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
if (validDocIds != null) {
queryableDocIdsSnapshot = validDocIds.getMutableRoaringBitmap();
}
}
return queryableDocIdsSnapshot;
}

protected void doClose()
throws IOException {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@

import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
Expand Down Expand Up @@ -105,11 +102,4 @@ protected void initCustomVariables() {
public UpsertConfig.Mode getUpsertMode() {
return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL;
}

@Override
public List<SegmentContext> getSegmentContexts(List<IndexSegment> selectedSegments) {
List<SegmentContext> segmentContexts = new ArrayList<>();
selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
return segmentContexts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
package org.apache.pinot.segment.local.upsert;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;


Expand Down Expand Up @@ -60,10 +58,9 @@ public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
}

@Override
public List<SegmentContext> getSegmentContexts(List<IndexSegment> selectedSegments) {
List<SegmentContext> segmentContexts = new ArrayList<>(selectedSegments.size());
_partitionMetadataManagerMap.forEach((k, v) -> v.getSegmentContexts(selectedSegments, segmentContexts));
return segmentContexts;
public void setSegmentContexts(List<SegmentContext> segmentContexts) {
_partitionMetadataManagerMap.forEach(
(partitionID, upsertMetadataManager) -> upsertMetadataManager.setSegmentContexts(segmentContexts));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
Expand Down Expand Up @@ -54,5 +53,6 @@ public interface TableUpsertMetadataManager extends Closeable {
*/
Map<Integer, Long> getPartitionToPrimaryKeyCount();

List<SegmentContext> getSegmentContexts(List<IndexSegment> selectedSegments);
default void setSegmentContexts(List<SegmentContext> segmentContexts) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.segment.spi;

import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;


Expand All @@ -41,18 +40,4 @@ public MutableRoaringBitmap getQueryableDocIdsSnapshot() {
public void setQueryableDocIdsSnapshot(MutableRoaringBitmap queryableDocIdsSnapshot) {
_queryableDocIdsSnapshot = queryableDocIdsSnapshot;
}

public static MutableRoaringBitmap getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
MutableRoaringBitmap queryableDocIdsSnapshot = null;
ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
if (queryableDocIds != null) {
queryableDocIdsSnapshot = queryableDocIds.getMutableRoaringBitmap();
} else {
ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
if (validDocIds != null) {
queryableDocIdsSnapshot = validDocIds.getMutableRoaringBitmap();
}
}
return queryableDocIdsSnapshot;
}
}

0 comments on commit a353b11

Please sign in to comment.