-
Notifications
You must be signed in to change notification settings - Fork 165
/
Copy pathdpcmanager.go
651 lines (586 loc) · 21.9 KB
/
dpcmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
// Copyright (c) 2022 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0
package dpcmanager
import (
"context"
"fmt"
"net"
"time"
"github.com/eriknordmark/ipinfo"
"github.com/lf-edge/eve/pkg/pillar/base"
"github.com/lf-edge/eve/pkg/pillar/conntester"
"github.com/lf-edge/eve/pkg/pillar/dpcreconciler"
"github.com/lf-edge/eve/pkg/pillar/flextimer"
"github.com/lf-edge/eve/pkg/pillar/netdump"
"github.com/lf-edge/eve/pkg/pillar/netmonitor"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
"github.com/lf-edge/eve/pkg/pillar/types"
"github.com/lf-edge/eve/pkg/pillar/zedcloud"
uuid "github.com/satori/go.uuid"
)
const (
errorTime = 60 * time.Second
warningTime = 40 * time.Second
watchdogPeriod = 25 * time.Second
)
// LastResortKey : key used for DPC used as a last-resort.
const LastResortKey = "lastresort"
// ManualDPCKey : key used for DPC set manually by the user.
const ManualDPCKey = "manual"
var nilUUID = uuid.UUID{} // used as a constant
// DpcManager manages a list of received device port configurations.
// Note that device port configuration (DevicePortConfig struct; abbreviated
// to DPC) represents configuration for all (physical) network interfaces
// to be used for device management or to be shared by applications
// (i.e. excluding NIC pass-through).
// The goal is to select and apply DPC with a working external connectivity,
// so that EVE is able to access the controller, and among the working DPCs
// prefer the one with the highest assigned priority (typically the last received).
// The manager uses ConnectivityTester to probe the connectivity status
// of the currently applied DPC. Based on the probing result, it may keep DPC
// unchanged or it may fallback to a lower-priority but working configuration.
// Whenever there is a higher-priority DPC available, the manager will test
// it periodically and switch to it as soon as the probing succeeds.
// DPC is applied into the device state using DpcReconciler (see pillar/dpcreconciler).
// The reconciler is able to switch from one DPC to another.
// Lastly, NetworkMonitor is used to monitor network stack for interesting
// events, such as link state changes, and to collect state information.
// Manager publishes device network status (DeviceNetworkStatus struct; abbreviated
// to DNS), updated on every state change, including a switch to another DPC.
// DpcManager is a generic state machine, not tied to any particular network stack.
// Instead, the injected components NetworkMonitor, DpcReconciler and ConnTester
// make all the probing, monitoring and network configuration operations.
type DpcManager struct {
Log *base.LogObject
Watchdog Watchdog
AgentName string
// Keep nil values to let DpcManager to use default implementations.
// It is useful to override for unit testing purposes.
GeoService GeolocationService
// Minimum time that should pass after a DPC verification failure
// until the DPC is eligible for another round of verification.
// By default it is 5 minutes.
// It is useful to override for unit testing purposes.
// XXX Should we make this a global config parameter?
DpcMinTimeSinceFailure time.Duration
// NIM components that the manager interacts with
NetworkMonitor netmonitor.NetworkMonitor
DpcReconciler dpcreconciler.DpcReconciler
ConnTester conntester.ConnectivityTester
// Publications
PubDummyDevicePortConfig pubsub.Publication // for logging
PubDevicePortConfigList pubsub.Publication
PubDeviceNetworkStatus pubsub.Publication
// Metrics
ZedcloudMetrics *zedcloud.AgentMetrics
// Current configuration
dpcList types.DevicePortConfigList
adapters types.AssignableAdapters
globalCfg types.ConfigItemValueMap
hasGlobalCfg bool
rsConfig types.RadioSilence
rsStatus types.RadioSilence
enableLastResort bool
devUUID uuid.UUID
flowlogEnabled bool
clusterStatus types.EdgeNodeClusterStatus
// Boot-time configuration
dpclPresentAtBoot bool
// DPC verification
dpcVerify dpcVerify
// Current status
reconcileStatus dpcreconciler.ReconcileStatus
deviceNetStatus types.DeviceNetworkStatus
wwanStatus types.WwanStatus
// Channels
inputCommands chan inputCommand
networkEvents <-chan netmonitor.Event
// Timers
dpcTestTimer *time.Timer
dpcTestBetterTimer *time.Timer
pendingDpcTimer *time.Timer
geoTimer flextimer.FlexTickerHandle
dpcTestDuration time.Duration // Wait for DHCP address
dpcTestInterval time.Duration // Test interval in minutes.
dpcTestBetterInterval time.Duration // Look for lower/better index
geoRedoInterval time.Duration
geoRetryInterval time.Duration
lastPublishedLocInfo types.WwanLocationInfo
// Netdump
netDumper *netdump.NetDumper // nil if netdump is disabled
netdumpInterval time.Duration
lastNetdumpPub time.Time // last call to publishNetdump
startTime time.Time
}
// Watchdog : methods used by DpcManager to interact with Watchdog.
type Watchdog interface {
// RegisterFileWatchdog tells the watchdog about the touch file.
RegisterFileWatchdog(agentName string)
// StillRunning touches a file per agentName to signal the event loop is still running
// Those files are observed by the watchdog
StillRunning(agentName string, warnTime, errTime time.Duration)
// CheckMaxTimeTopic verifies if the time for a call has exceeded a reasonable number.
CheckMaxTimeTopic(agentName, topic string, start time.Time,
warnTime, errTime time.Duration)
}
// WwanEvent is sent by WwanWatcher whenever there is new output coming
// from wwan microservice.
type WwanEvent uint8
const (
// WwanEventUndefined : undefined event, will be ignored.
WwanEventUndefined WwanEvent = iota
// WwanEventNewStatus : new wwan status data are available,
// reload with WwanWatcher.LoadStatus()
WwanEventNewStatus
// WwanEventNewMetrics : new wwan metrics are available,
// reload with WwanWatcher.LoadMetrics()
WwanEventNewMetrics
// WwanEventNewLocationInfo : new location info published by wwan microservice,
// reload with WwanWatcher.LoadLocationInfo()
WwanEventNewLocationInfo
)
// WwanWatcher allows to watch for output coming from wwan microservice.
// wwan microservice is a shell script and uses files for input/output
// instead of pubsub.
type WwanWatcher interface {
Watch(ctx context.Context) (<-chan WwanEvent, error)
LoadStatus() (types.WwanStatus, error)
LoadMetrics() (types.WwanMetrics, error)
LoadLocationInfo() (types.WwanLocationInfo, error)
}
// GeolocationService allows to obtain geolocation information based
// on assigned IP address.
type GeolocationService interface {
// GetGeolocationInfo tries to obtain geolocation information
// corresponding to the given IP address.
GetGeolocationInfo(ipAddr net.IP) (*ipinfo.IPInfo, error)
}
type command uint8
const (
commandUndefined command = iota
commandAddDPC
commandDelDPC
commandUpdateGCP
commandUpdateAA
commandUpdateRS
commandUpdateDevUUID
commandProcessWwanStatus
commandUpdateFlowlogState
commandUpdateClusterStatus
)
type inputCommand struct {
cmd command
dpc types.DevicePortConfig // for commandAddDPC and commandDelDPC
gcp types.ConfigItemValueMap // for commandUpdateGCP
aa types.AssignableAdapters // for commandUpdateAA
rs types.RadioSilence // for commandUpdateRS
devUUID uuid.UUID // for commandUpdateDevUUID
wwanStatus types.WwanStatus // for commandProcessWwanStatus
flowlogEnabled bool // for commandUpdateFlowlogState
clusterStatus types.EdgeNodeClusterStatus // for commandUpdateClusterStatus
}
type dpcVerify struct {
inProgress bool
startedAt time.Time
cloudConnWorks bool
crucialIfs map[string]netmonitor.IfAttrs // key = ifName, change triggers restartVerify
}
// Init DpcManager
func (m *DpcManager) Init(ctx context.Context) error {
m.dpcVerify.crucialIfs = make(map[string]netmonitor.IfAttrs)
m.inputCommands = make(chan inputCommand, 10)
if m.GeoService == nil {
m.GeoService = &geoService{}
}
if m.DpcMinTimeSinceFailure == 0 {
m.DpcMinTimeSinceFailure = 5 * time.Minute
}
m.dpcList.CurrentIndex = -1
// We start assuming cloud connectivity works
m.dpcVerify.cloudConnWorks = true
// Keep timers inactive until we receive GCP.
m.dpcTestTimer = &time.Timer{}
m.dpcTestBetterTimer = &time.Timer{}
m.pendingDpcTimer = &time.Timer{}
m.geoTimer = flextimer.FlexTickerHandle{}
// Ingest persisted list of DPCs. ingestDPCList will return false
// to indicate the file is missing in /persist
m.dpclPresentAtBoot = m.ingestDPCList()
return nil
}
// Run DpcManager as a separate task with its own loop and a watchdog file.
func (m *DpcManager) Run(ctx context.Context) (err error) {
m.startTime = time.Now()
m.networkEvents = m.NetworkMonitor.WatchEvents(ctx, "dpc-reconciler")
go m.run(ctx)
return nil
}
func (m *DpcManager) run(ctx context.Context) {
wdName := m.AgentName + "-DpcManager"
stillRunning := time.NewTicker(watchdogPeriod)
m.Watchdog.StillRunning(wdName, warningTime, errorTime)
m.Watchdog.RegisterFileWatchdog(wdName)
// Run initial reconciliation.
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
for {
select {
case inputCmd := <-m.inputCommands:
switch inputCmd.cmd {
case commandUndefined:
m.Log.Warn("DpcManager: Received undefined command")
case commandAddDPC:
m.doAddDPC(ctx, inputCmd.dpc)
case commandDelDPC:
m.doDelDPC(ctx, inputCmd.dpc)
case commandUpdateGCP:
m.doUpdateGCP(ctx, inputCmd.gcp)
case commandUpdateAA:
m.doUpdateAA(ctx, inputCmd.aa)
case commandUpdateRS:
m.doUpdateRadioSilence(ctx, inputCmd.rs)
case commandUpdateDevUUID:
m.doUpdateDevUUID(ctx, inputCmd.devUUID)
case commandProcessWwanStatus:
m.processWwanStatus(ctx, inputCmd.wwanStatus)
case commandUpdateFlowlogState:
m.doUpdateFlowlogState(ctx, inputCmd.flowlogEnabled)
case commandUpdateClusterStatus:
m.doUpdateClusterStatus(ctx, inputCmd.clusterStatus)
}
m.resumeVerifyIfAsyncDone(ctx)
case <-m.reconcileStatus.ResumeReconcile:
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
m.resumeVerifyIfAsyncDone(ctx)
case _, ok := <-m.dpcTestTimer.C:
start := time.Now()
if !ok {
m.Log.Noticef("DPC test timer stopped?")
} else if m.dpcList.CurrentIndex == -1 {
m.Log.Tracef("Starting looking for working Device connectivity to cloud")
m.restartVerify(ctx, "looking for working DPC")
m.Log.Noticef("Looking for working done at index %d. Took %v",
m.dpcList.CurrentIndex, time.Since(start))
} else {
m.Log.Tracef("Starting test of Device connectivity to cloud")
err := m.testConnectivityToCloud(ctx)
if err == nil {
m.Log.Tracef("Device connectivity to cloud worked. Took %v",
time.Since(start))
} else {
m.Log.Noticef("Device connectivity to cloud failed (%v). Took %v",
err, time.Since(start))
}
}
m.Watchdog.CheckMaxTimeTopic(m.AgentName, "TestTimer", start,
warningTime, errorTime)
case _, ok := <-m.dpcTestBetterTimer.C:
start := time.Now()
if !ok {
m.Log.Noticef("DPC testBetterTimer stopped?")
} else if m.dpcList.CurrentIndex == 0 && !m.deviceNetStatus.HasErrors() {
m.Log.Tracef("DPC testBetterTimer at zero ignored")
} else {
m.Log.Noticef("Network testBetterTimer at index %d",
m.dpcList.CurrentIndex)
m.restartVerify(ctx, "looking for better DPC")
m.Log.Noticef("Network testBetterTimer done at index %d. Took %v",
m.dpcList.CurrentIndex, time.Since(start))
}
m.Watchdog.CheckMaxTimeTopic(m.AgentName, "TestBetterTimer", start,
warningTime, errorTime)
case _, ok := <-m.pendingDpcTimer.C:
start := time.Now()
if !ok {
m.Log.Noticef("Device port test timer stopped?")
} else {
m.Log.Trace("PendTimer at", start)
m.runVerify(ctx, "PendTimer fired")
}
m.Watchdog.CheckMaxTimeTopic(m.AgentName, "PendTimer", start,
warningTime, errorTime)
case <-m.geoTimer.C:
start := time.Now()
m.Log.Trace("GeoTimer at", start)
m.updateGeo()
m.Watchdog.CheckMaxTimeTopic(m.AgentName, "geoTimer", start,
warningTime, errorTime)
case event := <-m.networkEvents:
switch ev := event.(type) {
case netmonitor.IfChange:
ifName := ev.Attrs.IfName
if !m.adapters.Initialized {
continue
}
if !m.isInterfaceCrucial(ifName) {
delete(m.dpcVerify.crucialIfs, ifName)
continue
}
m.Log.Noticef("Crucial port %s changed", ifName)
newAttrs := ev.Attrs
m.dpcVerify.crucialIfs[ifName] = newAttrs
prevAttrs, known := m.dpcVerify.crucialIfs[ifName]
if !known ||
prevAttrs.AdminUp != newAttrs.AdminUp ||
prevAttrs.LowerUp != newAttrs.LowerUp ||
prevAttrs.Enslaved != newAttrs.Enslaved ||
prevAttrs.IfIndex != newAttrs.IfIndex {
m.Log.Noticef("Restarting network connectivity verification "+
"because port %s is crucial to network configuration", ifName)
reasonForVerify := fmt.Sprintf("crucial interface %s changed", ifName)
m.restartVerify(ctx, reasonForVerify)
m.updateDNS()
}
case netmonitor.AddrChange:
ifAttrs, err := m.NetworkMonitor.GetInterfaceAttrs(ev.IfIndex)
if err != nil {
m.Log.Warnf("Failed to get attributes for ifIndex %d: %v",
ev.IfIndex, err)
continue
}
if m.isInterfaceCrucial(ifAttrs.IfName) {
if dpc := m.currentDPC(); dpc != nil {
reasonForVerify := "IP address change for interface " + ifAttrs.IfName
switch dpc.State {
case types.DPCStateIPDNSWait,
types.DPCStatePCIWait,
types.DPCStateIntfWait,
types.DPCStateWwanWait:
// Note that DPCStatePCIWait and DPCStateIntfWait can be returned
// also in scenarios where some ports are in PCIBack while others
// are waiting for IP addresses.
// For the sake of those not in PCIBack it makes sense to retest DPC.
m.runVerify(ctx, reasonForVerify)
case types.DPCStateFail:
m.restartVerify(ctx, reasonForVerify)
}
}
}
m.updateDNS()
case netmonitor.DNSInfoChange:
m.updateDNS()
}
case <-ctx.Done():
return
case <-stillRunning.C:
}
m.Watchdog.StillRunning(wdName, warningTime, errorTime)
}
}
func (m *DpcManager) reconcilerArgs() dpcreconciler.Args {
args := dpcreconciler.Args{
GCP: m.globalCfg,
AA: m.adapters,
RS: m.rsConfig,
FlowlogEnabled: m.flowlogEnabled,
ClusterStatus: m.clusterStatus,
}
if m.currentDPC() != nil {
args.DPC = *m.currentDPC()
}
return args
}
// AddDPC : add a new DPC into the list of configurations to work with.
// It will be added into the list at a position determined by the TimePriority
// attribute. The higher the timestamp is, the higher the priority is.
func (m *DpcManager) AddDPC(dpc types.DevicePortConfig) {
m.inputCommands <- inputCommand{
cmd: commandAddDPC,
dpc: dpc,
}
}
// DelDPC : remove DPC from the list of configurations to work with.
func (m *DpcManager) DelDPC(dpc types.DevicePortConfig) {
m.inputCommands <- inputCommand{
cmd: commandDelDPC,
dpc: dpc,
}
}
// UpdateGCP : apply an updated set of global configuration properties.
// These properties decides for example how often to probe connectivity
// status, whether to allow SSH access, etc.
func (m *DpcManager) UpdateGCP(gcp types.ConfigItemValueMap) {
m.inputCommands <- inputCommand{
cmd: commandUpdateGCP,
gcp: gcp,
}
}
// UpdateAA : apply an updated set of assignable adapters. This list
// contains low-level information about all the physical adapters,
// such as their names in the kernel, PCI addresses, etc.
func (m *DpcManager) UpdateAA(aa types.AssignableAdapters) {
m.inputCommands <- inputCommand{
cmd: commandUpdateAA,
aa: aa,
}
}
// UpdateRadioSilence : apply an update radio silence configuration.
// When radio silence is set to ON, all wireless ports should be configured
// with radio transmission disabled.
func (m *DpcManager) UpdateRadioSilence(rs types.RadioSilence) {
m.inputCommands <- inputCommand{
cmd: commandUpdateRS,
rs: rs,
}
}
// UpdateDevUUID : apply an update of the UUID assigned to the device by the controller.
func (m *DpcManager) UpdateDevUUID(devUUID uuid.UUID) {
m.inputCommands <- inputCommand{
cmd: commandUpdateDevUUID,
devUUID: devUUID,
}
}
// ProcessWwanStatus : process an update of cellular connectivity status.
func (m *DpcManager) ProcessWwanStatus(wwanStatus types.WwanStatus) {
m.inputCommands <- inputCommand{
cmd: commandProcessWwanStatus,
wwanStatus: wwanStatus,
}
}
// UpdateFlowlogState : handle flow logging being turned on/off.
func (m *DpcManager) UpdateFlowlogState(flowlogEnabled bool) {
m.inputCommands <- inputCommand{
cmd: commandUpdateFlowlogState,
flowlogEnabled: flowlogEnabled,
}
}
// UpdateClusterStatus : apply an updated cluster status.
func (m *DpcManager) UpdateClusterStatus(status types.EdgeNodeClusterStatus) {
m.inputCommands <- inputCommand{
cmd: commandUpdateClusterStatus,
clusterStatus: status,
}
}
// GetDNS returns device network state information.
func (m *DpcManager) GetDNS() types.DeviceNetworkStatus {
return m.deviceNetStatus
}
func (m *DpcManager) doUpdateGCP(ctx context.Context, gcp types.ConfigItemValueMap) {
firstGCP := !m.hasGlobalCfg
m.globalCfg = gcp
m.hasGlobalCfg = true
testInterval := time.Second *
time.Duration(m.globalCfg.GlobalValueInt(types.NetworkTestInterval))
testBetterInterval := time.Second *
time.Duration(m.globalCfg.GlobalValueInt(types.NetworkTestBetterInterval))
testDuration := time.Second *
time.Duration(m.globalCfg.GlobalValueInt(types.NetworkTestDuration))
// We refresh the gelocation information when the underlay aka public
// IP address(es) change, plus periodically with this interval.
geoRedoInterval := time.Second *
time.Duration(m.globalCfg.GlobalValueInt(types.NetworkGeoRedoTime))
// Interval for Geo retries after failure etc. Should be less than geoRedoInterval.
geoRetryInterval := time.Second *
time.Duration(m.globalCfg.GlobalValueInt(types.NetworkGeoRetryTime))
fallbackAnyEth := m.globalCfg.GlobalValueTriState(types.NetworkFallbackAnyEth)
m.enableLastResort = fallbackAnyEth == types.TS_ENABLED
if m.dpcTestInterval != testInterval {
if testInterval == 0 {
m.Log.Warn("NOT running TestTimer")
m.dpcTestTimer = &time.Timer{}
} else {
m.Log.Functionf("Starting TestTimer: %v", testInterval)
m.dpcTestTimer = time.NewTimer(testInterval)
}
m.dpcTestInterval = testInterval
}
if m.dpcTestBetterInterval != testBetterInterval {
if testBetterInterval == 0 {
m.Log.Warn("NOT running TestBetterTimer")
m.dpcTestBetterTimer = &time.Timer{}
} else {
m.Log.Functionf("Starting TestBetterTimer: %v", testBetterInterval)
m.dpcTestBetterTimer = time.NewTimer(testBetterInterval)
}
m.dpcTestBetterInterval = testBetterInterval
}
if m.dpcTestDuration != testDuration {
if testDuration == 0 {
m.Log.Warn("NOT running PendingTimer")
}
m.dpcTestDuration = testDuration
}
if m.geoRetryInterval != geoRetryInterval {
if geoRetryInterval == 0 {
m.Log.Warn("NOT running GeoTimer")
m.geoTimer = flextimer.FlexTickerHandle{}
} else {
m.Log.Functionf("Starting GeoTimer: %v", geoRetryInterval)
geoMax := float64(geoRetryInterval)
geoMin := geoMax * 0.3
m.geoTimer = flextimer.NewRangeTicker(time.Duration(geoMin),
time.Duration(geoMax))
}
m.geoRetryInterval = geoRetryInterval
}
m.geoRedoInterval = geoRedoInterval
m.reinitNetdumper()
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
// If we have persisted DPCs then go ahead and pick a working one
// with the highest priority, do not wait for dpcTestTimer to fire.
if firstGCP && m.currentDPC() == nil && len(m.dpcList.PortConfigList) > 0 {
m.restartVerify(ctx, "looking for working (persisted) DPC")
}
}
func (m *DpcManager) doUpdateAA(ctx context.Context, adapters types.AssignableAdapters) {
m.adapters = adapters
// In case a verification is in progress and is waiting for return from pciback
if dpc := m.currentDPC(); dpc != nil {
if dpc.State == types.DPCStatePCIWait || dpc.State == types.DPCStateIntfWait {
m.runVerify(ctx, "assignable adapters were updated")
}
}
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
}
func (m *DpcManager) resumeVerifyIfAsyncDone(ctx context.Context) {
if dpc := m.currentDPC(); dpc != nil {
asyncInProgress := m.reconcileStatus.AsyncInProgress
if dpc.State == types.DPCStateAsyncWait && !asyncInProgress {
// Config is ready, continue verification.
m.runVerify(ctx, "async ops no longer in progress")
}
}
}
func (m *DpcManager) doUpdateDevUUID(ctx context.Context, newUUID uuid.UUID) {
m.devUUID = newUUID
// Netdumper uses different publish period after onboarding.
m.reinitNetdumper()
}
func (m *DpcManager) reinitNetdumper() {
gcp := m.globalCfg
netDumper := m.netDumper
netdumpEnabled := gcp.GlobalValueBool(types.NetDumpEnable)
if netdumpEnabled {
if netDumper == nil {
netDumper = &netdump.NetDumper{}
// Determine when was the last time DPCManager published anything.
var err error
m.lastNetdumpPub, err = netDumper.LastPublishAt(
m.netDumpOKTopic(), m.netDumpFailTopic())
if err != nil {
m.Log.Warn(err)
}
}
isOnboarded := m.devUUID != nilUUID
if isOnboarded {
m.netdumpInterval = time.Second *
time.Duration(gcp.GlobalValueInt(types.NetDumpTopicPostOnboardInterval))
} else {
m.netdumpInterval = time.Second *
time.Duration(gcp.GlobalValueInt(types.NetDumpTopicPreOnboardInterval))
}
maxCount := gcp.GlobalValueInt(types.NetDumpTopicMaxCount)
netDumper.MaxDumpsPerTopic = int(maxCount)
} else {
netDumper = nil
}
m.netDumper = netDumper
}
func (m *DpcManager) doUpdateFlowlogState(ctx context.Context, flowlogEnabled bool) {
m.flowlogEnabled = flowlogEnabled
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
}
func (m *DpcManager) doUpdateClusterStatus(ctx context.Context,
status types.EdgeNodeClusterStatus) {
m.clusterStatus = status
m.reconcileStatus = m.DpcReconciler.Reconcile(ctx, m.reconcilerArgs())
}