-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstarkIO.go
281 lines (243 loc) · 7.28 KB
/
starkIO.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package stark
import (
"bytes"
"fmt"
"github.com/gogo/protobuf/jsonpb"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/pkg/errors"
starkpinata "github.com/will-rowe/stark/src/pinata"
)
// GetSnapshot returns the current database snapshot
// CID, which can be used to rebuild the current
// database instance.
//
// Note: if the database has no entries, the
// returned snapshot will be a nil string.
func (starkdb *Db) GetSnapshot() string {
starkdb.Lock()
defer starkdb.Unlock()
if starkdb.currentNumEntries == 0 {
return ""
}
return starkdb.snapshotCID
}
// GetNumEntries returns the number of entries
// in the current database instance.
func (starkdb *Db) GetNumEntries() int {
starkdb.Lock()
defer starkdb.Unlock()
return starkdb.currentNumEntries
}
// GetCIDs will return a map of keys to CIDs for
// all Records currently held in the database.
func (starkdb *Db) GetCIDs() map[string]string {
starkdb.Lock()
defer starkdb.Unlock()
return starkdb.cidLookup
}
// GetNodeAddr returns the public address of the
// underlying IPFS node for the starkDB
// instance.
func (starkdb *Db) GetNodeAddr() (string, error) {
if !starkdb.isOnline() {
return "", ErrNodeOffline
}
nodeID := starkdb.ipfsClient.PrintNodeID()
add, err := starkdb.ipfsClient.GetPublicIPv4Addr()
if err != nil {
return "", err
}
return fmt.Sprintf("%s/p2p/%s", add, nodeID), nil
}
// GetConnectedPeers returns the addresses of the
// currently connected IPFS peers.
func (starkdb *Db) GetConnectedPeers() ([]string, error) {
if !starkdb.isOnline() {
return nil, ErrNodeOffline
}
connectPeers, err := starkdb.ipfsClient.GetPeers(starkdb.ctx)
if err != nil {
return nil, err
}
return connectPeers, nil
}
// PinataPublish will issue an API call to the pinata
// pinByHash endpoint and pin the current database
// instance.
func (starkdb *Db) PinataPublish(apiKey, apiSecret string) (*starkpinata.APIResponse, error) {
hostAddress, err := starkdb.GetNodeAddr()
if err != nil {
return nil, err
}
pinataClient, err := starkpinata.NewClient(apiKey, apiSecret, hostAddress)
if err != nil {
return nil, err
}
meta := starkpinata.NewMetadata(starkdb.project)
resp, err := pinataClient.PinByHashWithMetadata(starkdb.snapshotCID, meta)
if err != nil {
return nil, err
}
return resp, nil
}
// Listen will start a subscription to the IPFS PubSub network
// for messages matching the current database's project. It
// tries pulling Records from the IPFS via the announced
// CIDs, then returns them via a channel to the caller.
//
// It returns the Record channel, an Error channel which reports
// errors during message processing and Record retrieval, as well
// as any error during the PubSub setup.
func (starkdb *Db) Listen(terminator chan struct{}) (chan *Record, chan error, error) {
if !starkdb.isOnline() {
return nil, nil, ErrNodeOffline
}
// subscribe the node to the starkDB project
if err := starkdb.ipfsClient.Subscribe(starkdb.ctx, starkdb.project); err != nil {
return nil, nil, err
}
// cidTracker skips listener over duplicate CIDs
cidTracker := make(map[string]struct{})
// create channels used to send Records and errors back to the caller
recChan := make(chan *Record, DefaultBufferSize)
errChan := make(chan error)
// process the incoming messages
go func() {
for {
select {
case msg := <-starkdb.ipfsClient.GetPSMchan():
// TODO: check sender peerID
//msg.From()
// get the CID
cid := string(msg.Data())
if _, ok := cidTracker[cid]; ok {
continue
}
cidTracker[cid] = struct{}{}
// collect the Record from IPFS
collectedRecord, err := starkdb.getRecordFromCID(cid)
if err != nil {
errChan <- err
} else {
// add a comment to say this Record was from PubSub
collectedRecord.AddComment(fmt.Sprintf("collected from %s via pubsub.", msg.From()))
// send the record on to the caller
recChan <- collectedRecord
}
case err := <-starkdb.ipfsClient.GetPSEchan():
errChan <- err
case <-terminator:
if err := starkdb.ipfsClient.Unsubscribe(); err != nil {
errChan <- err
}
close(recChan)
close(errChan)
return
}
}
}()
return recChan, errChan, nil
}
// Delete will delete an entry from starkdb. This involves
// removing the key and Record CID from the local store,
// as well as unpinning the Record from the IPFS.
//
// Note: I'm not sure how this behaves if the Record
// wasn't pinned in the IPFS in the first place.
func (starkdb *Db) Delete(key string) error {
starkdb.Lock()
defer starkdb.Unlock()
// check the local keystore for the provided key
cid, ok := starkdb.cidLookup[key]
if !ok {
return ErrNotFound(key)
}
// unlink the record CID from the project directory and update a snapshot
snapshotUpdate, err := starkdb.ipfsClient.RmLink(starkdb.ctx, starkdb.snapshotCID, key)
if err != nil {
return errors.Wrap(err, ErrSnapshotUpdate.Error())
}
starkdb.snapshotCID = snapshotUpdate
// unpin the file
if err := starkdb.ipfsClient.Unpin(starkdb.ctx, cid); err != nil {
return err
}
// TODO: if using a pinning service - will need to request an unpin there
// remove from the keystore
delete(starkdb.cidLookup, key)
starkdb.currentNumEntries--
return nil
}
// GetVersion returns the full version string
// for the current stark package.
func GetVersion() string {
return fmt.Sprintf("%d.%d.%d", major, minor, patch)
}
// GetBaseVersion returns the major minor
// version string for the current stark
// package.
func GetBaseVersion() string {
return fmt.Sprintf("%d.%d", major, minor)
}
// getRecordFromCID is a helper method that collects a Record from
// the IPFS using its CID string.
func (starkdb *Db) getRecordFromCID(cid string) (*Record, error) {
if len(cid) == 0 {
return nil, ErrNoCID
}
// retrieve the record data from the IPFS
retrievedNode, err := starkdb.ipfsClient.DagGet(starkdb.ctx, cid)
if err != nil {
return nil, err
}
// double check it's CBOR
cborNode, isCborNode := retrievedNode.(*cbor.Node)
if !isCborNode {
return nil, fmt.Errorf("%v: %v", ErrNodeFormat, cid)
}
// get JSON data from node
data, err := cborNode.MarshalJSON()
if err != nil {
return nil, err
}
// unmarshal it to a Record
record := &Record{}
um := &jsonpb.Unmarshaler{}
if err := um.Unmarshal(bytes.NewReader(data), record); err != nil {
return nil, err
}
// if it's an encrypted Record, see if we can decrypt
if record.GetEncrypted() {
if err := record.Decrypt(starkdb.cipherKey); err != nil {
return nil, errors.Wrap(err, ErrEncrypted.Error())
}
}
// add the pulled CID to this record
record.PreviousCID = cid
return record, nil
}
// publishAnnouncement will send a PubSub message on the topic
// of the database project.
func (starkdb *Db) publishAnnouncement(message []byte) error {
if !starkdb.isOnline() {
return ErrNodeOffline
}
if len(starkdb.project) == 0 {
return ErrNoProject
}
return starkdb.ipfsClient.SendMessage(starkdb.ctx, starkdb.project, message)
}
// isOnline returns true if the starkDB is in online mode
// and the IPFS daemon is reachable.
// TODO: this needs some more work.
func (starkdb *Db) isOnline() bool {
return starkdb.ipfsClient.Online()
}
// send2log will send a message to the log if one
// is attached.
func (starkdb *Db) send2log(msg interface{}) {
if msg == nil || starkdb.loggingChan == nil {
return
}
starkdb.loggingChan <- msg
}