Skip to content

Commit

Permalink
Fixed file deletion bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lte678 committed Jan 5, 2024
1 parent ade77a5 commit ceddc14
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/MergeAlgorithms.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace fmerge {
friend std::ostream& operator<<(std::ostream& os, const FileOperation& fop);
};

typedef std::map<std::string, std::vector<FileOperation>, std::less<std::string>> SortedOperationSet;
typedef std::map<std::string, std::vector<FileOperation>, std::greater<std::string>> SortedOperationSet;


enum class ConflictResolution {
Expand Down
66 changes: 61 additions & 5 deletions src/Syncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
namespace fmerge {
Syncer::Syncer(SortedOperationSet &operations, std::string _base_path, Connection &_peer_conn) : Syncer(operations, _base_path, _peer_conn, nullptr) {}

Syncer::Syncer(SortedOperationSet &operations, std::string _base_path, Connection &_peer_conn, CompletionCallback _status_callback) : queued_operations(operations), peer_conn(_peer_conn) {
queued_operations = operations;
Syncer::Syncer(SortedOperationSet &operations, std::string _base_path, Connection &_peer_conn, CompletionCallback _status_callback) : peer_conn(_peer_conn) {
auto[q1, q2] = split_operations(operations);
queued_parallel_operations = std::move(q1);
queued_sequential_operations = std::move(q2);
completion_callback = _status_callback;
base_path = _base_path;
}
Expand All @@ -24,6 +26,10 @@ namespace fmerge {
std::thread{[this, i](){worker_function(i);}}
);
}

// Use this thread as the sequential worker
sequential_function();

// Wait for threads to finish processing everything
for(auto &t: worker_threads) {
t.join();
Expand All @@ -41,12 +47,12 @@ namespace fmerge {
// Fetch a new task
std::unique_lock<std::mutex> op_lock(operations_mtx);
// We are done syncing.
if(queued_operations.size() == 0) return;
if(queued_parallel_operations.size() == 0) return;
// List of operations to apply to the file
auto op_set = queued_operations.begin();
auto op_set = queued_parallel_operations.begin();
auto filepath = op_set->first;
auto op_list = op_set->second;
queued_operations.erase(op_set);
queued_parallel_operations.erase(op_set);
op_lock.unlock();

DEBUG("[tid:" << tid << "] Processing file " << filepath << std::endl);
Expand All @@ -69,6 +75,56 @@ namespace fmerge {
}
}


void Syncer::sequential_function() {
for(auto op : queued_sequential_operations) {
const auto& filepath = op.first;
const auto& op_list = op.second;
DEBUG("[seq. thread] Processing file " << filepath << std::endl);

// Process that task

// For us to accurately reproduce the new file history, all operations
// have to be executed successfully. If this fails, we will have to resolve it
// or leave the file history in a dirty state, which will be corrected at
// the next database rebuild and merge.

bool successful = process_file(op_list);

if(!successful) {
LOG("[Error] File " << filepath << " is in a conflicted state!" << std::endl);
error_count++;
}
std::unique_lock<std::mutex> cb_lock(callback_mtx);
if(completion_callback) completion_callback(filepath, successful);
}
}


static bool is_sequential_operation(std::vector<FileOperation> ops) {
for(const auto& op : ops) {
if(op.type == FileOperationType::Delete) return true;
}
return false;
}


std::pair<SortedOperationSet, SortedOperationSet> Syncer::split_operations(SortedOperationSet &operation_map) {
SortedOperationSet parallel_ops{};
SortedOperationSet sequential_ops{};

for(const auto& op : operation_map) {
if(is_sequential_operation(op.second)) {
sequential_ops.emplace(op);
} else {
parallel_ops.emplace(op);
}
}

return {parallel_ops, sequential_ops};
}


bool Syncer::process_file(const std::vector<FileOperation> &ops) {
for(const auto& op : ops) {
std::string filepath{op.path};
Expand Down
6 changes: 5 additions & 1 deletion src/Syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ namespace fmerge {
Syncer(SortedOperationSet &operations, std::string _base_path, Connection &_peer_conn);
Syncer(SortedOperationSet &operations, std::string _base_path, Connection &_peer_conn, CompletionCallback _status_callback);

std::pair<SortedOperationSet, SortedOperationSet> split_operations(SortedOperationSet &operations);

void perform_sync();
void submit_file_transfer(const protocol::FileTransferPayload &ft_payload);
bool _submit_file_transfer(const protocol::FileTransferPayload &ft_payload);

int get_error_count() { return error_count.load(); }
private:
SortedOperationSet &queued_operations;
SortedOperationSet queued_parallel_operations{};
SortedOperationSet queued_sequential_operations{};
std::mutex operations_mtx;
// Status callback that is called after every processed file with the (completed, total) number of files
CompletionCallback completion_callback;
Expand All @@ -45,6 +48,7 @@ namespace fmerge {
std::atomic_int error_count{0};

void worker_function(int tid);
void sequential_function();
// Returns true if file was processed successfully
bool process_file(const std::vector<FileOperation> &ops);
};
Expand Down

0 comments on commit ceddc14

Please sign in to comment.