Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WPB-11810 Delete federation queues in CI #4374

13 changes: 9 additions & 4 deletions .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ export NIX_CONFIG='extra-experimental-features = nix-command'

[[ -d "$layout_dir" ]] || mkdir -p "$layout_dir"

if [[ ! -d "$env_dir" || ! -f "$layout_dir/nix-rebuild" || "$store_paths" != $(< "$layout_dir/nix-rebuild" ) ]]; then
if [[ ! -d "$env_dir" || ! -f "$layout_dir/nix-rebuild" || "$store_paths" != $(<"$layout_dir/nix-rebuild") ]]; then
bcmd=nix
if command -v nom &> /dev/null; then
if command -v nom &>/dev/null; then
if [[ "${USE_NOM}" != "0" ]]; then
bcmd=nom
fi
fi
echo "🔧 Building environment"
$bcmd build -f nix wireServer.devEnv -Lv --out-link ./.env
echo "$store_paths" > "$layout_dir/nix-rebuild"
echo "$store_paths" >"$layout_dir/nix-rebuild"
fi

PATH_add "./.env/bin"
Expand All @@ -49,8 +49,13 @@ export LANG=en_US.UTF-8
export RABBITMQ_USERNAME=guest
export RABBITMQ_PASSWORD=alpaca-grapefruit

# Redis
export RABBITMQ_USERNAME_V0=guest
export RABBITMQ_PASSWORD_V0=alpaca-grapefruit

export RABBITMQ_USERNAME_V1=guest
export RABBITMQ_PASSWORD_V1=alpaca-grapefruit

# Redis
export REDIS_PASSWORD=very-secure-redis-cluster-password
export REDIS_ADDITIONAL_WRITE_PASSWORD=very-secure-redis-master-password

Expand Down
1 change: 1 addition & 0 deletions changelog.d/5-internal/WPB-11810
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Delete federation V0 and V1 queues after integration tests
14 changes: 14 additions & 0 deletions charts/integration/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ data:
rabbitmq:
host: rabbitmq
adminPort: 15671
tls: true
vHost: /

rabbitmq-v0:
host: rabbitmq.wire-federation-v0.svc.cluster.local
adminPort: 15672
tls: false
vHost: /

rabbitmq-v1:
host: rabbitmq.wire-federation-v1.svc.cluster.local
adminPort: 15672
tls: false
vHost: /

backendTwo:

Expand Down
14 changes: 14 additions & 0 deletions charts/integration/templates/integration-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ spec:
secretKeyRef:
name: brig
key: rabbitmqPassword
- name: RABBITMQ_USERNAME_V0
value: "wire-server"
- name: RABBITMQ_PASSWORD_V0
valueFrom:
secretKeyRef:
name: rabbitmq-v0
key: rabbitmq-password
- name: RABBITMQ_USERNAME_V1
value: "wire-server"
- name: RABBITMQ_PASSWORD_V1
valueFrom:
secretKeyRef:
name: rabbitmq-v1
key: rabbitmq-password
{{- if hasKey .Values.secrets "redisUsername" }}
- name: REDIS_USERNAME
valueFrom:
Expand Down
6 changes: 6 additions & 0 deletions hack/bin/integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ summary() {
done
}

# Copy the secrets from the wire-federation-v0 namespace to the current namespace to be able to delete RabbitMQ queues that are created by the integration tests to avoid overflows
kubectl -n "$NAMESPACE" delete --force secret rabbitmq-v0 || true
kubectl -n wire-federation-v0 get secrets rabbitmq -ojson | jq 'del(.metadata.namespace) | del(.metadata.resourceVersion) | del(.metadata.uid) | .metadata.name="rabbitmq-v0"' | kubectl -n "$NAMESPACE" apply -f -
kubectl -n "$NAMESPACE" delete --force secret rabbitmq-v1 || true
kubectl -n wire-federation-v1 get secrets rabbitmq -ojson | jq 'del(.metadata.namespace) | del(.metadata.resourceVersion) | del(.metadata.uid) | .metadata.name="rabbitmq-v1"' | kubectl -n "$NAMESPACE" apply -f -

# Run tests in parallel using GNU parallel (see https://www.gnu.org/software/parallel/)
# The below commands are a little convoluted, but we wish to:
# - run integration tests. If they fail, keep track of this, but still go and get logs, so we see what failed
Expand Down
5 changes: 4 additions & 1 deletion integration/test/Test/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,10 @@ killConnection backend = do
port = 0,
adminPort = fromIntegral rc.adminPort,
vHost = Text.pack backend.berVHost,
tls = Just $ RabbitMqTlsOpts Nothing True
tls =
if rc.tls
then Just $ RabbitMqTlsOpts Nothing True
else Nothing
}
servantClient <- liftIO $ mkRabbitMqAdminClientEnv opts
name <- do
Expand Down
2 changes: 2 additions & 0 deletions integration/test/Testlib/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ mkGlobalEnv cfgFile = do
gServicesCwdBase = devEnvProjectRoot <&> (</> "services"),
gBackendResourcePool = resourcePool,
gRabbitMQConfig = intConfig.rabbitmq,
gRabbitMQConfigV0 = intConfig.rabbitmqV0,
gRabbitMQConfigV1 = intConfig.rabbitmqV1,
gTempDir = tempDir,
gTimeOutSeconds = timeOutSeconds
}
Expand Down
9 changes: 4 additions & 5 deletions integration/test/Testlib/ResourcePool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import Data.Functor
import Data.IORef
import qualified Data.Set as Set
import Data.String
import qualified Data.Text as T
import Data.Tuple
import Database.CQL.IO
import GHC.Stack (HasCallStack)
Expand Down Expand Up @@ -85,13 +84,13 @@ deleteAllRabbitMQQueues rc resource = do
{ host = rc.host,
port = 0,
adminPort = fromIntegral rc.adminPort,
vHost = T.pack resource.berVHost,
vHost = fromString resource.berVHost,
tls = Just $ RabbitMqTlsOpts Nothing True
}
client <- mkRabbitMqAdminClientEnv opts
queues <- listQueuesByVHost client (T.pack resource.berVHost) Nothing Nothing
for_ queues $ \queue ->
deleteQueue client (T.pack resource.berVHost) queue.name
queuesPage <- listQueuesByVHost client (fromString resource.berVHost) (fromString "") False 100 1
for_ queuesPage.items $ \queue ->
deleteQueue client (fromString resource.berVHost) queue.name

deleteAllDynamicBackendConfigs :: BackendResource -> Client ()
deleteAllDynamicBackendConfigs resource = write cql (defQueryParams LocalQuorum ())
Expand Down
46 changes: 46 additions & 0 deletions integration/test/Testlib/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import Data.Foldable
import Data.Function
import Data.Functor
import Data.List
import Data.Maybe (fromMaybe)
import Data.String (IsString (fromString))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
import Network.AMQP.Extended
import Network.RabbitMqAdmin
import RunAllTests
import System.Directory
import System.Environment
Expand Down Expand Up @@ -133,11 +139,51 @@ runTests tests mXMLOutput cfg = do
pure (TestSuiteReport [TestCaseReport qname TestSuccess tm])
writeChan output Nothing
wait displayThread
deleteFederationV0AndV1Queues genv
printReport report
mapM_ (saveXMLReport report) mXMLOutput
when (any (\testCase -> testCase.result /= TestSuccess) report.cases) $
exitFailure

deleteFederationV0AndV1Queues :: GlobalEnv -> IO ()
deleteFederationV0AndV1Queues env = do
let testDomains = env.gDomain1 : env.gDomain2 : env.gDynamicDomains
putStrLn "Attempting to delete federation V0 queues..."
(mV0User, mV0Pass) <- readCredsFromEnvWithSuffix "V0"
fromMaybe (putStrLn "No or incomplete credentials for fed V0 RabbitMQ") $
deleteFederationQueues testDomains env.gRabbitMQConfigV0 <$> mV0User <*> mV0Pass

putStrLn "Attempting to delete federation V1 queues..."
(mV1User, mV1Pass) <- readCredsFromEnvWithSuffix "V1"
fromMaybe (putStrLn "No or incomplete credentials for fed V1 RabbitMQ") $
deleteFederationQueues testDomains env.gRabbitMQConfigV1 <$> mV1User <*> mV1Pass
where
readCredsFromEnvWithSuffix :: String -> IO (Maybe Text, Maybe Text)
readCredsFromEnvWithSuffix suffix =
(,)
<$> (fmap fromString <$> lookupEnv ("RABBITMQ_USERNAME_" <> suffix))
<*> (fmap fromString <$> lookupEnv ("RABBITMQ_PASSWORD_" <> suffix))

deleteFederationQueues :: [String] -> RabbitMQConfig -> Text -> Text -> IO ()
deleteFederationQueues testDomains rc username password = do
let opts =
RabbitMqAdminOpts
{ host = rc.host,
port = 0,
adminPort = fromIntegral rc.adminPort,
vHost = fromString rc.vHost,
tls =
if rc.tls
then Just (RabbitMqTlsOpts Nothing True)
else Nothing
}
client <- mkRabbitMqAdminClientEnvWithCreds opts username password
for_ testDomains $ \domain -> do
page <- client.listQueuesByVHost (fromString rc.vHost) (fromString $ "^backend-notifications\\." <> domain <> "$") True 100 1
for_ page.items $ \queue -> do
putStrLn $ "Deleting queue " <> T.unpack queue.name
void $ deleteQueue client (fromString rc.vHost) queue.name

doListTests :: [(String, String, String, x)] -> IO ()
doListTests tests = for_ tests $ \(qname, _desc, _full, _) -> do
putStrLn qname
Expand Down
12 changes: 11 additions & 1 deletion integration/test/Testlib/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ instance FromJSON DynamicBackendConfig

data RabbitMQConfig = RabbitMQConfig
{ host :: String,
adminPort :: Word16
adminPort :: Word16,
tls :: Bool,
vHost :: String
}
deriving (Show)

Expand All @@ -100,6 +102,8 @@ instance FromJSON RabbitMQConfig where
RabbitMQConfig
<$> ob .: fromString "host"
<*> ob .: fromString "adminPort"
<*> ob .: fromString "tls"
<*> ob .: fromString "vHost"

-- | Initialised once per testsuite.
data GlobalEnv = GlobalEnv
Expand All @@ -115,6 +119,8 @@ data GlobalEnv = GlobalEnv
gServicesCwdBase :: Maybe FilePath,
gBackendResourcePool :: ResourcePool BackendResource,
gRabbitMQConfig :: RabbitMQConfig,
gRabbitMQConfigV0 :: RabbitMQConfig,
gRabbitMQConfigV1 :: RabbitMQConfig,
gTempDir :: FilePath,
gTimeOutSeconds :: Int
}
Expand All @@ -127,6 +133,8 @@ data IntegrationConfig = IntegrationConfig
integrationTestHostName :: String,
dynamicBackends :: Map String DynamicBackendConfig,
rabbitmq :: RabbitMQConfig,
rabbitmqV0 :: RabbitMQConfig,
rabbitmqV1 :: RabbitMQConfig,
cassandra :: CassandraConfig
}
deriving (Show, Generic)
Expand All @@ -142,6 +150,8 @@ instance FromJSON IntegrationConfig where
<*> o .: fromString "integrationTestHostName"
<*> o .: fromString "dynamicBackends"
<*> o .: fromString "rabbitmq"
<*> o .: fromString "rabbitmq-v0"
<*> o .: fromString "rabbitmq-v1"
<*> o .: fromString "cassandra"

data ServiceMap = ServiceMap
Expand Down
9 changes: 6 additions & 3 deletions libs/extended/src/Network/AMQP/Extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Network.AMQP.Extended
withConnection,
openConnectionWithRetries,
mkRabbitMqAdminClientEnv,
mkRabbitMqAdminClientEnvWithCreds,
mkRabbitMqChannelMVar,
demoteOpts,
RabbitMqTlsOpts (..),
Expand Down Expand Up @@ -91,9 +92,8 @@ instance FromJSON RabbitMqAdminOpts where
<*> parseTlsJson v
<*> v .: "adminPort"

mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv opts = do
(username, password) <- readCredsFromEnv
mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> Text -> Text -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds opts username password = do
mTlsSettings <- traverse (mkTLSSettings opts.host) opts.tls
let (protocol, managerSettings) = case mTlsSettings of
Nothing -> (Servant.Http, HTTP.defaultManagerSettings)
Expand All @@ -107,6 +107,9 @@ mkRabbitMqAdminClientEnv opts = do
(either throwM pure <=< flip runClientM clientEnv)
(toServant $ adminClient basicAuthData)

mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv opts = readCredsFromEnv >>= uncurry (mkRabbitMqAdminClientEnvWithCreds opts)

-- | When admin opts are needed use `AmqpEndpoint Identity`, otherwise use
-- `AmqpEndpoint NoAdmin`.
data AmqpEndpoint = AmqpEndpoint
Expand Down
30 changes: 23 additions & 7 deletions libs/extended/src/Network/RabbitMqAdmin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,36 @@ type VHost = Text

type QueueName = Text

data Page a = Page {items :: [a], page :: Int, pageCount :: Int}
deriving (Show, Eq, Generic)

instance (FromJSON a) => FromJSON (Page a) where
parseJSON =
genericParseJSON $
defaultOptions
{ fieldLabelModifier = camelTo2 '_'
}

instance (ToJSON a) => ToJSON (Page a) where
toJSON =
genericToJSON $
defaultOptions
{ fieldLabelModifier = camelTo2 '_'
}

-- | Upstream Docs:
-- https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.12.0/deps/rabbitmq_management/priv/www/api/index.html
data AdminAPI route = AdminAPI
{ -- | NOTE: This endpoint can be made paginated, but that complicates
-- consumer code a little. This might be needed for performance tuning
-- later, but perhaps not.
listQueuesByVHost ::
{ listQueuesByVHost ::
route
:- "api"
:> "queues"
:> Capture "vhost" VHost
:> QueryParam "name" Text
:> QueryParam "use_regex" Bool
:> Get '[JSON] [Queue],
:> QueryParam' '[Required, Strict] "name" Text
:> QueryParam' '[Required, Strict] "use_regex" Bool
:> QueryParam' '[Required, Strict] "page_size" Int
:> QueryParam' '[Required, Strict] "page" Int
:> Get '[JSON] (Page Queue),
deleteQueue ::
route
:- "api"
Expand Down
16 changes: 10 additions & 6 deletions services/background-worker/src/Wire/BackendNotificationPusher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,18 @@ getRemoteDomains adminClient = do
handlers =
skipAsyncExceptions
<> [logRetries (const $ pure True) logErrr]
recovering policy handlers $ const go
recovering policy handlers $ const $ go [] 1
where
go :: AppT IO [Domain]
go = do
go :: [Domain] -> Int -> AppT IO [Domain]
go domains pageNumber = do
vhost <- asks rabbitmqVHost
queues <- liftIO $ listQueuesByVHost adminClient vhost (Just "backend-notifications\\..*") (Just True)
let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queues
catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes
queuesPage <- liftIO $ listQueuesByVHost adminClient vhost "^backend-notifications\\..*" True 100 pageNumber
let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queuesPage.items
newDomains <- catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes
let domainsSoFar = newDomains <> domains
if queuesPage.page >= queuesPage.pageCount
then pure domainsSoFar
else go domainsSoFar (pageNumber + 1)
logInvalidDomain d e =
Log.warn $
Log.msg (Log.val "Found invalid domain in a backend notifications queue name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,18 @@ mockApi mockAdmin =
deleteConnection = mockDeleteConnection mockAdmin
}

mockListQueuesByVHost :: MockRabbitMqAdmin -> Text -> Maybe Text -> Maybe Bool -> Servant.Handler [Queue]
mockListQueuesByVHost MockRabbitMqAdmin {..} vhost _ _ = do
mockListQueuesByVHost :: MockRabbitMqAdmin -> Text -> Text -> Bool -> Int -> Int -> Servant.Handler (Page Queue)
mockListQueuesByVHost MockRabbitMqAdmin {..} vhost _ _ _ _ = do
atomically $ modifyTVar listQueuesVHostCalls (<> [vhost])
readTVarIO broken >>= \case
True -> throwError $ Servant.err500
False -> pure $ map (\n -> Queue n vhost) queues
False ->
pure
Page
{ items = map (\n -> Queue n vhost) queues,
pageCount = 1,
page = 1
}

mockListDeleteQueue :: MockRabbitMqAdmin -> Text -> Text -> Servant.Handler NoContent
mockListDeleteQueue _ _ _ = do
Expand Down
14 changes: 14 additions & 0 deletions services/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ dynamicBackends:
rabbitmq:
host: localhost
adminPort: 15671
tls: true
vHost: /

rabbitmq-v0:
host: localhost
adminPort: 15672
tls: false
vHost: federation-v0

rabbitmq-v1:
host: localhost
adminPort: 15672
tls: false
vHost: federation-v1

cassandra:
host: 127.0.0.1
Expand Down
Loading