Skip to content

Commit

Permalink
Merge branch 'automatic-backfill'
Browse files Browse the repository at this point in the history
Fixes #476
Fixes #477
  • Loading branch information
tulir committed Jul 29, 2020
2 parents 993354b + 9848f8b commit be3b135
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 91 deletions.
6 changes: 3 additions & 3 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
* [ ] Buttons
* [x] Message deletions
* [x] Message edits
* [ ] Message history
* [x] Message history
* [x] Manually (`!tg backfill`)
* [ ] Automatically when creating portal
* [ ] Automatically for missed messages
* [x] Automatically when creating portal
* [x] Automatically for missed messages
* [x] Avatars
* [x] Presence
* [x] Typing notifications
Expand Down
4 changes: 3 additions & 1 deletion mautrix_telegram/abstract_user.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -426,6 +426,8 @@ async def update_message(self, original_update: UpdateMessage) -> None:
self.log.debug(f"Ignoring relaybot-sent message %s to %s", update.id, portal.tgid_log)
return

await portal.backfill_lock.wait(update.id)

if isinstance(update, MessageService):
if isinstance(update.action, MessageActionChannelMigrateFrom):
self.log.trace(f"Ignoring action %s to %s by %d", update.action, portal.tgid_log,
Expand Down
22 changes: 15 additions & 7 deletions mautrix_telegram/commands/telegram/misc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -185,8 +185,10 @@ async def sync(evt: CommandEvent) -> EventID:
sync_only = None

if not sync_only or sync_only == "chats":
await evt.sender.sync_dialogs(synchronous_create=True)
await evt.reply("Synchronizing chats...")
await evt.sender.sync_dialogs()
if not sync_only or sync_only == "contacts":
await evt.reply("Synchronizing contacts...")
await evt.sender.sync_contacts()
if not sync_only or sync_only == "me":
await evt.sender.update_info()
Expand Down Expand Up @@ -311,18 +313,20 @@ async def vote(evt: CommandEvent) -> EventID:


@command_handler(help_section=SECTION_MISC, help_args="<_emoji_>",
help_text="Roll a dice (\U0001F3B2) or throw a dart (\U0001F3AF) "
"on the Telegram servers.")
help_text="Roll a dice (\U0001F3B2), kick a football (\u26BD\uFE0F) or throw a "
"dart (\U0001F3AF) or basketball (\U0001F3C0) on the Telegram servers.")
async def random(evt: CommandEvent) -> EventID:
if not evt.is_portal:
return await evt.reply("You can only roll dice in portal rooms")
return await evt.reply("You can only randomize values in portal rooms")
portal = po.Portal.get_by_mxid(evt.room_id)
arg = evt.args[0] if len(evt.args) > 0 else "dice"
emoticon = {
"dart": "\U0001F3AF",
"dice": "\U0001F3B2",
"ball": "\U0001F3C0",
"basketball": "\U0001F3C0",
"football": "\u26BD",
"soccer": "\u26BD",
}.get(arg, arg)
try:
await evt.sender.client.send_media(await portal.get_input_entity(evt.sender),
Expand All @@ -331,15 +335,19 @@ async def random(evt: CommandEvent) -> EventID:
return await evt.reply("Invalid emoji for randomization")


@command_handler(help_section=SECTION_PORTAL_MANAGEMENT,
@command_handler(help_section=SECTION_PORTAL_MANAGEMENT, help_args="[_limit_]",
help_text="Backfill messages from Telegram history.")
async def backfill(evt: CommandEvent) -> None:
if not evt.is_portal:
await evt.reply("You can only use backfill in portal rooms")
return
try:
limit = int(evt.args[0])
except (ValueError, IndexError):
limit = -1
portal = po.Portal.get_by_mxid(evt.room_id)
try:
await portal.backfill(evt.sender)
await portal.backfill(evt.sender, limit=limit)
except TakeoutInitDelayError:
msg = ("Please accept the data export request from a mobile device, "
"then re-run the backfill command.")
Expand Down
14 changes: 12 additions & 2 deletions mautrix_telegram/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -89,7 +89,12 @@ def do_update(self, helper: ConfigUpdateHelper) -> None:
copy("bridge.sync_channel_members")
copy("bridge.skip_deleted_members")
copy("bridge.startup_sync")
copy("bridge.sync_dialog_limit")
if "bridge.sync_dialog_limit" in self:
base["bridge.sync_create_limit"] = self["bridge.sync_dialog_limit"]
base["bridge.sync_update_limit"] = self["bridge.sync_dialog_limit"]
else:
copy("bridge.sync_update_limit")
copy("bridge.sync_create_limit")
copy("bridge.sync_direct_chats")
copy("bridge.max_telegram_delete")
copy("bridge.sync_matrix_state")
Expand All @@ -113,6 +118,11 @@ def do_update(self, helper: ConfigUpdateHelper) -> None:
copy("bridge.delivery_receipts")
copy("bridge.delivery_error_reports")
copy("bridge.resend_bridge_info")
copy("bridge.backfill.invite_own_puppet")
copy("bridge.backfill.takeout_limit")
copy("bridge.backfill.initial_limit")
copy("bridge.backfill.missed_limit")
copy("bridge.backfill.disable_notifications")

copy("bridge.initial_power_level_overrides.group")
copy("bridge.initial_power_level_overrides.user")
Expand Down
34 changes: 31 additions & 3 deletions mautrix_telegram/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ bridge:
# Maximum number of members to sync per portal when starting up. Other members will be
# synced when they send messages. The maximum is 10000, after which the Telegram server
# will not send any more members.
# Defaults to no local limit (-> limited to 10000 by server)
max_initial_member_sync: -1
# -1 means no limit (which means it's limited to 10000 by the server)
max_initial_member_sync: 100
# Whether or not to sync the member list in channels.
# If no channel admins have logged into the bridge, the bridge won't be able to sync the member
# list regardless of this setting.
Expand All @@ -142,7 +142,10 @@ bridge:
startup_sync: true
# Number of most recently active dialogs to check when syncing chats.
# Set to 0 to remove limit.
sync_dialog_limit: 30
sync_update_limit: 0
# Number of most recently active dialogs to create portals for when syncing chats.
# Set to 0 to remove limit.
sync_create_limit: 30
# Whether or not to sync and create portals for direct chats at startup.
sync_direct_chats: false
# The maximum number of simultaneous Telegram deletions to handle.
Expand Down Expand Up @@ -232,6 +235,31 @@ bridge:
# This field will automatically be changed back to false after it,
# except if the config file is not writable.
resend_bridge_info: false
# Settings for backfilling messages from Telegram.
backfill:
# Whether or not the Telegram ghosts of logged in Matrix users should be
# invited to private chats when backfilling history from Telegram. This is
# usually needed to prevent rate limits and to allow timestamp massaging.
invite_own_puppet: true
# Maximum number of messages to backfill without using a takeout.
# The first time a takeout is used, the user has to manually approve it from a different
# device. If initial_limit or missed_limit are higher than this value, the bridge will ask
# the user to accept the takeout after logging in before syncing any chats.
takeout_limit: 100
# Maximum number of messages to backfill initially.
# Set to 0 to disable backfilling when creating portal, or -1 to disable the limit.
#
# N.B. Initial backfill will only start after member sync. Make sure your
# max_initial_member_sync is set to a low enough value so it doesn't take forever.
initial_limit: 0
# Maximum number of messages to backfill if messages were missed while the bridge was
# disconnected. Note that this only works for logged in users and only if the chat isn't
# older than sync_update_limit
# Set to 0 to disable backfilling missed messages.
missed_limit: 50
# If using double puppeting, should notifications be disabled
# while the initial backfill is in progress?
disable_notifications: false

# Overrides for base power levels.
initial_power_level_overrides:
Expand Down
9 changes: 6 additions & 3 deletions mautrix_telegram/portal/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from mautrix.types import (RoomID, RoomAlias, UserID, EventID, EventType, MessageEventContent,
PowerLevelStateEventContent, ContentURI)
from mautrix.util.simple_template import SimpleTemplate
from mautrix.util.simple_lock import SimpleLock
from mautrix.util.logging import TraceLogger

from ..types import TelegramID
Expand Down Expand Up @@ -93,7 +94,7 @@ class BasePortal(ABC):
avatar_url: Optional[ContentURI]
encrypted: bool
deleted: bool
backfilling: bool
backfill_lock: SimpleLock
backfill_leave: Optional[Set[IntentAPI]]
log: TraceLogger

Expand Down Expand Up @@ -127,7 +128,8 @@ def __init__(self, tgid: TelegramID, peer_type: str, tg_receiver: Optional[Teleg
self._main_intent = None
self.deleted = False
self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid)
self.backfilling = False
self.backfill_lock = SimpleLock("Waiting for backfilling to finish before handling %s",
log=self.log, loop=self.loop)
self.backfill_leave = None

self.dedup = PortalDedup(self)
Expand Down Expand Up @@ -531,7 +533,8 @@ def handle_matrix_power_levels(self, sender: 'u.User', new_levels: Dict[UserID,
pass

@abstractmethod
def backfill(self, source: 'AbstractUser') -> Awaitable[None]:
def backfill(self, source: 'AbstractUser', is_initial: bool = False,
limit: Optional[int] = None, last_id: Optional[int] = None) -> Awaitable[None]:
pass

@abstractmethod
Expand Down
74 changes: 42 additions & 32 deletions mautrix_telegram/portal/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,12 @@ async def _update_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat
if changed:
self.save()
await self.update_bridge_info()
puppet = p.Puppet.get_by_custom_mxid(user.mxid)
if puppet:
try:
await puppet.intent.ensure_joined(self.mxid)
except Exception:
self.log.exception("Failed to ensure %s is joined to portal", user.mxid)
if self.sync_matrix_state:
await self.main_intent.get_joined_members(self.mxid)

async def create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User] = None,
invites: InviteList = None, update_if_exists: bool = True,
synchronous: bool = False) -> Optional[str]:
invites: InviteList = None, update_if_exists: bool = True
) -> Optional[RoomID]:
if self.mxid:
if update_if_exists:
if not entity:
Expand All @@ -240,10 +234,7 @@ async def create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat,
self.log.exception(f"Failed to get entity through {user.tgid} for update")
return self.mxid
update = self.update_matrix_room(user, entity, self.peer_type == "user")
if synchronous:
await update
else:
asyncio.ensure_future(update, loop=self.loop)
self.loop.create_task(update)
await self.invite_to_matrix(invites or [])
return self.mxid
async with self._room_create_lock:
Expand Down Expand Up @@ -392,28 +383,40 @@ async def _create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat
if not config["bridge.federate_rooms"]:
creation_content["m.federate"] = False

room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset,
is_direct=direct, invitees=invites or [],
name=self.title, topic=self.about,
initial_state=initial_state,
creation_content=creation_content)
if not room_id:
raise Exception(f"Failed to create room")
with self.backfill_lock:
room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset,
is_direct=direct, invitees=invites or [],
name=self.title, topic=self.about,
initial_state=initial_state,
creation_content=creation_content)
if not room_id:
raise Exception(f"Failed to create room")

if self.encrypted and self.matrix.e2ee and direct:
try:
await self.az.intent.ensure_joined(room_id)
except Exception:
self.log.warning(f"Failed to add bridge bot to new private chat {room_id}")
if self.encrypted and self.matrix.e2ee and direct:
try:
await self.az.intent.ensure_joined(room_id)
except Exception:
self.log.warning(f"Failed to add bridge bot to new private chat {room_id}")

self.mxid = room_id
self.by_mxid[self.mxid] = self
self.save()
await self.az.state_store.set_power_levels(self.mxid, power_levels)
user.register_portal(self)
asyncio.ensure_future(self.update_matrix_room(user, entity, direct, puppet,
levels=power_levels, users=users,
participants=participants), loop=self.loop)
self.mxid = room_id
self.by_mxid[self.mxid] = self
self.save()
await self.az.state_store.set_power_levels(self.mxid, power_levels)
user.register_portal(self)

update_room = self.loop.create_task(self.update_matrix_room(
user, entity, direct, puppet,
levels=power_levels, users=users, participants=participants))

if config["bridge.backfill.initial_limit"] > 0:
self.log.debug("Initial backfill is enabled. Waiting for room members to sync "
"and then starting backfill")
await update_room

try:
await self.backfill(user, is_initial=True)
except Exception:
self.log.exception("Failed to backfill new portal")

return self.mxid

Expand Down Expand Up @@ -545,6 +548,13 @@ async def _sync_telegram_users(self, source: 'AbstractUser', users: List[User])
if user:
await self.invite_to_matrix(user.mxid)

puppet = p.Puppet.get_by_custom_mxid(user.mxid)
if puppet:
try:
await puppet.intent.ensure_joined(self.mxid)
except Exception:
self.log.exception("Failed to ensure %s is joined to portal", user.mxid)

# We can't trust the member list if any of the following cases is true:
# * There are close to 10 000 users, because Telegram might not be sending all members.
# * The member sync count is limited, because then we might ignore some members.
Expand Down
Loading

0 comments on commit be3b135

Please sign in to comment.