Skip to content

Commit

Permalink
Use file path as cache key (#1852) (#1862)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored May 7, 2021
1 parent 294c37e commit 5e3efcb
Show file tree
Hide file tree
Showing 15 changed files with 12 additions and 85 deletions.
3 changes: 0 additions & 3 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ struct DMContext : private boost::noncopyable

StoragePathPool & path_pool;
StoragePool & storage_pool;
const UInt64 hash_salt;

// gc safe-point, maybe update.
DB::Timestamp min_version;
Expand Down Expand Up @@ -75,7 +74,6 @@ struct DMContext : private boost::noncopyable
DMContext(const Context & db_context_,
StoragePathPool & path_pool_,
StoragePool & storage_pool_,
const UInt64 hash_salt_,
const DB::Timestamp min_version_,
const NotCompress & not_compress_,
bool is_common_handle_,
Expand All @@ -85,7 +83,6 @@ struct DMContext : private boost::noncopyable
metrics(db_context.getTiFlashMetrics()),
path_pool(path_pool_),
storage_pool(storage_pool_),
hash_salt(hash_salt_),
min_version(min_version_),
not_compress(not_compress_),
is_common_handle(is_common_handle_),
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ DeltaPackFile::DeltaPackFile(const DMContext & context, const DMFilePtr & file_,
void DeltaPackFile::calculateStat(const DMContext & context)
{
auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache();
auto hash_salt = context.hash_salt;

auto pack_filter
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, segment_range, EMPTY_FILTER, {}, context.db_context.getFileProvider());
= DMFilePackFilter::loadFrom(file, index_cache, segment_range, EMPTY_FILTER, {}, context.db_context.getFileProvider());

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down Expand Up @@ -70,7 +69,6 @@ void DPFileReader::initStream()
file_stream = std::make_shared<DMFileBlockInputStream>(context.db_context,
/*max_version*/ MAX_UINT64,
/*clean_read*/ false,
context.hash_salt,
pack.getFile(),
*col_defs,
pack.segment_range,
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
original_table_handle_define(handle),
background_pool(db_context.getBackgroundPool()),
blockable_background_pool(db_context.getBlockableBackgroundPool()),
hash_salt(++DELTA_MERGE_STORE_HASH_SALT),
log(&Logger::get("DeltaMergeStore[" + db_name + "." + table_name + "]"))
{
LOG_INFO(log, "Restore DeltaMerge Store start [" << db_name << "." << table_name << "]");
Expand Down Expand Up @@ -353,7 +352,6 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
auto * ctx = new DMContext(db_context.getGlobalContext(),
path_pool,
storage_pool,
hash_salt,
latest_gc_safe_point,
settings.not_compress_columns,
is_common_handle,
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class DMFileBlockInputStream : public SkippableBlockInputStream
DMFileBlockInputStream(const Context & context,
UInt64 max_read_version,
bool enable_clean_read,
UInt64 hash_salt,
const DMFilePtr & dmfile,
const ColumnDefines & read_columns,
const RowKeyRange & rowkey_range,
Expand All @@ -33,7 +32,6 @@ class DMFileBlockInputStream : public SkippableBlockInputStream
filter,
read_packs,
// caches
hash_salt,
context.getGlobalContext().getMarkCache(),
context.getGlobalContext().getMinMaxIndexCache(),
context.getSettingsRef().dt_enable_stable_column_cache,
Expand Down
12 changes: 3 additions & 9 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ class DMFilePackFilter
public:
static DMFilePackFilter loadFrom(const DMFilePtr & dmfile,
const MinMaxIndexCachePtr & index_cache,
UInt64 hash_salt,
const RowKeyRange & rowkey_range,
const RSOperatorPtr & filter,
const IdSetPtr & read_packs,
const FileProviderPtr & file_provider)
{
auto pack_filter = DMFilePackFilter(dmfile, index_cache, hash_salt, rowkey_range, filter, read_packs, file_provider);
auto pack_filter = DMFilePackFilter(dmfile, index_cache, rowkey_range, filter, read_packs, file_provider);
pack_filter.init();
return pack_filter;
}
Expand Down Expand Up @@ -84,14 +83,12 @@ class DMFilePackFilter
private:
DMFilePackFilter(const DMFilePtr & dmfile_,
const MinMaxIndexCachePtr & index_cache_,
UInt64 hash_salt_,
const RowKeyRange & rowkey_range_, // filter by handle range
const RSOperatorPtr & filter_, // filter by push down where clause
const IdSetPtr & read_packs_, // filter by pack index
const FileProviderPtr & file_provider_)
: dmfile(dmfile_),
index_cache(index_cache_),
hash_salt(hash_salt_),
rowkey_range(rowkey_range_),
filter(filter_),
read_packs(read_packs_),
Expand Down Expand Up @@ -184,7 +181,6 @@ class DMFilePackFilter
const DMFilePtr & dmfile,
const FileProviderPtr & file_provider,
const MinMaxIndexCachePtr & index_cache,
UInt64 hash_salt,
ColId col_id)
{
auto & type = dmfile->getColumnStat(col_id).type;
Expand All @@ -202,8 +198,7 @@ class DMFilePackFilter
MinMaxIndexPtr minmax_index;
if (index_cache)
{
auto key = MinMaxIndexCache::hash(dmfile->colIndexCacheKey(file_name_base), hash_salt);
minmax_index = index_cache->getOrSet(key, load);
minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load);
}
else
{
Expand All @@ -220,13 +215,12 @@ class DMFilePackFilter
if (!dmfile->isColIndexExist(col_id))
return;

loadIndex(param.indexes, dmfile, file_provider, index_cache, hash_salt, col_id);
loadIndex(param.indexes, dmfile, file_provider, index_cache, col_id);
}

private:
DMFilePtr dmfile;
MinMaxIndexCachePtr index_cache;
UInt64 hash_salt;
RowKeyRange rowkey_range;
RSOperatorPtr filter;
IdSetPtr read_packs;
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ DMFileReader::Stream::Stream(DMFileReader & reader, //
};
if (reader.mark_cache)
marks
= reader.mark_cache->getOrSet(MarkCache::hash(reader.dmfile->colMarkCacheKey(file_name_base), reader.hash_salt), mark_load);
= reader.mark_cache->getOrSet(reader.dmfile->colMarkCacheKey(file_name_base), mark_load);
else
marks = mark_load();
}
Expand Down Expand Up @@ -146,7 +146,6 @@ DMFileReader::DMFileReader(const DMFilePtr & dmfile_,
const RSOperatorPtr & filter_,
const IdSetPtr & read_packs_,
// caches
UInt64 hash_salt_,
const MarkCachePtr & mark_cache_,
const MinMaxIndexCachePtr & index_cache_,
bool enable_column_cache_,
Expand All @@ -159,12 +158,11 @@ DMFileReader::DMFileReader(const DMFilePtr & dmfile_,
read_columns(read_columns_),
enable_clean_read(enable_clean_read_),
max_read_version(max_read_version_),
pack_filter(dmfile_, index_cache_, hash_salt_, rowkey_range_, filter_, read_packs_, file_provider_),
pack_filter(dmfile_, index_cache_, rowkey_range_, filter_, read_packs_, file_provider_),
handle_res(pack_filter.getHandleRes()),
use_packs(pack_filter.getUsePacks()),
is_common_handle(rowkey_range_.is_common_handle),
skip_packs_by_column(read_columns.size(), 0),
hash_salt(hash_salt_),
mark_cache(mark_cache_),
enable_column_cache(enable_column_cache_ && column_cache_),
column_cache(column_cache_),
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class DMFileReader
const RSOperatorPtr & filter_,
const IdSetPtr & read_packs_, // filter by pack index
// caches
UInt64 hash_salt_,
const MarkCachePtr & mark_cache_,
const MinMaxIndexCachePtr & index_cache_,
bool enable_column_cache_,
Expand Down Expand Up @@ -112,7 +111,6 @@ class DMFileReader
std::vector<size_t> skip_packs_by_column;

/// Caches
const UInt64 hash_salt;
MarkCachePtr mark_cache;
const bool enable_column_cache;
ColumnCachePtr column_cache;
Expand Down
18 changes: 2 additions & 16 deletions dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,14 @@ struct MinMaxIndexWeightFunction
};


class MinMaxIndexCache : public LRUCache<UInt128, MinMaxIndex, UInt128TrivialHash, MinMaxIndexWeightFunction>
class MinMaxIndexCache : public LRUCache<String, MinMaxIndex, std::hash<String>, MinMaxIndexWeightFunction>
{
private:
using Base = LRUCache<UInt128, MinMaxIndex, UInt128TrivialHash, MinMaxIndexWeightFunction>;
using Base = LRUCache<String, MinMaxIndex, std::hash<String>, MinMaxIndexWeightFunction>;

public:
MinMaxIndexCache(size_t max_size_in_bytes, const Delay & expiration_delay) : Base(max_size_in_bytes, expiration_delay) {}

static UInt128 hash(const String & path_to_file, UInt64 salt = 0)
{
UInt128 key;

SipHash hash;
hash.update(path_to_file.data(), path_to_file.size() + 1);
hash.get128(key.low, key.high);

key.low ^= salt;
key.high ^= salt;

return key;
}

template <typename LoadFunc>
MappedPtr getOrSet(const Key & key, LoadFunc && load)
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,6 @@ std::optional<RowKeyValue> Segment::getSplitPointFast(DMContext & dm_context, co
DMFileBlockInputStream stream(dm_context.db_context,
MAX_UINT64,
false,
dm_context.hash_salt,
read_file,
{getExtraHandleColumnDefine(is_common_handle)},
RowKeyRange::newAll(is_common_handle, rowkey_column_size),
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
else
{
auto index_cache = dm_context->db_context.getGlobalContext().getMinMaxIndexCache();
auto hash_salt = dm_context->hash_salt;
for (auto & file : files_)
{
auto pack_filter = DMFilePackFilter::loadFrom(
file, index_cache, hash_salt, range, EMPTY_FILTER, {}, dm_context->db_context.getFileProvider());
file, index_cache, range, EMPTY_FILTER, {}, dm_context->db_context.getFileProvider());
auto [file_valid_rows, file_valid_bytes] = pack_filter.validRowsAndBytes();
rows += file_valid_rows;
bytes += file_valid_bytes;
Expand Down Expand Up @@ -193,7 +192,6 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream(const DM
context.db_context,
max_data_version,
enable_clean_read,
context.hash_salt,
stable->files[i],
read_columns,
rowkey_range,
Expand All @@ -216,7 +214,6 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
{
auto filter = DMFilePackFilter::loadFrom(f,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
context.hash_salt,
range,
RSOperatorPtr{},
IdSetPtr{},
Expand Down
Loading

0 comments on commit 5e3efcb

Please sign in to comment.