From bdf8e519ff660ae2db80c798adfbe6e46a5b71f8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Aug 2019 10:54:26 +0100 Subject: [PATCH 01/35] Newsfile --- changelog.d/5850.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5850.misc diff --git a/changelog.d/5850.misc b/changelog.d/5850.misc new file mode 100644 index 000000000000..c4f879ca2f47 --- /dev/null +++ b/changelog.d/5850.misc @@ -0,0 +1 @@ +Retry well-known lookups if we have recently seen a valid well-known record for the server. From 8629a6897301ba1466ec69b155a7b1b057a20983 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2019 13:15:26 +0100 Subject: [PATCH 02/35] Fixup changelog and remove debug logging --- changelog.d/5850.misc | 1 - 1 file changed, 1 deletion(-) delete mode 100644 changelog.d/5850.misc diff --git a/changelog.d/5850.misc b/changelog.d/5850.misc deleted file mode 100644 index c4f879ca2f47..000000000000 --- a/changelog.d/5850.misc +++ /dev/null @@ -1 +0,0 @@ -Retry well-known lookups if we have recently seen a valid well-known record for the server. From 904340344df97a321d9cba4ced12fdc5e968a90a Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 17:58:34 +0100 Subject: [PATCH 03/35] Stylish imports --- synapse/federation/sender/per_destination_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index fad980b89307..a0e708f25aa0 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -28,6 +28,7 @@ from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state +from synapse.logging.opentracing import extract_text_map, start_active_span_follows_from from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import UserPresenceState From 98afb1456125dde61597af1c820a9b5a7d2f8642 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 28 Jun 2019 17:15:31 +0100 Subject: [PATCH 04/35] Trace devices --- synapse/handlers/device.py | 38 +++++++++++++++++++++++++++++++++++++- synapse/storage/devices.py | 5 +++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5c1cf83c9dd1..4f1f14ae6b87 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -31,6 +31,7 @@ from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination +from synapse.util.tracerutils import TracerUtil, trace_defered_function from ._base import BaseHandler @@ -45,6 +46,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + @trace_defered_function @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -56,6 +58,7 @@ def get_devices_by_user(self, user_id): defer.Deferred: list[dict[str, X]]: info on each device """ + TracerUtil.set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -64,8 +67,10 @@ def get_devices_by_user(self, user_id): for device in devices: _update_device_from_client_ips(device, ips) + TracerUtil.log_kv(device_map) return devices + @trace_defered_function @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -85,9 +90,14 @@ def get_device(self, user_id, device_id): raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) + + TracerUtil.set_tag("device", device) + TracerUtil.set_tag("ips", ips) + return device @measure_func("device.get_user_ids_changed") + @trace_defered_function @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -97,6 +107,9 @@ def get_user_ids_changed(self, user_id, from_token): user_id (str) from_token (StreamToken) """ + + TracerUtil("user_id", user_id) + TracerUtil.set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -133,7 +146,7 @@ def get_user_ids_changed(self, user_id, from_token): if etype != EventTypes.Member: continue possibly_left.add(state_key) - continue + continue # Fetch the current state at the time. try: @@ -148,6 +161,9 @@ def get_user_ids_changed(self, user_id, from_token): # special-case for an empty prev state: include all members # in the changed list if not event_ids: + TracerUtil.log_kv( + {"event": "encountered empty previous state", "room_id": room_id} + ) for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: @@ -200,6 +216,10 @@ def get_user_ids_changed(self, user_id, from_token): possibly_joined = [] possibly_left = [] + TracerUtil.log_kv( + {"changed": list(possibly_joined), "left": list(possibly_left)} + ) + return {"changed": list(possibly_joined), "left": list(possibly_left)} @@ -267,6 +287,7 @@ def check_device_registered( raise errors.StoreError(500, "Couldn't generate a device ID.") + @trace_defered_function @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -284,6 +305,8 @@ def delete_device(self, user_id, device_id): except errors.StoreError as e: if e.code == 404: # no match + TracerUtil.set_tag("error", True) + TracerUtil.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -296,6 +319,7 @@ def delete_device(self, user_id, device_id): yield self.notify_device_update(user_id, [device_id]) + @trace_defered_function @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -331,6 +355,8 @@ def delete_devices(self, user_id, device_ids): except errors.StoreError as e: if e.code == 404: # no match + TracerUtil.set_tag("error", True) + TracerUtil.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -451,12 +477,15 @@ def __init__(self, hs, device_handler): iterable=True, ) + @trace_defered_function @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ + TracerUtil.set_tag("origin", origin) + TracerUtil.set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -477,6 +506,13 @@ def incoming_device_list_update(self, origin, edu_content): if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + TracerUtil.set_tag("error", True) + TracerUtil.log_kv( + { + "message": "Got an update from a user which " + + "doesn't share a room with the current user." + } + ) logger.warning( "Got device list update edu for %r/%r, but don't share a room", user_id, diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index e11881161da9..f686534cfed3 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from synapse.util.tracerutils import TracerUtil, trace_defered_function from six import iteritems @@ -321,6 +322,7 @@ def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + @trace_defered_function @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -352,6 +354,9 @@ def get_user_devices_from_cache(self, query_list): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) + TracerUtil.set_tag("in_cache", results) + TracerUtil.set_tag("not_in_cache", user_ids_not_in_cache) + return (user_ids_not_in_cache, results) @cachedInlineCallbacks(num_args=2, tree=True) From b0d2e26b7ab33d46f9bb358af46f3a03baf9f3ad Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 28 Jun 2019 17:16:07 +0100 Subject: [PATCH 05/35] Trace device messages. --- synapse/handlers/devicemessage.py | 4 ++++ synapse/rest/client/v2_alpha/sendtodevice.py | 8 +++++++ synapse/storage/deviceinbox.py | 23 ++++++++++++++++++++ 3 files changed, 35 insertions(+) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c7d56779b83f..d7ead2cf2736 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -28,6 +28,7 @@ ) from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string +from synapse.util.tracerutils import TracerUtil, trace_defered_function logger = logging.getLogger(__name__) @@ -84,6 +85,7 @@ def on_direct_to_device_edu(self, origin, content): "to_device_key", stream_id, users=local_messages.keys() ) + @trace_defered_function @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): @@ -124,6 +126,7 @@ def send_device_message(self, sender_user_id, message_type, messages): else None, } + TracerUtil.log_kv(local_messages) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -132,6 +135,7 @@ def send_device_message(self, sender_user_id, message_type, messages): "to_device_key", stream_id, users=local_messages.keys() ) + TracerUtil.log_kv(remote_messages) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 2613648d821b..a7782cbd7cb3 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -20,6 +20,11 @@ from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.transactions import HttpTransactionCache +from synapse.util.tracerutils import ( + TracerUtil, + trace_defered_function_using_operation_name, + tag_args, +) from ._base import client_patterns @@ -42,7 +47,10 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() + @trace_defered_function_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): + TracerUtil.set_tag("message_type", message_type) + TracerUtil.set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 79bb0ea46db5..4a45926c45d3 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -22,6 +22,7 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function logger = logging.getLogger(__name__) @@ -72,6 +73,7 @@ def get_new_messages_for_device_txn(txn): "get_new_messages_for_device", get_new_messages_for_device_txn ) + @trace_defered_function @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -87,11 +89,15 @@ def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), None ) + + TracerUtil.set_tag("last_deleted_stream_id", last_deleted_stream_id) + if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: + TracerUtil.log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -107,6 +113,10 @@ def delete_messages_for_device_txn(txn): "delete_messages_for_device", delete_messages_for_device_txn ) + TracerUtil.log_kv( + {"message": "deleted {} messages for device".format(count), "count": count} + ) + # Update the cache, ensuring that we only ever increase the value last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -117,6 +127,7 @@ def delete_messages_for_device_txn(txn): return count + @trace_defered_function def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -132,16 +143,23 @@ def get_new_device_msgs_for_remote( in the stream the messages got to. """ + TracerUtil.set_tag("destination", destination) + TracerUtil.set_tag("last_stream_id", last_stream_id) + TracerUtil.set_tag("current_stream_id", current_stream_id) + TracerUtil.set_tag("limit", limit) + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: + TracerUtil.log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) + @trace_function def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -156,6 +174,9 @@ def get_new_messages_for_remote_destination_txn(txn): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: + TracerUtil.log_kv( + {"message": "Set stream position to current position"} + ) stream_pos = current_stream_id return (messages, stream_pos) @@ -164,6 +185,7 @@ def get_new_messages_for_remote_destination_txn(txn): get_new_messages_for_remote_destination_txn, ) + @trace_defered_function def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -214,6 +236,7 @@ def __init__(self, db_conn, hs): expiry_ms=30 * 60 * 1000, ) + @trace_defered_function @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination From 0b3d08867ed6f31c4745689a56cc112ce364a5ae Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 2 Jul 2019 17:34:48 +0100 Subject: [PATCH 06/35] Update to new access pattern --- synapse/handlers/device.py | 46 ++++++++++---------- synapse/handlers/devicemessage.py | 8 ++-- synapse/rest/client/v2_alpha/sendtodevice.py | 12 ++--- synapse/storage/deviceinbox.py | 30 ++++++------- synapse/storage/devices.py | 8 ++-- 5 files changed, 50 insertions(+), 54 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 4f1f14ae6b87..fab78e65ee90 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -31,7 +31,7 @@ from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils from ._base import BaseHandler @@ -46,7 +46,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -58,7 +58,7 @@ def get_devices_by_user(self, user_id): defer.Deferred: list[dict[str, X]]: info on each device """ - TracerUtil.set_tag("user_id", user_id) + tracerutils.set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -67,10 +67,10 @@ def get_devices_by_user(self, user_id): for device in devices: _update_device_from_client_ips(device, ips) - TracerUtil.log_kv(device_map) + tracerutils.log_kv(device_map) return devices - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -91,13 +91,13 @@ def get_device(self, user_id, device_id): ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) - TracerUtil.set_tag("device", device) - TracerUtil.set_tag("ips", ips) + tracerutils.set_tag("device", device) + tracerutils.set_tag("ips", ips) return device @measure_func("device.get_user_ids_changed") - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -108,8 +108,8 @@ def get_user_ids_changed(self, user_id, from_token): from_token (StreamToken) """ - TracerUtil("user_id", user_id) - TracerUtil.set_tag("from_token", from_token) + tracerutils("user_id", user_id) + tracerutils.set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -161,7 +161,7 @@ def get_user_ids_changed(self, user_id, from_token): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - TracerUtil.log_kv( + tracerutils.log_kv( {"event": "encountered empty previous state", "room_id": room_id} ) for key, event_id in iteritems(current_state_ids): @@ -216,7 +216,7 @@ def get_user_ids_changed(self, user_id, from_token): possibly_joined = [] possibly_left = [] - TracerUtil.log_kv( + tracerutils.log_kv( {"changed": list(possibly_joined), "left": list(possibly_left)} ) @@ -287,7 +287,7 @@ def check_device_registered( raise errors.StoreError(500, "Couldn't generate a device ID.") - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -305,8 +305,8 @@ def delete_device(self, user_id, device_id): except errors.StoreError as e: if e.code == 404: # no match - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", "User doesn't have that device id.") + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -319,7 +319,7 @@ def delete_device(self, user_id, device_id): yield self.notify_device_update(user_id, [device_id]) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -355,8 +355,8 @@ def delete_devices(self, user_id, device_ids): except errors.StoreError as e: if e.code == 404: # no match - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", "User doesn't have that device id.") + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -477,15 +477,15 @@ def __init__(self, hs, device_handler): iterable=True, ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ - TracerUtil.set_tag("origin", origin) - TracerUtil.set_tag("edu_content", edu_content) + tracerutils.set_tag("origin", origin) + tracerutils.set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -506,8 +506,8 @@ def incoming_device_list_update(self, origin, edu_content): if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. - TracerUtil.set_tag("error", True) - TracerUtil.log_kv( + tracerutils.set_tag("error", True) + tracerutils.log_kv( { "message": "Got an update from a user which " + "doesn't share a room with the current user." diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index d7ead2cf2736..defced10f4c0 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -28,7 +28,7 @@ ) from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils logger = logging.getLogger(__name__) @@ -85,7 +85,7 @@ def on_direct_to_device_edu(self, origin, content): "to_device_key", stream_id, users=local_messages.keys() ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): @@ -126,7 +126,7 @@ def send_device_message(self, sender_user_id, message_type, messages): else None, } - TracerUtil.log_kv(local_messages) + tracerutils.log_kv(local_messages) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -135,7 +135,7 @@ def send_device_message(self, sender_user_id, message_type, messages): "to_device_key", stream_id, users=local_messages.keys() ) - TracerUtil.log_kv(remote_messages) + tracerutils.log_kv(remote_messages) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index a7782cbd7cb3..9df38a8f5ef6 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -20,11 +20,7 @@ from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.transactions import HttpTransactionCache -from synapse.util.tracerutils import ( - TracerUtil, - trace_defered_function_using_operation_name, - tag_args, -) +import synapse.util.tracerutils as tracerutils from ._base import client_patterns @@ -47,10 +43,10 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @trace_defered_function_using_operation_name("sendToDevice") + @tracerutils.trace_defered_function_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): - TracerUtil.set_tag("message_type", message_type) - TracerUtil.set_tag("txn_id", txn_id) + tracerutils.set_tag("message_type", message_type) + tracerutils.set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 4a45926c45d3..8687b5a046c2 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -22,7 +22,7 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function +import synapse.util.tracerutils as tracerutils logger = logging.getLogger(__name__) @@ -73,7 +73,7 @@ def get_new_messages_for_device_txn(txn): "get_new_messages_for_device", get_new_messages_for_device_txn ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -90,14 +90,14 @@ def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): (user_id, device_id), None ) - TracerUtil.set_tag("last_deleted_stream_id", last_deleted_stream_id) + tracerutils.set_tag("last_deleted_stream_id", last_deleted_stream_id) if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: - TracerUtil.log_kv({"message": "No changes in cache since last check"}) + tracerutils.log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -113,7 +113,7 @@ def delete_messages_for_device_txn(txn): "delete_messages_for_device", delete_messages_for_device_txn ) - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "deleted {} messages for device".format(count), "count": count} ) @@ -127,7 +127,7 @@ def delete_messages_for_device_txn(txn): return count - @trace_defered_function + @tracerutils.trace_defered_function def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -143,23 +143,23 @@ def get_new_device_msgs_for_remote( in the stream the messages got to. """ - TracerUtil.set_tag("destination", destination) - TracerUtil.set_tag("last_stream_id", last_stream_id) - TracerUtil.set_tag("current_stream_id", current_stream_id) - TracerUtil.set_tag("limit", limit) + tracerutils.set_tag("destination", destination) + tracerutils.set_tag("last_stream_id", last_stream_id) + tracerutils.set_tag("current_stream_id", current_stream_id) + tracerutils.set_tag("limit", limit) has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: - TracerUtil.log_kv({"message": "No new messages in stream"}) + tracerutils.log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) - @trace_function + @tracerutils.trace_function def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -174,7 +174,7 @@ def get_new_messages_for_remote_destination_txn(txn): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "Set stream position to current position"} ) stream_pos = current_stream_id @@ -185,7 +185,7 @@ def get_new_messages_for_remote_destination_txn(txn): get_new_messages_for_remote_destination_txn, ) - @trace_defered_function + @tracerutils.trace_defered_function def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -236,7 +236,7 @@ def __init__(self, db_conn, hs): expiry_ms=30 * 60 * 1000, ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index f686534cfed3..76488c9348d1 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils from six import iteritems @@ -322,7 +322,7 @@ def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -354,8 +354,8 @@ def get_user_devices_from_cache(self, query_list): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) - TracerUtil.set_tag("in_cache", results) - TracerUtil.set_tag("not_in_cache", user_ids_not_in_cache) + tracerutils.set_tag("in_cache", results) + tracerutils.set_tag("not_in_cache", user_ids_not_in_cache) return (user_ids_not_in_cache, results) From 34f6214d55f5db69d49898e6c015df87e91d14c4 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 3 Jul 2019 11:52:27 +0100 Subject: [PATCH 07/35] How did that half of the statement get deleted? --- synapse/handlers/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index fab78e65ee90..acf9b02d3263 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -108,7 +108,7 @@ def get_user_ids_changed(self, user_id, from_token): from_token (StreamToken) """ - tracerutils("user_id", user_id) + tracerutils.set_tag("user_id", user_id) tracerutils.set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() From e358f00e0a5f9cc9517aee182aff2260dcd830a7 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 4 Jul 2019 14:24:39 +0100 Subject: [PATCH 08/35] These functions were not deferreds! --- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- synapse/storage/deviceinbox.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 9df38a8f5ef6..d58291fc856a 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -43,7 +43,7 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @tracerutils.trace_defered_function_using_operation_name("sendToDevice") + @tracerutils.trace_function_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): tracerutils.set_tag("message_type", message_type) tracerutils.set_tag("txn_id", txn_id) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 8687b5a046c2..6011312af9e4 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -127,7 +127,7 @@ def delete_messages_for_device_txn(txn): return count - @tracerutils.trace_defered_function + @tracerutils.trace_function def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): From 6a2ac727a57b3129b0b02be36ae02eafe3ca7661 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 4 Jul 2019 17:11:46 +0100 Subject: [PATCH 09/35] The great logging/ migration --- synapse/handlers/device.py | 46 ++++++++++---------- synapse/handlers/devicemessage.py | 8 ++-- synapse/rest/client/v2_alpha/sendtodevice.py | 8 ++-- synapse/storage/deviceinbox.py | 30 ++++++------- synapse/storage/devices.py | 8 ++-- 5 files changed, 50 insertions(+), 50 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index acf9b02d3263..226bfefd05f3 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -31,7 +31,7 @@ from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing from ._base import BaseHandler @@ -46,7 +46,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -58,7 +58,7 @@ def get_devices_by_user(self, user_id): defer.Deferred: list[dict[str, X]]: info on each device """ - tracerutils.set_tag("user_id", user_id) + opentracing.set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -67,10 +67,10 @@ def get_devices_by_user(self, user_id): for device in devices: _update_device_from_client_ips(device, ips) - tracerutils.log_kv(device_map) + opentracing.log_kv(device_map) return devices - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -91,13 +91,13 @@ def get_device(self, user_id, device_id): ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) - tracerutils.set_tag("device", device) - tracerutils.set_tag("ips", ips) + opentracing.set_tag("device", device) + opentracing.set_tag("ips", ips) return device @measure_func("device.get_user_ids_changed") - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -108,8 +108,8 @@ def get_user_ids_changed(self, user_id, from_token): from_token (StreamToken) """ - tracerutils.set_tag("user_id", user_id) - tracerutils.set_tag("from_token", from_token) + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -161,7 +161,7 @@ def get_user_ids_changed(self, user_id, from_token): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - tracerutils.log_kv( + opentracing.log_kv( {"event": "encountered empty previous state", "room_id": room_id} ) for key, event_id in iteritems(current_state_ids): @@ -216,7 +216,7 @@ def get_user_ids_changed(self, user_id, from_token): possibly_joined = [] possibly_left = [] - tracerutils.log_kv( + opentracing.log_kv( {"changed": list(possibly_joined), "left": list(possibly_left)} ) @@ -287,7 +287,7 @@ def check_device_registered( raise errors.StoreError(500, "Couldn't generate a device ID.") - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -305,8 +305,8 @@ def delete_device(self, user_id, device_id): except errors.StoreError as e: if e.code == 404: # no match - tracerutils.set_tag("error", True) - tracerutils.set_tag("reason", "User doesn't have that device id.") + opentracing.set_tag("error", True) + opentracing.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -319,7 +319,7 @@ def delete_device(self, user_id, device_id): yield self.notify_device_update(user_id, [device_id]) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -355,8 +355,8 @@ def delete_devices(self, user_id, device_ids): except errors.StoreError as e: if e.code == 404: # no match - tracerutils.set_tag("error", True) - tracerutils.set_tag("reason", "User doesn't have that device id.") + opentracing.set_tag("error", True) + opentracing.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -477,15 +477,15 @@ def __init__(self, hs, device_handler): iterable=True, ) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ - tracerutils.set_tag("origin", origin) - tracerutils.set_tag("edu_content", edu_content) + opentracing.set_tag("origin", origin) + opentracing.set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -506,8 +506,8 @@ def incoming_device_list_update(self, origin, edu_content): if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. - tracerutils.set_tag("error", True) - tracerutils.log_kv( + opentracing.set_tag("error", True) + opentracing.log_kv( { "message": "Got an update from a user which " + "doesn't share a room with the current user." diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index defced10f4c0..edb4f069b9a8 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -28,7 +28,7 @@ ) from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing logger = logging.getLogger(__name__) @@ -85,7 +85,7 @@ def on_direct_to_device_edu(self, origin, content): "to_device_key", stream_id, users=local_messages.keys() ) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): @@ -126,7 +126,7 @@ def send_device_message(self, sender_user_id, message_type, messages): else None, } - tracerutils.log_kv(local_messages) + opentracing.log_kv(local_messages) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -135,7 +135,7 @@ def send_device_message(self, sender_user_id, message_type, messages): "to_device_key", stream_id, users=local_messages.keys() ) - tracerutils.log_kv(remote_messages) + opentracing.log_kv(remote_messages) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index d58291fc856a..68bbcf4a48c2 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -20,7 +20,7 @@ from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.transactions import HttpTransactionCache -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing from ._base import client_patterns @@ -43,10 +43,10 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @tracerutils.trace_function_using_operation_name("sendToDevice") + @opentracing.trace_function_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): - tracerutils.set_tag("message_type", message_type) - tracerutils.set_tag("txn_id", txn_id) + opentracing.set_tag("message_type", message_type) + opentracing.set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 6011312af9e4..91c6cd4cf60c 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -22,7 +22,7 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing logger = logging.getLogger(__name__) @@ -73,7 +73,7 @@ def get_new_messages_for_device_txn(txn): "get_new_messages_for_device", get_new_messages_for_device_txn ) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -90,14 +90,14 @@ def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): (user_id, device_id), None ) - tracerutils.set_tag("last_deleted_stream_id", last_deleted_stream_id) + opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id) if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: - tracerutils.log_kv({"message": "No changes in cache since last check"}) + opentracing.log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -113,7 +113,7 @@ def delete_messages_for_device_txn(txn): "delete_messages_for_device", delete_messages_for_device_txn ) - tracerutils.log_kv( + opentracing.log_kv( {"message": "deleted {} messages for device".format(count), "count": count} ) @@ -127,7 +127,7 @@ def delete_messages_for_device_txn(txn): return count - @tracerutils.trace_function + @opentracing.trace_function def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -143,23 +143,23 @@ def get_new_device_msgs_for_remote( in the stream the messages got to. """ - tracerutils.set_tag("destination", destination) - tracerutils.set_tag("last_stream_id", last_stream_id) - tracerutils.set_tag("current_stream_id", current_stream_id) - tracerutils.set_tag("limit", limit) + opentracing.set_tag("destination", destination) + opentracing.set_tag("last_stream_id", last_stream_id) + opentracing.set_tag("current_stream_id", current_stream_id) + opentracing.set_tag("limit", limit) has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: - tracerutils.log_kv({"message": "No new messages in stream"}) + opentracing.log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) - @tracerutils.trace_function + @opentracing.trace_function def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -174,7 +174,7 @@ def get_new_messages_for_remote_destination_txn(txn): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: - tracerutils.log_kv( + opentracing.log_kv( {"message": "Set stream position to current position"} ) stream_pos = current_stream_id @@ -185,7 +185,7 @@ def get_new_messages_for_remote_destination_txn(txn): get_new_messages_for_remote_destination_txn, ) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -236,7 +236,7 @@ def __init__(self, db_conn, hs): expiry_ms=30 * 60 * 1000, ) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 76488c9348d1..3026bc6843a7 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing from six import iteritems @@ -322,7 +322,7 @@ def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -354,8 +354,8 @@ def get_user_devices_from_cache(self, query_list): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) - tracerutils.set_tag("in_cache", results) - tracerutils.set_tag("not_in_cache", user_ids_not_in_cache) + opentracing.set_tag("in_cache", results) + opentracing.set_tag("not_in_cache", user_ids_not_in_cache) return (user_ids_not_in_cache, results) From 09e40a0de347f045b3b5c3e98e3612d473c22f5a Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 8 Jul 2019 14:00:36 +0100 Subject: [PATCH 10/35] Some tracing --- synapse/handlers/device.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 226bfefd05f3..fcd4e3e56595 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -397,6 +397,7 @@ def update_device(self, user_id, device_id, content): else: raise + @opentracing.trace_defered_function @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -412,6 +413,8 @@ def notify_device_update(self, user_id, device_ids): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) + opentracing.set_tag("hosts to update", hosts) + position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) ) @@ -431,6 +434,9 @@ def notify_device_update(self, user_id, device_ids): ) for host in hosts: self.federation_sender.send_device_messages(host) + opentracing.log_kv( + {"message": "sent device update to host", "host": host} + ) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): From 78a0123ec5c26c61b1f5f3f36fd20ca3b4367901 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 11 Jul 2019 11:55:14 +0100 Subject: [PATCH 11/35] Nicer tracing --- synapse/handlers/device.py | 19 ++++++++++++++++--- synapse/handlers/devicemessage.py | 8 ++++---- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index fcd4e3e56595..25f1441f7ad6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -306,7 +306,9 @@ def delete_device(self, user_id, device_id): if e.code == 404: # no match opentracing.set_tag("error", True) - opentracing.set_tag("reason", "User doesn't have that device id.") + opentracing.log_kv( + {"reason": "User doesn't have device id.", "device_id": device_id} + ) pass else: raise @@ -506,6 +508,15 @@ def incoming_device_list_update(self, origin, edu_content): device_id, origin, ) + + opentracing.set_tag("error", True) + opentracing.log_kv( + { + "message": "Got a device list update edu from a user and device which does not match the origin of the request.", + "user_id": user_id, + "device_id": device_id, + } + ) return room_ids = yield self.store.get_rooms_for_user(user_id) @@ -515,8 +526,9 @@ def incoming_device_list_update(self, origin, edu_content): opentracing.set_tag("error", True) opentracing.log_kv( { - "message": "Got an update from a user which " - + "doesn't share a room with the current user." + "message": "Got an update from a user for which " + + "we don't share any rooms", + "other user_id": user_id, } ) logger.warning( @@ -620,6 +632,7 @@ def user_device_resync(self, user_id): request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid """ + opentracing.log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index edb4f069b9a8..3e439279da85 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -85,10 +85,10 @@ def on_direct_to_device_edu(self, origin, content): "to_device_key", stream_id, users=local_messages.keys() ) - @opentracing.trace_defered_function @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): - + opentracing.set_tag("number of messages", len(messages)) + opentracing.set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} for user_id, by_device in messages.items(): @@ -126,7 +126,7 @@ def send_device_message(self, sender_user_id, message_type, messages): else None, } - opentracing.log_kv(local_messages) + opentracing.log_kv({"local_messages": local_messages}) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -135,7 +135,7 @@ def send_device_message(self, sender_user_id, message_type, messages): "to_device_key", stream_id, users=local_messages.keys() ) - opentracing.log_kv(remote_messages) + opentracing.log_kv({"remote_messages": remote_messages}) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 68bbcf4a48c2..1385f80e2719 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -43,7 +43,7 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @opentracing.trace_function_using_operation_name("sendToDevice") + @opentracing.trace_defered_function_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): opentracing.set_tag("message_type", message_type) opentracing.set_tag("txn_id", txn_id) From b9db310ed2cc4bedc61ef4a45d0e37c43094b407 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 15 Jul 2019 15:23:33 +0100 Subject: [PATCH 12/35] A little extra device_list tracing --- synapse/handlers/device.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 25f1441f7ad6..d932a0121b76 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -649,13 +649,20 @@ def user_device_resync(self, user_id): # eventually become consistent. return except FederationDeniedError as e: + opentracing.set_tag("error", True) + opentracing.log_kv({"reason": "FederationDeniedError"}) logger.info(e) return - except Exception: + except Exception as e: # TODO: Remember that we are now out of sync and try again # later + opentracing.set_tag("error", True) + opentracing.log_kv( + {"message": "Exception raised by federation request", "exception": e} + ) logger.exception("Failed to handle device list update for %s", user_id) return + opentracing.log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] From 6068b69cb296eaef5bd3a5921e7b5d3460cd9ff3 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 17 Jul 2019 13:56:29 +0100 Subject: [PATCH 13/35] Use better decorator names. --- synapse/handlers/device.py | 14 +++++++------- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- synapse/storage/deviceinbox.py | 10 +++++----- synapse/storage/devices.py | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d932a0121b76..1331dcfbb72d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -46,7 +46,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -70,7 +70,7 @@ def get_devices_by_user(self, user_id): opentracing.log_kv(device_map) return devices - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -97,7 +97,7 @@ def get_device(self, user_id, device_id): return device @measure_func("device.get_user_ids_changed") - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -287,7 +287,7 @@ def check_device_registered( raise errors.StoreError(500, "Couldn't generate a device ID.") - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -321,7 +321,7 @@ def delete_device(self, user_id, device_id): yield self.notify_device_update(user_id, [device_id]) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -399,7 +399,7 @@ def update_device(self, user_id, device_id, content): else: raise - @opentracing.trace_defered_function + @opentracing.trace_deferred @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -485,7 +485,7 @@ def __init__(self, hs, device_handler): iterable=True, ) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 1385f80e2719..3bda83969d17 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -43,7 +43,7 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @opentracing.trace_defered_function_using_operation_name("sendToDevice") + @opentracing.trace_deferred_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): opentracing.set_tag("message_type", message_type) opentracing.set_tag("txn_id", txn_id) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 91c6cd4cf60c..1e0285b8bb93 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -73,7 +73,7 @@ def get_new_messages_for_device_txn(txn): "get_new_messages_for_device", get_new_messages_for_device_txn ) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -127,7 +127,7 @@ def delete_messages_for_device_txn(txn): return count - @opentracing.trace_function + @opentracing.trace def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -159,7 +159,7 @@ def get_new_device_msgs_for_remote( # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) - @opentracing.trace_function + @opentracing.trace def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -185,7 +185,7 @@ def get_new_messages_for_remote_destination_txn(txn): get_new_messages_for_remote_destination_txn, ) - @opentracing.trace_defered_function + @opentracing.trace_deferred def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -236,7 +236,7 @@ def __init__(self, db_conn, hs): expiry_ms=30 * 60 * 1000, ) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 3026bc6843a7..5863518f1bb7 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -322,7 +322,7 @@ def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() - @opentracing.trace_defered_function + @trace @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. From e5155ee7e41d93df83764b67e7a5b44204b068a2 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 22 Jul 2019 13:38:47 +0100 Subject: [PATCH 14/35] Use unified trace method --- synapse/handlers/device.py | 14 +++++++------- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- synapse/storage/deviceinbox.py | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1331dcfbb72d..0e659dd81585 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -46,7 +46,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -70,7 +70,7 @@ def get_devices_by_user(self, user_id): opentracing.log_kv(device_map) return devices - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -97,7 +97,7 @@ def get_device(self, user_id, device_id): return device @measure_func("device.get_user_ids_changed") - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -287,7 +287,7 @@ def check_device_registered( raise errors.StoreError(500, "Couldn't generate a device ID.") - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -321,7 +321,7 @@ def delete_device(self, user_id, device_id): yield self.notify_device_update(user_id, [device_id]) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -399,7 +399,7 @@ def update_device(self, user_id, device_id, content): else: raise - @opentracing.trace_deferred + @opentracing.trace @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -485,7 +485,7 @@ def __init__(self, hs, device_handler): iterable=True, ) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 3bda83969d17..fb79ffb5e6ec 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -43,7 +43,7 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @opentracing.trace_deferred_using_operation_name("sendToDevice") + @opentracing.trace_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): opentracing.set_tag("message_type", message_type) opentracing.set_tag("txn_id", txn_id) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 1e0285b8bb93..ad1e865bbdbf 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -73,7 +73,7 @@ def get_new_messages_for_device_txn(txn): "get_new_messages_for_device", get_new_messages_for_device_txn ) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -185,7 +185,7 @@ def get_new_messages_for_remote_destination_txn(txn): get_new_messages_for_remote_destination_txn, ) - @opentracing.trace_deferred + @opentracing.trace def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -236,7 +236,7 @@ def __init__(self, db_conn, hs): expiry_ms=30 * 60 * 1000, ) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination From c35f9d9184bfcbac9dd5cab36e64401bb0b3fe91 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 5 Aug 2019 13:48:09 +0100 Subject: [PATCH 15/35] Refactor return value so we don't create identical lists each time. --- synapse/handlers/device.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0e659dd81585..5f59959cc262 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -216,11 +216,11 @@ def get_user_ids_changed(self, user_id, from_token): possibly_joined = [] possibly_left = [] - opentracing.log_kv( - {"changed": list(possibly_joined), "left": list(possibly_left)} - ) + result = {"changed": list(possibly_joined), "left": list(possibly_left)} + + opentracing.log_kv(result) - return {"changed": list(possibly_joined), "left": list(possibly_left)} + return {result} class DeviceHandler(DeviceWorkerHandler): From 303fcce93a1857703a0a04668954a9720a54532a Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 5 Aug 2019 13:56:04 +0100 Subject: [PATCH 16/35] String concatenation without the '+' --- synapse/handlers/device.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5f59959cc262..c09ffadc3ff8 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -512,7 +512,8 @@ def incoming_device_list_update(self, origin, edu_content): opentracing.set_tag("error", True) opentracing.log_kv( { - "message": "Got a device list update edu from a user and device which does not match the origin of the request.", + "message": "Got a device list update edu from a user and " + "device which does not match the origin of the request.", "user_id": user_id, "device_id": device_id, } @@ -527,7 +528,7 @@ def incoming_device_list_update(self, origin, edu_content): opentracing.log_kv( { "message": "Got an update from a user for which " - + "we don't share any rooms", + "we don't share any rooms", "other user_id": user_id, } ) From d7d84928a609df194a20fd12a48fa3ad09fa1367 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 5 Aug 2019 13:59:04 +0100 Subject: [PATCH 17/35] Use underscores. --- synapse/handlers/devicemessage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 3e439279da85..5ccb7711938e 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -87,7 +87,7 @@ def on_direct_to_device_edu(self, origin, content): @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): - opentracing.set_tag("number of messages", len(messages)) + opentracing.set_tag("number_of_messages", len(messages)) opentracing.set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} From 68d4c94709fef933dbf17c6f63eb806eda29cf3c Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 16:35:13 +0100 Subject: [PATCH 18/35] isort --- synapse/handlers/device.py | 2 +- synapse/handlers/devicemessage.py | 1 - synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- synapse/storage/deviceinbox.py | 2 +- synapse/storage/devices.py | 2 +- 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c09ffadc3ff8..d7e1935d0f11 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -18,6 +18,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api import errors from synapse.api.constants import EventTypes from synapse.api.errors import ( @@ -31,7 +32,6 @@ from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination -import synapse.logging.opentracing as opentracing from ._base import BaseHandler diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 5ccb7711938e..bb088154d452 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -28,7 +28,6 @@ ) from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string -import synapse.logging.opentracing as opentracing logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index fb79ffb5e6ec..db8103bd5dba 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -17,10 +17,10 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.transactions import HttpTransactionCache -import synapse.logging.opentracing as opentracing from ._base import client_patterns diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index ad1e865bbdbf..5f09b86bf0ac 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -19,10 +19,10 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache -import synapse.logging.opentracing as opentracing logger = logging.getLogger(__name__) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 5863518f1bb7..8191ea7065af 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import synapse.logging.opentracing as opentracing from six import iteritems @@ -21,6 +20,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import StoreError from synapse.logging.opentracing import ( get_active_span_text_map, From 8869422e80029d84d204b9ae92463c83a4e381d5 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 16:40:03 +0100 Subject: [PATCH 19/35] Bad return type --- synapse/handlers/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d7e1935d0f11..337988420e2f 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -220,7 +220,7 @@ def get_user_ids_changed(self, user_id, from_token): opentracing.log_kv(result) - return {result} + return result class DeviceHandler(DeviceWorkerHandler): From 08787f448acb92363afc4ba2971221b865ea7f1f Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 16:42:54 +0100 Subject: [PATCH 20/35] newsfile --- changelog.d/5853.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5853.misc diff --git a/changelog.d/5853.misc b/changelog.d/5853.misc new file mode 100644 index 000000000000..081012b3a695 --- /dev/null +++ b/changelog.d/5853.misc @@ -0,0 +1 @@ +Trace device lists and device updates. From 42c2acdecc2aadeca34a090f1124fb6a40452ac0 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 18:17:20 +0100 Subject: [PATCH 21/35] Use the import style. --- synapse/handlers/devicemessage.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index bb088154d452..01731cb2d0da 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -22,6 +22,7 @@ from synapse.api.errors import SynapseError from synapse.logging.opentracing import ( get_active_span_text_map, + log_kv, set_tag, start_active_span, whitelisted_homeserver, @@ -86,8 +87,8 @@ def on_direct_to_device_edu(self, origin, content): @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): - opentracing.set_tag("number_of_messages", len(messages)) - opentracing.set_tag("sender", sender_user_id) + set_tag("number_of_messages", len(messages)) + set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} for user_id, by_device in messages.items(): @@ -125,7 +126,7 @@ def send_device_message(self, sender_user_id, message_type, messages): else None, } - opentracing.log_kv({"local_messages": local_messages}) + log_kv({"local_messages": local_messages}) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -134,7 +135,7 @@ def send_device_message(self, sender_user_id, message_type, messages): "to_device_key", stream_id, users=local_messages.keys() ) - opentracing.log_kv({"remote_messages": remote_messages}) + log_kv({"remote_messages": remote_messages}) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. From 547f1251510b4460a442d68137de8689c0f84720 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 15 Aug 2019 11:23:42 +0100 Subject: [PATCH 22/35] Remove astray indent --- synapse/handlers/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 337988420e2f..ce513d3abf53 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -146,7 +146,7 @@ def get_user_ids_changed(self, user_id, from_token): if etype != EventTypes.Member: continue possibly_left.add(state_key) - continue + continue # Fetch the current state at the time. try: From ff86fa6e67bc017f523fa5a370b3c17264532c6e Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 16 Aug 2019 18:21:21 +0100 Subject: [PATCH 23/35] Import style --- synapse/handlers/device.py | 70 ++++++++++---------- synapse/rest/client/v2_alpha/sendtodevice.py | 8 +-- synapse/storage/deviceinbox.py | 32 +++++---- synapse/storage/devices.py | 6 +- 4 files changed, 56 insertions(+), 60 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ce513d3abf53..5104306a8886 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -18,7 +18,6 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing from synapse.api import errors from synapse.api.constants import EventTypes from synapse.api.errors import ( @@ -26,6 +25,7 @@ HttpResponseException, RequestSendFailed, ) +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async_helpers import Linearizer @@ -46,7 +46,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() - @opentracing.trace + @trace @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -58,7 +58,7 @@ def get_devices_by_user(self, user_id): defer.Deferred: list[dict[str, X]]: info on each device """ - opentracing.set_tag("user_id", user_id) + set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -67,10 +67,10 @@ def get_devices_by_user(self, user_id): for device in devices: _update_device_from_client_ips(device, ips) - opentracing.log_kv(device_map) + log_kv(device_map) return devices - @opentracing.trace + @trace @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -91,13 +91,13 @@ def get_device(self, user_id, device_id): ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) - opentracing.set_tag("device", device) - opentracing.set_tag("ips", ips) + set_tag("device", device) + set_tag("ips", ips) return device @measure_func("device.get_user_ids_changed") - @opentracing.trace + @trace @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -108,8 +108,8 @@ def get_user_ids_changed(self, user_id, from_token): from_token (StreamToken) """ - opentracing.set_tag("user_id", user_id) - opentracing.set_tag("from_token", from_token) + set_tag("user_id", user_id) + set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -161,7 +161,7 @@ def get_user_ids_changed(self, user_id, from_token): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - opentracing.log_kv( + log_kv( {"event": "encountered empty previous state", "room_id": room_id} ) for key, event_id in iteritems(current_state_ids): @@ -218,7 +218,7 @@ def get_user_ids_changed(self, user_id, from_token): result = {"changed": list(possibly_joined), "left": list(possibly_left)} - opentracing.log_kv(result) + log_kv(result) return result @@ -287,7 +287,7 @@ def check_device_registered( raise errors.StoreError(500, "Couldn't generate a device ID.") - @opentracing.trace + @trace @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -305,8 +305,8 @@ def delete_device(self, user_id, device_id): except errors.StoreError as e: if e.code == 404: # no match - opentracing.set_tag("error", True) - opentracing.log_kv( + set_tag("error", True) + log_kv( {"reason": "User doesn't have device id.", "device_id": device_id} ) pass @@ -321,7 +321,7 @@ def delete_device(self, user_id, device_id): yield self.notify_device_update(user_id, [device_id]) - @opentracing.trace + @trace @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -357,8 +357,8 @@ def delete_devices(self, user_id, device_ids): except errors.StoreError as e: if e.code == 404: # no match - opentracing.set_tag("error", True) - opentracing.set_tag("reason", "User doesn't have that device id.") + set_tag("error", True) + set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -399,7 +399,7 @@ def update_device(self, user_id, device_id, content): else: raise - @opentracing.trace + @trace @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -415,7 +415,7 @@ def notify_device_update(self, user_id, device_ids): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) - opentracing.set_tag("hosts to update", hosts) + set_tag("hosts to update", hosts) position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) @@ -436,9 +436,7 @@ def notify_device_update(self, user_id, device_ids): ) for host in hosts: self.federation_sender.send_device_messages(host) - opentracing.log_kv( - {"message": "sent device update to host", "host": host} - ) + log_kv({"message": "sent device update to host", "host": host}) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): @@ -485,15 +483,15 @@ def __init__(self, hs, device_handler): iterable=True, ) - @opentracing.trace + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ - opentracing.set_tag("origin", origin) - opentracing.set_tag("edu_content", edu_content) + set_tag("origin", origin) + set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -509,8 +507,8 @@ def incoming_device_list_update(self, origin, edu_content): origin, ) - opentracing.set_tag("error", True) - opentracing.log_kv( + set_tag("error", True) + log_kv( { "message": "Got a device list update edu from a user and " "device which does not match the origin of the request.", @@ -524,8 +522,8 @@ def incoming_device_list_update(self, origin, edu_content): if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. - opentracing.set_tag("error", True) - opentracing.log_kv( + set_tag("error", True) + log_kv( { "message": "Got an update from a user for which " "we don't share any rooms", @@ -633,7 +631,7 @@ def user_device_resync(self, user_id): request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid """ - opentracing.log_kv({"message": "Doing resync to update device list."}) + log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: @@ -650,20 +648,20 @@ def user_device_resync(self, user_id): # eventually become consistent. return except FederationDeniedError as e: - opentracing.set_tag("error", True) - opentracing.log_kv({"reason": "FederationDeniedError"}) + set_tag("error", True) + log_kv({"reason": "FederationDeniedError"}) logger.info(e) return except Exception as e: # TODO: Remember that we are now out of sync and try again # later - opentracing.set_tag("error", True) - opentracing.log_kv( + set_tag("error", True) + log_kv( {"message": "Exception raised by federation request", "exception": e} ) logger.exception("Failed to handle device list update for %s", user_id) return - opentracing.log_kv({"result": result}) + log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index db8103bd5dba..c415a974f07d 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -17,9 +17,9 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request +from synapse.logging.opentracing import trace, trace_using_operation_name from synapse.rest.client.transactions import HttpTransactionCache from ._base import client_patterns @@ -43,10 +43,10 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @opentracing.trace_using_operation_name("sendToDevice") + @trace_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): - opentracing.set_tag("message_type", message_type) - opentracing.set_tag("txn_id", txn_id) + set_tag("message_type", message_type) + set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 5f09b86bf0ac..4fe0a493a24e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -19,7 +19,7 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache @@ -73,7 +73,7 @@ def get_new_messages_for_device_txn(txn): "get_new_messages_for_device", get_new_messages_for_device_txn ) - @opentracing.trace + @trace @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -90,14 +90,14 @@ def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): (user_id, device_id), None ) - opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id) + set_tag("last_deleted_stream_id", last_deleted_stream_id) if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: - opentracing.log_kv({"message": "No changes in cache since last check"}) + log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -113,7 +113,7 @@ def delete_messages_for_device_txn(txn): "delete_messages_for_device", delete_messages_for_device_txn ) - opentracing.log_kv( + log_kv( {"message": "deleted {} messages for device".format(count), "count": count} ) @@ -127,7 +127,7 @@ def delete_messages_for_device_txn(txn): return count - @opentracing.trace + @trace def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -143,23 +143,23 @@ def get_new_device_msgs_for_remote( in the stream the messages got to. """ - opentracing.set_tag("destination", destination) - opentracing.set_tag("last_stream_id", last_stream_id) - opentracing.set_tag("current_stream_id", current_stream_id) - opentracing.set_tag("limit", limit) + set_tag("destination", destination) + set_tag("last_stream_id", last_stream_id) + set_tag("current_stream_id", current_stream_id) + set_tag("limit", limit) has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: - opentracing.log_kv({"message": "No new messages in stream"}) + log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) - @opentracing.trace + @trace def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -174,9 +174,7 @@ def get_new_messages_for_remote_destination_txn(txn): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: - opentracing.log_kv( - {"message": "Set stream position to current position"} - ) + log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id return (messages, stream_pos) @@ -185,7 +183,7 @@ def get_new_messages_for_remote_destination_txn(txn): get_new_messages_for_remote_destination_txn, ) - @opentracing.trace + @trace def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -236,7 +234,7 @@ def __init__(self, db_conn, hs): expiry_ms=30 * 60 * 1000, ) - @opentracing.trace + @trace @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8191ea7065af..521a404338ff 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -20,10 +20,10 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing from synapse.api.errors import StoreError from synapse.logging.opentracing import ( get_active_span_text_map, + set_tag, trace, whitelisted_homeserver, ) @@ -354,8 +354,8 @@ def get_user_devices_from_cache(self, query_list): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) - opentracing.set_tag("in_cache", results) - opentracing.set_tag("not_in_cache", user_ids_not_in_cache) + set_tag("in_cache", results) + set_tag("not_in_cache", user_ids_not_in_cache) return (user_ids_not_in_cache, results) From 395ee6a44a414c59bfa0a065ad5e7e2c57652bb3 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 16 Aug 2019 18:39:43 +0100 Subject: [PATCH 24/35] Missing import --- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index c415a974f07d..733802e967f6 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -19,7 +19,7 @@ from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request -from synapse.logging.opentracing import trace, trace_using_operation_name +from synapse.logging.opentracing import set_tag, trace, trace_using_operation_name from synapse.rest.client.transactions import HttpTransactionCache from ._base import client_patterns From 83a011f16315bb3883bdc97d54f6cd9e342ce4ac Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 16 Aug 2019 18:41:28 +0100 Subject: [PATCH 25/35] Unused import --- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 733802e967f6..c528ce69065f 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -19,7 +19,7 @@ from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request -from synapse.logging.opentracing import set_tag, trace, trace_using_operation_name +from synapse.logging.opentracing import set_tag, trace_using_operation_name from synapse.rest.client.transactions import HttpTransactionCache from ._base import client_patterns From 8769aa343d2aa4457d015165d877351c4febc42c Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 20 Aug 2019 10:18:16 +0100 Subject: [PATCH 26/35] Feature and simplification --- changelog.d/5853.feature | 1 + changelog.d/5853.misc | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 changelog.d/5853.feature delete mode 100644 changelog.d/5853.misc diff --git a/changelog.d/5853.feature b/changelog.d/5853.feature new file mode 100644 index 000000000000..80a04ae2eeed --- /dev/null +++ b/changelog.d/5853.feature @@ -0,0 +1 @@ +Opentracing for device list updates. diff --git a/changelog.d/5853.misc b/changelog.d/5853.misc deleted file mode 100644 index 081012b3a695..000000000000 --- a/changelog.d/5853.misc +++ /dev/null @@ -1 +0,0 @@ -Trace device lists and device updates. From 656e3b754ba606754850f941fa79830f9c14c76d Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 20 Aug 2019 10:19:56 +0100 Subject: [PATCH 27/35] Underscores --- synapse/handlers/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5104306a8886..71a8f33da32e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -415,7 +415,7 @@ def notify_device_update(self, user_id, device_ids): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) - set_tag("hosts to update", hosts) + set_tag("target_hosts", hosts) position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) From fb8786351ad6731c17f32585a479195e36fd8f55 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 22 Aug 2019 11:18:32 +0100 Subject: [PATCH 28/35] Allow the passing of operation_name to trace --- synapse/logging/opentracing.py | 63 ++++---------------- synapse/rest/client/v2_alpha/sendtodevice.py | 4 +- 2 files changed, 15 insertions(+), 52 deletions(-) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index dd296027a12d..a4cb4218c278 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -85,14 +85,14 @@ def interesting_function(*args, **kwargs): return something_usual_and_useful -Operation names can be explicitly set for functions by using -``trace_using_operation_name`` +Operation names can be explicitly set for a function by passing the +operation name to ``trace`` .. code-block:: python - from synapse.logging.opentracing import trace_using_operation_name + from synapse.logging.opentracing import trace - @trace_using_operation_name("A *much* better operation name") + @trace(operation_name="A *much* better operation name") def interesting_badly_named_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful @@ -641,57 +641,16 @@ def extract_text_map(carrier): # Tracing decorators -def trace(func): +def trace(func=None, operation_name=None): """ Decorator to trace a function. Sets the operation name to that of the function's. """ - if opentracing is None: - return func - - @wraps(func) - def _trace_inner(self, *args, **kwargs): - if opentracing is None: - return func(self, *args, **kwargs) - - scope = start_active_span(func.__name__) - scope.__enter__() - - try: - result = func(self, *args, **kwargs) - if isinstance(result, defer.Deferred): - - def call_back(result): - scope.__exit__(None, None, None) - return result - - def err_back(result): - scope.span.set_tag(tags.ERROR, True) - scope.__exit__(None, None, None) - return result - result.addCallbacks(call_back, err_back) + if func and not operation_name: + operation_name = func.__name__ - else: - scope.__exit__(None, None, None) - - return result - - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - - return _trace_inner - - -def trace_using_operation_name(operation_name): - """Decorator to trace a function. Explicitely sets the operation_name.""" - - def trace(func): - """ - Decorator to trace a function. - Sets the operation name to that of the function's. - """ + def decorator(func): if opentracing is None: return func @@ -717,6 +676,7 @@ def err_back(result): return result result.addCallbacks(call_back, err_back) + else: scope.__exit__(None, None, None) @@ -728,7 +688,10 @@ def err_back(result): return _trace_inner - return trace + if func: + return decorator(func) + else: + return decorator def tag_args(func): diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index c528ce69065f..668c3af277f3 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -19,7 +19,7 @@ from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request -from synapse.logging.opentracing import set_tag, trace_using_operation_name +from synapse.logging.opentracing import set_tag, trace from synapse.rest.client.transactions import HttpTransactionCache from ._base import client_patterns @@ -43,7 +43,7 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @trace_using_operation_name("sendToDevice") + @trace(operation_name="sendToDevice") def on_PUT(self, request, message_type, txn_id): set_tag("message_type", message_type) set_tag("txn_id", txn_id) From 1f5b9b0a2d05478b130deea06fa3e877465c9673 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 23 Aug 2019 16:38:11 +0100 Subject: [PATCH 29/35] Old import --- synapse/rest/client/v2_alpha/keys.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index b218a3f334d4..74522983d919 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -24,7 +24,7 @@ parse_json_object_from_request, parse_string, ) -from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import StreamToken from ._base import client_patterns @@ -69,7 +69,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @trace_using_operation_name("upload_keys") + @trace("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) From 47f8a59ebcdb855c9bc4ffa39b3eef6cd1f814ff Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 27 Aug 2019 13:37:31 +0100 Subject: [PATCH 30/35] Remove unused import --- synapse/federation/sender/per_destination_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index a0e708f25aa0..fad980b89307 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -28,7 +28,6 @@ from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state -from synapse.logging.opentracing import extract_text_map, start_active_span_follows_from from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import UserPresenceState From 071b04d9f8d0068713763a6e05b02582c24ab95a Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 27 Aug 2019 18:35:50 +0100 Subject: [PATCH 31/35] Use the the keyword in the trace method --- synapse/rest/client/v2_alpha/keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 74522983d919..9dcfc13721ee 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -69,7 +69,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @trace("upload_keys") + @trace(operation_name="upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) From 4d68ac038a5f81ffc818a8d7409cee8cd175cafa Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 2 Sep 2019 11:58:38 +0100 Subject: [PATCH 32/35] Cleanup trace method --- synapse/logging/opentracing.py | 15 +++++++++------ synapse/rest/client/v2_alpha/keys.py | 2 +- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index a4cb4218c278..1fd20c39836d 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -92,7 +92,7 @@ def interesting_function(*args, **kwargs): from synapse.logging.opentracing import trace - @trace(operation_name="A *much* better operation name") + @trace(operation_name="a_better_operation_name") def interesting_badly_named_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful @@ -641,19 +641,22 @@ def extract_text_map(carrier): # Tracing decorators -def trace(func=None, operation_name=None): +def trace(func=None, opname=None): """ Decorator to trace a function. - Sets the operation name to that of the function's. + Sets the operation name to that of the function's or that given + as operation_name. See the module's doc string for usage + examples. """ - if func and not operation_name: - operation_name = func.__name__ - def decorator(func): if opentracing is None: return func + # Doing this weird assignment thing to get around local variable + # referenced before assignment 'bug' raised by checkstyle + operation_name = opname if opname else func.__name__ + @wraps(func) def _trace_inner(self, *args, **kwargs): if opentracing is None: diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 9dcfc13721ee..7f67680efe37 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -69,7 +69,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @trace(operation_name="upload_keys") + @trace(opname="upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 668c3af277f3..d90e52ed1ab2 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -43,7 +43,7 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @trace(operation_name="sendToDevice") + @trace(opname="sendToDevice") def on_PUT(self, request, message_type, txn_id): set_tag("message_type", message_type) set_tag("txn_id", txn_id) From 1c86773e75abe781c79e2fe97dc9fd6ec3f6860a Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 2 Sep 2019 13:40:17 +0100 Subject: [PATCH 33/35] Merge was not black --- synapse/storage/devices.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 9dfac2d47165..41f62828bd5b 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -356,9 +356,8 @@ def get_user_devices_from_cache(self, query_list): set_tag("in_cache", results) set_tag("not_in_cache", user_ids_not_in_cache) - - return user_ids_not_in_cache, results + return user_ids_not_in_cache, results @cachedInlineCallbacks(num_args=2, tree=True) def _get_cached_user_device(self, user_id, device_id): From fa17018fef2adac8f47b285db7f74dd73d355d5a Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 3 Sep 2019 10:04:09 +0100 Subject: [PATCH 34/35] opname not operation_name --- synapse/logging/opentracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 1fd20c39836d..c9917e97051a 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -92,7 +92,7 @@ def interesting_function(*args, **kwargs): from synapse.logging.opentracing import trace - @trace(operation_name="a_better_operation_name") + @trace(opname="a_better_operation_name") def interesting_badly_named_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful From 1b5c89118cc870df6df6ea7a07331426dae6da0e Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 3 Sep 2019 10:06:07 +0100 Subject: [PATCH 35/35] Remove comment --- synapse/logging/opentracing.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index c9917e97051a..256b972aaa82 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -653,16 +653,14 @@ def decorator(func): if opentracing is None: return func - # Doing this weird assignment thing to get around local variable - # referenced before assignment 'bug' raised by checkstyle - operation_name = opname if opname else func.__name__ + _opname = opname if opname else func.__name__ @wraps(func) def _trace_inner(self, *args, **kwargs): if opentracing is None: return func(self, *args, **kwargs) - scope = start_active_span(operation_name) + scope = start_active_span(_opname) scope.__enter__() try: