-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstateManager.go
151 lines (121 loc) · 3.03 KB
/
stateManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package main
import (
"fmt"
"sync"
)
type State struct {
Counter int
HasToken bool
}
type Snapshot struct {
ID int
State State
Complete bool
BlockedQueues int
Queues map[int]*Queue
}
type StateManager struct {
currentState State
snapshots map[int]*Snapshot
peerManager *PeerManager
mu sync.RWMutex
}
func NewStateManager(peerManager *PeerManager) *StateManager {
return &StateManager{
currentState: State{Counter: 0},
snapshots: make(map[int]*Snapshot),
peerManager: peerManager,
}
}
func (sm *StateManager) IncrementCounter() {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.currentState.Counter++
}
func (sm *StateManager) SetHasToken(value bool) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.currentState.HasToken = value
}
func (sm *StateManager) GetHasToken() bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.currentState.HasToken
}
func (sm *StateManager) GetCurrentState() State {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.currentState
}
func (sm *StateManager) InitiateSnapshot(snapshotID int) *Snapshot {
sm.mu.Lock()
defer sm.mu.Unlock()
snapshot := &Snapshot{
ID: snapshotID,
State: sm.currentState,
Complete: false,
Queues: make(map[int]*Queue),
}
// Initialize queues for all peers
peers := sm.peerManager.GetPeers()
for _, peer := range peers {
snapshot.Queues[peer.ID] = NewQueue()
}
sm.snapshots[snapshotID] = snapshot
return snapshot
}
func (sm *StateManager) GetSnapshot(snapshotID int) (*Snapshot, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
snapshot, exists := sm.snapshots[snapshotID]
return snapshot, exists
}
func (sm *StateManager) GetSnapshots() map[int]*Snapshot {
sm.mu.RLock()
defer sm.mu.RUnlock()
snapshots := make(map[int]*Snapshot, len(sm.snapshots))
for id, snapshot := range sm.snapshots {
snapshots[id] = snapshot
}
return snapshots
}
func (sm *StateManager) RecordMessageInSnapshot(snapshotID, senderID int, message interface{}) error {
sm.mu.Lock()
defer sm.mu.Unlock()
snapshot, exists := sm.snapshots[snapshotID]
if !exists {
return fmt.Errorf("snapshot %d does not exist", snapshotID)
}
queue, exists := snapshot.Queues[senderID]
if !exists {
return fmt.Errorf("queue for sender %d does not exist in snapshot %d", senderID, snapshotID)
}
return queue.Append(message)
}
func (sm *StateManager) CloseChannelInSnapshot(snapshotID, senderID int) error {
sm.mu.Lock()
defer sm.mu.Unlock()
snapshot, exists := sm.snapshots[snapshotID]
if !exists {
return fmt.Errorf("snapshot %d does not exist", snapshotID)
}
queue, exists := snapshot.Queues[senderID]
if !exists {
return fmt.Errorf("queue for sender %d does not exist in snapshot %d", senderID, snapshotID)
}
queue.SetBlock(true)
snapshot.BlockedQueues += 1
return nil
}
func (sm *StateManager) IsSnapshotComplete(snapshotID int, npeers int) bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
snapshot, exists := sm.snapshots[snapshotID]
if !exists {
return false
}
if snapshot.BlockedQueues == npeers {
return true
}
return false
}