From c30839460e076afa935c442031e3067101f1761a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tadeusz=20So=C5=9Bnierz?= Date: Tue, 7 Sep 2021 13:33:42 +0200 Subject: [PATCH] Add active user tracking and optional bridge blocking --- .../97404229e75e_add_account_activity.py | 33 ++++++++ mautrix_telegram/__main__.py | 39 +++++++++ mautrix_telegram/config.py | 5 ++ mautrix_telegram/db/__init__.py | 3 +- mautrix_telegram/db/user_activity.py | 80 +++++++++++++++++++ mautrix_telegram/example-config.yaml | 12 +++ mautrix_telegram/portal/matrix.py | 5 ++ mautrix_telegram/portal/telegram.py | 7 +- 8 files changed, 182 insertions(+), 2 deletions(-) create mode 100644 alembic/versions/97404229e75e_add_account_activity.py create mode 100644 mautrix_telegram/db/user_activity.py diff --git a/alembic/versions/97404229e75e_add_account_activity.py b/alembic/versions/97404229e75e_add_account_activity.py new file mode 100644 index 00000000..1e34fa02 --- /dev/null +++ b/alembic/versions/97404229e75e_add_account_activity.py @@ -0,0 +1,33 @@ +"""add account activity + +Revision ID: 97404229e75e +Revises: bfc0a39bfe02 +Create Date: 2021-09-07 10:38:41.655301 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '97404229e75e' +down_revision = 'bfc0a39bfe02' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('user_activity', + sa.Column('puppet_id', sa.BigInteger(), nullable=False), + sa.Column('first_activity_ts', sa.Integer(), nullable=False), + sa.Column('last_activity_ts', sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint('puppet_id') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('user_activity') + # ### end Alembic commands ### diff --git a/mautrix_telegram/__main__.py b/mautrix_telegram/__main__.py index 83ee0fb1..8aa85a06 100644 --- a/mautrix_telegram/__main__.py +++ b/mautrix_telegram/__main__.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import Dict, Any +import asyncio from telethon import __version__ as __telethon_version__ from alchemysession import AlchemySessionContainer @@ -21,6 +22,7 @@ from mautrix.types import UserID, RoomID from mautrix.bridge import Bridge from mautrix.util.db import Base +from mautrix.util.opt_prometheus import Gauge from .web.provisioning import ProvisioningAPI from .web.public import PublicBridgeWebsite @@ -29,6 +31,7 @@ from .config import Config from .context import Context from .db import init as init_db +from .db.user_activity import UserActivity from .formatter import init as init_formatter from .matrix import MatrixHandler from .portal import Portal, init as init_portal @@ -41,6 +44,9 @@ except ImportError: prometheus = None +ACTIVE_USER_METRICS_INTERVAL_S = 5 +METRIC_ACTIVE_PUPPETS = Gauge('bridge_active_puppets_total', 'Number of active Telegram users bridged into Matrix') +METRIC_BLOCKING = Gauge('bridge_blocked', 'Is the bridge currently blocking messages') class TelegramBridge(Bridge): module = "mautrix_telegram" @@ -58,6 +64,9 @@ class TelegramBridge(Bridge): session_container: AlchemySessionContainer bot: Bot + periodic_active_metrics_task: asyncio.Task + is_blocked: bool = False + def prepare_db(self) -> None: super().prepare_db() init_db(self.db) @@ -92,6 +101,7 @@ def prepare_bridge(self) -> None: self.add_startup_actions(self.bot.start()) if self.config["bridge.resend_bridge_info"]: self.add_startup_actions(self.resend_bridge_info()) + self.add_startup_actions(self._loop_active_puppet_metric()) async def resend_bridge_info(self) -> None: self.config["bridge.resend_bridge_info"] = False @@ -127,6 +137,35 @@ def is_bridge_ghost(self, user_id: UserID) -> bool: async def count_logged_in_users(self) -> int: return len([user for user in User.by_tgid.values() if user.tgid]) + async def _update_active_puppet_metric(self) -> None: + active_users = UserActivity.get_active_count( + self.config['bridge.limits.puppet_inactivity_days'], + self.config['bridge.limits.min_puppet_activity_days'], + ) + + block_on_limit_reached = self.config['bridge.limits.block_on_limit_reached'] + max_puppet_limit = self.config['bridge.limits.max_puppet_limit'] + if block_on_limit_reached is not None and max_puppet_limit is not None: + self.is_blocked = max_puppet_limit < active_users + METRIC_BLOCKING.set(int(self.is_blocked)) + self.log.debug(f"Current active puppet count is {active_users}") + METRIC_ACTIVE_PUPPETS.set(active_users) + + async def _loop_active_puppet_metric(self) -> None: + while True: + try: + await asyncio.sleep(ACTIVE_USER_METRICS_INTERVAL_S) + except asyncio.CancelledError: + return + self.log.info("Executing periodic active puppet metric check") + try: + await self._update_active_puppet_metric() + except asyncio.CancelledError: + return + except Exception as e: + self.log.exception(f"Error while checking: {e}") + + async def manhole_global_namespace(self, user_id: UserID) -> Dict[str, Any]: return { **await super().manhole_global_namespace(user_id), diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index 03888d6c..06cc18bb 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -169,6 +169,11 @@ def do_update(self, helper: ConfigUpdateHelper) -> None: copy("bridge.command_prefix") + copy("bridge.limits.max_puppet_limit") + copy("bridge.limits.min_puppet_activity_days") + copy("bridge.limits.puppet_inactivity_days") + copy("bridge.limits.block_on_limit_reached") + migrate_permissions = ("bridge.permissions" not in self or "bridge.whitelist" in self or "bridge.admins" in self) diff --git a/mautrix_telegram/db/__init__.py b/mautrix_telegram/db/__init__.py index 67c9779a..d4102e95 100644 --- a/mautrix_telegram/db/__init__.py +++ b/mautrix_telegram/db/__init__.py @@ -23,9 +23,10 @@ from .puppet import Puppet from .telegram_file import TelegramFile from .user import User, UserPortal, Contact +from .user_activity import UserActivity def init(db_engine: Engine) -> None: for table in (Portal, Message, User, Contact, UserPortal, Puppet, TelegramFile, UserProfile, - RoomState, BotChat): + RoomState, BotChat, UserActivity): table.bind(db_engine) diff --git a/mautrix_telegram/db/user_activity.py b/mautrix_telegram/db/user_activity.py new file mode 100644 index 00000000..01633294 --- /dev/null +++ b/mautrix_telegram/db/user_activity.py @@ -0,0 +1,80 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2021 Tadeusz SoĊ›nierz +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from typing import Optional, Iterable + +from sqlalchemy import Column, Integer, BigInteger + +from mautrix.util.db import Base +from mautrix.util.logging import TraceLogger + +from ..types import TelegramID + +import logging +import datetime +import time + +UPPER_ACTIVITY_LIMIT_MS = 60 * 1000 * 5 # 5 minutes +ONE_DAY_MS = 24 * 60 * 60 * 1000 + + +class UserActivity(Base): + __tablename__ = "user_activity" + + log: TraceLogger = logging.getLogger("mau.user_activity") + + puppet_id: TelegramID = Column(BigInteger, primary_key=True) + first_activity_ts: Optional[int] = Column(Integer) + last_activity_ts: Optional[int] = Column(Integer) + + def update(self, activity_ts: int) -> None: + if self.last_activity_ts > activity_ts: + return + + self.last_activity_ts = activity_ts + + self.edit(last_activity_ts=self.last_activity_ts) + + @classmethod + def update_for_puppet(cls, puppet: 'Puppet', activity_dt: datetime) -> None: + activity_ts = int(activity_dt.timestamp() * 1000) + + if (time.time() * 1000) - activity_ts > UPPER_ACTIVITY_LIMIT_MS: + return + + cls.log.debug(f"Updating activity time for {puppet.id} to {activity_ts}") + obj = cls._select_one_or_none(cls.c.puppet_id == puppet.id) + if obj: + obj.update(activity_ts) + else: + obj = UserActivity( + puppet_id=puppet.id, + first_activity_ts=activity_ts, + last_activity_ts=activity_ts, + ) + obj.insert() + + @classmethod + def get_active_count(cls, min_activity_days: int, max_activity_days: Optional[int]) -> int: + current_ms = time.time() / 1000 + active_count = 0 + for user in cls._select_all(): + activity_days = (user.last_activity_ts - user.first_activity_ts / 1000) / ONE_DAY_MS + # If maxActivityTime is not set, they are always active + is_active = max_activity_days is None or (current_ms - user.last_activity_ts) <= (max_activity_days * ONE_DAY_MS) + if is_active and activity_days > min_activity_days: + active_count += 1 + return active_count diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index 7cc9e981..9d198a7d 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -443,6 +443,18 @@ bridge: - myusername - 12345678 + # Limit usage of the bridge + limits: + # The maximum number of bridge puppets that can be "active" before the limit is reached + max_puppet_limit: 0 + # The minimum amount of days a puppet must be active for before they are considered "active". + min_puppet_activity_days: 0 + # The number of days after a puppets last activity where they are considered inactive again. + puppet_inactivity_days: 30 + # Should the bridge block traffic when a limit has been reached + block_on_limit_reached: false + + # Telegram config telegram: # Get your own API keys at https://my.telegram.org/apps diff --git a/mautrix_telegram/portal/matrix.py b/mautrix_telegram/portal/matrix.py index 9ec59dbc..24715130 100644 --- a/mautrix_telegram/portal/matrix.py +++ b/mautrix_telegram/portal/matrix.py @@ -369,6 +369,11 @@ async def _send_bridge_error(self, msg: str) -> None: async def handle_matrix_message(self, sender: 'u.User', content: MessageEventContent, event_id: EventID) -> None: + if self.bridge.is_blocked: + self.log.debug(f"Bridge is blocked, dropping matrix event {event_id}") + await self._send_bridge_error(f"\u26a0 The bridge is blocked due to reaching its user limit") + return + try: await self._handle_matrix_message(sender, content, event_id) except RPCError as e: diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index 63eb770f..7182d0aa 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -44,7 +44,7 @@ from mautrix.bridge import NotificationDisabler from ..types import TelegramID -from ..db import Message as DBMessage, TelegramFile as DBTelegramFile +from ..db import Message as DBMessage, TelegramFile as DBTelegramFile, UserActivity from ..util import sane_mimetypes from ..context import Context from ..tgclient import TelegramClient @@ -613,6 +613,10 @@ async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int] async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet, evt: Message) -> None: + if self.bridge.is_blocked: + self.log.debug(f"Bridge is blocked, dropping telegram message {evt.id}") + return + if not self.mxid: self.log.trace("Got telegram message %d, but no room exists, creating...", evt.id) await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False) @@ -704,6 +708,7 @@ async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet await intent.redact(self.mxid, event_id) return + UserActivity.update_for_puppet(sender, evt.date) self.log.debug("Handled telegram message %d -> %s", evt.id, event_id) try: DBMessage(tgid=TelegramID(evt.id), mx_room=self.mxid, mxid=event_id,