Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WPB-10308] Use RabbitMQ queues for notifications #4272

Merged
Merged
Show file tree
Hide file tree
Changes from 88 commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
e7c1fc3
create rabbitmq exchange
battermann Sep 26, 2024
3d70ead
set up DLX
battermann Sep 26, 2024
70d96a9
wip
battermann Sep 26, 2024
adf32b7
create user queues on client add/update
battermann Sep 26, 2024
253e107
make it internal server error
battermann Sep 27, 2024
44b506c
make galley compile
battermann Sep 27, 2024
b8fac8e
make rabbitmq mandatory
battermann Sep 27, 2024
8ce0eb3
fix integration tests
battermann Sep 27, 2024
031f063
set correct DLX headers
battermann Sep 27, 2024
98ed71a
wip
battermann Sep 30, 2024
a36e176
Add a WIP test to consume notifs via `GET /events`
akshaymankar Sep 30, 2024
c02ac0a
cannon: Roughly implement subscribing notifs from RabbitMQ
akshaymankar Sep 30, 2024
cf9ed15
gundeck: Start implementing push to rabbitmq
akshaymankar Sep 30, 2024
94659ee
integration: Fix assertion to assert on a real event
akshaymankar Oct 1, 2024
d689885
integration: Use correct vHost in cannon
akshaymankar Oct 1, 2024
2ba7f1d
cannon: Ensure exchange exists and publish event correctly on WS
akshaymankar Oct 1, 2024
c6efecd
NotificationSubsystem: Align names for queues and extract them as top…
akshaymankar Oct 1, 2024
c4e4fe5
gundeck: Push events to RabbitMQ for compatible clients
akshaymankar Oct 1, 2024
78cfda9
integration: Assert that acked events don't come back
akshaymankar Oct 1, 2024
f900dc8
cannon: Forward client acks to rabbitmq
akshaymankar Oct 1, 2024
7bb78fd
WIP: Get rid of channel as an explicit param for NotificationSubsyste…
akshaymankar Oct 1, 2024
a5b1210
Get galley to compile.
fisx Oct 1, 2024
5642a47
Fix some easy TODOs.
fisx Oct 1, 2024
9d05cf6
Use these library better.
fisx Oct 2, 2024
f47dfd8
resolve rebase conflict better.
fisx Oct 2, 2024
a0239ab
fix ghc errors.
fisx Oct 2, 2024
c1bf85d
Extract rabbitmq channel lookup into helper.
fisx Oct 2, 2024
2f78dce
Move setupConsumableNotificationsClient from subsystems to gundeck.
fisx Oct 2, 2024
460ecd6
Revert "Get galley to compile."
akshaymankar Oct 2, 2024
818a77c
wire-subsystems: Fix compile errors in tests
akshaymankar Oct 2, 2024
e64cdc2
gundeck: doesn't depend on wire-subsystems (yet?)
akshaymankar Oct 2, 2024
670769b
brig: Remove unnecessary import
akshaymankar Oct 2, 2024
47190ff
cannon: Don't create the queue for clients, expect it to already be t…
akshaymankar Oct 2, 2024
d059fb9
cannon: close ws connection when something goes wrong
akshaymankar Oct 2, 2024
db306e8
Funky!
akshaymankar Oct 2, 2024
e368674
Undo funkiness: Create Wire.API.WebSocket with types for comms on the…
akshaymankar Oct 2, 2024
3ace353
WIP: cannon: try to use the new types from wire-api
akshaymankar Oct 2, 2024
ded4622
wire-api: Cabal file
akshaymankar Oct 2, 2024
108e241
gen nix stuff
fisx Oct 4, 2024
f5a48c8
ormolu
fisx Oct 4, 2024
8148e57
Is this a good way of representing websocket messages?
fisx Oct 4, 2024
1b590ed
Resolve TODO.
fisx Oct 4, 2024
fe26080
Refactor test, extend coverage.
fisx Oct 4, 2024
c3c231e
Suggestions for better module name; removed "websocket" reference fro…
fisx Oct 4, 2024
9a65550
Roundtrip tests for Message*To*.
fisx Oct 7, 2024
6e75fcc
Source comments.
fisx Oct 7, 2024
c4dc168
debug failing test [WIP]
fisx Oct 7, 2024
42142a5
refactor tests a bit, fix ack, fix typos
battermann Oct 7, 2024
d68e3a6
fix test, handle connection closed, format
battermann Oct 7, 2024
4508ea7
ping pong test
battermann Oct 7, 2024
b302c8e
Maintain stable connection to rabbitmq from cannon. [WIP]
fisx Oct 7, 2024
5ac11b0
Fix typo.
fisx Oct 8, 2024
a70071b
Tune tests.
fisx Oct 8, 2024
67f9187
Remove ping-pong stuff.
fisx Oct 8, 2024
310732c
Revert "Maintain stable connection to rabbitmq from cannon. [WIP]"
fisx Oct 8, 2024
f451f89
Test multiple acks and out of order acks
akshaymankar Oct 8, 2024
bd4adb2
cannon: Refactor code a little and more logging
akshaymankar Oct 8, 2024
0c9eadd
integration: Deal with events websocket more gracefully, ensure all a…
akshaymankar Oct 8, 2024
df36aee
cannon: Easier to understand cleanup code
akshaymankar Oct 8, 2024
f7cb1be
Add TODO for tomorrow
akshaymankar Oct 8, 2024
d329c79
cannon: Ensure invalid messages don't accumulate
akshaymankar Oct 9, 2024
21444ee
small re-org of code
akshaymankar Oct 9, 2024
0943ec9
Avoid using unsafeRange
akshaymankar Oct 9, 2024
b438882
Reduce top level functions
akshaymankar Oct 9, 2024
954b466
integration: Test that old and new clients can co-exist
akshaymankar Oct 9, 2024
e18fbf4
gundeck: Optimize number of calls to brig
akshaymankar Oct 9, 2024
97ee4fb
gundeck: Try to not kill brig
akshaymankar Oct 9, 2024
98c8e58
Fix typo
akshaymankar Oct 9, 2024
2c2d1a7
gundeck: Remove pairing comment
akshaymankar Oct 9, 2024
304886b
wire-api: Rename Wire.API.WebSocket -> Wire.API.Event.WebSocketProtocol
akshaymankar Oct 9, 2024
935af4e
gundeck: Don't configure dead-lettering while declaring queues
akshaymankar Oct 10, 2024
10d9504
More TODOs
akshaymankar Oct 10, 2024
e068179
Use direct exchange for user notifications
akshaymankar Oct 10, 2024
9510c87
integration: Test that users only get notifs meant for them
akshaymankar Oct 10, 2024
0fa079c
integration/Notifications: Allow waiting for notifs without a client
akshaymankar Oct 10, 2024
19197aa
Deflake newly written tests
akshaymankar Oct 10, 2024
bc13ae3
cannon: Don't declare the exchange, its not needed
akshaymankar Oct 10, 2024
2aa58f2
integration: Make cannon logLevel Warn again
akshaymankar Oct 10, 2024
a30c2f3
integrations: Throw error if websocket responds with invalid JSON
akshaymankar Oct 10, 2024
4991f1c
Add ticket number to FUTUREWORK
akshaymankar Oct 10, 2024
8f51bed
Add notification Id to rabbitmq notifs
akshaymankar Oct 10, 2024
c87d7b0
Makefile/clean-rabbit: Also cleanup exchanges
akshaymankar Oct 10, 2024
0cd93b1
gundeck: Get preexisting unit tests to pass
akshaymankar Oct 21, 2024
8d4ea13
Set correct expiration on events pushed to RabbitMQ
akshaymankar Oct 21, 2024
5c48522
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Oct 22, 2024
7c3632d
fix a typo in the changelog
mdimjasevic Oct 22, 2024
b4220fd
Fix a release note
mdimjasevic Oct 22, 2024
816e540
Add changelogs for public and internal API changes
mdimjasevic Oct 22, 2024
593f5f2
Move around routing key helper functions
mdimjasevic Oct 22, 2024
c465c92
Remove commented out code
mdimjasevic Oct 22, 2024
e628387
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Oct 22, 2024
bb5179f
Hi CI
mdimjasevic Oct 23, 2024
ea0f937
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Oct 24, 2024
581df47
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Oct 30, 2024
b74b7e6
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Oct 30, 2024
b8f80ac
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Oct 31, 2024
6b01607
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Nov 4, 2024
4334775
Merge remote-tracking branch 'origin/develop' into WPB-10308-use-rabb…
mdimjasevic Nov 11, 2024
f86217f
Hi CI
mdimjasevic Nov 12, 2024
61bacf2
Detect and flag missed notifications from RabbitMQ (#4317)
elland Nov 12, 2024
938fea4
Update a change log to reflect changes from PR #4317
mdimjasevic Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ install: init
./hack/bin/cabal-run-all-tests.sh
./hack/bin/cabal-install-artefacts.sh all

.PHONY: rabbit-clean
rabbit-clean:
rabbitmqadmin -f pretty_json list queues vhost name messages | jq -r '.[] | "rabbitmqadmin delete queue name=\(.name) --vhost=\(.vhost)"' | bash
.PHONY: clean-rabbit
clean-rabbit:
rabbitmqadmin -f pretty_json list queues vhost name \
| jq -r '.[] | "rabbitmqadmin delete queue name=\(.name) --vhost=\(.vhost)"' \
| bash
rabbitmqadmin -f pretty_json list exchanges name vhost \
| jq -r '.[] |select(.name | startswith("amq") | not) | select (.name != "") | "rabbitmqadmin delete exchange name=\(.name) --vhost=\(.vhost)"' \
| bash

# Clean
.PHONY: full-clean
Expand Down Expand Up @@ -85,7 +90,7 @@ cabal.project.local:
c: treefmt c-fast

.PHONY: c
c-fast:
c-fast:
cabal build $(WIRE_CABAL_BUILD_OPTIONS) $(package) || ( make clean-hint; false )
ifeq ($(test), 1)
./hack/bin/cabal-run-tests.sh $(package) $(testargs)
Expand Down
1 change: 1 addition & 0 deletions changelog.d/0-release-notes/WBP-10308
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Notifications are now also sent via RabbitMQ. Therefore, if federation is enabled, RabbitMQ is a required configuration in Brig.
1 change: 1 addition & 0 deletions changelog.d/1-api-changes/WPB-10308
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New endpoint `GET /events` for consuming events is added
2 changes: 2 additions & 0 deletions changelog.d/5-internal/WPB-10308
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- RabbitMQ queues have been introduced for client notifications
- New internal endpoint `POST /i/users/:uid/clients/:cid/consumable-notifications` is added
1 change: 1 addition & 0 deletions deploy/dockerephemeral/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ services:
ports:
- '127.0.0.1:5671:5671'
- '127.0.0.1:15671:15671'
- '127.0.0.1:15672:15672'
volumes:
- ./rabbitmq-config/rabbitmq.conf:/etc/rabbitmq/conf.d/20-wire.conf
- ./rabbitmq-config/certificates:/etc/rabbitmq/certificates
Expand Down
1 change: 1 addition & 0 deletions integration/integration.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ library
Test.Demo
Test.EJPD
Test.Errors
Test.Events
Test.ExternalPartner
Test.FeatureFlags
Test.FeatureFlags.AppLock
Expand Down
9 changes: 9 additions & 0 deletions integration/test/API/Galley.hs
Original file line number Diff line number Diff line change
Expand Up @@ -733,3 +733,12 @@ getTeamMembersCsv :: (HasCallStack, MakesValue user) => user -> String -> App Re
getTeamMembersCsv user tid = do
req <- baseRequest user Galley Versioned (joinHttpPath ["teams", tid, "members", "csv"])
submit "GET" req

-- | https://staging-nginz-https.zinfra.io/v6/api/swagger-ui/#/default/post_conversations__cnv_domain___cnv__typing
sendTypingStatus :: (HasCallStack, MakesValue user, MakesValue conv) => user -> conv -> String -> App Response
sendTypingStatus user conv status = do
convDomain <- objDomain conv
convId <- objId conv
req <- baseRequest user Galley Versioned (joinHttpPath ["conversations", convDomain, convId, "typing"])
submit "POST"
$ addJSONObject ["status" .= status] req
26 changes: 18 additions & 8 deletions integration/test/Notifications.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ assertNoNotifications u uc since0 p = do
awaitNotifications ::
(HasCallStack, MakesValue user, MakesValue client) =>
user ->
client ->
Maybe client ->
Maybe String ->
-- | Max no. of notifications
Int ->
Expand All @@ -62,11 +62,11 @@ awaitNotifications user client since0 n selector = do
| timeRemaining <= 0 = pure res0
| otherwise =
do
c <- make client & asString
c <- for client (asString . make)
notifs <-
getNotifications
user
def {since = since, client = Just c}
def {since = since, client = c}
`bindResponse` \resp -> asList (resp.json %. "notifications")
lastNotifId <- case notifs of
[] -> pure since
Expand All @@ -85,16 +85,26 @@ awaitNotifications user client since0 n selector = do
threadDelay 1_000
go (timeRemaining - 1) lastNotifId res

awaitNotification ::
awaitNotificationClient ::
(HasCallStack, MakesValue user, MakesValue client, MakesValue lastNotifId) =>
user ->
client ->
Maybe lastNotifId ->
(Value -> App Bool) ->
App Value
awaitNotification user client lastNotifId selector = do
awaitNotificationClient user client lastNotifId selector = do
since0 <- mapM objId lastNotifId
head <$> awaitNotifications user (Just client) since0 1 selector

awaitNotification ::
(HasCallStack, MakesValue user, MakesValue lastNotifId) =>
user ->
Maybe lastNotifId ->
(Value -> App Bool) ->
App Value
awaitNotification user lastNotifId selector = do
since0 <- mapM objId lastNotifId
head <$> awaitNotifications user client since0 1 selector
head <$> awaitNotifications user (Nothing :: Maybe ()) since0 1 selector

isDeleteUserNotif :: (MakesValue a) => a -> App Bool
isDeleteUserNotif n =
Expand Down Expand Up @@ -219,9 +229,9 @@ assertLeaveNotification ::
App ()
assertLeaveNotification fromUser conv user client leaver =
void
$ awaitNotification
$ awaitNotificationClient
user
client
(Just client)
noValue
( allPreds
[ isConvLeaveNotif,
Expand Down
10 changes: 10 additions & 0 deletions integration/test/SetupHelpers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,13 @@ uploadDownloadProfilePicture :: (HasCallStack, MakesValue usr) => usr -> App (St
uploadDownloadProfilePicture usr = do
(dom, key, _payload) <- uploadProfilePicture usr
downloadProfilePicture usr dom key

mkContextUserIds :: (MakesValue user) => [(String, user)] -> App String
mkContextUserIds =
fmap (intercalate "\n")
. traverse
( \(name, user) -> do
uid <- objQidObject user %. "id" & asString
domain <- objDomain user
pure $ name <> ": " <> uid <> "@" <> domain
)
16 changes: 7 additions & 9 deletions integration/test/Test/Conversation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -492,26 +492,25 @@ testSynchroniseUserRemovalNotification domain = do
otherDomain <- make domain
[alice, bob] <- createAndConnectUsers [ownDomain, otherDomain]
runCodensity (acquireResources 1 resourcePool) $ \[dynBackend] -> do
(conv, charlie, client) <-
(conv, charlie) <-
runCodensity (startDynamicBackend dynBackend mempty) $ \_ -> do
charlie <- randomUser dynBackend.berDomain def
client <- objId $ bindResponse (addClient charlie def) $ getJSON 201
mapM_ (connectTwoUsers charlie) [alice, bob]
conv <-
postConversation alice (defProteus {qualifiedUsers = [bob, charlie]})
>>= getJSON 201
pure (conv, charlie, client)
pure (conv, charlie)

let newConvName = "The new conversation name"
bindResponse (changeConversationName alice conv newConvName) $ \resp ->
resp.status `shouldMatchInt` 200
bindResponse (removeMember alice conv charlie) $ \resp ->
resp.status `shouldMatchInt` 200
runCodensity (startDynamicBackend dynBackend mempty) $ \_ -> do
nameNotif <- awaitNotification charlie client noValue isConvNameChangeNotif
nameNotif <- awaitNotification charlie noValue isConvNameChangeNotif
nameNotif %. "payload.0.qualified_conversation" `shouldMatch` objQidObject conv
nameNotif %. "payload.0.data.name" `shouldMatch` newConvName
leaveNotif <- awaitNotification charlie client noValue isConvLeaveNotif
leaveNotif <- awaitNotification charlie noValue isConvLeaveNotif
leaveNotif %. "payload.0.qualified_conversation" `shouldMatch` objQidObject conv

testConvRenaming :: (HasCallStack) => App ()
Expand Down Expand Up @@ -648,19 +647,18 @@ testDeleteTeamConversationWithUnreachableRemoteMembers = do
notif %. "payload.0.qualified_from" `shouldMatch` objQidObject alice

runCodensity (acquireResources 1 resourcePool) $ \[dynBackend] -> do
(bob, bobClient) <- runCodensity (startDynamicBackend dynBackend mempty) $ \_ -> do
bob <- runCodensity (startDynamicBackend dynBackend mempty) $ \_ -> do
bob <- randomUser dynBackend.berDomain def
bobClient <- objId $ bindResponse (addClient bob def) $ getJSON 201
connectTwoUsers alice bob
mem <- bob %. "qualified_id"
void $ addMembers alice conv def {users = [mem]} >>= getBody 200
pure (bob, bobClient)
pure bob
withWebSocket alice $ \ws -> do
void $ deleteTeamConversation team conv alice >>= getBody 200
notif <- awaitMatch isConvDeleteNotif ws
assertNotification notif
void $ runCodensity (startDynamicBackend dynBackend mempty) $ \_ -> do
notif <- awaitNotification bob bobClient noValue isConvDeleteNotif
notif <- awaitNotification bob noValue isConvDeleteNotif
assertNotification notif

testDeleteTeamMemberLimitedEventFanout :: (HasCallStack) => App ()
Expand Down
Loading
Loading