Skip to content

Commit

Permalink
some improvements to client class
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Dec 6, 2018
1 parent 4463775 commit ca6b552
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions engineio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import six
from six.moves import urllib
import urllib3
import websocket

from . import exceptions
Expand All @@ -21,8 +20,8 @@ def signal_handler(sig, frame):
Disconnect all active clients and then invoke the original signal handler.
"""
for client in connected_clients:
client.disconnect()
original_signal_handler(sig, frame)
client.disconnect(abort=True)
return original_signal_handler(sig, frame)


original_signal_handler = signal.signal(signal.SIGINT, signal_handler)
Expand Down Expand Up @@ -171,16 +170,18 @@ def send(self, data, binary=None):
self._send_packet(packet.Packet(packet.MESSAGE, data=data,
binary=binary))

def disconnect(self):
def disconnect(self, abort=False):
"""Disconnect from the server."""
if self.state != 'disconnected':
if self.state == 'connected':
self._send_packet(packet.Packet(packet.CLOSE))
self.queue.put(None)
self.queue.join()
self.state = 'disconnecting'
if not abort:
self.queue.join()
if self.current_transport == 'websocket':
self.ws.close()
self.state = 'disconnecting'
self.read_loop_task.join()
if not abort:
self.read_loop_task.join()
self.state = 'disconnected'
try:
connected_clients.remove(self)
Expand Down Expand Up @@ -228,16 +229,14 @@ def _reset(self):

def _connect_polling(self, url, headers, engineio_path):
"""Establish a long-polling connection to the Engine.IO server."""
self.http = urllib3.PoolManager()
self.base_url = self._get_engineio_url(url, engineio_path, 'polling')
self.logger.info('Attempting polling connection to ' + self.base_url)
try:
r = self.http.request(
'GET', self.base_url + self._get_url_timestamp())
except urllib3.exceptions.MaxRetryError:
r = self._send_request(
'GET', self.base_url + self._get_url_timestamp())
if r is None:
self._reset()
six.raise_from(exceptions.ConnectionError(
'Connection refused by the server'), None)
raise exceptions.ConnectionError(
'Connection refused by the server')
if r.status != 200:
raise exceptions.ConnectionError(
'Unexpected status code %s in server response', r.status)
Expand Down Expand Up @@ -281,10 +280,9 @@ def read_loop():
while self.state == 'connected':
self.logger.info(
'Sending polling GET request to ' + self.base_url)
try:
r = self.http.request(
'GET', self.base_url + self._get_url_timestamp())
except urllib3.exceptions.MaxRetryError:
r = self._send_request(
'GET', self.base_url + self._get_url_timestamp())
if r is None:
self.logger.warning(
'Connection refused by the server, aborting')
self.queue.put(None)
Expand Down Expand Up @@ -422,6 +420,15 @@ def _send_packet(self, pkt):
packet.packet_names[pkt.packet_type],
pkt.data if not isinstance(pkt.data, bytes) else '<binary>')

def _send_request(self, method, url, headers=None, body=None):
import urllib3
if self.http is None:
self.http = urllib3.PoolManager()
try:
return self.http.request(method, url, headers=headers, body=body)
except urllib3.exceptions.MaxRetryError:
return None

def _create_queue(self):
"""Create the client's send queue."""
import queue
Expand Down Expand Up @@ -509,9 +516,14 @@ def _writer_task(self):
break
if self.current_transport == 'polling':
p = payload.Payload(packets=packets)
r = self.http.request(
r = self._send_request(
'POST', self.base_url, body=p.encode(),
headers={'Content-Type': 'application/octet-stream'})
if r is None:
self.logger.warning(
'Connection refused by the server, aborting')
self._reset()
break
if r.status != 200:
self.logger.warning('Unexpected status code %s in server '
'response, aborting', r.status)
Expand Down

0 comments on commit ca6b552

Please sign in to comment.