Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-blocking embedded mode feature #159

Merged
merged 4 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Table of Contents
* [End-to-End Encryption](#end-to-end-encryption)
* [TLS Interception](#tls-interception)
* [Embed proxy.py](#embed-proxypy)
* [Blocking Mode](#blocking-mode)
* [Non-blocking Mode](#non-blocking-mode)
* [Plugin Developer and Contributor Guide](#plugin-developer-and-contributor-guide)
* [Everything is a plugin](#everything-is-a-plugin)
* [Internal Architecture](#internal-architecture)
Expand Down Expand Up @@ -125,7 +127,7 @@ Features
- See [End-to-End Encryption](#end-to-end-encryption)
- Man-In-The-Middle
- Can decrypt TLS traffic between clients and upstream servers
- See [TLS Encryption](#tls-interception)
- See [TLS Interception](#tls-interception)
- Supported proxy protocols
- `http`
- `https`
Expand Down Expand Up @@ -162,7 +164,7 @@ Start proxy.py
## From command line when installed using PIP

When `proxy.py` is installed using `pip`,
an executable named `proxy` is added under your `$PATH`.
an executable named `proxy` is placed under your `$PATH`.

#### Run it

Expand Down Expand Up @@ -711,7 +713,9 @@ Now use CA flags with other
Embed proxy.py
==============

To start `proxy.py` in embedded mode:
## Blocking Mode

Start `proxy.py` in embedded mode by using `main` method:

```
from proxy.main import main
Expand All @@ -732,6 +736,29 @@ if __name__ == '__main__':
])
```

Note that:

1. Calling `main` is simply equivalent to starting `proxy.py` from command line.
2. `main` will block until `proxy.py` shuts down.

## Non-blocking Mode

Start `proxy.py` in embedded mode by using `start` method:

```
from proxy.main import start

if __name__ == '__main__':
with start([]):
# ... your logic here ...
```

Note that:

1. `start` is simply a context manager.
2. Is similar to calling `main` except `start` won't block.
3. It automatically shut down `proxy.py`.

Plugin Developer and Contributor Guide
======================================

Expand Down
13 changes: 6 additions & 7 deletions proxy/core/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class AcceptorPool:

def __init__(self, flags: Flags, work_klass: Type[ThreadlessWork]) -> None:
self.flags = flags
self.running: bool = False
self.socket: Optional[socket.socket] = None
self.acceptors: List[Acceptor] = []
self.work_queues: List[connection.Connection] = []
Expand Down Expand Up @@ -69,7 +68,7 @@ def start_workers(self) -> None:
event_queue=self.event_queue
)
acceptor.start()
logger.debug('Started acceptor process %d', acceptor.pid)
logger.debug('Started acceptor#%d process %d', acceptor_id, acceptor.pid)
self.acceptors.append(acceptor)
self.work_queues.append(work_queue[0])
logger.info('Started %d workers' % self.flags.num_workers)
Expand All @@ -90,6 +89,8 @@ def start_event_dispatcher(self) -> None:

def shutdown(self) -> None:
logger.info('Shutting down %d workers' % self.flags.num_workers)
for acceptor in self.acceptors:
acceptor.running.set()
if self.flags.enable_events:
assert self.event_dispatcher_shutdown
assert self.event_dispatcher_thread
Expand All @@ -104,7 +105,6 @@ def shutdown(self) -> None:

def setup(self) -> None:
"""Listen on port, setup workers and pass server socket to workers."""
self.running = True
self.listen()
if self.flags.enable_events:
self.start_event_dispatcher()
Expand Down Expand Up @@ -145,7 +145,7 @@ def __init__(
self.work_klass = work_klass
self.event_queue = event_queue

self.running = False
self.running = multiprocessing.Event()
self.selector: Optional[selectors.DefaultSelector] = None
self.sock: Optional[socket.socket] = None
self.threadless_process: Optional[multiprocessing.Process] = None
Expand Down Expand Up @@ -208,7 +208,6 @@ def run_once(self) -> None:
# logger.info('Work started for fd %d in %f seconds', fileno, time.time() - now)

def run(self) -> None:
self.running = True
self.selector = selectors.DefaultSelector()
fileno = recv_handle(self.work_queue)
self.work_queue.close()
Expand All @@ -221,7 +220,7 @@ def run(self) -> None:
self.selector.register(self.sock, selectors.EVENT_READ)
if self.flags.threadless:
self.start_threadless_process()
while self.running:
while not self.running.is_set():
self.run_once()
except KeyboardInterrupt:
pass
Expand All @@ -230,4 +229,4 @@ def run(self) -> None:
if self.flags.threadless:
self.shutdown_threadless_process()
self.sock.close()
self.running = False
logger.debug('Acceptor#%d shutdown', self.idd)
18 changes: 12 additions & 6 deletions proxy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:license: BSD, see LICENSE for more details.
"""
import base64
import contextlib
import importlib
import inspect
import ipaddress
Expand All @@ -16,7 +17,7 @@
import os
import sys
import time
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Generator

from .common.flags import Flags, init_parser
from .common.utils import text_, bytes_
Expand Down Expand Up @@ -101,7 +102,8 @@ def setup_logger(
logging.basicConfig(level=ll, format=log_format)


def main(input_args: List[str]) -> None:
@contextlib.contextmanager
def start(input_args: List[str]) -> Generator[None, None, None]:
if not is_py3():
print(
'DEPRECATION: "develop" branch no longer supports Python 2.7. Kindly upgrade to Python 3+. '
Expand All @@ -112,7 +114,6 @@ def main(input_args: List[str]) -> None:
'Please upgrade your Python as Python 2.7 won\'t be maintained after that date. '
'A future version of pip will drop support for Python 2.7.')
sys.exit(1)

args = init_parser().parse_args(input_args)

if args.version:
Expand Down Expand Up @@ -194,9 +195,7 @@ def main(input_args: List[str]) -> None:

try:
acceptor_pool.setup()
# TODO: Introduce cron feature instead of mindless sleep
while True:
time.sleep(2**10)
yield
except Exception as e:
logger.exception('exception', exc_info=e)
finally:
Expand All @@ -208,5 +207,12 @@ def main(input_args: List[str]) -> None:
os.remove(args.pid_file)


def main(input_args: List[str]) -> None:
with start(input_args):
# TODO: Introduce cron feature instead of mindless sleep
while True:
time.sleep(1)


def entry_point() -> None:
main(sys.argv[1:])
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
long_description_content_type='text/markdown',
download_url=__download_url__,
license=__license__,
packages=find_packages(exclude=['benchmark', 'dashboard', 'plugin_examples', 'tests']),
packages=find_packages(
exclude=[
'benchmark',
'dashboard',
'plugin_examples',
'tests'
]),
install_requires=open('requirements.txt', 'r').read().strip().split(),
entry_points={
'console_scripts': [
Expand Down