Skip to content

Commit

Permalink
Add active user tracking and optional bridge blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
tadzik committed Sep 7, 2021
1 parent 66aa68f commit c308394
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 2 deletions.
33 changes: 33 additions & 0 deletions alembic/versions/97404229e75e_add_account_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add account activity
Revision ID: 97404229e75e
Revises: bfc0a39bfe02
Create Date: 2021-09-07 10:38:41.655301
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '97404229e75e'
down_revision = 'bfc0a39bfe02'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('user_activity',
sa.Column('puppet_id', sa.BigInteger(), nullable=False),
sa.Column('first_activity_ts', sa.Integer(), nullable=False),
sa.Column('last_activity_ts', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('puppet_id')
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('user_activity')
# ### end Alembic commands ###
39 changes: 39 additions & 0 deletions mautrix_telegram/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Dict, Any
import asyncio

from telethon import __version__ as __telethon_version__
from alchemysession import AlchemySessionContainer

from mautrix.types import UserID, RoomID
from mautrix.bridge import Bridge
from mautrix.util.db import Base
from mautrix.util.opt_prometheus import Gauge

from .web.provisioning import ProvisioningAPI
from .web.public import PublicBridgeWebsite
Expand All @@ -29,6 +31,7 @@
from .config import Config
from .context import Context
from .db import init as init_db
from .db.user_activity import UserActivity
from .formatter import init as init_formatter
from .matrix import MatrixHandler
from .portal import Portal, init as init_portal
Expand All @@ -41,6 +44,9 @@
except ImportError:
prometheus = None

ACTIVE_USER_METRICS_INTERVAL_S = 5
METRIC_ACTIVE_PUPPETS = Gauge('bridge_active_puppets_total', 'Number of active Telegram users bridged into Matrix')
METRIC_BLOCKING = Gauge('bridge_blocked', 'Is the bridge currently blocking messages')

class TelegramBridge(Bridge):
module = "mautrix_telegram"
Expand All @@ -58,6 +64,9 @@ class TelegramBridge(Bridge):
session_container: AlchemySessionContainer
bot: Bot

periodic_active_metrics_task: asyncio.Task
is_blocked: bool = False

def prepare_db(self) -> None:
super().prepare_db()
init_db(self.db)
Expand Down Expand Up @@ -92,6 +101,7 @@ def prepare_bridge(self) -> None:
self.add_startup_actions(self.bot.start())
if self.config["bridge.resend_bridge_info"]:
self.add_startup_actions(self.resend_bridge_info())
self.add_startup_actions(self._loop_active_puppet_metric())

async def resend_bridge_info(self) -> None:
self.config["bridge.resend_bridge_info"] = False
Expand Down Expand Up @@ -127,6 +137,35 @@ def is_bridge_ghost(self, user_id: UserID) -> bool:
async def count_logged_in_users(self) -> int:
return len([user for user in User.by_tgid.values() if user.tgid])

async def _update_active_puppet_metric(self) -> None:
active_users = UserActivity.get_active_count(
self.config['bridge.limits.puppet_inactivity_days'],
self.config['bridge.limits.min_puppet_activity_days'],
)

block_on_limit_reached = self.config['bridge.limits.block_on_limit_reached']
max_puppet_limit = self.config['bridge.limits.max_puppet_limit']
if block_on_limit_reached is not None and max_puppet_limit is not None:
self.is_blocked = max_puppet_limit < active_users
METRIC_BLOCKING.set(int(self.is_blocked))
self.log.debug(f"Current active puppet count is {active_users}")
METRIC_ACTIVE_PUPPETS.set(active_users)

async def _loop_active_puppet_metric(self) -> None:
while True:
try:
await asyncio.sleep(ACTIVE_USER_METRICS_INTERVAL_S)
except asyncio.CancelledError:
return
self.log.info("Executing periodic active puppet metric check")
try:
await self._update_active_puppet_metric()
except asyncio.CancelledError:
return
except Exception as e:
self.log.exception(f"Error while checking: {e}")


async def manhole_global_namespace(self, user_id: UserID) -> Dict[str, Any]:
return {
**await super().manhole_global_namespace(user_id),
Expand Down
5 changes: 5 additions & 0 deletions mautrix_telegram/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ def do_update(self, helper: ConfigUpdateHelper) -> None:

copy("bridge.command_prefix")

copy("bridge.limits.max_puppet_limit")
copy("bridge.limits.min_puppet_activity_days")
copy("bridge.limits.puppet_inactivity_days")
copy("bridge.limits.block_on_limit_reached")

migrate_permissions = ("bridge.permissions" not in self
or "bridge.whitelist" in self
or "bridge.admins" in self)
Expand Down
3 changes: 2 additions & 1 deletion mautrix_telegram/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from .puppet import Puppet
from .telegram_file import TelegramFile
from .user import User, UserPortal, Contact
from .user_activity import UserActivity


def init(db_engine: Engine) -> None:
for table in (Portal, Message, User, Contact, UserPortal, Puppet, TelegramFile, UserProfile,
RoomState, BotChat):
RoomState, BotChat, UserActivity):
table.bind(db_engine)
80 changes: 80 additions & 0 deletions mautrix_telegram/db/user_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2021 Tadeusz Sośnierz
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, Iterable

from sqlalchemy import Column, Integer, BigInteger

from mautrix.util.db import Base
from mautrix.util.logging import TraceLogger

from ..types import TelegramID

import logging
import datetime
import time

UPPER_ACTIVITY_LIMIT_MS = 60 * 1000 * 5 # 5 minutes
ONE_DAY_MS = 24 * 60 * 60 * 1000


class UserActivity(Base):
__tablename__ = "user_activity"

log: TraceLogger = logging.getLogger("mau.user_activity")

puppet_id: TelegramID = Column(BigInteger, primary_key=True)
first_activity_ts: Optional[int] = Column(Integer)
last_activity_ts: Optional[int] = Column(Integer)

def update(self, activity_ts: int) -> None:
if self.last_activity_ts > activity_ts:
return

self.last_activity_ts = activity_ts

self.edit(last_activity_ts=self.last_activity_ts)

@classmethod
def update_for_puppet(cls, puppet: 'Puppet', activity_dt: datetime) -> None:
activity_ts = int(activity_dt.timestamp() * 1000)

if (time.time() * 1000) - activity_ts > UPPER_ACTIVITY_LIMIT_MS:
return

cls.log.debug(f"Updating activity time for {puppet.id} to {activity_ts}")
obj = cls._select_one_or_none(cls.c.puppet_id == puppet.id)
if obj:
obj.update(activity_ts)
else:
obj = UserActivity(
puppet_id=puppet.id,
first_activity_ts=activity_ts,
last_activity_ts=activity_ts,
)
obj.insert()

@classmethod
def get_active_count(cls, min_activity_days: int, max_activity_days: Optional[int]) -> int:
current_ms = time.time() / 1000
active_count = 0
for user in cls._select_all():
activity_days = (user.last_activity_ts - user.first_activity_ts / 1000) / ONE_DAY_MS
# If maxActivityTime is not set, they are always active
is_active = max_activity_days is None or (current_ms - user.last_activity_ts) <= (max_activity_days * ONE_DAY_MS)
if is_active and activity_days > min_activity_days:
active_count += 1
return active_count
12 changes: 12 additions & 0 deletions mautrix_telegram/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,18 @@ bridge:
- myusername
- 12345678

# Limit usage of the bridge
limits:
# The maximum number of bridge puppets that can be "active" before the limit is reached
max_puppet_limit: 0
# The minimum amount of days a puppet must be active for before they are considered "active".
min_puppet_activity_days: 0
# The number of days after a puppets last activity where they are considered inactive again.
puppet_inactivity_days: 30
# Should the bridge block traffic when a limit has been reached
block_on_limit_reached: false


# Telegram config
telegram:
# Get your own API keys at https://my.telegram.org/apps
Expand Down
5 changes: 5 additions & 0 deletions mautrix_telegram/portal/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ async def _send_bridge_error(self, msg: str) -> None:

async def handle_matrix_message(self, sender: 'u.User', content: MessageEventContent,
event_id: EventID) -> None:
if self.bridge.is_blocked:
self.log.debug(f"Bridge is blocked, dropping matrix event {event_id}")
await self._send_bridge_error(f"\u26a0 The bridge is blocked due to reaching its user limit")
return

try:
await self._handle_matrix_message(sender, content, event_id)
except RPCError as e:
Expand Down
7 changes: 6 additions & 1 deletion mautrix_telegram/portal/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from mautrix.bridge import NotificationDisabler

from ..types import TelegramID
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile, UserActivity
from ..util import sane_mimetypes
from ..context import Context
from ..tgclient import TelegramClient
Expand Down Expand Up @@ -613,6 +613,10 @@ async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int]

async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet,
evt: Message) -> None:
if self.bridge.is_blocked:
self.log.debug(f"Bridge is blocked, dropping telegram message {evt.id}")
return

if not self.mxid:
self.log.trace("Got telegram message %d, but no room exists, creating...", evt.id)
await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False)
Expand Down Expand Up @@ -704,6 +708,7 @@ async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet
await intent.redact(self.mxid, event_id)
return

UserActivity.update_for_puppet(sender, evt.date)
self.log.debug("Handled telegram message %d -> %s", evt.id, event_id)
try:
DBMessage(tgid=TelegramID(evt.id), mx_room=self.mxid, mxid=event_id,
Expand Down

0 comments on commit c308394

Please sign in to comment.