diff --git a/mautrix_telegram/__main__.py b/mautrix_telegram/__main__.py index 5913c18a..6ce33bfd 100644 --- a/mautrix_telegram/__main__.py +++ b/mautrix_telegram/__main__.py @@ -61,13 +61,14 @@ class TelegramBridge(Bridge): matrix_class = MatrixHandler config: Config + context: Context session_container: AlchemySessionContainer bot: Bot periodic_active_metrics_task: asyncio.Task is_blocked: bool = False - periodic_sync_task: asyncio.Task + periodic_sync_task: asyncio.Task = None def prepare_db(self) -> None: super().prepare_db() @@ -76,29 +77,28 @@ def prepare_db(self) -> None: engine=self.db, table_base=Base, session=False, table_prefix="telethon_", manage_tables=False) - def _prepare_website(self, context: Context) -> None: + def _prepare_website(self) -> None: if self.config["appservice.public.enabled"]: public_website = PublicBridgeWebsite(self.loop) self.az.app.add_subapp(self.config["appservice.public.prefix"], public_website.app) - context.public_website = public_website + self.context.public_website = public_website if self.config["appservice.provisioning.enabled"]: - provisioning_api = ProvisioningAPI(context) + provisioning_api = ProvisioningAPI(self.context) self.az.app.add_subapp(self.config["appservice.provisioning.prefix"], provisioning_api.app) - context.provisioning_api = provisioning_api + self.context.provisioning_api = provisioning_api def prepare_bridge(self) -> None: self.bot = init_bot(self.config) - context = Context(self.az, self.config, self.loop, self.session_container, self, self.bot) - self._prepare_website(context) - self.matrix = context.mx = MatrixHandler(context) + self.context = Context(self.az, self.config, self.loop, self.session_container, self, self.bot) + self._prepare_website() + self.matrix = self.context.mx = MatrixHandler(self.context) - init_abstract_user(context) - init_formatter(context) - init_portal(context) - self.add_startup_actions(init_puppet(context)) - self.add_startup_actions(init_user(context)) + init_abstract_user(self.context) + init_formatter(self.context) + init_portal(self.context) + self.add_startup_actions(init_puppet(self.context)) if self.bot: self.add_startup_actions(self.bot.start()) @@ -109,6 +109,22 @@ def prepare_bridge(self) -> None: if self.config['bridge.limits.enable_activity_tracking'] is not False: self.periodic_sync_task = self.loop.create_task(self._loop_active_puppet_metric()) + async def start(self) -> None: + await super().start() + + semaphore = None + concurrency = self.config['telegram.connection.concurrent_connections_startup'] + if concurrency: + semaphore = asyncio.Semaphore(concurrency) + await semaphore.acquire() + + async def sem_task(task): + if not semaphore: + return await task + async with semaphore: + return await task + + await asyncio.gather(*(sem_task(task) for task in init_user(self.context))) async def resend_bridge_info(self) -> None: self.config["bridge.resend_bridge_info"] = False diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index c4b6d820..7ad723e1 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -212,7 +212,6 @@ def do_update(self, helper: ConfigUpdateHelper) -> None: copy("telegram.connection.retry_delay") copy("telegram.connection.flood_sleep_threshold") copy("telegram.connection.request_retries") - copy("telegram.connection.block_startup") copy("telegram.connection.concurrent_connections_startup") copy("telegram.device_info.device_model") diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index 0241da79..e6aecf80 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -505,10 +505,6 @@ telegram: # is not recommended, since some requests can always trigger a call fail (such as searching # for messages). request_retries: 5 - # Should the bridge block startup until all telegram connections have been made for all puppets. - # This should be disable for bridges with a large amount of puppets to prevent an extended startup - # period. Defaults to true - block_startup: true # How many concurrent connections should be handled on startup. Set to 0 to allow unlimited connections # Defualts to 0 concurrent_connections_startup: 0 diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index 836a9ed8..c3d22e8d 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -13,7 +13,6 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from asyncio.futures import Future from typing import (Awaitable, Dict, List, Iterable, NamedTuple, Optional, Tuple, Any, cast, TYPE_CHECKING) from datetime import datetime, timezone @@ -684,27 +683,6 @@ def init(context: 'Context') -> Future: global config config = context.config User.bridge = context.bridge - concurrency = config.get("telegram.connection.concurrent_connections_startup", 0) - semaphore = None - block_startup = config.get("telegram.connection.block_startup", True) - if concurrency: - semaphore = asyncio.Semaphore(concurrency) - - async def sem_task(task, index): - async with semaphore: - print(f"sem_task: ${index}") - return await task - - tasks = (User.from_db(db_user).try_ensure_started() + return (User.from_db(db_user).try_ensure_started() for db_user in DBUser.all_with_tgid()) - future = None - if semaphore: - future = asyncio.gather(*(sem_task(tasks[i], i) for i in range(0, len(tasks)))) - else: - future = asyncio.gather(*tasks) - - if block_startup: - return future - else: - asyncio.create_task(future)