Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new thread_pool handler #416

Merged
merged 4 commits into from
Nov 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,28 @@ jobs:
- name: Install dependencies
run: pip install setuptools==39.2.0 --force-reinstall

- name: Install Python 2 dependencies
if: ${{ contains(matrix.python-version, '2.7') }}
# certifi dropped support for Python 2 in 2020.4.5.2 but only started
# using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with
# Python 2 support.
run: pip install certifi==2021.10.8 requests==2.27.1 incremental==21.3.0

- name: Install Python 3.4 dependencies
if: ${{ contains(matrix.python-version, '3.4') }}
# certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18
run: pip install certifi==2021.10.8 "typing-extensions<4" incremental==21.3.0

- name: Install Python 3.5 dependencies
if: ${{ contains(matrix.python-version, '3.5') }}
# typing-extensions dropped support for Python 3.5 in version 4
run: pip install "typing-extensions<4"

- name: Install Python 3.6 dependencies
if: ${{ contains(matrix.python-version, '3.6') }}
# typing-extensions dropped support for Python 3.6 in version 4.2
run: pip install "typing-extensions<4.2"

- name: Set the framework
run: echo ${{ matrix.framework }} >> $GITHUB_ENV

Expand Down Expand Up @@ -232,27 +254,5 @@ jobs:
if: ${{ contains(matrix.framework, 'FASTAPI_VERSION') }}
run: pip install fastapi==$FASTAPI_VERSION

- name: Install Python 2 dependencies
if: ${{ contains(matrix.python-version, '2.7') }}
# certifi dropped support for Python 2 in 2020.4.5.2 but only started
# using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with
# Python 2 support.
run: pip install certifi==2021.10.8 requests==2.27.1

- name: Install Python 3.4 dependencies
if: ${{ contains(matrix.python-version, '3.4') }}
# certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18
run: pip install certifi==2021.10.8 "typing-extensions<4"

- name: Install Python 3.5 dependencies
if: ${{ contains(matrix.python-version, '3.5') }}
# typing-extensions dropped support for Python 3.5 in version 4
run: pip install "typing-extensions<4"

- name: Install Python 3.6 dependencies
if: ${{ contains(matrix.python-version, '3.6') }}
# typing-extensions dropped support for Python 3.6 in version 4.2
run: pip install "typing-extensions<4.2"

- name: Run tests
run: python setup.py test
29 changes: 26 additions & 3 deletions rollbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from rollbar.lib import events, filters, dict_merge, parse_qs, text, transport, urljoin, iteritems, defaultJSONEncode


__version__ = '0.16.3'
__version__ = '0.16.4beta'
__log_name__ = 'rollbar'
log = logging.getLogger(__log_name__)

Expand Down Expand Up @@ -124,7 +124,7 @@ def wrap(*args, **kwargs):
from twisted.internet.ssl import CertificateOptions
from twisted.internet import task, defer, ssl, reactor
from zope.interface import implementer

@implementer(IPolicyForHTTPS)
class VerifyHTTPS(object):
def __init__(self):
Expand Down Expand Up @@ -275,7 +275,12 @@ def _get_fastapi_request():
'root': None, # root path to your code
'branch': None, # git branch name
'code_version': None,
'handler': 'default', # 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted' or 'httpx'
# 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted', 'httpx' or 'thread_pool'
# 'async' requires Python 3.4 or higher.
# 'httpx' requires Python 3.7 or higher.
# 'thread_pool' requires Python 3.2 or higher.
'handler': 'default',
'thread_pool_workers': None,
'endpoint': DEFAULT_ENDPOINT,
'timeout': DEFAULT_TIMEOUT,
'agent.log_file': 'log.rollbar',
Expand Down Expand Up @@ -383,6 +388,9 @@ def init(access_token, environment='production', scrub_fields=None, url_fields=N

if SETTINGS.get('handler') == 'agent':
agent_log = _create_agent_log()
elif SETTINGS.get('handler') == 'thread_pool':
from rollbar.lib.thread_pool import init_pool
init_pool(SETTINGS.get('thread_pool_workers', None))

if not SETTINGS['locals']['safelisted_types'] and SETTINGS['locals']['whitelisted_types']:
warnings.warn('whitelisted_types deprecated use safelisted_types instead', DeprecationWarning)
Expand Down Expand Up @@ -523,6 +531,7 @@ def send_payload(payload, access_token):
- 'gae': calls _send_payload_appengine() (which makes a blocking call to Google App Engine)
- 'twisted': calls _send_payload_twisted() (which makes an async HTTP request using Twisted and Treq)
- 'httpx': calls _send_payload_httpx() (which makes an async HTTP request using HTTPX)
- 'thread_pool': uses a pool of worker threads to make HTTP requests off the main thread. Returns immediately.
"""
payload = events.on_payload(payload)
if payload is False:
Expand Down Expand Up @@ -569,6 +578,8 @@ def send_payload(payload, access_token):
_send_payload_async(payload_str, access_token)
elif handler == 'thread':
_send_payload_thread(payload_str, access_token)
elif handler == 'thread_pool':
_send_payload_thread_pool(payload_str, access_token)
else:
# default to 'thread'
_send_payload_thread(payload_str, access_token)
Expand Down Expand Up @@ -1510,6 +1521,18 @@ def _send_payload_thread(payload_str, access_token):
thread.start()


def _send_payload_pool(payload_str, access_token):
try:
_post_api('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _send_payload_thread_pool(payload_str, access_token):
from rollbar.lib.thread_pool import submit
submit(_send_payload_pool, payload_str, access_token)


def _send_payload_appengine(payload_str, access_token):
try:
_post_api_appengine('item/', payload_str, access_token=access_token)
Expand Down
38 changes: 38 additions & 0 deletions rollbar/lib/thread_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import os
import sys
from concurrent.futures import ThreadPoolExecutor

_pool = None # type: ThreadPoolExecutor|None

log = logging.getLogger(__name__)


def init_pool(max_workers):
"""
Creates the thread pool with the max workers.

:type max_workers: int|None
:param max_workers: If max_workers is None it will use the logic from the standard library to calculate the number
of threads. However, we ported the logic from Python 3.5 to earlier versions.
"""
if max_workers is None and sys.version_info < (3, 5):
max_workers = (os.cpu_count() or 1) * 5

global _pool
_pool = ThreadPoolExecutor(max_workers)


def submit(worker, payload_str, access_token):
"""
Submit a new task to the thread pool.

:type worker: function
:type payload_str: str
:type access_token: str
"""
global _pool
if _pool is None:
log.warning('pyrollbar: Thead pool not initialized. Please ensure init_pool() is called prior to submit().')
return
_pool.submit(worker, payload_str, access_token)
26 changes: 26 additions & 0 deletions rollbar/test/test_rollbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,32 @@ def _raise():

send_payload_httpx.assert_called_once()

@unittest.skipUnless(sys.version_info >= (3, 6), 'assert_called_once support requires Python3.6+')
@mock.patch('rollbar._send_payload_thread_pool')
def test_thread_pool_handler(self, send_payload_thread_pool):
def _raise():
try:
raise Exception('foo')
except:
rollbar.report_exc_info()
rollbar.SETTINGS['handler'] = 'thread_pool'
_raise()

send_payload_thread_pool.assert_called_once()

@unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+')
def test_thread_pool_submit(self):
from rollbar.lib.thread_pool import init_pool, submit
init_pool(1)
ran = {'nope': True} # dict used so it is not shadowed in run

def run(payload_str, access_token):
ran['nope'] = False

submit(run, 'foo', 'bar')
self.assertFalse(ran['nope'])


@mock.patch('rollbar.send_payload')
def test_args_constructor(self, send_payload):

Expand Down