forked from NixOS/hydra
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstate.hh
549 lines (401 loc) · 15.2 KB
/
state.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <memory>
#include <queue>
#include "db.hh"
#include "parsed-derivations.hh"
#include "pathlocks.hh"
#include "pool.hh"
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
typedef unsigned int BuildID;
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
typedef std::atomic<unsigned long> counter;
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2, // builds only
bsAborted = 3,
bsCancelled = 4,
bsFailedWithOutput = 6, // builds only
bsTimedOut = 7,
bsCachedFailure = 8, // steps only
bsUnsupported = 9,
bsLogLimitExceeded = 10,
bsNarSizeLimitExceeded = 11,
bsNotDeterministic = 12,
bsBusy = 100, // not stored
} BuildStatus;
typedef enum {
ssPreparing = 1,
ssConnecting = 10,
ssSendingInputs = 20,
ssBuilding = 30,
ssReceivingOutputs = 40,
ssPostProcessing = 50,
} StepState;
struct RemoteResult
{
BuildStatus stepStatus = bsAborted;
bool canRetry = false; // for bsAborted
bool isCached = false; // for bsSucceed
bool canCache = false; // for bsFailed
std::string errorMsg; // for bsAborted
unsigned int timesBuilt = 0;
bool isNonDeterministic = false;
time_t startTime = 0, stopTime = 0;
unsigned int overhead = 0;
nix::Path logFile;
BuildStatus buildStatus() const
{
return stepStatus == bsCachedFailure ? bsFailed : stepStatus;
}
};
struct Step;
struct BuildOutput;
class Jobset
{
public:
typedef std::shared_ptr<Jobset> ptr;
typedef std::weak_ptr<Jobset> wptr;
static const time_t schedulingWindow = 24 * 60 * 60;
private:
std::atomic<time_t> seconds{0};
std::atomic<unsigned int> shares{1};
/* The start time and duration of the most recent build steps. */
nix::Sync<std::map<time_t, time_t>> steps;
public:
double shareUsed()
{
return (double) seconds / shares;
}
void setShares(int shares_)
{
assert(shares_ > 0);
shares = shares_;
}
time_t getSeconds() { return seconds; }
void addStep(time_t startTime, time_t duration);
void pruneSteps();
};
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
nix::StorePath drvPath;
std::map<std::string, nix::StorePath> outputs;
std::string projectName, jobsetName, jobName;
time_t timestamp;
unsigned int maxSilentTime, buildTimeout;
int localPriority, globalPriority;
std::shared_ptr<Step> toplevel;
Jobset::ptr jobset;
std::atomic_bool finishedInDB{false};
Build(nix::StorePath && drvPath) : drvPath(std::move(drvPath))
{ }
std::string fullJobName()
{
return projectName + ":" + jobsetName + ":" + jobName;
}
void propagatePriorities();
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
nix::StorePath drvPath;
std::unique_ptr<nix::Derivation> drv;
std::unique_ptr<nix::ParsedDerivation> parsedDrv;
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
bool isDeterministic;
std::string systemType; // concatenation of drv.platform and requiredSystemFeatures
struct State
{
/* Whether the step has finished initialisation. */
bool created = false;
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Jobsets to which this step belongs. Used for determining
scheduling priority. */
std::set<Jobset::ptr> jobsets;
/* Number of times we've tried this step. */
unsigned int tries = 0;
/* Point in time after which the step can be retried. */
system_time after;
/* The highest global priority of any build depending on this
step. */
int highestGlobalPriority{0};
/* The highest local priority of any build depending on this
step. */
int highestLocalPriority{0};
/* The lowest ID of any build depending on this step. */
BuildID lowestBuildID{std::numeric_limits<BuildID>::max()};
/* The time at which this step became runnable. */
system_time runnableSince;
/* The time that we last saw a machine that supports this
step. */
system_time lastSupported = std::chrono::system_clock::now();
};
std::atomic_bool finished{false}; // debugging
nix::Sync<State> state;
Step(const nix::StorePath & drvPath) : drvPath(drvPath)
{ }
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps);
/* Call ‘visitor’ for a step and all its dependencies. */
void visitDependencies(std::function<void(Step::ptr)> visitor, Step::ptr step);
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
bool enabled{true};
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
std::string sshPublicHostKey;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
std::atomic<time_t> idleSince{0};
struct ConnectInfo
{
system_time lastFailure, disabledUntil;
unsigned int consecutiveFailures;
};
nix::Sync<ConnectInfo> connectInfo;
/* Mutex to prevent multiple threads from sending data to the
same machine (which would be inefficient). */
std::timed_mutex sendLock;
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
/* Check that this machine is of the type required by the
step. */
if (!systemTypes.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
return false;
/* Check that the step requires all mandatory features of this
machine. (Thus, a machine with the mandatory "benchmark"
feature will *only* execute steps that require
"benchmark".) The "preferLocalBuild" bit of a step is
mapped to the "local" feature; thus machines that have
"local" as a mandatory feature will only do
preferLocalBuild steps. */
for (auto & f : mandatoryFeatures)
if (!step->requiredSystemFeatures.count(f)
&& !(f == "local" && step->preferLocalBuild))
return false;
/* Check that the machine supports all features required by
the step. */
for (auto & f : step->requiredSystemFeatures)
if (!supportedFeatures.count(f)) return false;
return true;
}
bool isLocalhost()
{
return sshName == "localhost";
}
};
class HydraConfig;
class State
{
private:
std::unique_ptr<HydraConfig> config;
// FIXME: Make configurable.
const unsigned int maxTries = 5;
const unsigned int retryInterval = 60; // seconds
const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;
/* Time in seconds before unsupported build steps are aborted. */
const unsigned int maxUnsupportedTime = 0;
nix::Path hydraData, logDir;
bool useSubstitutes = false;
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
nix::Sync<Builds> builds;
/* The jobsets. */
typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets;
nix::Sync<Jobsets> jobsets;
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<nix::StorePath, Step::wptr> Steps;
nix::Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable;
nix::Sync<Runnable> runnable;
/* CV for waking up the dispatcher. */
nix::Sync<bool> dispatcherWakeup;
std::condition_variable dispatcherWakeupCV;
/* PostgreSQL connection pool. */
nix::Pool<Connection> dbPool;
/* The build machines. */
typedef std::map<std::string, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Various stats. */
time_t startedAt;
counter nrBuildsRead{0};
counter buildReadTimeMs{0};
counter nrBuildsDone{0};
counter nrStepsStarted{0};
counter nrStepsDone{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrStepsWaiting{0};
counter nrUnsupportedSteps{0};
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter dispatchTimeMs{0};
counter bytesSent{0};
counter bytesReceived{0};
counter nrActiveDbUpdates{0};
/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
/* Statistics per machine type for the Hydra auto-scaler. */
struct MachineType
{
unsigned int runnable{0}, running{0};
system_time lastActive;
std::chrono::seconds waitTime; // time runnable steps have been waiting
};
nix::Sync<std::map<std::string, MachineType>> machineTypes;
struct MachineReservation
{
typedef std::shared_ptr<MachineReservation> ptr;
State & state;
Step::ptr step;
Machine::ptr machine;
MachineReservation(State & state, Step::ptr step, Machine::ptr machine);
~MachineReservation();
};
struct ActiveStep
{
Step::ptr step;
struct State
{
pid_t pid = -1;
bool cancelled = false;
};
nix::Sync<State> state_;
};
nix::Sync<std::set<std::shared_ptr<ActiveStep>>> activeSteps_;
std::atomic<time_t> lastDispatcherCheck{0};
std::shared_ptr<nix::Store> localStore;
std::shared_ptr<nix::Store> _destStore;
size_t maxOutputSize;
size_t maxLogSize;
/* Steps that were busy while we encounted a PostgreSQL
error. These need to be cleared at a later time to prevent them
from showing up as busy until the queue runner is restarted. */
nix::Sync<std::set<std::pair<BuildID, int>>> orphanedSteps;
/* How often the build steps of a jobset should be repeated in
order to detect non-determinism. */
std::map<std::pair<std::string, std::string>, unsigned int> jobsetRepeats;
bool uploadLogsToBinaryCache;
/* Where to store GC roots. Defaults to
/nix/var/nix/gcroots/per-user/$USER/hydra-roots, overridable
via gc_roots_dir. */
nix::Path rootsDir;
public:
State();
private:
nix::MaintainCount<counter> startDbUpdate();
/* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore();
void clearBusy(Connection & conn, time_t stopTime);
void parseMachines(const std::string & contents);
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
unsigned int allocBuildStep(pqxx::work & txn, BuildID buildId);
unsigned int createBuildStep(pqxx::work & txn, time_t startTime, BuildID buildId, Step::ptr step,
const std::string & machine, BuildStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
void updateBuildStep(pqxx::work & txn, BuildID buildId, unsigned int stepNr, StepState stepState);
void finishBuildStep(pqxx::work & txn, const RemoteResult & result, BuildID buildId, unsigned int stepNr,
const std::string & machine);
int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const nix::StorePath & drvPath, const std::string & outputName, const nix::StorePath & storePath);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
void queueMonitorLoop();
/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn);
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
const nix::Derivation & drv);
Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished,
bool & quit);
Jobset::ptr createJobset(pqxx::work & txn,
const std::string & projectName, const std::string & jobsetName);
void processJobsetSharesChange(Connection & conn);
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */
void dispatcher();
system_time doDispatch();
void wakeDispatcher();
void abortUnsupported();
void builder(MachineReservation::ptr reservation);
/* Perform the given build step. Return true if the step is to be
retried. */
enum StepResult { sDone, sRetry, sMaybeCancelled };
StepResult doBuildStep(nix::ref<nix::Store> destStore,
MachineReservation::ptr reservation,
std::shared_ptr<ActiveStep> activeStep);
void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout,
unsigned int repeats,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers);
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
bool checkCachedFailure(Step::ptr step, Connection & conn);
void notifyBuildStarted(pqxx::work & txn, BuildID buildId);
void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds);
/* Acquire the global queue runner lock, or null if somebody else
has it. */
std::shared_ptr<nix::PathLocks> acquireGlobalLock();
void dumpStatus(Connection & conn);
void addRoot(const nix::StorePath & storePath);
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};