-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathThreadPool_basics.cpp
157 lines (136 loc) · 4.69 KB
/
ThreadPool_basics.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
#include <vector>
#include <poolqueue/ThreadPool.hpp>
using poolqueue::ThreadPool;
using poolqueue::Promise;
int main() {
ThreadPool tp;
// The default size of the pool is the number of hardware threads.
std::cout << tp.getThreadCount() << " default threads\n";
// The size of the pool can be changed if desired.
tp.setThreadCount(5);
assert(tp.getThreadCount() == 5);
std::cout << "Changed to " << tp.getThreadCount() << " threads\n";
#if 0
// DON'T DO THIS - Don't change the thread count from within a pool
// thread.
tp.post([&tp]() {
tp.setThreadCount(1);
return nullptr;
});
#endif
// Use post() to queue a function to execute in the pool.
std::cout << "main() thread " << std::this_thread::get_id() << '\n';
tp.post([]() {
std::cout << "function running in thread " << std::this_thread::get_id() << '\n';
return nullptr;
});
#if 0
// DON'T DO THIS - Functions to execute in the pool should not have
// an argument.
tp.post([](const std::string& s) {
return nullptr;
});
#endif
// index() returns the thread index in the pool, -1 if not a
// pool thread.
assert(tp.index() == -1);
tp.post([&tp]() {
const auto index = tp.index();
assert(index >= 0 && index < tp.getThreadCount());
std::cout << "posted thread index " << index << '\n';
return nullptr;
});
// dispatch() will execute its function argument immediately if
// calling thread is in the pool; otherwise it will call post().
tp.dispatch([&tp]() {
const auto index = tp.index();
assert(index >= 0 && index < tp.getThreadCount());
std::cout << "dispatched thread index " << index << '\n';
return nullptr;
});
tp.post([&tp]() {
// Calling dispatch() here invokes the function synchronously.
std::cout << "calling dispatch() from " << tp.index() << "...\n";
tp.dispatch([&tp]() {
std::cout << "...executes synchronously on " << tp.index() << '\n';
return nullptr;
});
return nullptr;
});
// wrap() transforms a function into a new function that dispatches
// the original function.
auto wrapped = tp.wrap([&tp]() {
std::cout << "wrapped function on " << tp.index() << '\n';
return nullptr;
});
wrapped();
// post() and dispatch() return a Promise that settles with the
// result of the function.
Promise p0 = tp.post([]() {
return std::string("foo");
});
p0.then([](const std::string& s) {
// Be aware that chained Promises will not necessarily
// execute on a ThreadPool thread (though they often will).
// If the returned Promise p0 is settled before then() is
// called, this function will execute on the current thread.
//
// If you really want callbacks to execute in the pool then
// you must invoke them with a ThreadPool method.
std::cout << "posted function returned " << s << '\n';
return nullptr;
});
Promise p1 = tp.dispatch([]() -> std::nullptr_t {
throw std::runtime_error("bar");
});
p1.except([](const std::exception_ptr& e) {
// Be aware that chained Promises will not necessarily
// execute on a ThreadPool thread (though they often will).
try {
if (e)
std::rethrow_exception(e);
}
catch (const std::exception& e) {
std::cout << "dispatched function threw " << e.what() << '\n';
}
return nullptr;
});
// synchronize() is an asynchronous barrier. It ensures that any
// functions queued on the ThreadPool before the call complete
// before any functions queued after the call begin. However, by
// itself it does *not* block - i.e. there is no guarantee what
// is or is not executing at the moment synchronize() returns.
tp.synchronize();
tp.post([]() {
// When this executes, which may not be until after the
// below synchronize() call returns, it will not overlap
// with any other ThreadPool function.
return nullptr;
});
tp.synchronize();
// synchronize() does return a std::shared_future<void> that *can*
// be used to block until the pool is idle.
std::atomic<int> counter(0);
for (int i = 0; i < 4; ++i) {
tp.post([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
++counter;
return nullptr;
});
}
std::shared_future<void> future = tp.synchronize();
future.wait();
assert(counter == 4);
#if 0
// DON'T DO THIS - Do not block inside a ThreadPool thread.
tp.post([&tp]() {
tp.synchronize().wait();
return nullptr;
});
#endif
return 0;
}