-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathpatchenvelopes.go
354 lines (304 loc) · 12 KB
/
patchenvelopes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
// Copyright (c) 2023 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0
package msrv
import (
"encoding/json"
"fmt"
"sync"
uuid "github.com/satori/go.uuid"
"github.com/lf-edge/eve/pkg/pillar/base"
"github.com/lf-edge/eve/pkg/pillar/pubsub"
"github.com/lf-edge/eve/pkg/pillar/types"
"github.com/lf-edge/eve/pkg/pillar/utils/generics"
)
// PatchEnvelopeURLPath is route used for patch envelopes
// it is used in URL composing of patch envelopes
const PatchEnvelopeURLPath = "/eve/v1/patch/"
// PatchEnvelopes is a structure representing
// Patch Envelopes exposed to App instances via metadata server
// for more info check docs/PATCH-ENVELOPES.md
// Must be created by calling NewPatchEnvelopes()
//
// Internally, PatchEnvelopes structure stores envelopes which
// come from EdgeDevConfig parsed by zedagent. This envelopes contains
// both inline binary artifacts which are ready to be downloaded by app instances
// and volume references, which are handled by volumemgr.
// So PatchEnvelopes struct has completedVolumes and contentTreeStatus to store
// information of all volumes and contentTree handled by volumemgr to link them with
// patch envelope volume references. ContentTreeStatus is used to retrieve SHA of underlying
// file.
// App instances are accessing PatchEnvelopes via metadata server handlers, which is calling
// PatchEnvelopes.Get() method to get list of available PatchEnvelopeInfo
// to certain App Instance which are stored in currentState.
// PatchEnvelopes also hasupdateStateNotificationCh channel
// to receive notification about the need of updating specified PatchEnvelopes.
// updateStateNotificationCh has length of 1, so update queue will never pile up.
// When updating state, we iterate through all envelopes, and remove envelopes which
// are marked in envelopesToDelete boolean map (set).
// NewPatchEnvelopes() starts goroutine processStateUpdate() which reads from the channel and updates
// currentState to desired one. In addition, this goroutine publishes status for every PatchEnvelope
// via pubsub. Note that PatchEnvelopes does not create PubSub, rather used one provided to NewPatchEnvelopes()
// So it does not have a agentName, but could easily be split into one if needed
// This way handlers can do work of determining which patch envelopes actually need change (if any)
// and send back in go routine rest of the update including slow work.
// Note that this channels are only accessible from the outside by calling a function which returns
// write-only channel, meaning that updateStateNotificationCh should not be
// read from anywhere except processStateUpdate() so that there could not be any deadlock.
type PatchEnvelopes struct {
sync.RWMutex
updateStateNotificationCh chan struct{}
envelopesToDelete *generics.LockedMap[uuid.UUID, bool]
currentState *generics.LockedMap[uuid.UUID, types.PatchEnvelopeInfo]
envelopes *generics.LockedMap[uuid.UUID, types.PatchEnvelopeInfo]
completedVolumes *generics.LockedMap[uuid.UUID, types.VolumeStatus]
contentTreeStatus *generics.LockedMap[uuid.UUID, types.ContentTreeStatus]
pubSub *pubsub.PubSub
log *base.LogObject
pubPatchEnvelopeState pubsub.Publication
}
// UpdateStateNotificationCh return update channel to send notifications to update currentState
func (pes *PatchEnvelopes) UpdateStateNotificationCh() chan<- struct{} {
return pes.updateStateNotificationCh
}
// NewPatchEnvelopes returns PatchEnvelopes structure and starts goroutine
// to process notifications from channel. Note that we create buffered channel
// to avoid unbounded processing time in writing to channel
func NewPatchEnvelopes(log *base.LogObject, ps *pubsub.PubSub) *PatchEnvelopes {
pe := &PatchEnvelopes{
updateStateNotificationCh: make(chan struct{}, 1),
envelopesToDelete: generics.NewLockedMap[uuid.UUID, bool](),
currentState: generics.NewLockedMap[uuid.UUID, types.PatchEnvelopeInfo](),
envelopes: generics.NewLockedMap[uuid.UUID, types.PatchEnvelopeInfo](),
completedVolumes: generics.NewLockedMap[uuid.UUID, types.VolumeStatus](),
contentTreeStatus: generics.NewLockedMap[uuid.UUID, types.ContentTreeStatus](),
log: log,
pubSub: ps,
}
var err error
pe.pubPatchEnvelopeState, err = pe.pubSub.NewPublication(pubsub.PublicationOptions{
AgentName: agentName,
TopicType: types.PatchEnvelopeInfo{},
})
if err != nil {
return nil
}
go pe.processStateUpdate()
return pe
}
func (pes *PatchEnvelopes) processStateUpdate() {
for {
select {
case <-pes.updateStateNotificationCh:
pes.updateState()
}
}
}
func (pes *PatchEnvelopes) updateState() {
pes.Lock()
defer pes.Unlock()
keys := pes.envelopesToDelete.Keys()
for _, k := range keys {
if toDelete, _ := pes.envelopesToDelete.Load(k); toDelete {
if peInfo, ok := pes.currentState.Load(k); ok {
pes.unpublishPatchEnvelopeInfo(&peInfo)
}
pes.currentState.Delete(k)
pes.envelopesToDelete.Store(k, false)
}
}
keys = pes.envelopes.Keys()
for _, peUUID := range keys {
if pe, ok := pes.envelopes.Load(peUUID); ok {
peState := types.PatchEnvelopeStateActive
for _, volRef := range pe.VolumeRefs {
if blob, blobState := pes.blobFromVolumeRef(volRef); blob != nil {
if blobState < peState {
peState = blobState
}
if idx := types.CompletedBinaryBlobIdxByName(pe.BinaryBlobs, blob.FileName); idx != -1 {
pe.BinaryBlobs[idx] = *blob
} else {
pe.BinaryBlobs = append(pe.BinaryBlobs, *blob)
}
}
}
// If controller forces us to store patch envelope and don't expose it
// to appInstance we keep it that way
if pe.State == types.PatchEnvelopeStateReady && peState == types.PatchEnvelopeStateActive {
peState = types.PatchEnvelopeStateReady
}
if len(pe.Errors) > 0 {
peState = types.PatchEnvelopeStateError
pes.log.Errorf("Errors: %v", pe.Errors)
}
pe.State = peState
pes.currentState.Store(peUUID, pe)
pes.publishPatchEnvelopeInfo(&pe)
} else {
pes.log.Errorf("No entry in envelopes for %v to fetch", peUUID)
}
}
}
func (pes *PatchEnvelopes) publishPatchEnvelopeInfo(peInfo *types.PatchEnvelopeInfo) {
if peInfo == nil {
pes.log.Errorf("publishPatchEnvelopeInfo: nil peInfo")
}
key := peInfo.Key()
pub := pes.pubPatchEnvelopeState
err := pub.Publish(key, *peInfo)
if err != nil {
pes.log.Errorf("publishPatchEnvelopeInfo failed: %v", err)
}
}
func (pes *PatchEnvelopes) unpublishPatchEnvelopeInfo(peInfo *types.PatchEnvelopeInfo) {
if peInfo == nil {
pes.log.Errorf("unpublishPatchEnvelopeInfo: nil peInfo")
return
}
key := peInfo.Key()
pub := pes.pubPatchEnvelopeState
if exists, _ := pub.Get(key); exists == nil {
pes.log.Errorf("unpublishPatchEnvelopeInfo: key %s not found", key)
return
}
if err := pub.Unpublish(key); err != nil {
pes.log.Errorf("unpublishPatchEnvelopeInfo failed: %v", err)
}
}
// Get returns list of Patch Envelopes available for this app instance
func (pes *PatchEnvelopes) Get(appUUID string) types.PatchEnvelopeInfoList {
var res []types.PatchEnvelopeInfo
pes.currentState.Range(func(patchEnvelopeUUID uuid.UUID, envelope types.PatchEnvelopeInfo) bool {
// We don't want to expose patch envelopes which are not activated to app instance
if envelope.State != types.PatchEnvelopeStateActive {
return true
}
for _, allowedUUID := range envelope.AllowedApps {
if allowedUUID == appUUID {
res = append(res, envelope)
break
}
}
return true
})
return types.PatchEnvelopeInfoList{
Envelopes: res,
}
}
func (pes *PatchEnvelopes) blobFromVolumeRef(vr types.BinaryBlobVolumeRef) (*types.BinaryBlobCompleted, types.PatchEnvelopeState) {
volUUID, err := uuid.FromString(vr.ImageID)
if err != nil {
pes.log.Errorf("Failed to compose volUUID from string %v", err)
return nil, types.PatchEnvelopeStateError
}
state := types.PatchEnvelopeStateRecieved
if vs, hasVs := pes.completedVolumes.Load(volUUID); hasVs {
state = types.PatchEnvelopeStateDownloading
result := &types.BinaryBlobCompleted{
FileName: vr.FileName,
FileMetadata: vr.FileMetadata,
ArtifactMetadata: vr.ArtifactMetadata,
URL: vs.FileLocation,
Size: vs.TotalSize,
}
if ct, hasCt := pes.contentTreeStatus.Load(vs.ContentID); hasCt {
state = types.PatchEnvelopeStateActive
result.FileSha = ct.ContentSha256
}
return result, state
}
return nil, state
}
// UpdateVolumeStatus adds or removes VolumeStatus from PatchEnvelopes structure
func (pes *PatchEnvelopes) UpdateVolumeStatus(vs types.VolumeStatus, deleteVolume bool) {
if deleteVolume {
pes.completedVolumes.Delete(vs.VolumeID)
} else {
if vs.State < types.CREATED_VOLUME {
return
}
pes.completedVolumes.Store(vs.VolumeID, vs)
}
}
// UpdateEnvelopes sets pes.envelopes and marks envelopes that are not
// present in new peInfo as ones to be deleted and updates the rest of them
// all of the updates will happen after notification to updateStateNotificationCh
// will be sent
func (pes *PatchEnvelopes) UpdateEnvelopes(peInfo []types.PatchEnvelopeInfo) {
pes.RLock()
defer pes.RUnlock()
before := pes.envelopes.Keys()
envelopes := generics.NewLockedMap[uuid.UUID, types.PatchEnvelopeInfo]()
for _, pe := range peInfo {
peUUID, err := uuid.FromString(pe.PatchID)
if err != nil {
pes.log.Errorf("Failed to Update Envelopes :%v", err)
}
envelopes.Store(peUUID, pe)
}
toDelete, _ := generics.DiffSets(before, envelopes.Keys())
for _, deleteUUID := range toDelete {
pes.envelopesToDelete.Store(deleteUUID, true)
}
pes.envelopes = envelopes
}
// UpdateContentTree adds or removes ContentTreeStatus from PatchEnvelopes structure
// marks PatchEnvelopes which will require update. Update will happen explicitly
// after sending notification to updateStateNotificationCh
func (pes *PatchEnvelopes) UpdateContentTree(ct types.ContentTreeStatus, deleteCt bool) {
if deleteCt {
pes.contentTreeStatus.Delete(ct.ContentID)
} else {
pes.contentTreeStatus.Store(ct.ContentID, ct)
}
}
// EnvelopesInUsage returns list of currently patch envelopes currently attached to
// app instances
func (pes *PatchEnvelopes) EnvelopesInUsage() []string {
var result []string
pes.envelopes.Range(func(_ uuid.UUID, peInfo types.PatchEnvelopeInfo) bool {
peUsages := types.PatchEnvelopeUsageFromInfo(peInfo)
for _, usage := range peUsages {
result = append(result, usage.Key())
}
return true
})
return result
}
// PeInfoToDisplay is used together with patchEnvelopesJSONFOrAppInstance to marshal
// marshal PatchEnvelopeInfoList in a format, which is suitable for app instance.
// Also, PatchEnvelopeInfo contains fields that we don't want to expose to app instance (like AllowedApps)
// We cannot use json:"-" structure tag to omit AllowedApps from json marshaling since we use PatchEnvelopeInfo between
// zedagent and zedrouter to communicate new PatchEnvelopes from EdgeDevConfig. This communication is done via pubSub,
// which uses json marshaling to communicate structures between processes. And using json:"-" will make AllowedApps "magically"
// disappear on zedrouter
type PeInfoToDisplay struct {
PatchID string
Version string
BinaryBlobs []types.BinaryBlobCompleted
VolumeRefs []types.BinaryBlobVolumeRef
}
// patchEnvelopesJSONForAppInstance returns json representation
// of Patch Envelopes list which are shown to app instances
func patchEnvelopesJSONForAppInstance(pe types.PatchEnvelopeInfoList) ([]byte, error) {
toDisplay := make([]PeInfoToDisplay, len(pe.Envelopes))
for i, envelope := range pe.Envelopes {
var binaryBlobs []types.BinaryBlobCompleted
binaryBlobs = nil
if envelope.BinaryBlobs != nil {
binaryBlobs = make([]types.BinaryBlobCompleted, len(envelope.BinaryBlobs))
copy(binaryBlobs, envelope.BinaryBlobs)
}
for j := range binaryBlobs {
url := fmt.Sprintf("http://%s%sdownload/%s/%s", MetaDataServerIP, PatchEnvelopeURLPath, envelope.PatchID, binaryBlobs[j].FileName)
binaryBlobs[j].URL = url
}
toDisplay[i] = PeInfoToDisplay{
PatchID: envelope.PatchID,
Version: envelope.Version,
BinaryBlobs: binaryBlobs,
VolumeRefs: envelope.VolumeRefs,
}
}
return json.Marshal(toDisplay)
}