Skip to content

Commit

Permalink
WIP: Fix migration calls
Browse files Browse the repository at this point in the history
The index migrations need access to both ES indices. This cannot be
provided by a proxy which is configured to serve only for one index.indices

This commit needs cleanup. Committing it to save the result.
  • Loading branch information
supersven committed Dec 18, 2024
1 parent 77a7a3a commit 1627c92
Showing 1 changed file with 27 additions and 37 deletions.
64 changes: 27 additions & 37 deletions services/brig/test/integration/API/Search.hs
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,9 @@ testMigrationToNewIndex opts brig migrateIndexCommand = do

-- Run Migrations
logger <- Log.create Log.StdOut
liftIO $ do
createCommand logger opts (ES.IndexName oldESIndex)
migrateIndexCommand logger opts (ES.IndexName oldESIndex)
withESProxyOnly [oldESIndex, newESIndex] opts $ \bothIndexUrl -> do
let opts' = (optsWithIndex "old") & Opt.elasticsearchLens . Opt.urlLens .~ (ES.Server bothIndexUrl)
liftIO $ migrateIndexCommand logger opts' (ES.IndexName newESIndex)

-- Phase 3: Using old index for search, writing to both indices, migrations have run
(phase3NonTeamUser, phase3TeamUser) <- withSettingsOverrides (optsWithIndex "both") $ do
Expand All @@ -706,6 +706,7 @@ testMigrationToNewIndex opts brig migrateIndexCommand = do

-- Phase 4: Using only new index
withSettingsOverrides (optsWithIndex "new") $ do
refreshIndex brig
-- Searching should work for phase1 users
assertCanFindByName brig phase1TeamUser1 phase1TeamUser2
assertCanFindByName brig phase1TeamUser1 phase1NonTeamUser
Expand All @@ -718,42 +719,23 @@ testMigrationToNewIndex opts brig migrateIndexCommand = do
assertCanFindByName brig phase1TeamUser1 phase3NonTeamUser
assertCanFindByName brig phase1TeamUser1 phase3TeamUser

createCommand :: Log.Logger -> Opt.Opts -> ES.IndexName -> IO ()
createCommand logger opts oldIndexName =
let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens
esOldOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens) & (Opt.indexLens .~ oldIndexName)
esOldConnectionSettings :: ESConnectionSettings = toESConnectionSettings esOldOpts
esNewConnectionSettings = esOldConnectionSettings {esIndex = newIndexName}
replicas = 2
shards = 2
refreshInterval = 5
esSettings =
IndexOpts.localElasticSettings
& IndexOpts.esConnection .~ esNewConnectionSettings
& IndexOpts.esIndexReplicas .~ ES.ReplicaCount replicas
& IndexOpts.esIndexShardCount .~ shards
& IndexOpts.esIndexRefreshInterval .~ refreshInterval
in runCommand logger $ Create esSettings opts.galley

runReindexFromAnotherIndex :: Log.Logger -> Opt.Opts -> ES.IndexName -> IO ()
runReindexFromAnotherIndex logger opts oldIndexName =
let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens
esOldOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens) & (Opt.indexLens .~ oldIndexName)
runReindexFromAnotherIndex logger opts newIndexName =
let esOldOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens)
esOldConnectionSettings :: ESConnectionSettings = toESConnectionSettings esOldOpts
reindexSettings = ReindexFromAnotherIndexSettings esOldConnectionSettings newIndexName 5
in runCommand logger $ ReindexFromAnotherIndex reindexSettings
in do
runCommand logger $ ReindexFromAnotherIndex reindexSettings

runReindexFromDatabase ::
(ElasticSettings -> CassandraSettings -> Endpoint -> Command) ->
Log.Logger ->
Opt.Opts ->
ES.IndexName ->
IO ()
runReindexFromDatabase syncCommand logger opts oldIndexName =
let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens
esOldOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens) & (Opt.indexLens .~ oldIndexName)
esOldConnectionSettings :: ESConnectionSettings = toESConnectionSettings esOldOpts
esNewConnectionSettings = esOldConnectionSettings {esIndex = newIndexName}
runReindexFromDatabase syncCommand logger opts newIndexName =
let esNewOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens) & (Opt.indexLens .~ newIndexName)
esNewConnectionSettings :: ESConnectionSettings = toESConnectionSettings esNewOpts
replicas = 2
shards = 2
refreshInterval = 5
Expand Down Expand Up @@ -783,28 +765,36 @@ toESConnectionSettings opts = ESConnectionSettings {..}
esInsecureSkipVerifyTls = opts.insecureSkipVerifyTls
esCredentials = opts.credentials

-- TODO: Separate the index creation from proxying to reduce duplication withESProxyOnly
withESProxy :: (TestConstraints m, MonadUnliftIO m, HasCallStack) => Opt.Opts -> (Text -> Text -> m a) -> m a
withESProxy opts f = do
indexName <- randomHandle
createIndexWithMapping opts indexName oldMapping
mgr <- liftIO $ initHttpManagerWithTLSConfig opts.elasticsearch.insecureSkipVerifyTls opts.elasticsearch.caCert
(proxyPort, sock) <- liftIO Warp.openFreePort
bracket
(async $ liftIO $ Warp.runSettingsSocket Warp.defaultSettings sock $ indexProxyServer indexName opts mgr)
(async $ liftIO $ Warp.runSettingsSocket Warp.defaultSettings sock $ indexProxyServer [indexName] opts mgr)
cancel
(\_ -> f ("http://localhost:" <> Text.pack (show proxyPort)) indexName)

indexProxyServer :: Text -> Opt.Opts -> Manager -> Wai.Application
indexProxyServer idx opts mgr =
withESProxyOnly :: (TestConstraints m, MonadUnliftIO m, HasCallStack) => [Text] -> Opt.Opts -> (Text -> m a) -> m a
withESProxyOnly indexNames opts f = do
mgr <- liftIO $ initHttpManagerWithTLSConfig opts.elasticsearch.insecureSkipVerifyTls opts.elasticsearch.caCert
(proxyPort, sock) <- liftIO Warp.openFreePort
bracket
(async $ liftIO $ Warp.runSettingsSocket Warp.defaultSettings sock $ indexProxyServer indexNames opts mgr)
cancel
(\_ -> f ("http://localhost:" <> Text.pack (show proxyPort)))

indexProxyServer :: [Text] -> Opt.Opts -> Manager -> Wai.Application
indexProxyServer idxs opts mgr =
let toUri (ES.Server url) = either (error . show) id $ URI.parseURI URI.strictURIParserOptions (Text.encodeUtf8 url)
proxyURI = toUri (Opts.url (Opts.elasticsearch opts))
proxyToHost = URI.hostBS . URI.authorityHost . fromMaybe (error "No Host") . URI.uriAuthority $ proxyURI
proxyToPort = URI.portNumber . fromMaybe (URI.Port 9200) . URI.authorityPort . fromMaybe (error "No Host") . URI.uriAuthority $ proxyURI
proxyApp req =
pure $
if headMay (Wai.pathInfo req) == Just idx
then Wai.WPRProxyDestSecure (Wai.ProxyDest proxyToHost proxyToPort)
else Wai.WPRResponse (Wai.responseLBS HTTP.status400 [] $ "Refusing to proxy to path=" <> cs (Wai.rawPathInfo req))
proxyApp req | (headMay (Wai.pathInfo req)) `elem` [Just "_reindex", Just "_tasks"] = pure $ Wai.WPRProxyDestSecure (Wai.ProxyDest proxyToHost proxyToPort)
proxyApp req | (any (\idx -> (headMay (Wai.pathInfo req) == Just idx)) idxs) = pure $ Wai.WPRProxyDestSecure (Wai.ProxyDest proxyToHost proxyToPort)
proxyApp req = pure $ Wai.WPRResponse (Wai.responseLBS HTTP.status400 [] $ "Refusing to proxy to path=" <> cs (Wai.rawPathInfo req))
in waiProxyTo proxyApp Wai.defaultOnExc mgr

testWithBothIndices :: Opt.Opts -> Manager -> TestName -> WaiTest.Session a -> TestTree
Expand Down

0 comments on commit 1627c92

Please sign in to comment.