Skip to content

Commit

Permalink
Merge pull request ecmwf#24 from ecmwf/feature/archive-callback
Browse files Browse the repository at this point in the history
Feature/archive callback
  • Loading branch information
simondsmart authored Jul 2, 2024
2 parents c029792 + e4743c2 commit bd6cef6
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/fdb5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ list( APPEND fdb5_srcs
api/helpers/PurgeIterator.h
api/helpers/StatsIterator.cc
api/helpers/StatsIterator.h
api/helpers/ArchiveCallback.h

api/local/QueryVisitor.h
api/local/QueueStringLogTarget.h
Expand Down
4 changes: 4 additions & 0 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const {
return internal_->enabled(controlIdentifier);
}

void FDB::registerCallback(ArchiveCallback callback) {
internal_->registerCallback(callback);
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5
6 changes: 6 additions & 0 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "fdb5/api/helpers/WipeIterator.h"
#include "fdb5/api/helpers/MoveIterator.h"
#include "fdb5/config/Config.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -74,7 +75,9 @@ class FDB {
void archive(const void* data, size_t length);
// warning: not high-perf API - makes sure that all the requested fields are archived and there are no data exceeding the request
void archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& handle);

// disclaimer: this is a low-level API. The provided key and the corresponding data are not checked for consistency
// Optional callback function is called upon receiving field location from the store.
void archive(const Key& key, const void* data, size_t length);

/// Flushes all buffers and closes all data handles into a consistent DB state
Expand Down Expand Up @@ -116,6 +119,8 @@ class FDB {

bool dirty() const;

void registerCallback(ArchiveCallback callback);

// -------------- API management ----------------------------

/// ID used for hashing in the Rendezvous hash. Should be unique.
Expand Down Expand Up @@ -149,6 +154,7 @@ class FDB {
bool reportStats_;

FDBStats stats_;

};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
7 changes: 6 additions & 1 deletion src/fdb5/api/FDBFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "fdb5/api/helpers/PurgeIterator.h"
#include "fdb5/api/helpers/StatsIterator.h"
#include "fdb5/api/helpers/StatusIterator.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -65,7 +66,7 @@ class FDBBase : private eckit::NonCopyable {
// -------------- Primary API functions ----------------------------

virtual void archive(const Key& key, const void* data, size_t length) = 0;

virtual void flush() = 0;

virtual ListIterator inspect(const metkit::mars::MarsRequest& request) = 0;
Expand All @@ -90,6 +91,8 @@ class FDBBase : private eckit::NonCopyable {

virtual AxesIterator axes(const FDBToolRequest& request, int axes) { NOTIMP; }

void registerCallback(ArchiveCallback callback) {callback_ = callback;}

// -------------- API management ----------------------------

/// ID used for hashing in the Rendezvous hash. Should be unique amongst those used
Expand Down Expand Up @@ -125,6 +128,8 @@ class FDBBase : private eckit::NonCopyable {
ControlIdentifiers controlIdentifiers_;

bool disabled_;

ArchiveCallback callback_ = CALLBACK_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/api/LocalFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ using namespace eckit;


namespace fdb5 {

void LocalFDB::archive(const Key& key, const void* data, size_t length) {

if (!archiver_) {
LOG_DEBUG_LIB(LibFdb5) << *this << ": Constructing new archiver" << std::endl;
archiver_.reset(new Archiver(config_));
archiver_.reset(new Archiver(config_, callback_));
}

archiver_->archive(key, data, length);
Expand Down
27 changes: 27 additions & 0 deletions src/fdb5/api/helpers/ArchiveCallback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

/*
* This software was developed as part of the EC H2020 funded project NextGenIO
* (Project ID: 671951) www.nextgenio.eu
*/

#pragma once

#include "fdb5/database/Key.h"
#include "fdb5/database/FieldLocation.h"

namespace fdb5 {

using ArchiveCallback = std::function<void(const Key&, const FieldLocation&)>;

static const ArchiveCallback CALLBACK_NOOP = [](const Key&, const FieldLocation&) {};

} // namespace fdb5
8 changes: 4 additions & 4 deletions src/fdb5/database/ArchiveVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

namespace fdb5 {

ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size) :
ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size, const ArchiveCallback& callback) :
BaseArchiveVisitor(owner, field),
data_(data),
size_(size) {
size_(size),
callback_(callback){
}

bool ArchiveVisitor::selectDatum(const Key &key, const Key &full) {
Expand All @@ -29,8 +30,7 @@ bool ArchiveVisitor::selectDatum(const Key &key, const Key &full) {

ASSERT(current());

current()->archive(key, data_, size_);

current()->archive(key, data_, size_, field_, callback_);

return true;
}
Expand Down
4 changes: 3 additions & 1 deletion src/fdb5/database/ArchiveVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ArchiveVisitor : public BaseArchiveVisitor {

public: // methods

ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size);
ArchiveVisitor(Archiver &owner, const Key &field, const void *data, size_t size, const ArchiveCallback& callback = CALLBACK_NOOP);

protected: // methods

Expand All @@ -43,6 +43,8 @@ class ArchiveVisitor : public BaseArchiveVisitor {
const void *data_;
size_t size_;

const ArchiveCallback& callback_;

};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
7 changes: 4 additions & 3 deletions src/fdb5/database/Archiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ namespace fdb5 {
//----------------------------------------------------------------------------------------------------------------------


Archiver::Archiver(const Config& dbConfig) :
Archiver::Archiver(const Config& dbConfig, const ArchiveCallback& callback) :
dbConfig_(dbConfig),
current_(nullptr) {
current_(nullptr),
callback_(callback) {
}

Archiver::~Archiver() {
Expand All @@ -37,7 +38,7 @@ Archiver::~Archiver() {
}

void Archiver::archive(const Key &key, const void* data, size_t len) {
ArchiveVisitor visitor(*this, key, data, len);
ArchiveVisitor visitor(*this, key, data, len, callback_);
archive(key, visitor);
}

Expand Down
4 changes: 3 additions & 1 deletion src/fdb5/database/Archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Archiver : public eckit::NonCopyable {

public: // methods

Archiver(const Config& dbConfig = Config().expandConfig());
Archiver(const Config& dbConfig = Config().expandConfig(), const ArchiveCallback& callback = CALLBACK_NOOP);

virtual ~Archiver();

Expand Down Expand Up @@ -77,6 +77,8 @@ class Archiver : public eckit::NonCopyable {
std::vector<Key> prev_;

DB* current_;

const ArchiveCallback& callback_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/fdb5/database/BaseArchiveVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class BaseArchiveVisitor : public WriteVisitor {

fdb5::DB* current() const;

protected: // members

const Key& field_;

private: // members

Archiver &owner_;

const Key &field_;

bool checkMissingKeysOnWrite_;
};

Expand Down
9 changes: 7 additions & 2 deletions src/fdb5/database/DB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "fdb5/database/DB.h"
#include "fdb5/database/Field.h"
#include "fdb5/toc/TocEngine.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

using eckit::Log;

Expand Down Expand Up @@ -103,13 +104,17 @@ eckit::DataHandle *DB::retrieve(const Key& key) {
return nullptr;
}

void DB::archive(const Key& key, const void* data, eckit::Length length) {
void DB::archive(const Key& key, const void* data, eckit::Length length, const Key& field, const ArchiveCallback& callback) {

CatalogueWriter* cat = dynamic_cast<CatalogueWriter*>(catalogue_.get());
ASSERT(cat);

const Index& idx = cat->currentIndex();
cat->archive(key, store().archive(idx.key(), data, length));
std::unique_ptr<FieldLocation> location(store().archive(idx.key(), data, length));

callback(field, *location);

cat->archive(key, std::move(location));
}

bool DB::open() {
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/database/DB.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "fdb5/database/EntryVisitMechanism.h"
#include "fdb5/database/Key.h"
#include "fdb5/database/Store.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

namespace eckit {
class DataHandle;
Expand Down Expand Up @@ -56,7 +57,7 @@ class DB final : public eckit::OwnedLock {
bool axis(const std::string &keyword, eckit::StringSet &s) const;
bool inspect(const Key& key, Field& field);
eckit::DataHandle *retrieve(const Key &key);
void archive(const Key &key, const void *data, eckit::Length length);
void archive(const Key &key, const void *data, eckit::Length length, const Key &field, const ArchiveCallback& callback = CALLBACK_NOOP);

bool open();
void flush();
Expand Down
1 change: 1 addition & 0 deletions tests/fdb/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ list( APPEND api_tests
select
dist
fdb_c
archive_callback
)

foreach( _test ${api_tests} )
Expand Down
73 changes: 73 additions & 0 deletions tests/fdb/api/test_archive_callback.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include "eckit/testing/Test.h"
#include "fdb5/api/FDB.h"

namespace fdb5::test {

//----------------------------------------------------------------------------------------------------------------------
CASE("Archive callback") {
FDB fdb;

std::string data_str = "Raining cats and dogs";
const void* data = static_cast<const void *>(data_str.c_str());
size_t length = data_str.size();

Key key;
key.set("class","od");
key.set("expver","xxxx");
key.set("type","fc");
key.set("stream","oper");
key.set("date","20101010");
key.set("time","0000");
key.set("domain","g");
key.set("levtype","sfc");
key.set("param","130");

std::map<fdb5::Key, eckit::URI> map;
std::vector<Key> keys;

fdb.registerCallback([&map] (const fdb5::Key& key, const fdb5::FieldLocation& location) {
map[key] = location.fullUri();
});

key.set("step","1");
keys.push_back(key);
fdb.archive(key, data, length);

key.set("date","20111213");
keys.push_back(key);
fdb.archive(key, data, length);

key.set("type","pf");
keys.push_back(key);
fdb.archive(key, data, length);

fdb.flush();

EXPECT(map.size() == 3);

// for (const auto& [key, uri] : map) {
// std::cout << key << " -> " << uri << std::endl;
// }

for (const auto& [key, uri] : map) {
bool found = false;
for (const auto& originalKey : keys) {
if (key == originalKey){
found = true;
break;
}
}
EXPECT(found);
}

}
//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5::test

int main(int argc, char** argv) {

eckit::Log::info() << ::getenv("FDB_HOME") << std::endl;

return ::eckit::testing::run_tests(argc, argv);
}

0 comments on commit bd6cef6

Please sign in to comment.