Skip to content

Commit

Permalink
[refactor] Refactor LookupJoin: add logs for troubleshoot and polish …
Browse files Browse the repository at this point in the history
…DynamicPartitionLoader (apache#4828)
  • Loading branch information
yuzelin authored Jan 7, 2025
1 parent 5fcdbbe commit 4620f30
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -33,15 +40,20 @@
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Dynamic partition for lookup. */
public class DynamicPartitionLoader implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionLoader.class);

private static final long serialVersionUID = 1L;

private static final String MAX_PT = "max_pt()";
Expand All @@ -51,6 +63,7 @@ public class DynamicPartitionLoader implements Serializable {
private final Table table;
private final Duration refreshInterval;
private final int maxPartitionNum;
private final RowDataToObjectArrayConverter partitionConverter;

private Comparator<InternalRow> comparator;

Expand All @@ -61,6 +74,8 @@ private DynamicPartitionLoader(Table table, Duration refreshInterval, int maxPar
this.table = table;
this.refreshInterval = refreshInterval;
this.maxPartitionNum = maxPartitionNum;
this.partitionConverter =
new RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
}

public void open() {
Expand All @@ -81,30 +96,85 @@ public List<BinaryRow> partitions() {
return partitions;
}

public Predicate createSpecificPartFilter() {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec = partitionConverter.convert(partition);
Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
for (int i = 0; i < partitionSpec.length; i++) {
partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
}

// create partition predicate base on rowType instead of partitionType
return createPartitionPredicate(rowType, partitionMap);
}

/** @return true if partition changed. */
public boolean checkRefresh() {
if (lastRefresh != null
&& !lastRefresh.plus(refreshInterval).isBefore(LocalDateTime.now())) {
return false;
}

LOG.info(
"DynamicPartitionLoader(maxPartitionNum={},table={}) refreshed after {} second(s), refreshing",
maxPartitionNum,
table.name(),
refreshInterval.toMillis() / 1000);

List<BinaryRow> newPartitions = getMaxPartitions();
lastRefresh = LocalDateTime.now();

if (newPartitions.size() != partitions.size()) {
partitions = newPartitions;
logNewPartitions();
return true;
} else {
for (int i = 0; i < newPartitions.size(); i++) {
if (comparator.compare(newPartitions.get(i), partitions.get(i)) != 0) {
partitions = newPartitions;
logNewPartitions();
return true;
}
}
LOG.info(
"DynamicPartitionLoader(maxPartitionNum={},table={}) didn't find new partitions.",
maxPartitionNum,
table.name());
return false;
}
}

private void logNewPartitions() {
String partitionsStr =
partitions.stream()
.map(
partition ->
InternalRowPartitionComputer.partToSimpleString(
table.rowType().project(table.partitionKeys()),
partition,
"-",
200))
.collect(Collectors.joining(","));
LOG.info(
"DynamicPartitionLoader(maxPartitionNum={},table={}) finds new partitions: {}.",
maxPartitionNum,
table.name(),
partitionsStr);
}

private List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
Expand All @@ -126,6 +196,11 @@ public static DynamicPartitionLoader of(Table table) {
return null;
}

checkArgument(
!table.partitionKeys().isEmpty(),
"{} is not supported for non-partitioned table.",
LOOKUP_DYNAMIC_PARTITION);

int maxPartitionNum;
switch (dynamicPartition.toLowerCase()) {
case MAX_PT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;

Expand All @@ -58,10 +56,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -74,7 +70,6 @@
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;

/** A lookup {@link TableFunction} for file store. */
Expand Down Expand Up @@ -168,12 +163,15 @@ private void open() throws Exception {
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
FileStoreTable storeTable = (FileStoreTable) table;

LOG.info("Creating lookup table for {}.", table.name());
if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
&& new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
if (isRemoteServiceAvailable(storeTable)) {
this.lookupTable =
PrimaryKeyPartialLookupTable.createRemoteTable(
storeTable, projection, joinKeys);
LOG.info(
"Remote service is available. Created PrimaryKeyPartialLookupTable with remote service.");
} else {
try {
this.lookupTable =
Expand All @@ -183,7 +181,13 @@ private void open() throws Exception {
path,
joinKeys,
getRequireCachedBucketIds());
} catch (UnsupportedOperationException ignore2) {
LOG.info(
"Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
} catch (UnsupportedOperationException ignore) {
LOG.info(
"Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor "
+ "because bucket mode isn't {}. Will create FullCacheLookupTable.",
BucketMode.HASH_FIXED);
}
}
}
Expand All @@ -199,14 +203,15 @@ private void open() throws Exception {
joinKeys,
getRequireCachedBucketIds());
this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS));
LOG.info("Created {}.", lookupTable.getClass().getSimpleName());
}

if (partitionLoader != null) {
partitionLoader.open();
partitionLoader.checkRefresh();
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
}
}

Expand Down Expand Up @@ -267,33 +272,6 @@ private List<RowData> lookupInternal(InternalRow key) throws IOException {
return rows;
}

private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec =
new RowDataToObjectArrayConverter(rowType.project(partitionKeys))
.convert(partition);
Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
for (int i = 0; i < partitionSpec.length; i++) {
partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
}

// create partition predicate base on rowType instead of partitionType
return createPartitionPredicate(rowType, partitionMap);
}

private void reopen() {
try {
close();
Expand Down Expand Up @@ -321,7 +299,7 @@ void tryRefresh() throws Exception {

if (partitionChanged) {
// reopen with latest partition
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static org.apache.paimon.CoreOptions.StartupMode;
Expand All @@ -41,6 +44,8 @@
*/
public class LookupDataTableScan extends DataTableStreamScan {

private static final Logger LOG = LoggerFactory.getLogger(LookupDataTableScan.class);

private final StartupMode startupMode;
private final LookupStreamScanMode lookupScanMode;

Expand Down Expand Up @@ -69,6 +74,7 @@ protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
if (plan != null) {
return plan;
}
LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", snapshot.id());
throw new ReopenException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
Expand All @@ -36,6 +37,9 @@

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
Expand All @@ -50,6 +54,8 @@
/** A streaming reader to load data into {@link LookupTable}. */
public class LookupStreamingReader {

private static final Logger LOG = LoggerFactory.getLogger(LookupStreamingReader.class);

private final LookupFileStoreTable table;
private final int[] projection;
@Nullable private final Filter<InternalRow> cacheRowFilter;
Expand Down Expand Up @@ -103,6 +109,7 @@ public LookupStreamingReader(

public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Exception {
List<Split> splits = scan.plan().splits();
log(splits);
CoreOptions options = CoreOptions.fromMap(table.options());
FunctionWithIOException<Split, RecordReader<InternalRow>> readerSupplier =
split -> readBuilder.newRead().createReader(split);
Expand Down Expand Up @@ -136,6 +143,19 @@ public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Except
return reader;
}

private void log(List<Split> splits) {
if (splits.isEmpty()) {
LOG.info("LookupStreamingReader didn't get splits from {}.", table.name());
return;
}

DataSplit dataSplit = (DataSplit) splits.get(0);
LOG.info(
"LookupStreamingReader get splits from {} with snapshotId {}.",
table.name(),
dataSplit.snapshotId());
}

@Nullable
public Long nextSnapshotId() {
return scan.checkpoint();
Expand Down
Loading

0 comments on commit 4620f30

Please sign in to comment.