Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finite task queue in zimcheck #309

Merged
merged 2 commits into from
Jul 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/zimcheck/checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@ void ArticleChecker::detect_redundant_articles()

class TaskStream
{
public: // constants
const static size_t MAX_SIZE = 1000;

public: // functions
explicit TaskStream(ArticleChecker* ac)
: articleChecker(*ac)
Expand All @@ -542,16 +545,20 @@ class TaskStream
void addTask(zim::Entry entry)
{
assert(expectingMoreTasks);
std::lock_guard<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);

while ( inIsBlocked() )
waitUntilInIsUnblocked(lock);

taskQueue.push(entry);
unblock();
unblockOut();
}

void noMoreTasks()
{
std::lock_guard<std::mutex> lock(mutex);
expectingMoreTasks = false;
unblock();
unblockOut();
}

private: // types
Expand All @@ -569,32 +576,48 @@ class TaskStream
}
}

bool blocked() const
bool inIsBlocked() const
{
return taskQueue.size() >= MAX_SIZE;
}

bool outIsBlocked() const
{
return expectingMoreTasks && taskQueue.empty();
}

void waitUntilUnblocked(std::unique_lock<std::mutex>& lock)
void waitUntilInIsUnblocked(std::unique_lock<std::mutex>& lock)
{
cv.wait(lock);
}

void waitUntilOutIsUnblocked(std::unique_lock<std::mutex>& lock)
{
cv.wait(lock);
}

void unblock()
void unblockIn()
{
cv.notify_one();
}

void unblockOut()
{
cv.notify_one();
}

Task getNextTask()
{
std::unique_lock<std::mutex> lock(mutex);
if ( blocked() )
waitUntilUnblocked(lock);
if ( outIsBlocked() )
waitUntilOutIsUnblocked(lock);

Task t;
if ( !taskQueue.empty() )
{
t = std::make_shared<zim::Entry>(taskQueue.front());
taskQueue.pop();
unblockIn();
}
return t;
}
Expand All @@ -604,7 +627,18 @@ class TaskStream
std::queue<zim::Entry> taskQueue;
std::mutex mutex;
std::thread thread;

// cv is used for managing the blocking & unblocking of both ends of the
// task queue. This is possible with a single std::condition_variable due
// to the fact that both ends of the queue cannot be blocked at the same
// time:
// 1. when the input is blocked (because the queue is full), the consumer
// is free to proceed (thus unblocking the input).
// 2. when the output is blocked (becuase the queue is empty while more data
// has been promised) the producer is welcome to use the input end.
// Thus only the consumer or the producer waits on cv.
std::condition_variable cv;

std::atomic<bool> expectingMoreTasks;
};

Expand Down