Skip to content

Commit

Permalink
fix(scheduler): company & security mismatch
Browse files Browse the repository at this point in the history
also enhanced empty float values handling
  • Loading branch information
fcote committed Feb 22, 2023
1 parent 4abd647 commit f3e92e3
Show file tree
Hide file tree
Showing 15 changed files with 1,419 additions and 166 deletions.
2 changes: 1 addition & 1 deletion scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func getBaseConfig() *Config {
Job: JobConfig{
FullSync: CronConfig{
Enabled: true,
Rule: "0 0 0 * * *",
Rule: "* * * * * *",
},
NewsSync: CronConfig{
Enabled: true,
Expand Down
11 changes: 3 additions & 8 deletions scheduler/go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module github.com/fcote/merlin/sheduler

go 1.19
go 1.20

require (
github.com/go-co-op/gocron v1.18.0
github.com/jackc/pgx/v4 v4.18.0
github.com/jackc/pgx/v5 v5.3.0
github.com/newrelic/go-agent/v3 v3.20.3
github.com/newrelic/go-agent/v3/integrations/logcontext-v2/nrzerolog v1.0.0
github.com/rs/zerolog v1.29.0
Expand All @@ -17,14 +17,9 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
Expand Down
134 changes: 5 additions & 129 deletions scheduler/go.sum

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions scheduler/internal/domain/earning.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"

"github.com/fcote/merlin/sheduler/pkg/math"
"github.com/fcote/merlin/sheduler/pkg/pointer"
"github.com/fcote/merlin/sheduler/pkg/slices"
)
Expand Down Expand Up @@ -102,7 +103,7 @@ func (earnings Earnings) Times() []string {

func (earnings Earnings) EpsEstimates() []*float64 {
return slices.Map(earnings, func(e Earning) *float64 {
if e.EpsEstimate == 0 {
if math.IsEmpty(e.EpsEstimate) {
return nil
}
return pointer.To(e.EpsEstimate)
Expand All @@ -111,7 +112,7 @@ func (earnings Earnings) EpsEstimates() []*float64 {

func (earnings Earnings) Epss() []*float64 {
return slices.Map(earnings, func(e Earning) *float64 {
if e.Eps == 0 {
if math.IsEmpty(e.Eps) {
return nil
}
return pointer.To(e.Eps)
Expand All @@ -120,7 +121,7 @@ func (earnings Earnings) Epss() []*float64 {

func (earnings Earnings) EpsSurprisePercents() []*float64 {
return slices.Map(earnings, func(e Earning) *float64 {
if e.EpsSurprisePercent == 0 {
if math.IsEmpty(e.EpsSurprisePercent) {
return nil
}
return pointer.To(e.EpsSurprisePercent)
Expand All @@ -129,7 +130,7 @@ func (earnings Earnings) EpsSurprisePercents() []*float64 {

func (earnings Earnings) RevenueEstimates() []*float64 {
return slices.Map(earnings, func(e Earning) *float64 {
if e.RevenueEstimate == 0 {
if math.IsEmpty(e.RevenueEstimate) {
return nil
}
return pointer.To(e.RevenueEstimate)
Expand All @@ -138,7 +139,7 @@ func (earnings Earnings) RevenueEstimates() []*float64 {

func (earnings Earnings) Revenues() []*float64 {
return slices.Map(earnings, func(e Earning) *float64 {
if e.Revenue == 0 {
if math.IsEmpty(e.Revenue) {
return nil
}
return pointer.To(e.Revenue)
Expand All @@ -147,7 +148,7 @@ func (earnings Earnings) Revenues() []*float64 {

func (earnings Earnings) RevenueSurprisePercents() []*float64 {
return slices.Map(earnings, func(e Earning) *float64 {
if e.RevenueSurprisePercent == 0 {
if math.IsEmpty(e.RevenueSurprisePercent) {
return nil
}
return pointer.To(e.RevenueSurprisePercent)
Expand Down
3 changes: 2 additions & 1 deletion scheduler/internal/domain/financial.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package domain
import (
"fmt"

"github.com/fcote/merlin/sheduler/pkg/math"
"github.com/fcote/merlin/sheduler/pkg/pointer"
"github.com/fcote/merlin/sheduler/pkg/slices"
)
Expand Down Expand Up @@ -81,7 +82,7 @@ type Financials []Financial

func (financials Financials) Values() []*float64 {
return slices.Map(financials, func(f Financial) *float64 {
if f.Value == 0 {
if math.IsEmpty(f.Value) {
return nil
}
return pointer.To(f.Value)
Expand Down
14 changes: 7 additions & 7 deletions scheduler/internal/domain/historical_price.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (prices HistoricalPrices) Dates() []string {

func (prices HistoricalPrices) Opens() []*float64 {
return slices.Map(prices, func(i HistoricalPrice) *float64 {
if i.Open == 0 {
if math.IsEmpty(i.Open) {
return nil
}
return pointer.To(i.Open)
Expand All @@ -59,7 +59,7 @@ func (prices HistoricalPrices) Opens() []*float64 {

func (prices HistoricalPrices) Highs() []*float64 {
return slices.Map(prices, func(i HistoricalPrice) *float64 {
if i.High == 0 {
if math.IsEmpty(i.High) {
return nil
}
return pointer.To(i.High)
Expand All @@ -68,7 +68,7 @@ func (prices HistoricalPrices) Highs() []*float64 {

func (prices HistoricalPrices) Lows() []*float64 {
return slices.Map(prices, func(i HistoricalPrice) *float64 {
if i.Low == 0 {
if math.IsEmpty(i.Low) {
return nil
}
return pointer.To(i.Low)
Expand All @@ -77,7 +77,7 @@ func (prices HistoricalPrices) Lows() []*float64 {

func (prices HistoricalPrices) Closes() []*float64 {
return slices.Map(prices, func(i HistoricalPrice) *float64 {
if i.Close == 0 {
if math.IsEmpty(i.Close) {
return nil
}
return pointer.To(i.Close)
Expand All @@ -86,7 +86,7 @@ func (prices HistoricalPrices) Closes() []*float64 {

func (prices HistoricalPrices) Volumes() []*float64 {
return slices.Map(prices, func(i HistoricalPrice) *float64 {
if i.Volume == 0 {
if math.IsEmpty(i.Volume) {
return nil
}
return pointer.To(i.Volume)
Expand All @@ -95,7 +95,7 @@ func (prices HistoricalPrices) Volumes() []*float64 {

func (prices HistoricalPrices) Changes() []*float64 {
return slices.Map(prices, func(i HistoricalPrice) *float64 {
if i.Change == 0 {
if math.IsEmpty(i.Change) {
return nil
}
return pointer.To(i.Change)
Expand All @@ -104,7 +104,7 @@ func (prices HistoricalPrices) Changes() []*float64 {

func (prices HistoricalPrices) ChangePercents() []*float64 {
return slices.Map(prices, func(i HistoricalPrice) *float64 {
if i.ChangePercent == 0 {
if math.IsEmpty(i.ChangePercent) {
return nil
}
return pointer.To(i.ChangePercent)
Expand Down
10 changes: 7 additions & 3 deletions scheduler/internal/repository/fmp/company.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ import (

"github.com/fcote/merlin/sheduler/internal/domain"
"github.com/fcote/merlin/sheduler/pkg/fmp"
"github.com/fcote/merlin/sheduler/pkg/maps"
)

func (r Repository) Companies(ctx context.Context, tickers []string) ([]domain.CompanyBase, error) {
companies, err := r.client.BatchCompanies(ctx, tickers)
if err != nil {
return nil, err
}
companiesByTicker := maps.GroupBy(companies, func(c fmp.Company) string {
return c.Symbol
})

result := make([]domain.CompanyBase, len(companies))
for i, company := range companies {
result[i] = CompanyBaseFromFMP(company)
result := make([]domain.CompanyBase, len(tickers))
for i, ticker := range tickers {
result[i] = CompanyBaseFromFMP(companiesByTicker[ticker])
}

return result, nil
Expand Down
11 changes: 9 additions & 2 deletions scheduler/internal/repository/fmp/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/fcote/merlin/sheduler/internal/domain"
"github.com/fcote/merlin/sheduler/pkg/fmp"
"github.com/fcote/merlin/sheduler/pkg/maps"
"github.com/fcote/merlin/sheduler/pkg/pointer"
)

Expand All @@ -28,15 +29,21 @@ func (r Repository) Securities(ctx context.Context, tickers []string) ([]domain.
if err != nil {
return nil, err
}
companiesByTicker := maps.GroupBy(companies, func(c fmp.Company) string {
return c.Symbol
})

stocks, err := r.client.BatchStocks(ctx, tickers)
if err != nil {
return nil, err
}
stocksByTicker := maps.GroupBy(stocks, func(s fmp.Stock) string {
return s.Symbol
})

result := make([]domain.SecurityBase, len(tickers))
for i := range stocks {
result[i] = SecurityBaseFromFMP(stocks[i], companies[i])
for i, ticker := range tickers {
result[i] = SecurityBaseFromFMP(stocksByTicker[ticker], companiesByTicker[ticker])
}

return result, nil
Expand Down
4 changes: 2 additions & 2 deletions scheduler/internal/repository/pg/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/fcote/merlin/sheduler/internal/usecase"
)
Expand Down
3 changes: 1 addition & 2 deletions scheduler/internal/usecase/earning.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ func (uc EarningUsecase) sync(ctx context.Context, task domain.SecurityTask) (do
Map(rawEarnings, func(p domain.EarningBase) domain.Earning {
return domain.EarningFromBase(p, task.SecurityId)
})
result, err := s.
BatchInsertEarnings(ctx, earningInputs)
result, err := s.BatchInsertEarnings(ctx, earningInputs)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"github.com/jackc/pgx/v4/pgxpool"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog/log"

"github.com/go-co-op/gocron"
Expand Down Expand Up @@ -36,7 +36,7 @@ func main() {
if err != nil {
logger.Fatal().Msgf("failed to initialize database config: %v", err)
}
dbPool, err := pgxpool.ConnectConfig(context.Background(), pgConf)
dbPool, err := pgxpool.NewWithConfig(context.Background(), pgConf)
if err != nil {
logger.Fatal().Msgf("failed to initialize database: %v", err)
}
Expand Down
4 changes: 1 addition & 3 deletions scheduler/pkg/fmp/rate_limit_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func (t *RateLimitTimer) Wait() {
defer t.mu.Unlock()

t.nRequests++
glog.Get().Debug().Msgf("fmp | rate limit | %d/%d", t.nRequests, t.maxRequestsPerMin)

if t.nRequests >= t.maxRequestsPerMin {
timeToWait := time.Until(t.end)

glog.Get().Debug().Msgf("fmp | rate limit | waiting %.2fs", timeToWait.Seconds())

<-time.After(timeToWait)
Expand All @@ -53,8 +53,6 @@ func (t *RateLimitTimer) reset() {

t.nRequests = 0

glog.Get().Debug().Msg("fmp | rate limit | reset")

t.timer.Reset(timerDuration)
t.end = time.Now().Add(timerDuration)
}
10 changes: 10 additions & 0 deletions scheduler/pkg/maps/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package maps

func GroupBy[T any, C comparable](s []T, fn func(a T) C) map[C]T {
ret := make(map[C]T)
for _, input := range s {
key := fn(input)
ret[key] = input
}
return ret
}
10 changes: 10 additions & 0 deletions scheduler/pkg/math/empty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package math

import "math"

func IsEmpty(f float64) bool {
return f == 0 ||
math.IsNaN(f) ||
math.IsInf(f, 0) ||
math.IsInf(f, -1)
}
Loading

0 comments on commit f3e92e3

Please sign in to comment.