-
Notifications
You must be signed in to change notification settings - Fork 415
/
main.go
344 lines (287 loc) · 10.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
)
// BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked.
//
// In CQRS, one command must be handled by only one handler.
// When another handler with this command is added to command processor, error will be retuerned.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) Handle(ctx context.Context, cmd *BookRoom) error {
// some random price, in production you probably will calculate in wiser way
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"Booked %s for %s from %s to %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked will be handled by OrderBeerOnRoomBooked event handler,
// in future RoomBooked may be handled by multiple event handler
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked is a event handler, which handles RoomBooked event and emits OrderBeer command.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, event *RoomBooked) error {
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler is a command handler, which handles OrderBeer command and emits BeerOrdered.
// BeerOrdered is not handled by any event handler, but we may use persistent Pub/Sub to handle it in the future.
type OrderBeerHandler struct {
eventBus *cqrs.EventBus
}
func (o OrderBeerHandler) HandlerName() string {
return "OrderBeerHandler"
}
func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error {
if rand.Int63n(10) == 0 {
// sometimes there is no beer left, command will be retried
return fmt.Errorf("no beer left for room %s, please try later", cmd.RoomId)
}
if err := o.eventBus.Publish(ctx, &BeerOrdered{
RoomId: cmd.RoomId,
Count: cmd.Count,
}); err != nil {
return err
}
log.Printf("%d beers ordered to room %s", cmd.Count, cmd.RoomId)
return nil
}
// BookingsFinancialReport is a read model, which calculates how much money we may earn from bookings.
// Like OrderBeerOnRoomBooked, it listens for RoomBooked event.
//
// This implementation is just writing to the memory. In production, you will probably will use some persistent storage.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked) error {
// Handle may be called concurrently, so it need to be thread safe.
b.lock.Lock()
defer b.lock.Unlock()
// When we are using Pub/Sub which doesn't provide exactly-once delivery semantics, we need to deduplicate messages.
// GoChannel Pub/Sub provides exactly-once delivery,
// but let's make this example ready for other Pub/Sub implementations.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
cqrs.NewCommandHandler("BookRoomHandler", BookRoomHandler{eventBus}.Handle),
cqrs.NewCommandHandler("OrderBeerHandler", OrderBeerHandler{eventBus}.Handle),
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
cqrs.NewGroupEventHandler(OrderBeerOnRoomBooked{commandBus}.Handle),
cqrs.NewGroupEventHandler(NewBookingsFinancialReport().Handle),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
func publishCommands(commandBus *cqrs.CommandBus) func() {
i := 0
for {
i++
startDate, err := ptypes.TimestampProto(time.Now())
if err != nil {
panic(err)
}
endDate, err := ptypes.TimestampProto(time.Now().Add(time.Hour * 24 * 3))
if err != nil {
panic(err)
}
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}