-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathdohlcvstream.go
139 lines (108 loc) · 2.88 KB
/
dohlcvstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/* Package GoTrade implements
*/
package gotrade
import (
"math"
"sync"
"time"
)
type DOHLCVStreamTickReceiver interface {
ReceiveTick(tickData DOHLCV)
}
type DOHLCVStreamSubscriber interface {
AddTickSubscription(subscriber DOHLCVTickReceiver)
}
type DataStreamHolder interface {
MinValue() float64
MaxValue() float64
}
type DateStreamHolder interface {
MinDate() time.Time
MaxDate() time.Time
}
type interDayBarType int
type intraDayBarType int
const (
MinuteBar intraDayBarType = iota
DailyBar interDayBarType = iota
WeeklyBar
MonthlyBar
)
type DOHLCVStream struct {
Data []DOHLCV
subscribers []DOHLCVTickReceiver
streamBarIndex int
minValue float64
maxValue float64
}
type InterDayDOHLCVStream struct {
*DOHLCVStream
streamBarType interDayBarType
}
func NewInterDayDOHLCVStream(streamBarType interDayBarType) *InterDayDOHLCVStream {
s := InterDayDOHLCVStream{DOHLCVStream: &DOHLCVStream{streamBarIndex: 0,
minValue: math.MaxFloat64,
maxValue: math.SmallestNonzeroFloat64},
streamBarType: streamBarType}
return &s
}
func NewDailyDOHLCVStream() *InterDayDOHLCVStream {
return NewInterDayDOHLCVStream(DailyBar)
}
func NewWeeklyDOHLCVStream() *InterDayDOHLCVStream {
return NewInterDayDOHLCVStream(WeeklyBar)
}
func NewMonthlyDOHLCVStream() *InterDayDOHLCVStream {
return NewInterDayDOHLCVStream(MonthlyBar)
}
func (p *DOHLCVStream) ReceiveTick(tickData DOHLCV) {
p.streamBarIndex++
p.Data = append(p.Data, tickData)
if p.minValue > tickData.L() {
p.minValue = tickData.L()
}
if p.maxValue < tickData.H() {
p.maxValue = tickData.H()
}
var waitGroup sync.WaitGroup
// notify all the subscribers and wait
for subscriberIndex := range p.subscribers {
waitGroup.Add(1)
var subscriber DOHLCVTickReceiver = p.subscribers[subscriberIndex]
go func(subscriber DOHLCVTickReceiver) {
defer waitGroup.Done()
subscriber.ReceiveDOHLCVTick(tickData, p.streamBarIndex)
}(subscriber)
}
waitGroup.Wait()
}
func (p *DOHLCVStream) MinDate() time.Time {
// do some checks here, return an error object too
return p.Data[0].D()
}
func (p *DOHLCVStream) MaxDate() time.Time {
// do some checks here, return an error object too
return p.Data[len(p.Data)-1].D()
}
func (p *DOHLCVStream) MinValue() float64 {
return p.minValue
}
func (p *DOHLCVStream) MaxValue() float64 {
return p.maxValue
}
func (p *DOHLCVStream) AddTickSubscription(subscriber DOHLCVTickReceiver) {
p.subscribers = append(p.subscribers, subscriber)
}
func (p *DOHLCVStream) RemoveTickSubscription(subscriber DOHLCVTickReceiver) {
}
type IntraDayDOHLCVStream struct {
*DOHLCVStream
intraDayBarInterval int
}
func NewIntraDayDOHLCVStream(barIntervalInMins int) *IntraDayDOHLCVStream {
s := IntraDayDOHLCVStream{DOHLCVStream: &DOHLCVStream{streamBarIndex: 0,
minValue: math.MaxFloat64,
maxValue: math.SmallestNonzeroFloat64},
intraDayBarInterval: barIntervalInMins}
return &s
}