Skip to content

Commit

Permalink
automatic bundle partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
moonshadow565 committed Oct 13, 2022
1 parent 6398838 commit 5ffd938
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 58 deletions.
5 changes: 5 additions & 0 deletions lib/rlib/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
}()

namespace rlib {
static std::size_t KiB = 1024;
static std::size_t MiB = KiB * 1024;
static std::size_t GiB = MiB * 1024;
static std::size_t TiB = GiB * 1024;

namespace fs = std::filesystem;

[[noreturn]] extern void throw_error(char const* from, char const* msg);
Expand Down
135 changes: 105 additions & 30 deletions lib/rlib/rcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,82 @@

using namespace rlib;

static constexpr auto rcache_file_flags(RCache::Options const& options) -> IO::Flags {
return (options.readonly ? IO::READ : IO::WRITE) | IO::NO_INTERUPT | IO::NO_OVERGROW;
static constexpr auto rcache_file_flags(bool readonly) -> IO::Flags {
return (readonly ? IO::READ : IO::WRITE) | IO::NO_INTERUPT | IO::NO_OVERGROW;
}

RCache::RCache(Options const& options) : file_(options.path, rcache_file_flags(options)), options_(options) {
auto file_size = file_.size();
if (file_size == 0 && !options.readonly) {
flush();
return;
static auto rcache_file_path(fs::path base, std::size_t index) -> fs::path {
if (!index) return base;
return std::move(base.replace_extension(fmt::format(".{:05d}.bundle", index)));
}

RCache::RCache(Options const& options) : options_(options) {
if (!options_.readonly) {
options_.flush_size = std::max(32 * MiB, options_.flush_size);
options_.max_size = std::max(options_.flush_size * 2, options_.max_size) - options_.flush_size;
}
for (fs::path path = options_.path;;) {
auto const index = files_.size();
auto next_path = rcache_file_path(options_.path, index + 1);
auto const next_exists = fs::exists(next_path);
auto const flags = rcache_file_flags(options_.readonly || next_exists);

auto file = std::make_unique<IO::File>(path, flags);
auto const is_empty = file->size() == 0;
auto bundle = !is_empty ? RBUN::read(*file) : RBUN{};
files_.push_back(std::move(file));
for (auto& chunk : bundle.lookup) {
chunk.second.bundleId = (BundleID)index;
}
lookup_.merge(std::move(bundle.lookup));

if (flags & IO::WRITE) {
writer_ = {
.toc_offset = bundle.toc_offset,
.end_offset = bundle.toc_offset + sizeof(RBUN::Footer),
.chunks = std::move(bundle.chunks),
};
writer_.end_offset += sizeof(RChunk) * writer_.chunks.size();
writer_.buffer.reserve(options_.flush_size * 2);
can_write_ = true;
if (is_empty) {
this->flush();
} else {
this->check_space(options_.flush_size);
}
}

if (!next_exists) {
break;
}

path = std::move(next_path);
}
bundle_ = RBUN::read(file_);
}

RCache::~RCache() { this->flush(); }

auto RCache::add(RChunk const& chunk, std::span<char const> data) -> bool {
rlib_assert(chunk.compressed_size == data.size());
if (!can_write() || bundle_.lookup.contains(chunk.chunkId)) {
if (!can_write() || lookup_.contains(chunk.chunkId)) {
return false;
}
if (chunk.chunkId == ChunkID::None) {
return false;
}
bundle_.chunks.push_back(chunk);
bundle_.lookup[chunk.chunkId] = {chunk, BundleID::None, buffer_.size() + bundle_.toc_offset};
buffer_.insert(buffer_.end(), data.begin(), data.end());
if (buffer_.size() > options_.flush_size) {

// check if we hit chunk limit
auto const extra_data = sizeof(RChunk) + data.size();
this->check_space(extra_data);

writer_.chunks.push_back(chunk);
lookup_[chunk.chunkId] = {chunk, BundleID::None, writer_.buffer.size() + writer_.toc_offset};
writer_.buffer.insert(writer_.buffer.end(), data.begin(), data.end());
if (writer_.buffer.size() > options_.flush_size) {
this->flush();
}
writer_.end_offset += extra_data;

return true;
}

Expand All @@ -58,11 +105,11 @@ auto RCache::add_uncompressed(std::span<char const> src, int level) -> RChunk::S
return chunk;
}

auto RCache::contains(ChunkID chunkId) const noexcept -> bool { return bundle_.lookup.contains(chunkId); }
auto RCache::contains(ChunkID chunkId) const noexcept -> bool { return lookup_.contains(chunkId); }

auto RCache::find(ChunkID chunkId) const noexcept -> RChunk::Src {
auto i = bundle_.lookup.find(chunkId);
if (i == bundle_.lookup.end()) {
auto i = lookup_.find(chunkId);
if (i == lookup_.end()) {
return {};
}
return i->second;
Expand All @@ -82,6 +129,7 @@ auto RCache::uncache(std::vector<RChunk::Dst> chunks, RChunk::Dst::data_cb on_da
rlib_assert(c.uncompressed_size == chunk.uncompressed_size);
chunk.compressed_offset = c.compressed_offset;
chunk.compressed_size = c.compressed_size;
chunk.bundleId = c.bundleId;
found.push_back(chunk);
return true;
});
Expand All @@ -95,11 +143,12 @@ auto RCache::uncache(std::vector<RChunk::Dst> chunks, RChunk::Dst::data_cb on_da
on_data(chunk, dst);
continue;
}
auto src = std::span(buffer_);
if (chunk.compressed_offset > bundle_.toc_offset) {
src = src.subspan(chunk.compressed_offset - bundle_.toc_offset, chunk.compressed_size);
auto src = std::span<char const>{};
auto const& file = files_.at((std::size_t)chunk.bundleId);
if (can_write() && &file == &files_.back() && chunk.compressed_offset > writer_.toc_offset) {
src = src.subspan(chunk.compressed_offset - writer_.toc_offset, chunk.compressed_size);
} else {
src = file_.copy(chunk.compressed_offset, chunk.compressed_size);
src = file->copy(chunk.compressed_offset, chunk.compressed_size);
}
dst = zstd_decompress(src, chunk.uncompressed_size);
on_data(chunk, dst);
Expand All @@ -108,23 +157,49 @@ auto RCache::uncache(std::vector<RChunk::Dst> chunks, RChunk::Dst::data_cb on_da
return std::move(chunks);
}

auto RCache::check_space(std::size_t extra) -> bool {
// ensure we can allways at least write one file
if (writer_.end_offset <= sizeof(RBUN::Footer)) {
return false;
}
// still have space
if (writer_.end_offset + extra < options_.max_size) {
return false;
}
this->flush(); // flush anything that we have atm
auto const index = files_.size();
auto const path = rcache_file_path(options_.path, index);
auto const flags = rcache_file_flags(false);
auto file = std::make_unique<IO::File>(path, flags);
file->resize(0, 0);
files_.push_back(std::move(file));
writer_.toc_offset = 0;
writer_.end_offset = sizeof(RBUN::Footer);
writer_.chunks.clear();
writer_.buffer.clear();
this->flush();
return true;
}

auto RCache::flush() -> bool {
// Dont reflush when there is nothing to flush.
if (!can_write() || (buffer_.empty() && bundle_.toc_offset != 0)) {
if (!can_write() || (writer_.buffer.empty() && writer_.toc_offset != 0)) {
return false;
}
auto toc_size = sizeof(RChunk) * bundle_.chunks.size();
auto toc_size = sizeof(RChunk) * writer_.chunks.size();
RBUN::Footer footer = {
.checksum = std::bit_cast<std::array<char, 8>>(XXH64((char const*)bundle_.chunks.data(), toc_size, 0)),
.entry_count = (std::uint32_t)bundle_.chunks.size(),
.checksum = std::bit_cast<std::array<char, 8>>(XXH64((char const*)writer_.chunks.data(), toc_size, 0)),
.entry_count = (std::uint32_t)writer_.chunks.size(),
.version = RBUN::Footer::VERSION,
.magic = {'R', 'B', 'U', 'N'},
};
auto new_toc_offset = bundle_.toc_offset + buffer_.size();
buffer_.insert(buffer_.end(), (char const*)bundle_.chunks.data(), (char const*)bundle_.chunks.data() + toc_size);
buffer_.insert(buffer_.end(), (char const*)&footer, (char const*)&footer + sizeof(footer));
rlib_assert(file_.write(bundle_.toc_offset, buffer_));
buffer_.clear();
bundle_.toc_offset = new_toc_offset;
auto new_toc_offset = writer_.toc_offset + writer_.buffer.size();
writer_.buffer.insert(writer_.buffer.end(),
(char const*)writer_.chunks.data(),
(char const*)writer_.chunks.data() + toc_size);
writer_.buffer.insert(writer_.buffer.end(), (char const*)&footer, (char const*)&footer + sizeof(footer));
rlib_assert(files_.back()->write(writer_.toc_offset, writer_.buffer));
writer_.buffer.clear();
writer_.toc_offset = new_toc_offset;
return true;
}
23 changes: 17 additions & 6 deletions lib/rlib/rcache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace rlib {
struct Options {
std::string path;
bool readonly;
std::uint32_t flush_size;
std::size_t flush_size;
std::size_t max_size;
};

RCache(Options const& options);
Expand All @@ -30,11 +31,21 @@ namespace rlib {

auto flush() -> bool;

auto can_write() const noexcept -> bool { return file_.fd() && !options_.readonly; }
auto can_write() const noexcept -> bool { return can_write_; }

private:
IO::File file_;
Options options_;
std::vector<char> buffer_;
RBUN bundle_;
struct Writer {
std::size_t toc_offset;
std::size_t end_offset;
std::vector<RChunk> chunks;
std::vector<char> buffer;
};
bool can_write_ = {};
Options options_ = {};
Writer writer_ = {};
std::vector<std::unique_ptr<IO::File>> files_;
std::unordered_map<ChunkID, RChunk::Src> lookup_ = {};

auto check_space(std::size_t extra) -> bool;
};
}
25 changes: 16 additions & 9 deletions src/rbun_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ using namespace rlib;

struct Main {
struct CLI {
std::string output = {};
RCache::Options output = {};
std::vector<std::string> inputs = {};
bool no_hash = {};
bool no_extract = {};
bool no_progress = {};
std::uint32_t buffer = {};
} cli = {};

auto parse_args(int argc, char** argv) -> void {
Expand All @@ -32,22 +31,30 @@ struct Main {
.help("Do not print progress to cerr.")
.default_value(false)
.implicit_value(true);

program.add_argument("--buffer")
.help("Size for buffer before flush to disk in killobytes [64, 1048576]")
.default_value(std::uint32_t{32 * 1024 * 1024u})
.help("Size for buffer before flush to disk in megabytes [1, 1048576]")
.default_value(std::uint32_t{32})
.action([](std::string const& value) -> std::uint32_t {
return std::clamp((std::uint32_t)std::stoul(value), 1u, 1024u * 1024);
});
program.add_argument("--limit")
.help("Size for bundle limit in gigabytes [0, 4096]")
.default_value(std::uint32_t{4096})
.action([](std::string const& value) -> std::uint32_t {
return std::clamp((std::uint32_t)std::stoul(value), 64u, 1024u * 1024) * 1024u;
return std::clamp((std::uint32_t)std::stoul(value), 0u, 4096u);
});

program.parse_args(argc, argv);

cli.output = program.get<std::string>("output");
cli.output = {
.path = program.get<std::string>("output"),
.flush_size = program.get<std::uint32_t>("--buffer") * MiB,
.max_size = program.get<std::uint32_t>("--limit") * GiB,
};
cli.inputs = program.get<std::vector<std::string>>("input");
cli.no_hash = program.get<bool>("--no-extract");
cli.no_extract = program.get<bool>("--no-hash");
cli.no_progress = program.get<bool>("--no-progress");
cli.buffer = program.get<std::uint32_t>("--buffer");
}

auto run() {
Expand All @@ -57,7 +64,7 @@ struct Main {
return;
}
std::cerr << "Processing output bundle ... " << std::endl;
auto output = RCache(RCache::Options{.path = cli.output, .readonly = false, .flush_size = cli.buffer});
auto output = RCache(cli.output);
std::cerr << "Processing input bundles ... " << std::endl;
for (std::uint32_t index = paths.size(); auto const& path : paths) {
add_bundle(path, output, index--);
Expand Down
16 changes: 11 additions & 5 deletions src/rman_dl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@ struct Main {
.default_value(false)
.implicit_value(true);
program.add_argument("--cache-buffer")
.help("Size for cache buffer in killobytes [64, 1048576]")
.default_value(std::uint32_t{32 * 1024 * 1024})
.scan<'u', std::uint32_t>()
.help("Size for cache buffer in megabytes [1, 1048576]")
.default_value(std::uint32_t{32})
.action([](std::string const& value) -> std::uint32_t {
return std::clamp((std::uint32_t)std::stoul(value), 1u, 1024u * 1024);
});
program.add_argument("--cache-limit")
.help("Size for cache bundle limit in gigabytes [0, 4096]")
.default_value(std::uint32_t{4})
.action([](std::string const& value) -> std::uint32_t {
return std::clamp((std::uint32_t)std::stoul(value), 64u, 1024u * 1024) * 1024u;
return std::clamp((std::uint32_t)std::stoul(value), 0u, 4096u);
});

// CDN options
Expand Down Expand Up @@ -117,7 +122,8 @@ struct Main {
cli.cache = {
.path = program.get<std::string>("--cache"),
.readonly = program.get<bool>("--cache-readonly"),
.flush_size = program.get<std::uint32_t>("--cache-buffer"),
.flush_size = program.get<std::uint32_t>("--cache-buffer") * MiB,
.max_size = program.get<std::uint32_t>("--cache-limit") * GiB,
};

cli.cdn = {
Expand Down
Loading

0 comments on commit 5ffd938

Please sign in to comment.