Skip to content

Commit

Permalink
Merge pull request #863 from 00k/0.3
Browse files Browse the repository at this point in the history
同步PR #862 到0.3分支
  • Loading branch information
xupeilin committed Apr 22, 2016
2 parents da20a66 + e429b78 commit 0a7e8ed
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 55 deletions.
6 changes: 6 additions & 0 deletions src/benchmark/mark_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "benchmark/mark.h"
#include "types.h"
#include "version.h"

DECLARE_string(flagfile);
DEFINE_string(tablename, "", "table_name");
Expand Down Expand Up @@ -487,6 +488,11 @@ void print_summary_proc(Adapter* adapter, double duration) {
int main(int argc, char** argv) {
::google::ParseCommandLineFlags(&argc, &argv, true);

if (argc > 1 && strcmp(argv[1], "version") == 0) {
PrintSystemVersion();
return 0;
}

tera::ErrorCode err;
tera::Client* client = tera::Client::NewClient("", "tera_mark");
if (NULL == client) {
Expand Down
17 changes: 8 additions & 9 deletions src/common/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <deque>
#include <map>
#include <queue>
#include <set>
#include <sstream>
#include <vector>
#include <boost/function.hpp>
Expand All @@ -27,7 +28,6 @@ class ThreadPool {
work_cv_(&mutex_),
stop_(false),
last_task_id_(0),
running_task_id_(0),
schedule_cost_sum_(0),
schedule_count_(0),
task_cost_sum_(0),
Expand Down Expand Up @@ -114,7 +114,7 @@ class ThreadPool {
while (1) {
{
MutexLock lock(&mutex_);
if (running_task_id_ != task_id) {
if (running_task_ids_.find(task_id) == running_task_ids_.end()) {
BGMap::iterator it = latest_.find(task_id);
if (it == latest_.end()) {
if (is_running != NULL) {
Expand Down Expand Up @@ -197,13 +197,13 @@ class ThreadPool {
schedule_count_++;
task = bg_item.task;
latest_.erase(it);
running_task_id_ = bg_item.id;
running_task_ids_.insert(bg_item.id);
mutex_.Unlock();
task(bg_item.id);
mutex_.Lock("ThreadProcRelock");
task_cost_sum_ += timer::get_micros() - now_time;
task_count_++;
mutex_.Lock("ThreadProcRelock");
running_task_id_ = 0;
running_task_ids_.erase(bg_item.id);
}
continue;
} else if (queue_.empty() && !stop_) {
Expand All @@ -222,10 +222,9 @@ class ThreadPool {
schedule_count_++;
mutex_.Unlock();
task(0);
int64_t finish_time = timer::get_micros();
task_cost_sum_ += finish_time - start_time;
task_count_++;
mutex_.Lock("ThreadProcRelock2");
task_cost_sum_ += timer::get_micros() - start_time;
task_count_++;
}
}
}
Expand Down Expand Up @@ -257,11 +256,11 @@ class ThreadPool {
CondVar work_cv_;
bool stop_;
std::vector<pthread_t> tids_;
std::set<int64_t> running_task_ids_;

BGQueue time_queue_;
BGMap latest_;
int64_t last_task_id_;
int64_t running_task_id_;

// for profiling
int64_t schedule_cost_sum_;
Expand Down
2 changes: 1 addition & 1 deletion src/master/tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,7 @@ bool TabletManager::DumpMetaTable(const std::string& meta_tablet_addr,
mutation->set_value(packed_value);
}
// dump tablet record
int32_t request_size = 0;
uint64_t request_size = 0;
for (size_t i = 0; i < tablets.size(); i++) {
std::string packed_key;
std::string packed_value;
Expand Down
105 changes: 63 additions & 42 deletions src/sdk/table_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ TableImpl::TableImpl(const std::string& table_name,
_commit_size(FLAGS_tera_sdk_batch_size),
_write_commit_timeout(FLAGS_tera_sdk_write_send_interval),
_read_commit_timeout(FLAGS_tera_sdk_read_send_interval),
_mutation_batch_seq(0),
_reader_batch_seq(0),
_max_commit_pending_num(FLAGS_tera_sdk_max_mutation_pending_num),
_max_reader_pending_num(FLAGS_tera_sdk_max_reader_pending_num),
_meta_cond(&_meta_mutex),
Expand Down Expand Up @@ -586,48 +588,58 @@ void TableImpl::DistributeMutationsById(std::vector<int64_t>* mu_id_list) {
void TableImpl::PackMutations(const std::string& server_addr,
std::vector<RowMutationImpl*>& mu_list,
bool flush) {
if (flush) {
CommitMutations(server_addr, mu_list);
return;
}

MutexLock lock(&_mutation_batch_mutex);
TaskBatch* mutation_batch = NULL;
std::map<std::string, TaskBatch>::iterator it =
_mutation_batch_map.find(server_addr);
if (it == _mutation_batch_map.end()) {
mutation_batch = &_mutation_batch_map[server_addr];
mutation_batch->row_id_list = new std::vector<int64_t>;
ThreadPool::Task task =
boost::bind(&TableImpl::MutationBatchTimeout, this, server_addr);
int64_t timer_id = _thread_pool->DelayTask(_write_commit_timeout, task);
mutation_batch->timer_id = timer_id;
} else {
mutation_batch = &it->second;
}

for (size_t i = 0; i < mu_list.size(); ++i) {
// find existing batch or create a new batch
if (mutation_batch == NULL) {
std::map<std::string, TaskBatch>::iterator it = _mutation_batch_map.find(server_addr);
if (it != _mutation_batch_map.end()) {
mutation_batch = &it->second;
} else {
mutation_batch = &_mutation_batch_map[server_addr];
mutation_batch->sequence_num = _mutation_batch_seq++;
mutation_batch->row_id_list = new std::vector<int64_t>;
ThreadPool::Task task = boost::bind(&TableImpl::MutationBatchTimeout, this,
server_addr, mutation_batch->sequence_num);
int64_t timer_id = _thread_pool->DelayTask(_write_commit_timeout, task);
mutation_batch->timer_id = timer_id;
mutation_batch->byte_size = 0;
}
}

// put mutation into the batch
RowMutationImpl* row_mutation = mu_list[i];
mutation_batch->row_id_list->push_back(row_mutation->GetId());
mutation_batch->byte_size += row_mutation->Size();
row_mutation->DecRef();
}

if (mutation_batch->row_id_list->size() >= _commit_size) {
std::vector<int64_t>* mu_id_list = mutation_batch->row_id_list;
uint64_t timer_id = mutation_batch->timer_id;
_mutation_batch_mutex.Unlock();
if (_thread_pool->CancelTask(timer_id)) {
_mutation_batch_mutex.Lock();
// commit the batch if:
// 1) batch_byte_size >= max_rpc_byte_size
// for the *LAST* batch, commit it if:
// 2) any mutation is sync (flush == true)
// 3) batch_row_num >= min_batch_row_num
if (mutation_batch->byte_size >= kMaxRpcSize ||
(i == mu_list.size() - 1 &&
(flush || mutation_batch->row_id_list->size() >= _commit_size))) {
std::vector<int64_t>* mu_id_list = mutation_batch->row_id_list;
uint64_t timer_id = mutation_batch->timer_id;
const bool non_block_cancel = true;
bool is_running = false;
if (!_thread_pool->CancelTask(timer_id, non_block_cancel, &is_running)) {
CHECK(is_running); // this delay task must be waiting for _mutation_batch_mutex
}
_mutation_batch_map.erase(server_addr);
_mutation_batch_mutex.Unlock();
CommitMutationsById(server_addr, *mu_id_list);
delete mu_id_list;
mutation_batch = NULL;
_mutation_batch_mutex.Lock();
}
_mutation_batch_mutex.Lock();
}
}

void TableImpl::MutationBatchTimeout(std::string server_addr) {
void TableImpl::MutationBatchTimeout(std::string server_addr, uint64_t batch_seq) {
std::vector<int64_t>* mu_id_list = NULL;
{
MutexLock lock(&_mutation_batch_mutex);
Expand All @@ -637,6 +649,9 @@ void TableImpl::MutationBatchTimeout(std::string server_addr) {
return;
}
TaskBatch* mutation_batch = &it->second;
if (mutation_batch->sequence_num != batch_seq) {
return;
}
mu_id_list = mutation_batch->row_id_list;
_mutation_batch_map.erase(it);
}
Expand Down Expand Up @@ -683,6 +698,7 @@ void TableImpl::CommitMutations(const std::string& server_addr,
row_mutation->DecRef();
}

VLOG(20) << "commit " << mu_list.size() << " mutations to " << server_addr;
request->set_timestamp(common::timer::get_micros());
Closure<void, WriteTabletRequest*, WriteTabletResponse*, bool, int>* done =
NewClosure(this, &TableImpl::MutateCallBack, mu_id_list);
Expand Down Expand Up @@ -926,17 +942,17 @@ void TableImpl::PackReaders(const std::string& server_addr,
std::vector<RowReaderImpl*>& reader_list) {
MutexLock lock(&_reader_batch_mutex);
TaskBatch* reader_buffer = NULL;
std::map<std::string, TaskBatch>::iterator it =
_reader_batch_map.find(server_addr);
if (it == _reader_batch_map.end()) {
std::map<std::string, TaskBatch>::iterator it = _reader_batch_map.find(server_addr);
if (it != _reader_batch_map.end()) {
reader_buffer = &it->second;
} else {
reader_buffer = &_reader_batch_map[server_addr];
reader_buffer->sequence_num = _reader_batch_seq++;
reader_buffer->row_id_list = new std::vector<int64_t>;
ThreadPool::Task task =
boost::bind(&TableImpl::ReaderBatchTimeout, this, server_addr);
ThreadPool::Task task = boost::bind(&TableImpl::ReaderBatchTimeout, this,
server_addr, reader_buffer->sequence_num);
uint64_t timer_id = _thread_pool->DelayTask(_read_commit_timeout, task);
reader_buffer->timer_id = timer_id;
} else {
reader_buffer = &it->second;
}

for (size_t i = 0; i < reader_list.size(); ++i) {
Expand All @@ -948,19 +964,21 @@ void TableImpl::PackReaders(const std::string& server_addr,
if (reader_buffer->row_id_list->size() >= _commit_size) {
std::vector<int64_t>* reader_id_list = reader_buffer->row_id_list;
uint64_t timer_id = reader_buffer->timer_id;
_reader_batch_mutex.Unlock();
if (_thread_pool->CancelTask(timer_id)) {
_reader_batch_mutex.Lock();
_reader_batch_map.erase(server_addr);
_reader_batch_mutex.Unlock();
CommitReadersById(server_addr, *reader_id_list);
delete reader_id_list;
const bool non_block_cancel = true;
bool is_running = false;
if (!_thread_pool->CancelTask(timer_id, non_block_cancel, &is_running)) {
CHECK(is_running); // this delay task must be waiting for _reader_batch_map
}
_reader_batch_map.erase(server_addr);
_reader_batch_mutex.Unlock();
CommitReadersById(server_addr, *reader_id_list);
delete reader_id_list;
reader_buffer = NULL;
_reader_batch_mutex.Lock();
}
}

void TableImpl::ReaderBatchTimeout(std::string server_addr) {
void TableImpl::ReaderBatchTimeout(std::string server_addr, uint64_t batch_seq) {
std::vector<int64_t>* reader_id_list = NULL;
{
MutexLock lock(&_reader_batch_mutex);
Expand All @@ -970,6 +988,9 @@ void TableImpl::ReaderBatchTimeout(std::string server_addr) {
return;
}
TaskBatch* reader_buffer = &it->second;
if (reader_buffer->sequence_num != batch_seq) {
return;
}
reader_id_list = reader_buffer->row_id_list;
_reader_batch_map.erase(it);
}
Expand Down
8 changes: 6 additions & 2 deletions src/sdk/table_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class TableImpl : public Table {
bool flush);

// mutation打包不满但到达最大等待时间
void MutationBatchTimeout(std::string server_addr);
void MutationBatchTimeout(std::string server_addr, uint64_t batch_seq);

// 通过异步RPC将mutation提交至TS
void CommitMutationsById(const std::string& server_addr,
Expand Down Expand Up @@ -263,7 +263,7 @@ class TableImpl : public Table {
std::vector<RowReaderImpl*>& reader_list);

// reader打包不满但到达最大等待时间
void ReaderBatchTimeout(std::string server_addr);
void ReaderBatchTimeout(std::string server_addr, uint64_t batch_seq);

// 通过异步RPC将reader提交至TS
void CommitReadersById(const std::string server_addr,
Expand Down Expand Up @@ -366,7 +366,9 @@ class TableImpl : public Table {
void operator=(const TableImpl&);

struct TaskBatch {
uint64_t sequence_num;
uint64_t timer_id;
uint64_t byte_size;
std::vector<int64_t>* row_id_list;
};

Expand All @@ -382,6 +384,8 @@ class TableImpl : public Table {
uint64_t _read_commit_timeout;
std::map<std::string, TaskBatch> _mutation_batch_map;
std::map<std::string, TaskBatch> _reader_batch_map;
uint64_t _mutation_batch_seq;
uint64_t _reader_batch_seq;
Counter _cur_commit_pending_counter;
Counter _cur_reader_pending_counter;
int64_t _max_commit_pending_num;
Expand Down
2 changes: 1 addition & 1 deletion src/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const std::string kSms = "[SMS] ";
const std::string kMail = "[MAIL] ";
const int64_t kLatestTs = INT64_MAX;
const int64_t kOldestTs = INT64_MIN;
const int32_t kMaxRpcSize = (16 << 20);
const uint64_t kMaxRpcSize = (16 << 20); // 16MB
const uint64_t kRowkeySize = (64 << 10); // 64KB
const uint64_t kQualifierSize = (64 << 10); // 64KB
const uint64_t kValueSize = (32 << 20); // 32MB
Expand Down

0 comments on commit 0a7e8ed

Please sign in to comment.