-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
101 lines (79 loc) · 2.77 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"fmt"
gologger "log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/companieshouse/chs.go/kafka/resilience"
"github.com/companieshouse/chs.go/log"
"github.com/companieshouse/payment-reconciliation-consumer/config"
"github.com/companieshouse/payment-reconciliation-consumer/service"
)
func main() {
log.Namespace = "payment-reconciliation-consumer"
// Push the Sarama logs into our custom writer
sarama.Logger = gologger.New(&log.Writer{}, "[Sarama] ", gologger.LstdFlags)
cfg, err := config.Get()
if err != nil {
log.Error(fmt.Errorf("error configuring service: %s. Exiting", err), nil)
return
}
log.Info("intialising payment-reconciliation-consumer service...")
mainChannel := make(chan os.Signal, 1)
retryChannel := make(chan os.Signal, 1)
svc, err := service.New(cfg.PaymentProcessedTopic, cfg.PaymentReconciliationGroupName, cfg, nil)
if err != nil {
log.Error(fmt.Errorf("error initialising main consumer service: '%s'. Exiting", err), nil)
return
}
var wg sync.WaitGroup
if !cfg.IsErrorConsumer {
retrySvc, err := getRetryService(cfg)
if err != nil {
log.Error(fmt.Errorf("error initialising retry consumer service: '%s'. Exiting", err), nil)
svc.Shutdown(cfg.PaymentProcessedTopic)
return
}
wg.Add(1)
go retrySvc.Start(&wg, retryChannel)
}
wg.Add(1)
go svc.Start(&wg, mainChannel)
waitForServiceClose(&wg, mainChannel, retryChannel)
log.Info("Application successfully shutdown")
}
func getRetryService(cfg *config.Config) (*service.Service, error) {
retry := &resilience.ServiceRetry{
time.Duration(cfg.RetryThrottleRate),
cfg.MaxRetryAttempts,
}
retrySvc, err := service.New(cfg.PaymentProcessedTopic, cfg.PaymentReconciliationGroupName, cfg, retry)
if err != nil {
log.Error(fmt.Errorf("error initialising retry consumer service: %s", err), nil)
return nil, err
}
return retrySvc, nil
}
// waitForServiceClose will receive the close signal and forward a notification
// to all service (go routines) to ensure that they clean up (for example their
// consumers and producers) and exit gracefully.
func waitForServiceClose(wg *sync.WaitGroup, mainChannel, retryChannel chan os.Signal) {
// Channel to fan-out interrupt/kill notifications
notificationChannel := make(chan os.Signal, 1)
signal.Notify(notificationChannel, os.Interrupt, os.Kill, syscall.SIGTERM)
select {
case notification := <-notificationChannel:
// Falls into this block to successfully close consumer after service shutdown
log.Info("Close signal received, fanning out...")
log.Debug("Sending notification to main consumer channel")
mainChannel <- notification
log.Debug("Sending notification to retry consumer channel")
retryChannel <- notification
log.Info("Fan out completed")
}
wg.Wait()
}