-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathdtcontext.go
152 lines (136 loc) · 3.77 KB
/
dtcontext.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package dtcontext
import (
"context"
"errors"
"strings"
"sync"
"time"
"k8s.io/klog/v2"
"github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/edge/pkg/common/modules"
deviceconfig "github.com/kubeedge/kubeedge/edge/pkg/devicetwin/config"
"github.com/kubeedge/kubeedge/edge/pkg/devicetwin/dtcommon"
"github.com/kubeedge/kubeedge/edge/pkg/devicetwin/dttype"
)
// DTContext context for devicetwin
type DTContext struct {
GroupID string
NodeName string
CommChan map[string]chan interface{}
ConfirmChan chan interface{}
ConfirmMap *sync.Map
ModulesHealth *sync.Map
ModulesContext *context.Context
DeviceList *sync.Map
DeviceMutex *sync.Map
Mutex *sync.RWMutex
// DBConn *dtclient.Conn
State string
}
// InitDTContext init dtcontext
func InitDTContext() (*DTContext, error) {
return &DTContext{
GroupID: "",
NodeName: deviceconfig.Get().NodeName,
CommChan: make(map[string]chan interface{}),
ConfirmChan: make(chan interface{}, 1000),
ConfirmMap: &sync.Map{},
ModulesHealth: &sync.Map{},
DeviceList: &sync.Map{},
DeviceMutex: &sync.Map{},
Mutex: &sync.RWMutex{},
State: dtcommon.Disconnected,
}, nil
}
// CommTo communicate
func (dtc *DTContext) CommTo(dtmName string, content interface{}) error {
if v, exist := dtc.CommChan[dtmName]; exist {
v <- content
return nil
}
return errors.New("Not found chan to communicate")
}
// HeartBeat heartbeat to dtcontroller
func (dtc *DTContext) HeartBeat(dtmName string, content interface{}) error {
if strings.Compare(content.(string), "ping") == 0 {
dtc.ModulesHealth.Store(dtmName, time.Now().Unix())
klog.V(3).Infof("%s is healthy %v", dtmName, time.Now().Unix())
} else if strings.Compare(content.(string), "stop") == 0 {
klog.Infof("%s stop", dtmName)
return errors.New("stop")
}
return nil
}
// GetMutex get mutex
func (dtc *DTContext) GetMutex(deviceID string) (*sync.Mutex, bool) {
v, mutexExist := dtc.DeviceMutex.Load(deviceID)
if !mutexExist {
klog.Errorf("GetMutex device %s not exist", deviceID)
return nil, false
}
mutex, isMutex := v.(*sync.Mutex)
if isMutex {
return mutex, true
}
return nil, false
}
// Lock get the lock of the device
func (dtc *DTContext) Lock(deviceID string) bool {
deviceMutex, ok := dtc.GetMutex(deviceID)
if ok {
dtc.Mutex.RLock()
deviceMutex.Lock()
return true
}
return false
}
// Unlock remove the lock of the device
func (dtc *DTContext) Unlock(deviceID string) bool {
deviceMutex, ok := dtc.GetMutex(deviceID)
if ok {
deviceMutex.Unlock()
dtc.Mutex.RUnlock()
return true
}
return false
}
// LockAll get all lock
func (dtc *DTContext) LockAll() {
dtc.Mutex.Lock()
}
// UnlockAll get all lock
func (dtc *DTContext) UnlockAll() {
dtc.Mutex.Unlock()
}
// IsDeviceExist judge device is exist
func (dtc *DTContext) IsDeviceExist(deviceID string) bool {
_, ok := dtc.DeviceList.Load(deviceID)
return ok
}
// GetDevice get device
func (dtc *DTContext) GetDevice(deviceID string) (*dttype.Device, bool) {
d, ok := dtc.DeviceList.Load(deviceID)
if ok {
if device, isDevice := d.(*dttype.Device); isDevice {
return device, true
}
return nil, false
}
return nil, false
}
// Send send result
func (dtc *DTContext) Send(identity string, action string, module string, msg *model.Message) error {
dtMsg := &dttype.DTMessage{
Action: action,
Identity: identity,
Type: module,
Msg: msg}
return dtc.CommTo(module, dtMsg)
}
// BuildModelMessage build mode messages
func (dtc *DTContext) BuildModelMessage(group string, parentID string, resource string, operation string, content interface{}) *model.Message {
msg := model.NewMessage(parentID)
msg.BuildRouter(modules.TwinGroup, group, resource, operation)
msg.Content = content
return msg
}