From 97ebc17afba88390eea31c1079d88889a1b36503 Mon Sep 17 00:00:00 2001 From: 00k Date: Fri, 22 Apr 2016 12:15:26 +0800 Subject: [PATCH 1/4] bugfix of thread_pool --- src/common/thread_pool.h | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/common/thread_pool.h b/src/common/thread_pool.h index d249cf9ca..d7d410d15 100644 --- a/src/common/thread_pool.h +++ b/src/common/thread_pool.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -27,7 +28,6 @@ class ThreadPool { work_cv_(&mutex_), stop_(false), last_task_id_(0), - running_task_id_(0), schedule_cost_sum_(0), schedule_count_(0), task_cost_sum_(0), @@ -114,7 +114,7 @@ class ThreadPool { while (1) { { MutexLock lock(&mutex_); - if (running_task_id_ != task_id) { + if (running_task_ids_.find(task_id) == running_task_ids_.end()) { BGMap::iterator it = latest_.find(task_id); if (it == latest_.end()) { if (is_running != NULL) { @@ -197,13 +197,13 @@ class ThreadPool { schedule_count_++; task = bg_item.task; latest_.erase(it); - running_task_id_ = bg_item.id; + running_task_ids_.insert(bg_item.id); mutex_.Unlock(); task(bg_item.id); + mutex_.Lock("ThreadProcRelock"); task_cost_sum_ += timer::get_micros() - now_time; task_count_++; - mutex_.Lock("ThreadProcRelock"); - running_task_id_ = 0; + running_task_ids_.erase(bg_item.id); } continue; } else if (queue_.empty() && !stop_) { @@ -222,10 +222,9 @@ class ThreadPool { schedule_count_++; mutex_.Unlock(); task(0); - int64_t finish_time = timer::get_micros(); - task_cost_sum_ += finish_time - start_time; - task_count_++; mutex_.Lock("ThreadProcRelock2"); + task_cost_sum_ += timer::get_micros() - start_time; + task_count_++; } } } @@ -257,11 +256,11 @@ class ThreadPool { CondVar work_cv_; bool stop_; std::vector tids_; + std::set running_task_ids_; BGQueue time_queue_; BGMap latest_; int64_t last_task_id_; - int64_t running_task_id_; // for profiling int64_t schedule_cost_sum_; From a6f47b2edba78b94ef1edb441eb546be8bef8901 Mon Sep 17 00:00:00 2001 From: 00k Date: Tue, 5 Apr 2016 10:08:07 +0800 Subject: [PATCH 2/4] sdk r/w rpc pack upper limit --- src/sdk/table_impl.cc | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 6b97140f8..92e36316b 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -595,35 +595,38 @@ void TableImpl::PackMutations(const std::string& server_addr, TaskBatch* mutation_batch = NULL; std::map::iterator it = _mutation_batch_map.find(server_addr); - if (it == _mutation_batch_map.end()) { - mutation_batch = &_mutation_batch_map[server_addr]; - mutation_batch->row_id_list = new std::vector; - ThreadPool::Task task = - boost::bind(&TableImpl::MutationBatchTimeout, this, server_addr); - int64_t timer_id = _thread_pool->DelayTask(_write_commit_timeout, task); - mutation_batch->timer_id = timer_id; - } else { + if (it != _mutation_batch_map.end()) { mutation_batch = &it->second; } for (size_t i = 0; i < mu_list.size(); ++i) { + if (mutation_batch == NULL) { + mutation_batch = &_mutation_batch_map[server_addr]; + mutation_batch->row_id_list = new std::vector; + ThreadPool::Task task = + boost::bind(&TableImpl::MutationBatchTimeout, this, server_addr); + int64_t timer_id = _thread_pool->DelayTask(_write_commit_timeout, task); + mutation_batch->timer_id = timer_id; + } + RowMutationImpl* row_mutation = mu_list[i]; mutation_batch->row_id_list->push_back(row_mutation->GetId()); row_mutation->DecRef(); - } - if (mutation_batch->row_id_list->size() >= _commit_size) { - std::vector* mu_id_list = mutation_batch->row_id_list; - uint64_t timer_id = mutation_batch->timer_id; - _mutation_batch_mutex.Unlock(); - if (_thread_pool->CancelTask(timer_id)) { - _mutation_batch_mutex.Lock(); - _mutation_batch_map.erase(server_addr); - _mutation_batch_mutex.Unlock(); - CommitMutationsById(server_addr, *mu_id_list); - delete mu_id_list; + if (mutation_batch->row_id_list->size() >= _commit_size) { + std::vector* mu_id_list = mutation_batch->row_id_list; + uint64_t timer_id = mutation_batch->timer_id; + const bool non_block_cancel = true; + bool is_running = false; + if (_thread_pool->CancelTask(timer_id, non_block_cancel, &is_running)) { + _mutation_batch_map.erase(server_addr); + CommitMutationsById(server_addr, *mu_id_list); + delete mu_id_list; + } else { + CHECK(is_running); + } + mutation_batch = true; } - _mutation_batch_mutex.Lock(); } } From 4833468dcb2785ec67b3187b19cdbab1284b5a3b Mon Sep 17 00:00:00 2001 From: 00k Date: Wed, 20 Apr 2016 19:20:44 +0800 Subject: [PATCH 3/4] set max rpc size as upper limit for sdk write --- src/master/tablet_manager.cc | 2 +- src/sdk/table_impl.cc | 100 +++++++++++++++++++++-------------- src/sdk/table_impl.h | 8 ++- src/types.h | 2 +- 4 files changed, 67 insertions(+), 45 deletions(-) diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index 667066999..aa6adfe82 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -1401,7 +1401,7 @@ bool TabletManager::DumpMetaTable(const std::string& meta_tablet_addr, mutation->set_value(packed_value); } // dump tablet record - int32_t request_size = 0; + uint64_t request_size = 0; for (size_t i = 0; i < tablets.size(); i++) { std::string packed_key; std::string packed_value; diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 92e36316b..c17d56fc1 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -70,6 +70,8 @@ TableImpl::TableImpl(const std::string& table_name, _commit_size(FLAGS_tera_sdk_batch_size), _write_commit_timeout(FLAGS_tera_sdk_write_send_interval), _read_commit_timeout(FLAGS_tera_sdk_read_send_interval), + _mutation_batch_seq(0), + _reader_batch_seq(0), _max_commit_pending_num(FLAGS_tera_sdk_max_mutation_pending_num), _max_reader_pending_num(FLAGS_tera_sdk_max_reader_pending_num), _meta_cond(&_meta_mutex), @@ -586,51 +588,58 @@ void TableImpl::DistributeMutationsById(std::vector* mu_id_list) { void TableImpl::PackMutations(const std::string& server_addr, std::vector& mu_list, bool flush) { - if (flush) { - CommitMutations(server_addr, mu_list); - return; - } - MutexLock lock(&_mutation_batch_mutex); TaskBatch* mutation_batch = NULL; - std::map::iterator it = - _mutation_batch_map.find(server_addr); - if (it != _mutation_batch_map.end()) { - mutation_batch = &it->second; - } - for (size_t i = 0; i < mu_list.size(); ++i) { + // find existing batch or create a new batch if (mutation_batch == NULL) { - mutation_batch = &_mutation_batch_map[server_addr]; - mutation_batch->row_id_list = new std::vector; - ThreadPool::Task task = - boost::bind(&TableImpl::MutationBatchTimeout, this, server_addr); - int64_t timer_id = _thread_pool->DelayTask(_write_commit_timeout, task); - mutation_batch->timer_id = timer_id; + std::map::iterator it = _mutation_batch_map.find(server_addr); + if (it != _mutation_batch_map.end()) { + mutation_batch = &it->second; + } else { + mutation_batch = &_mutation_batch_map[server_addr]; + mutation_batch->sequence_num = _mutation_batch_seq++; + mutation_batch->row_id_list = new std::vector; + ThreadPool::Task task = boost::bind(&TableImpl::MutationBatchTimeout, this, + server_addr, mutation_batch->sequence_num); + int64_t timer_id = _thread_pool->DelayTask(_write_commit_timeout, task); + mutation_batch->timer_id = timer_id; + mutation_batch->byte_size = 0; + } } + // put mutation into the batch RowMutationImpl* row_mutation = mu_list[i]; mutation_batch->row_id_list->push_back(row_mutation->GetId()); + mutation_batch->byte_size += row_mutation->Size(); row_mutation->DecRef(); - if (mutation_batch->row_id_list->size() >= _commit_size) { + // commit the batch if: + // 1) batch_byte_size >= max_rpc_byte_size + // for the *LAST* batch, commit it if: + // 2) any mutation is sync (flush == true) + // 3) batch_row_num >= min_batch_row_num + if (mutation_batch->byte_size >= kMaxRpcSize || + (i == mu_list.size() - 1 && + (flush || mutation_batch->row_id_list->size() >= _commit_size))) { std::vector* mu_id_list = mutation_batch->row_id_list; uint64_t timer_id = mutation_batch->timer_id; const bool non_block_cancel = true; bool is_running = false; - if (_thread_pool->CancelTask(timer_id, non_block_cancel, &is_running)) { - _mutation_batch_map.erase(server_addr); - CommitMutationsById(server_addr, *mu_id_list); - delete mu_id_list; - } else { - CHECK(is_running); + if (!_thread_pool->CancelTask(timer_id, non_block_cancel, &is_running)) { + CHECK(is_running); // this delay task must be waiting for _mutation_batch_mutex } - mutation_batch = true; + _mutation_batch_map.erase(server_addr); + _mutation_batch_mutex.Unlock(); + CommitMutationsById(server_addr, *mu_id_list); + delete mu_id_list; + mutation_batch = NULL; + _mutation_batch_mutex.Lock(); } } } -void TableImpl::MutationBatchTimeout(std::string server_addr) { +void TableImpl::MutationBatchTimeout(std::string server_addr, uint64_t batch_seq) { std::vector* mu_id_list = NULL; { MutexLock lock(&_mutation_batch_mutex); @@ -640,6 +649,9 @@ void TableImpl::MutationBatchTimeout(std::string server_addr) { return; } TaskBatch* mutation_batch = &it->second; + if (mutation_batch->sequence_num != batch_seq) { + return; + } mu_id_list = mutation_batch->row_id_list; _mutation_batch_map.erase(it); } @@ -686,6 +698,7 @@ void TableImpl::CommitMutations(const std::string& server_addr, row_mutation->DecRef(); } + VLOG(20) << "commit " << mu_list.size() << " mutations to " << server_addr; request->set_timestamp(common::timer::get_micros()); Closure* done = NewClosure(this, &TableImpl::MutateCallBack, mu_id_list); @@ -929,17 +942,17 @@ void TableImpl::PackReaders(const std::string& server_addr, std::vector& reader_list) { MutexLock lock(&_reader_batch_mutex); TaskBatch* reader_buffer = NULL; - std::map::iterator it = - _reader_batch_map.find(server_addr); - if (it == _reader_batch_map.end()) { + std::map::iterator it = _reader_batch_map.find(server_addr); + if (it != _reader_batch_map.end()) { + reader_buffer = &it->second; + } else { reader_buffer = &_reader_batch_map[server_addr]; + reader_buffer->sequence_num = _reader_batch_seq++; reader_buffer->row_id_list = new std::vector; - ThreadPool::Task task = - boost::bind(&TableImpl::ReaderBatchTimeout, this, server_addr); + ThreadPool::Task task = boost::bind(&TableImpl::ReaderBatchTimeout, this, + server_addr, reader_buffer->sequence_num); uint64_t timer_id = _thread_pool->DelayTask(_read_commit_timeout, task); reader_buffer->timer_id = timer_id; - } else { - reader_buffer = &it->second; } for (size_t i = 0; i < reader_list.size(); ++i) { @@ -951,19 +964,21 @@ void TableImpl::PackReaders(const std::string& server_addr, if (reader_buffer->row_id_list->size() >= _commit_size) { std::vector* reader_id_list = reader_buffer->row_id_list; uint64_t timer_id = reader_buffer->timer_id; - _reader_batch_mutex.Unlock(); - if (_thread_pool->CancelTask(timer_id)) { - _reader_batch_mutex.Lock(); - _reader_batch_map.erase(server_addr); - _reader_batch_mutex.Unlock(); - CommitReadersById(server_addr, *reader_id_list); - delete reader_id_list; + const bool non_block_cancel = true; + bool is_running = false; + if (!_thread_pool->CancelTask(timer_id, non_block_cancel, &is_running)) { + CHECK(is_running); // this delay task must be waiting for _reader_batch_map } + _reader_batch_map.erase(server_addr); + _reader_batch_mutex.Unlock(); + CommitReadersById(server_addr, *reader_id_list); + delete reader_id_list; + reader_buffer = NULL; _reader_batch_mutex.Lock(); } } -void TableImpl::ReaderBatchTimeout(std::string server_addr) { +void TableImpl::ReaderBatchTimeout(std::string server_addr, uint64_t batch_seq) { std::vector* reader_id_list = NULL; { MutexLock lock(&_reader_batch_mutex); @@ -973,6 +988,9 @@ void TableImpl::ReaderBatchTimeout(std::string server_addr) { return; } TaskBatch* reader_buffer = &it->second; + if (reader_buffer->sequence_num != batch_seq) { + return; + } reader_id_list = reader_buffer->row_id_list; _reader_batch_map.erase(it); } diff --git a/src/sdk/table_impl.h b/src/sdk/table_impl.h index 06fac3ecb..bc2aba20a 100644 --- a/src/sdk/table_impl.h +++ b/src/sdk/table_impl.h @@ -235,7 +235,7 @@ class TableImpl : public Table { bool flush); // mutation打包不满但到达最大等待时间 - void MutationBatchTimeout(std::string server_addr); + void MutationBatchTimeout(std::string server_addr, uint64_t batch_seq); // 通过异步RPC将mutation提交至TS void CommitMutationsById(const std::string& server_addr, @@ -263,7 +263,7 @@ class TableImpl : public Table { std::vector& reader_list); // reader打包不满但到达最大等待时间 - void ReaderBatchTimeout(std::string server_addr); + void ReaderBatchTimeout(std::string server_addr, uint64_t batch_seq); // 通过异步RPC将reader提交至TS void CommitReadersById(const std::string server_addr, @@ -366,7 +366,9 @@ class TableImpl : public Table { void operator=(const TableImpl&); struct TaskBatch { + uint64_t sequence_num; uint64_t timer_id; + uint64_t byte_size; std::vector* row_id_list; }; @@ -382,6 +384,8 @@ class TableImpl : public Table { uint64_t _read_commit_timeout; std::map _mutation_batch_map; std::map _reader_batch_map; + uint64_t _mutation_batch_seq; + uint64_t _reader_batch_seq; Counter _cur_commit_pending_counter; Counter _cur_reader_pending_counter; int64_t _max_commit_pending_num; diff --git a/src/types.h b/src/types.h index e7b88dc31..bfad100da 100644 --- a/src/types.h +++ b/src/types.h @@ -31,7 +31,7 @@ const std::string kSms = "[SMS] "; const std::string kMail = "[MAIL] "; const int64_t kLatestTs = INT64_MAX; const int64_t kOldestTs = INT64_MIN; -const int32_t kMaxRpcSize = (16 << 20); +const uint64_t kMaxRpcSize = (16 << 20); // 16MB const uint64_t kRowkeySize = (64 << 10); // 64KB const uint64_t kQualifierSize = (64 << 10); // 64KB const uint64_t kValueSize = (32 << 20); // 32MB From e429b78abbbbbc946df3d13ace83484f7641fb1a Mon Sep 17 00:00:00 2001 From: 00k Date: Tue, 19 Apr 2016 10:19:07 +0800 Subject: [PATCH 4/4] add version to tera_mark --- src/benchmark/mark_main.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/benchmark/mark_main.cc b/src/benchmark/mark_main.cc index 8d193b33f..59ba2eccf 100644 --- a/src/benchmark/mark_main.cc +++ b/src/benchmark/mark_main.cc @@ -14,6 +14,7 @@ #include "benchmark/mark.h" #include "types.h" +#include "version.h" DECLARE_string(flagfile); DEFINE_string(tablename, "", "table_name"); @@ -487,6 +488,11 @@ void print_summary_proc(Adapter* adapter, double duration) { int main(int argc, char** argv) { ::google::ParseCommandLineFlags(&argc, &argv, true); + if (argc > 1 && strcmp(argv[1], "version") == 0) { + PrintSystemVersion(); + return 0; + } + tera::ErrorCode err; tera::Client* client = tera::Client::NewClient("", "tera_mark"); if (NULL == client) {