Skip to content

Commit

Permalink
Threadless execution using coroutines (#134)
Browse files Browse the repository at this point in the history
* Workers need not register/unregister sock for every loop

* No need of explicit socket.settimeout(0) which is same as socket.setblocking(False)

* Remove settimeout assertion

* Only store sender side of Pipe().  Also ensure both end of the Pipe() are closed on shutdown

* Make now global. Also we seem to be using datetime.utcnow and time.time for similar purposes

* Use time.time throughout.  Remove incomplete test_cache_responses_plugin to avoid resource leak in tests

* Remove unused

* Wrap selector register/unregister within a context manager

* Refactor in preparation of threadless request handling

* MyPy generator fix

* Add --threadless flag

* Internally call them acceptors

* Internally use acceptors

* Add Threadless class.

Also no need to pass family over pipe to acceptors.

* Make threadless work for a single client :)

* Threadless is soon be our default

* Close client queue

* Use context manager for register/unregister

* Fix Acceptor tests broken after refactoring

* Use asyncio tasks to invoke ProtocolHandle.handle_events

This gives all client threads a chance to respond without
waiting for other handlers to return.

* Explicitly initialize event loop per Threadless process

* Mypy fixes

* Add ThreadlessWork abstract class implemented by ProtocolHandler

* Add benchmark.py

Avoid TIME_WAIT by properly shutting down the connection.

* Add benchmark.py as part of testing workflow

* When e2e encryption is enabled, unwrap socket before shutdown to ensure CLOSED state

* MyPy fixes, Union should have worked, but likely unwrap is not part of socket.socket hence

* Unwrap if wrapped before shutdown

* Unwrap if wrapped before shutdown

* socket.SHUT_RDWR will cause leaks

* MyPy

* Add instructions for monitor.sh

* Avoid recursive exception in new_socket_connection and only invoke plugins/shutdown if server connection was initialized

* Add Fast & Scalable section

* Update internal classes section

* Dont print out local dir path in help text :)

* Refactor

* Fix a bug where response parser for HTTP only requests was reused for pipelined requests resulting in a hang

* Add chrome_with_proxy.sh helper script

* Handle OSError during client.flush which can happen due to invalid protocol type for socket error

* Remove redundant e

* Add classmethods to quickly construct a parser object

* Don't raise from TcpConnection abstract class.

This allows both client/socket side of communication to handle
exceptions as necessary. We might refactor this again later to remove
redundant code :)

* Disable response parsing when TLS interception is enabled. See issue #127

* remove unused imports

* Within webserver parse pipelined requests only if we have a route

* Add ShortLinkPlugin plugin

* Add more shortlinks

* Add ShortLinkPlugin to README.md

* Add path forwarding too instead of leaving as excercise ;)

* Add shortlink to TOC

* Ensure no socket leaks

* Ensure no leaks

* Naming

* Default number of clients 1

* Avoid shortlinking localhost

* Stress more
  • Loading branch information
abhinavsingh authored Oct 16, 2019
1 parent 9d46cba commit a1bb659
Show file tree
Hide file tree
Showing 10 changed files with 1,014 additions and 359 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:
run: |
# The GitHub editor is 127 chars wide
# W504 screams for line break after binary operators
flake8 --ignore=W504 --max-line-length=127 proxy.py plugin_examples.py tests.py setup.py
flake8 --ignore=W504 --max-line-length=127 proxy.py plugin_examples.py tests.py setup.py benchmark.py
# mypy compliance check
mypy --strict --ignore-missing-imports proxy.py plugin_examples.py tests.py setup.py
mypy --strict --ignore-missing-imports proxy.py plugin_examples.py tests.py setup.py benchmark.py
- name: Run Tests
run: pytest tests.py
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,3 @@ proxy.py.iml
*.pyc
ca-*.pem
https-*.pem
benchmark.py
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ coverage:
open htmlcov/index.html

lint:
flake8 --ignore=W504 --max-line-length=127 proxy.py plugin_examples.py tests.py setup.py
mypy --strict --ignore-missing-imports proxy.py plugin_examples.py tests.py setup.py
flake8 --ignore=W504 --max-line-length=127 proxy.py plugin_examples.py tests.py setup.py benchmark.py
mypy --strict --ignore-missing-imports proxy.py plugin_examples.py tests.py setup.py benchmark.py

autopep8:
autopep8 --recursive --in-place --aggressive proxy.py
Expand Down
142 changes: 108 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Table of Contents
* [Stable version](#stable-version-from-docker-hub)
* [Development version](#build-development-version-locally)
* [Plugin Examples](#plugin-examples)
* [ShortLinkPlugin](#shortlinkplugin)
* [ModifyPostDataPlugin](#modifypostdataplugin)
* [ProposedRestApiPlugin](#proposedrestapiplugin)
* [RedirectToCustomServerPlugin](#redirecttocustomserverplugin)
Expand All @@ -49,9 +50,16 @@ Table of Contents
* [End-to-End Encryption](#end-to-end-encryption)
* [TLS Interception](#tls-interception)
* [import proxy.py](#import-proxypy)
* [proxy.new_socket_connection](#proxynew_socket_connection)
* [proxy.socket_connection](#proxysocket_connection)
* [proxy.build_http_request](#proxybuild_http_request)
* [TCP Sockets](#tcp-sockets)
* [proxy.new_socket_connection](#proxynew_socket_connection)
* [proxy.socket_connection](#proxysocket_connection)
* [Http Client](#http-client)
* [proxy.build_http_request](#proxybuild_http_request)
* [proxy.build_http_response](#proxybuild_http_response)
* [Websocket Client](#websocket-client)
* [proxy.WebsocketFrame](#proxywebsocketframe)
* [proxy.WebsocketClient](#proxywebsocketclient)
* [Embed proxy.py](#embed-proxypy)
* [Plugin Developer and Contributor Guide](#plugin-developer-and-contributor-guide)
* [Everything is a plugin](#everything-is-a-plugin)
* [Internal Architecture](#internal-architecture)
Expand All @@ -68,6 +76,29 @@ Table of Contents
Features
========

- Fast & Scalable
- Scales by using all available cores on the system
- Threadless executions using coroutine
- Made to handle `tens-of-thousands` connections / sec
```
# On Macbook Pro 2015 / 2.8 GHz Intel Core i7
$ hey -n 10000 -c 100 http://localhost:8899/
Summary:
Total: 0.6157 secs
Slowest: 0.1049 secs
Fastest: 0.0007 secs
Average: 0.0055 secs
Requests/sec: 16240.5444
Total data: 800000 bytes
Size/request: 80 bytes
Response time histogram:
0.001 [1] |
0.011 [9565] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.022 [332] |■
```
- Lightweight
- Distributed as a single file module `~100KB`
- Uses only `~5-20MB` RAM
Expand Down Expand Up @@ -204,6 +235,35 @@ See [plugin_examples.py](https://github.com/abhinavsingh/proxy.py/blob/develop/p
All the examples below also works with `https` traffic but require additional flags and certificate generation.
See [TLS Interception](#tls-interception).
## ShortLinkPlugin
Add support for short links in your favorite browsers / applications.
Start `proxy.py` as:
```
$ proxy.py \
--plugins plugin_examples.ShortLinkPlugin
```
Now you can speed up your daily browsing experience by visiting your
favorite website using single character domain names :). This works
across all browsers.
Following short links are enabled by default:
Short Link | Destination URL
:--------: | :---------------:
a/ | amazon.com
i/ | instagram.com
l/ | linkedin.com
f/ | facebook.com
g/ | google.com
t/ | twitter.com
w/ | web.whatsapp.com
y/ | youtube.com
proxy/ | localhost:8899
## ModifyPostDataPlugin
Modifies POST request body before sending request to upstream server.
Expand Down Expand Up @@ -599,7 +659,9 @@ $ python
>>>
```
## proxy.new_socket_connection
## TCP Sockets
### proxy.new_socket_connection
Attempts to create an IPv4 connection, then IPv6 and
finally a dual stack connection to provided address.
Expand All @@ -610,7 +672,7 @@ finally a dual stack connection to provided address.
>>> conn.close()
```
## proxy.socket_connection
### proxy.socket_connection
`socket_connection` is a convenient decorator + context manager
around `new_socket_connection` which ensures `conn.close` is implicit.
Expand All @@ -630,17 +692,19 @@ As a decorator:
>>> ... [ use connection ] ...
```
## proxy.build_http_request
## Http Client
#### Generate HTTP GET request
### proxy.build_http_request
##### Generate HTTP GET request
```
>>> proxy.build_http_request(b'GET', b'/')
b'GET / HTTP/1.1\r\n\r\n'
>>>
```
#### Generate HTTP GET request with headers
##### Generate HTTP GET request with headers
```
>>> proxy.build_http_request(b'GET', b'/',
Expand All @@ -649,7 +713,7 @@ b'GET / HTTP/1.1\r\nConnection: close\r\n\r\n'
>>>
```
#### Generate HTTP POST request with headers and body
##### Generate HTTP POST request with headers and body
```
>>> import json
Expand All @@ -659,6 +723,22 @@ b'GET / HTTP/1.1\r\nConnection: close\r\n\r\n'
b'POST /form HTTP/1.1\r\nContent-type: application/json\r\n\r\n{"email": "[email protected]"}'
```
### proxy.build_http_response
TODO
## Websocket Client
### proxy.WebsocketFrame
TODO
### proxy.WebsocketClient
TODO
## Embed proxy.py
To start `proxy.py` server from imported `proxy.py` module, simply do:
```
Expand Down Expand Up @@ -710,14 +790,14 @@ mechanism. Its responsibility is to establish connection between client and
upstream [TcpServerConnection](https://github.com/abhinavsingh/proxy.py/blob/b03629fa0df1595eb4995427bc601063be7fdca9/proxy.py#L204-L227)
and invoke `HttpProxyBasePlugin` lifecycle hooks.
- `ProtocolHandler` threads are started by [Worker](https://github.com/abhinavsingh/proxy.py/blob/b03629fa0df1595eb4995427bc601063be7fdca9/proxy.py#L424-L472)
- `ProtocolHandler` threads are started by [Acceptor](https://github.com/abhinavsingh/proxy.py/blob/b03629fa0df1595eb4995427bc601063be7fdca9/proxy.py#L424-L472)
processes.
- `--num-workers` `Worker` processes are started by
- `--num-workers` `Acceptor` processes are started by
[AcceptorPool](https://github.com/abhinavsingh/proxy.py/blob/b03629fa0df1595eb4995427bc601063be7fdca9/proxy.py#L368-L421)
on start-up.
- `AcceptorPool` listens on server socket and pass the handler to `Worker` processes.
- `AcceptorPool` listens on server socket and pass the handler to `Acceptor` processes.
Workers are responsible for accepting new client connections and starting
`ProtocolHandler` thread.
Expand Down Expand Up @@ -748,33 +828,23 @@ Example:
```
$ pydoc3 proxy
Help on module proxy:
NAME
proxy
DESCRIPTION
proxy.py
~~~~~~~~
Lightweight, Programmable, TLS interceptor Proxy for HTTP(S), HTTP2, WebSockets protocols in a single Python file.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.

CLASSES
abc.ABC(builtins.object)
HttpProxyBasePlugin
HttpWebServerBasePlugin
DevtoolsFrontendPlugin
DevtoolsWebsocketPlugin
HttpWebServerPacFilePlugin
ProtocolHandlerPlugin
DevtoolsEventGeneratorPlugin
DevtoolsProtocolPlugin
HttpProxyPlugin
HttpWebServerPlugin
TcpConnection
TcpClientConnection
TcpServerConnection
WebsocketClient
ThreadlessWork
ProtocolHandler(threading.Thread, ThreadlessWork)
builtins.Exception(builtins.BaseException)
ProtocolException
HttpRequestRejected
Expand All @@ -789,17 +859,20 @@ CLASSES
WebsocketFrame
builtins.tuple(builtins.object)
ChunkParserStates
HttpMethods
HttpParserStates
HttpParserTypes
HttpProtocolTypes
HttpStatusCodes
TcpConnectionTypes
WebsocketOpcodes
contextlib.ContextDecorator(builtins.object)
socket_connection
multiprocessing.context.Process(multiprocessing.process.BaseProcess)
Worker
Acceptor
Threadless
threading.Thread(builtins.object)
ProtocolHandler
ProtocolHandler(threading.Thread, ThreadlessWork)
```
Frequently Asked Questions
Expand Down Expand Up @@ -905,8 +978,8 @@ usage: proxy.py [-h] [--backlog BACKLOG] [--basic-auth BASIC_AUTH]
[--pac-file-url-path PAC_FILE_URL_PATH] [--pid-file PID_FILE]
[--plugins PLUGINS] [--port PORT]
[--server-recvbuf-size SERVER_RECVBUF_SIZE]
[--static-server-dir STATIC_SERVER_DIR] [--timeout TIMEOUT]
[--version]
[--static-server-dir STATIC_SERVER_DIR] [--threadless]
[--timeout TIMEOUT] [--version]

proxy.py v1.2.0

Expand Down Expand Up @@ -991,10 +1064,11 @@ optional arguments:
value for faster downloads at the expense of increased
RAM.
--static-server-dir STATIC_SERVER_DIR
Default: /Users/abhinav/Dev/proxy.py/public. Static
server root directory. This option is only applicable
when static server is also enabled. See --enable-
static-server.
Default: "public" folder in directory where proxy.py
is placed. This option is only applicable when static
server is also enabled. See --enable-static-server.
--threadless Default: False. When disabled a new thread is spawned
to handle each client connection.
--timeout TIMEOUT Default: 10. Number of seconds after which an inactive
connection must be dropped. Inactivity is defined by
no data sent or received by the client.
Expand Down
95 changes: 95 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Programmable Proxy Server in a single Python file.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import argparse
import asyncio
import sys
from typing import List, Tuple

import proxy

DEFAULT_N = 1


def init_parser() -> argparse.ArgumentParser:
"""Initializes and returns argument parser."""
parser = argparse.ArgumentParser(
description='Benchmark opens N concurrent connections '
'to proxy.py web server. Currently, HTTP/1.1 '
'keep-alive connections are opened. Over each opened '
'connection multiple pipelined request / response '
'packets are exchanged with proxy.py web server.',
epilog='Proxy.py not working? Report at: %s/issues/new' % proxy.__homepage__
)
parser.add_argument(
'--n', '-n',
type=int,
default=DEFAULT_N,
help='Default: ' + str(DEFAULT_N) + '. See description above for meaning of N.'
)
return parser


class Benchmark:

def __init__(self, n: int = DEFAULT_N) -> None:
self.n = n
self.clients: List[Tuple[asyncio.StreamReader, asyncio.StreamWriter]] = []

async def open_connections(self) -> None:
for _ in range(self.n):
self.clients.append(await asyncio.open_connection('::', 8899))
print('Opened ' + str(self.n) + ' connections')

def send_requests(self) -> None:
for _, writer in self.clients:
writer.write(proxy.build_http_request(
proxy.httpMethods.GET, b'/'
))

async def recv_responses(self) -> None:
for reader, _ in self.clients:
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
while response.state != proxy.httpParserStates.COMPLETE:
response.parse(await reader.read(proxy.DEFAULT_BUFFER_SIZE))

async def close_connections(self) -> None:
for reader, writer in self.clients:
writer.close()
await writer.wait_closed()
print('Closed ' + str(self.n) + ' connections')

async def run(self) -> None:
num_completed_requests_per_connection: int = 0
try:
await self.open_connections()
print('Exchanging request / response packets')
while True:
self.send_requests()
await self.recv_responses()
num_completed_requests_per_connection += 1
await asyncio.sleep(0.1)
finally:
await self.close_connections()
print('Exchanged ' + str(num_completed_requests_per_connection) +
' request / response per connection')


def main(input_args: List[str]) -> None:
args = init_parser().parse_args(input_args)
benchmark = Benchmark(n=args.n)
try:
asyncio.run(benchmark.run())
except KeyboardInterrupt:
pass


if __name__ == '__main__':
main(sys.argv[1:]) # pragma: no cover
Loading

0 comments on commit a1bb659

Please sign in to comment.