Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: TunnelCommunity multiprocessing #2607

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,17 @@ def load_communities(self):
load=True,
kargs=multichain_kwargs)

from Tribler.community.tunnel.hidden_community_multichain import HiddenTunnelCommunityMultichain
self.tunnel_community = self.dispersy.define_auto_load(
HiddenTunnelCommunityMultichain, dispersy_member, load=True, kargs=tunnel_kwargs)[0]
if self.session.get_tunnel_community_pooled():
# Load the pooled HiddenTunnelCommunityMultichain
from Tribler.community.tunnel.pooled_tunnel_community import PooledTunnelCommunity
self.tunnel_community = self.dispersy.define_auto_load(
PooledTunnelCommunity, dispersy_member, load=True, kargs=tunnel_kwargs)[0]
else:
# Load the normal HiddenTunnelCommunityMultichain
from Tribler.community.tunnel.hidden_community_multichain \
import HiddenTunnelCommunityMultichain
self.tunnel_community = self.dispersy.define_auto_load(
HiddenTunnelCommunityMultichain, dispersy_member, load=True, kargs=tunnel_kwargs)[0]
else:
keypair = self.dispersy.crypto.generate_key(u"curve25519")
dispersy_member = self.dispersy.get_member(private_key=self.dispersy.crypto.key_to_bin(keypair))
Expand Down
14 changes: 14 additions & 0 deletions Tribler/Core/SessionConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,20 @@ def get_tunnel_community_enabled(self):
"""
return self.sessconfig.get(u'tunnel_community', u'enabled')

def set_tunnel_community_pooled(self, value):
"""
Enable or disable subprocesses for the tunnel community.
:param value: A boolean indicating whether tunnel community pooling should be enabled
"""
self.sessconfig.set(u'tunnel_community', u'pooled', value)

def get_tunnel_community_pooled(self):
"""
Returns whether tunnel community pooling is enabled.
:return: A boolean indicating whether tunnel community pooling is enabled
"""
return self.sessconfig.get(u'tunnel_community', u'pooled')

#
# BarterCommunity settings
#
Expand Down
1 change: 1 addition & 0 deletions Tribler/Core/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
sessdefaults['tunnel_community']['socks5_listen_ports'] = [-1] * 5
sessdefaults['tunnel_community']['exitnode_enabled'] = False
sessdefaults['tunnel_community']['enabled'] = True
sessdefaults['tunnel_community']['pooled'] = False

# Multichain community section
sessdefaults['multichain'] = OrderedDict()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from Tribler.community.multichain.community import (MultiChainCommunity, MultiChainCommunityCrawler, CRAWL_REQUEST,
CRAWL_RESPONSE, CRAWL_RESUME)
from Tribler.community.multichain.conversion import EMPTY_HASH
from Tribler.community.tunnel.routing import Circuit, RelayRoute
from Tribler.community.tunnel.remotes.circuit import Circuit
from Tribler.community.tunnel.remotes.relayroute import RelayRoute
from Tribler.community.tunnel.tunnel_community import TunnelExitSocket
from Tribler.Test.test_as_server import AbstractServer
from Tribler.dispersy.message import DelayPacketByMissingMember
Expand Down
Empty file.
221 changes: 221 additions & 0 deletions Tribler/Test/Community/Tunnel/processes/test_childprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
from twisted.internet.defer import Deferred, inlineCallbacks

import Tribler.community.tunnel.processes

from Tribler.community.tunnel.processes import line_util
from Tribler.community.tunnel.processes.childprocess import ChildProcess
from Tribler.dispersy.util import blocking_call_on_reactor_thread
from Tribler.Test.test_as_server import AbstractServer


class MockTransport(object):

def __init__(self):
self.input = {}
self.deferred = Deferred()

def writeToChild(self, fd, data):
if fd in self.input.keys():
self.input[fd] = self.input[fd] + data
else:
self.input[fd] = data

if len(self.input) == 1:
self.deferred.callback(None)

def get_output_on(self, fd):
if fd in self.input.keys():
return self.input[fd]
else:
return ""


class MockChildProcess(ChildProcess):

def __init__(self):
self.transport = MockTransport()
self.input_callbacks = {1: self.on_generic,
4: self.on_ctrl,
6: self.on_data,
8: self.on_exit}
self.called_ctrl = False
self.called_data = False
self.called_exit = False

def on_ctrl(self, msg):
self.called_ctrl = True

def on_exit(self, msg):
self.called_exit = True

def on_data(self, msg):
self.called_data = True


class TestChildProcess(AbstractServer):

@classmethod
def setUpClass(cls):
"""
Set up a message that contains all 256 possible characters
"""
cls.message = "".join(chr(i) for i in xrange(256))

@blocking_call_on_reactor_thread
@inlineCallbacks
def setUp(self, annotate=True):
"""
Write all of the Subprocess output to strings instead of file descriptors.
"""
yield super(TestChildProcess, self).setUp(annotate=annotate)
self.process = MockChildProcess()

@blocking_call_on_reactor_thread
@inlineCallbacks
def test_data_out(self):
"""
Output data should be pack_data()'d.
"""
Tribler.community.tunnel.processes.CHILDFDS_ENABLED = True
reload(Tribler.community.tunnel.processes.childprocess)
self.process.write_data(TestChildProcess.message)

yield self.process.transport.deferred
sent = self.process.transport.get_output_on(5)

self.assertGreater(len(sent), 0)

_, decoded = line_util.unpack_complex(sent)

self.assertIsNotNone(decoded)
self.assertEquals(decoded, TestChildProcess.message)

@blocking_call_on_reactor_thread
@inlineCallbacks
def test_ctrl_out(self):
"""
Output data should be pack_data()'d.
"""
Tribler.community.tunnel.processes.CHILDFDS_ENABLED = True
reload(Tribler.community.tunnel.processes.childprocess)
self.process.write_ctrl(TestChildProcess.message)

yield self.process.transport.deferred
sent = self.process.transport.get_output_on(3)

self.assertGreater(len(sent), 0)

_, decoded = line_util.unpack_complex(sent)

self.assertIsNotNone(decoded)
self.assertEquals(decoded, TestChildProcess.message)

@blocking_call_on_reactor_thread
@inlineCallbacks
def test_exit_out(self):
"""
Output data should be pack_data()'d.
"""
Tribler.community.tunnel.processes.CHILDFDS_ENABLED = True
reload(Tribler.community.tunnel.processes.childprocess)
self.process.write_exit(TestChildProcess.message)

yield self.process.transport.deferred
sent = self.process.transport.get_output_on(7)

self.assertGreater(len(sent), 0)

_, decoded = line_util.unpack_complex(sent)

self.assertIsNotNone(decoded)
self.assertEquals(decoded, TestChildProcess.message)

@blocking_call_on_reactor_thread
@inlineCallbacks
def test_generic_data_out(self):
"""
Output data should be pack_data()'d.
"""
Tribler.community.tunnel.processes.CHILDFDS_ENABLED = False
reload(Tribler.community.tunnel.processes.childprocess)
self.process.write_data(TestChildProcess.message)

yield self.process.transport.deferred
sent = self.process.transport.get_output_on(0)

self.assertGreater(len(sent), 0)

_, decoded = line_util.unpack_complex(sent)

self.assertIsNotNone(decoded)
self.assertEquals(decoded, str(5) + TestChildProcess.message)

@blocking_call_on_reactor_thread
@inlineCallbacks
def test_generic_ctrl_out(self):
"""
Output data should be pack_data()'d.
"""
Tribler.community.tunnel.processes.CHILDFDS_ENABLED = False
reload(Tribler.community.tunnel.processes.childprocess)
self.process.write_ctrl(TestChildProcess.message)

yield self.process.transport.deferred
sent = self.process.transport.get_output_on(0)

self.assertGreater(len(sent), 0)

_, decoded = line_util.unpack_complex(sent)

self.assertIsNotNone(decoded)
self.assertEquals(decoded, str(3) + TestChildProcess.message)

@blocking_call_on_reactor_thread
@inlineCallbacks
def test_generic_exit_out(self):
"""
Output data should be pack_data()'d.
"""
Tribler.community.tunnel.processes.CHILDFDS_ENABLED = False
reload(Tribler.community.tunnel.processes.childprocess)
self.process.write_exit(TestChildProcess.message)

yield self.process.transport.deferred
sent = self.process.transport.get_output_on(0)

self.assertGreater(len(sent), 0)

_, decoded = line_util.unpack_complex(sent)

self.assertIsNotNone(decoded)
self.assertEquals(decoded, str(7) + TestChildProcess.message)

def test_on_generic_ctrl(self):
"""
A generic message should be forwarded to the correct stream.
"""
self.process.on_generic(str(4) + TestChildProcess.message)

self.assertTrue(self.process.called_ctrl)
self.assertFalse(self.process.called_data)
self.assertFalse(self.process.called_exit)

def test_on_generic_data(self):
"""
A generic message should be forwarded to the correct stream.
"""
self.process.on_generic(str(6) + TestChildProcess.message)

self.assertFalse(self.process.called_ctrl)
self.assertTrue(self.process.called_data)
self.assertFalse(self.process.called_exit)

def test_on_generic_exit(self):
"""
A generic message should be forwarded to the correct stream.
"""
self.process.on_generic(str(8) + TestChildProcess.message)

self.assertFalse(self.process.called_ctrl)
self.assertFalse(self.process.called_data)
self.assertTrue(self.process.called_exit)
114 changes: 114 additions & 0 deletions Tribler/Test/Community/Tunnel/processes/test_line_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import struct
import unittest

from Tribler.community.tunnel.processes.line_util import (fix_split,
pack_data,
unpack_data,
unpack_complex)

BINARY_STRING_ALL_CHARS = "".join([chr(i) for i in range(256)])


class TestLineUtil(unittest.TestCase):

def test_fix_split_correct_single(self):
args = [BINARY_STRING_ALL_CHARS]
out = fix_split(1, "", args)

self.assertEqual(out, args)

def test_fix_split_correct_double(self):
args = ["test", BINARY_STRING_ALL_CHARS]
out = fix_split(2, "", args)

self.assertEqual(out, args)

def test_fix_split_correct_many(self):
args = [BINARY_STRING_ALL_CHARS] * 20
out = fix_split(20, "", args)

self.assertEqual(out, args)

def test_fix_split_broken_single(self):
delim = chr(128)
args = ["test"] + BINARY_STRING_ALL_CHARS.split(delim)
out = fix_split(2, delim, args)

self.assertEqual(out, ["test", BINARY_STRING_ALL_CHARS])

def test_fix_split_broken_double(self):
delim = chr(128)
args = (["test"]
+ BINARY_STRING_ALL_CHARS.split(delim)
+ BINARY_STRING_ALL_CHARS.split(delim))
out = fix_split(2, delim, args)

self.assertEqual(out, ["test", BINARY_STRING_ALL_CHARS
+ delim
+ BINARY_STRING_ALL_CHARS])

def test_pack_data_empty(self):
out = pack_data("")

self.assertEqual(len(out), 9)

l = struct.unpack("Q", out[:8])[0]

self.assertEqual(l, 1)
self.assertEqual(out[-1], "\n")

def test_pack_data_full(self):
out = pack_data(BINARY_STRING_ALL_CHARS)

self.assertEqual(len(out), len(BINARY_STRING_ALL_CHARS) + 9)

l = struct.unpack("Q", out[:8])[0]

self.assertEqual(l, len(BINARY_STRING_ALL_CHARS) + 1)
self.assertEqual(out[8:-1], BINARY_STRING_ALL_CHARS)
self.assertEqual(out[-1], "\n")

def test_unpack_data_incomplete(self):
data = "0000000"
l, out = unpack_data(data)

self.assertGreater(l, len(data))
self.assertEqual(out, data)

def test_unpack_data_empty(self):
data = pack_data("")
l, out = unpack_data(data)

self.assertEqual(out, "")
self.assertEqual(l, len(out) + 9)
self.assertEqual(l, len(data))

def test_unpack_data_full(self):
data = pack_data(BINARY_STRING_ALL_CHARS)
l, out = unpack_data(data)

self.assertEqual(out, BINARY_STRING_ALL_CHARS)
self.assertEqual(l, len(out) + 9)
self.assertEqual(l, len(data))

def test_unpack_complex_incomplete(self):
data = pack_data(BINARY_STRING_ALL_CHARS)[:-2]
keep, share = unpack_complex(data)

self.assertEqual(keep, data)
self.assertEqual(share, None)

def test_unpack_complex_complete(self):
data = pack_data(BINARY_STRING_ALL_CHARS)
keep, share = unpack_complex(data)

self.assertEqual(keep, "")
self.assertEqual(share, BINARY_STRING_ALL_CHARS)

def test_unpack_complex_overflow(self):
remainder = "test"
data = pack_data(BINARY_STRING_ALL_CHARS)
keep, share = unpack_complex(data + remainder)

self.assertEqual(keep, remainder)
self.assertEqual(share, BINARY_STRING_ALL_CHARS)
Loading