Skip to content

Commit

Permalink
Set a hardcoded size limit on TaskStream
Browse files Browse the repository at this point in the history
  • Loading branch information
veloman-yunkan committed Jul 5, 2022
1 parent 039325a commit cbf318d
Showing 1 changed file with 35 additions and 1 deletion.
36 changes: 35 additions & 1 deletion src/zimcheck/checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@ void ArticleChecker::detect_redundant_articles()

class TaskStream
{
public: // constants
const static size_t MAX_SIZE = 1000;

public: // functions
explicit TaskStream(ArticleChecker* ac)
: articleChecker(*ac)
Expand All @@ -542,7 +545,11 @@ class TaskStream
void addTask(zim::Entry entry)
{
assert(expectingMoreTasks);
std::lock_guard<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);

while ( inIsBlocked() )
waitUntilInIsUnblocked(lock);

taskQueue.push(entry);
unblockOut();
}
Expand All @@ -569,16 +576,31 @@ class TaskStream
}
}

bool inIsBlocked() const
{
return taskQueue.size() >= MAX_SIZE;
}

bool outIsBlocked() const
{
return expectingMoreTasks && taskQueue.empty();
}

void waitUntilInIsUnblocked(std::unique_lock<std::mutex>& lock)
{
cv.wait(lock);
}

void waitUntilOutIsUnblocked(std::unique_lock<std::mutex>& lock)
{
cv.wait(lock);
}

void unblockIn()
{
cv.notify_one();
}

void unblockOut()
{
cv.notify_one();
Expand All @@ -595,6 +617,7 @@ class TaskStream
{
t = std::make_shared<zim::Entry>(taskQueue.front());
taskQueue.pop();
unblockIn();
}
return t;
}
Expand All @@ -604,7 +627,18 @@ class TaskStream
std::queue<zim::Entry> taskQueue;
std::mutex mutex;
std::thread thread;

// cv is used for managing the blocking & unblocking of both ends of the
// task queue. This is possible with a single std::condition_variable due
// to the fact that both ends of the queue cannot be blocked at the same
// time:
// 1. when the input is blocked (because the queue is full), the consumer
// is free to proceed (thus unblocking the input).
// 2. when the output is blocked (becuase the queue is empty while more data
// has been promised) the producer is welcome to use the input end.
// Thus only the consumer or the producer waits on cv.
std::condition_variable cv;

std::atomic<bool> expectingMoreTasks;
};

Expand Down

0 comments on commit cbf318d

Please sign in to comment.