forked from DeFacto-Team/Factom-Open-API
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
executable file
·348 lines (283 loc) · 8.42 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
package main
import (
"encoding/json"
"flag"
"os/user"
"time"
"github.com/DeFacto-Team/Factom-Open-API/api"
"github.com/DeFacto-Team/Factom-Open-API/config"
"github.com/DeFacto-Team/Factom-Open-API/model"
"github.com/DeFacto-Team/Factom-Open-API/pool"
"github.com/DeFacto-Team/Factom-Open-API/service"
"github.com/DeFacto-Team/Factom-Open-API/store"
"github.com/DeFacto-Team/Factom-Open-API/wallet"
"github.com/FactomProject/factom"
_ "github.com/lib/pq"
log "github.com/sirupsen/logrus"
)
const (
// number of minutes in Factom dblock
MinutesInBlock = 10
// number of background workers to fetch data from chains
WorkersCount = 4
)
// @title Factom Open API
// @version 1.0.0
// @description Open-source REST API for the Factom blockchain.
// @contact.name De Facto
// @contact.url https://de-facto.pro
// @contact.email [email protected]
// @license.name MIT
// @license.url https://github.com/DeFacto-Team/Factom-Open-API/blob/master/LICENSE
// @host localhost:8081
// @BasePath /v1
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name Authorization
func main() {
var err error
usr, err := user.Current()
if err != nil {
log.Error(err)
}
configFile := usr.HomeDir + "/.foa/config.yaml"
flag.StringVar(&configFile, "c", configFile, "config.yaml path")
flag.Parse()
startAPI(configFile)
}
func startAPI(configFile string) {
for {
var err error
var conf *config.Config
if conf, err = config.NewConfig(configFile); err != nil {
log.Fatal(err)
}
// Setup logger
log.SetLevel(log.Level(conf.API.LogLevel))
log.Info("Starting Factom Open API")
// Create store
store, err := store.NewStore(conf, true)
if err != nil {
log.Error("Database connection FAILED")
log.Fatal(err)
}
defer store.Close()
log.Info("Store created successfully")
// Create factom
if conf.Factom.URL != "" {
factom.SetFactomdServer(conf.Factom.URL)
}
if conf.Factom.User != "" && conf.Factom.Password != "" {
factom.SetFactomdRpcConfig(conf.Factom.User, conf.Factom.Password)
}
// Check factomd availability
heights, err := factom.GetHeights()
if err != nil {
log.Warn("FAILED connection to factomd node: ", conf.Factom.URL)
} else {
log.Info("Using factomd node: ", conf.Factom.URL,
" (DBlock=", heights.DirectoryBlockHeight, "/", heights.LeaderHeight,
", EntryBlock=", heights.EntryHeight, "/", heights.EntryBlockHeight, ")")
if heights.EntryBlockHeight-heights.EntryHeight > 1 {
log.Warn("Factomd node is not fully synced! API will not be able to write data on the blockchain or read actual data from the blockchain!")
}
}
// initialize wallet
wallet, err := wallet.NewWallet(conf)
if err != nil {
log.Warn(err)
log.Warn("You need to setup Es address in order to use API")
}
// Create services
s := service.NewService(store, wallet)
log.Info("Services created successfully")
// Initialize pool for history fetching chains
collector := pool.StartDispatcher(WorkersCount)
// Initialize single-thread background workers
die := make(chan bool)
go pingDB(store, die)
go fetchUnsyncedChains(s, collector, die)
go fetchChainUpdates(s, die)
go processQueue(s, die)
go clearQueue(s, die)
go completedCallbacks(s, die)
// Init REST API
api := api.NewAPI(conf, s, configFile)
// Start REST API
log.WithField("port", api.GetAPIInfo().Port).
WithField("version", api.GetAPIInfo().Version).
WithField("middleware", api.GetAPIInfo().MW).
Info("Starting REST API")
err = api.Start()
if err != nil {
log.Warn("Restarting Factom Open API")
close(die)
}
}
}
func fetchUnsyncedChains(s service.Service, collector pool.Collector, die chan bool) {
log.Info("Reseting all unsynced local chains to put it into pool")
err := s.ResetChainsParsingAtAPIStart()
if err != nil {
log.Error(err)
}
for {
select {
default:
log.Info("Fetching unsynced chains: iteration started")
t := false
chains := s.GetChains(&model.Chain{Synced: &t, WorkerID: -1, SentToPool: &t})
for _, c := range chains {
s.SetChainSentToPool(c)
collector.Work <- pool.Work{ID: c.ChainID, Job: c, Service: s}
}
time.Sleep(5 * time.Second)
case <-die:
return
}
}
}
func fetchChainUpdates(s service.Service, die chan bool) {
var currentMinute int // current minute
var currentMinuteEnd int // current minute after parsing ended
var currentDBlock int // current dblock
var latestDBlock int // latest fetched dblock
var sleepFor int // sleep timer
var err error
for {
select {
default:
log.Info("Updates parser: Iteration started")
// get current minute & dblock from Factom
currentMinute, currentDBlock, err = getMinuteAndHeight()
if err != nil {
continue
}
log.Info("Updates parser: currentMinute=", currentMinute, ", currentDBlock=", currentDBlock)
// if current dblock <= latest fetched dblock, then elections should occur and need to sleep 1 minute before next try
// on the first iteration latestDblock = 0, so this code won't run & new updates will be fetched when API started
for currentDBlock <= latestDBlock {
log.Info("Updates parser: Sleeping for 1 minute / currentDBlock=", currentDBlock, ", latestDBlock=", latestDBlock)
time.Sleep(1 * time.Minute)
currentMinute, currentDBlock, err = getMinuteAndHeight()
log.Info("Updates parser: currentMinute=", currentMinute, ", currentDBlock=", currentDBlock)
}
// if we are here, then latestDBlock > currentDBlock (i.e. new dblock appeared)
// parsing chains updates
chains := s.GetChains(&model.Chain{Status: model.ChainCompleted})
for _, c := range chains {
err := s.ParseNewChainEntries(c)
if err != nil {
log.Error(err)
}
}
// updating latest parsed dblock
latestDBlock = currentDBlock
// parsing may spend time, so check current minute
currentMinuteEnd, _, err = getMinuteAndHeight()
log.Debug("Updates parser: currentMinute=", currentMinuteEnd)
// if current minute was {8|9} and becomes {0|1|2|3…}, i.e. new block appeared during the parsing
// then no sleep in the end
if currentMinuteEnd < currentMinute {
sleepFor = 0
} else {
// else calculate sleep minutes before next block
// workaround: if minute == 0, then sleep for 1 minute instead of 11
if currentMinute == 0 {
currentMinute = 10
}
// + 1 needed for sleeping at least 1 minute
sleepFor = MinutesInBlock - currentMinute + 1
}
log.Info("Updates parser: Sleeping for ", sleepFor, " minute(s)")
time.Sleep(time.Duration(sleepFor) * time.Minute)
case <-die:
return
}
}
}
// Get all tasks from queue where processed_at == NULL
func processQueue(s service.Service, die chan bool) {
for {
select {
default:
log.Info("Processing queue: iteration started")
queue := s.GetQueueToProcess()
for _, q := range queue {
err := s.ProcessQueue(q)
if err != nil {
log.Error(err)
}
}
time.Sleep(5 * time.Second)
case <-die:
return
}
}
}
func clearQueue(s service.Service, die chan bool) {
for {
select {
default:
log.Info("Clearing queue: iteration started")
queue := s.GetQueueToClear()
for _, q := range queue {
s.ClearQueue(q)
}
time.Sleep(60 * time.Second)
case <-die:
return
}
}
}
func completedCallbacks(s service.Service, die chan bool) {
for {
select {
default:
log.Info("Completed callbacks: iteration started")
callbacks := s.GetCallbacks(&model.Callback{})
for _, c := range callbacks {
log.Debug("Completed callbacks: Entry ", c.EntryHash, " ", c.Entry.Status)
if c.Entry.Status == model.EntryCompleted {
s.SendCallback(c)
}
}
time.Sleep(30 * time.Second)
case <-die:
return
}
}
}
func getMinuteAndHeight() (int, int, error) {
var currentMinute float64
var dBlockHeight float64
var i interface{}
request := factom.NewJSON2Request("current-minute", 0, nil)
resp, err := factom.SendFactomdRequest(request)
if err != nil {
log.Error(err)
return 0, 0, nil
}
if err = json.Unmarshal(resp.JSONResult(), &i); err != nil {
log.Error(err)
return 0, 0, nil
}
m, _ := i.(map[string]interface{})
currentMinute = m["minute"].(float64)
dBlockHeight = m["directoryblockheight"].(float64)
return int(currentMinute), int(dBlockHeight), nil
}
func pingDB(s store.Store, die chan bool) {
for {
select {
default:
if err := s.Ping(); err != nil {
log.Error("Database connection FAILED")
log.Fatal(err)
}
time.Sleep(5 * time.Second)
case <-die:
return
}
}
}