Skip to content

Commit

Permalink
Review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander.A,Utkin committed Dec 16, 2024
1 parent 7777016 commit 70ef6a7
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 18 deletions.
14 changes: 7 additions & 7 deletions pyreindexer/lib/src/rawpyreindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,22 @@ PyObject* queryResultsWrapperIterate(uintptr_t qresWrapperAddr) {
static PyObject* Init(PyObject* self, PyObject* args) {
ReindexerConfig cfg;
char* clientName = nullptr;
int connectTimeout = 0;
int requestTimeout = 0;
int netTimeout = 0;
unsigned enableCompression = 0;
unsigned startSpecialThread = 0;
unsigned syncRxCoroCount = 0;
unsigned maxReplUpdatesSize = 0;
if (!PyArg_ParseTuple(args, "iiiIIsIif", &cfg.fetchAmount, &connectTimeout, &requestTimeout,
&enableCompression, &startSpecialThread, &clientName, &maxReplUpdatesSize,
&cfg.allocatorCacheLimit, &cfg.allocatorCachePart)) {
if (!PyArg_ParseTuple(args, "iiiIIsIIif", &cfg.fetchAmount, &cfg.reconnectAttempts, &netTimeout,
&enableCompression, &startSpecialThread, &clientName, &syncRxCoroCount,
&maxReplUpdatesSize, &cfg.allocatorCacheLimit, &cfg.allocatorCachePart)) {
return nullptr;
}

cfg.connectTimeout = std::chrono::seconds(connectTimeout);
cfg.requestTimeout = std::chrono::seconds(requestTimeout);
cfg.netTimeout = std::chrono::milliseconds(netTimeout);
cfg.enableCompression = (enableCompression != 0);
cfg.requestDedicatedThread = (startSpecialThread != 0);
cfg.appName = clientName;
cfg.syncRxCoroCount = syncRxCoroCount;
cfg.maxReplUpdatesSize = maxReplUpdatesSize;

uintptr_t rx = initReindexer(cfg);
Expand Down
17 changes: 15 additions & 2 deletions pyreindexer/lib/src/reindexerinterface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ class GenericCommand : public ICommand {
std::atomic_bool executed_{false};
};

namespace {
reindexer::client::ReindexerConfig makeClientConfig(const ReindexerConfig& cfg) {
reindexer::client::ReindexerConfig config;
config.FetchAmount = cfg.fetchAmount;
config.ReconnectAttempts = cfg.reconnectAttempts;
// config.NetTimeout = cfg.netTimeout; // ToDo after migrate on v.4
config.EnableCompression = cfg.enableCompression;
config.RequestDedicatedThread = cfg.requestDedicatedThread;
config.AppName = cfg.appName;
//config.SyncRxCoroCount = cfg.syncRxCoroCount; // ToDo after migrate on v.4
return config;
}
} // namespace

template <>
ReindexerInterface<reindexer::Reindexer>::ReindexerInterface(const ReindexerConfig& cfg)
: db_(reindexer::ReindexerConfig().WithUpdatesSize(cfg.maxReplUpdatesSize)
Expand All @@ -46,8 +60,7 @@ ReindexerInterface<reindexer::Reindexer>::ReindexerInterface(const ReindexerConf

template <>
ReindexerInterface<reindexer::client::CoroReindexer>::ReindexerInterface(const ReindexerConfig& cfg)
: db_(reindexer::client::ReindexerConfig(4, 1, cfg.fetchAmount, 0, cfg.connectTimeout, cfg.requestTimeout,
cfg.enableCompression, cfg.requestDedicatedThread, cfg.appName)) {
: db_(makeClientConfig(cfg)) {
std::atomic_bool running{false};
executionThr_ = std::thread([this, &running] {
cmdAsync_.set(loop_);
Expand Down
5 changes: 3 additions & 2 deletions pyreindexer/lib/src/reindexerinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class ICommand;

struct ReindexerConfig {
int fetchAmount{1000};
std::chrono::seconds connectTimeout{0};
std::chrono::seconds requestTimeout{0};
int reconnectAttempts{0};
std::chrono::milliseconds netTimeout{0};
bool enableCompression{false};
bool requestDedicatedThread{false};
std::string appName;
unsigned int syncRxCoroCount{10};

size_t maxReplUpdatesSize{1024 * 1024 * 1024};
int64_t allocatorCacheLimit{-1};
Expand Down
16 changes: 9 additions & 7 deletions pyreindexer/rx_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ class RxConnector(RaiserMixin):
def __init__(self, dsn: str, *,
# cproto options
fetch_amount: int = 1000,
connect_timeout: int = 0,
request_timeout: int = 0,
reconnect_attempts: int = 0,
net_timeout: int = 0,
enable_compression: bool = False,
start_special_thread: bool = False,
client_name: str = 'pyreindexer',
sync_rxcoro_count: int = 10,
# builtin options
max_replication_updates_size: int = 1024 * 1024 * 1024,
allocator_cache_limit: int = -1,
Expand All @@ -41,12 +42,13 @@ def __init__(self, dsn: str, *,
cproto options:
fetch_amount (int): The number of items that will be fetched by one operation
connect_timeout (int): Connection and database login timeout value [seconds]
request_timeout (int): Request execution timeout value [seconds]
reconnect_attempts (int): Number of reconnection attempts when connection is lost
net_timeout (int): Connection and database login timeout value [milliseconds]
enable_compression (bool): Flag enable/disable traffic compression
start_special_thread (bool): Determines whether to request a special thread of execution
on the server for this connection
client_name (string): Proper name of the application (as a client for Reindexer-server)
sync_rxcoro_count (int): Client concurrency per connection
built-in options:
max_replication_updates_size (int): Max pended replication updates size in bytes
Expand All @@ -59,9 +61,9 @@ def __init__(self, dsn: str, *,
self.err_msg = ''
self.rx = 0
self._api_import(dsn)
self.rx = self.api.init(fetch_amount, connect_timeout, request_timeout, enable_compression,
start_special_thread, client_name, max_replication_updates_size,
allocator_cache_limit, allocator_cache_part)
self.rx = self.api.init(fetch_amount, reconnect_attempts, net_timeout, enable_compression,
start_special_thread, client_name, sync_rxcoro_count,
max_replication_updates_size, allocator_cache_limit, allocator_cache_part)
self._api_connect(dsn)

def __del__(self):
Expand Down

0 comments on commit 70ef6a7

Please sign in to comment.