Skip to content

Commit

Permalink
Added option to set client connection timeout when making APIs calls …
Browse files Browse the repository at this point in the history
…with a default of 90 seconds.
  • Loading branch information
rlratzel committed May 18, 2022
1 parent b8a923e commit 79cba4f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 19 deletions.
12 changes: 9 additions & 3 deletions python/gaas_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,24 @@ def wrapped_method(self, *args, **kwargs):
return ret_val
return wrapped_method

def open(self):
def open(self, call_timeout=90000):
"""
Opens a connection to the server at self.host/self.port if one is not
already established. close() must be called in order to allow other
connections from other clients to be made.
This call does nothing if a connection to the server is already open.
Note: all APIs that access the server will call this method
automatically, followed automatically by a call to close(), so calling
this method should not be necessary. close() is not automatically called
if self.hold_open is False.
Parameters
----------
None
call_timeout : int (default is 90000)
Time in millisecods that calls to the server using this open
connection must return by.
Returns
-------
Expand All @@ -103,9 +107,11 @@ def open(self):
>>> # clients cannot connect until a client API call completes or
>>> # close() is manually called.
>>> client.open()
"""
if self.__client is None:
self.__client = create_client(self.host, self.port)
self.__client = create_client(self.host, self.port,
call_timeout=call_timeout)

def close(self):
"""
Expand Down
10 changes: 8 additions & 2 deletions python/gaas_client/gaas_thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,19 @@ def create_server(handler, host, port):
"""
return make_server(spec.GaasService, handler, host, port)

def create_client(host, port):

def create_client(host, port, call_timeout=90000):
"""
Return a client object that will make calls on a server listening on
host/port.
The call_timeout value defaults to 90 seconds, and is used for setting the
timeout for server API calls when using the client created here - if a call
does not return in call_timeout milliseconds, an exception is raised.
"""
try:
return make_client(spec.GaasService, host=host, port=port)
return make_client(spec.GaasService, host=host, port=port,
timeout=call_timeout)
except thriftpy2.transport.TTransportException:
# Rasie a GaaS exception in order to completely encapsulate all Thrift
# details in this module. If this was not done, callers of this function
Expand Down
6 changes: 4 additions & 2 deletions python/gaas_server/gaas_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pathlib import Path
import importlib
import time
import traceback

import cudf
import cugraph
Expand Down Expand Up @@ -100,9 +101,10 @@ def call_graph_creation_extension(self, func_name,
func_kwargs = eval(func_kwargs_repr)
try:
graph_obj = func(*func_args, **func_kwargs)
except:
except Exception:
# FIXME: raise a more detailed error
raise GaasError(f"error running {func_name}")
raise GaasError(f"error running {func_name} : "
f"{traceback.format_exc()}")
return self.__add_graph(graph_obj)

raise GaasError(f"{func_name} is not a graph creation extension")
Expand Down
24 changes: 24 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ def my_graph_creation_function(arg1, arg2):
return pG
"""

graph_creation_extension_long_running_file_contents = """
import time
import cudf
from cugraph.experimental import PropertyGraph
def long_running_graph_creation_function():
time.sleep(10)
pG = PropertyGraph()
return pG
"""

@pytest.fixture(scope="module")
def graph_creation_extension1():
with TemporaryDirectory() as tmp_extension_dir:
Expand All @@ -68,3 +79,16 @@ def graph_creation_extension2():
flush=True)

yield tmp_extension_dir

@pytest.fixture(scope="module")
def graph_creation_extension_long_running():
with TemporaryDirectory() as tmp_extension_dir:
# write graph creation extension .py file
graph_creation_extension_file = open(
Path(tmp_extension_dir)/"long_running_graph_creation_extension.py",
"w")
print(graph_creation_extension_long_running_file_contents,
file=graph_creation_extension_file,
flush=True)

yield tmp_extension_dir
48 changes: 36 additions & 12 deletions python/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,40 +184,64 @@ def test_extract_subgraph(client_with_csv_loaded):
assert Gid in client.get_graph_ids()


def test_call_graph_creation_extension(client):
def test_load_and_call_graph_creation_extension(client,
graph_creation_extension2):
"""
Ensure the graph creation extension preloaded by the server fixture is
callable.
Tests calling a user-defined server-side graph creation extension from the
GaaS client.
"""
# The graph_creation_extension returns the tmp dir created which contains
# the extension
extension_dir = graph_creation_extension2

num_files_loaded = client.load_graph_creation_extensions(extension_dir)
assert num_files_loaded == 1

new_graph_ID = client.call_graph_creation_extension(
"custom_graph_creation_function")
"my_graph_creation_function", "a", "b")

assert new_graph_ID in client.get_graph_ids()

# Inspect the PG and ensure it was created from
# custom_graph_creation_function
# Inspect the PG and ensure it was created from my_graph_creation_function
# FIXME: add client APIs to allow for a more thorough test of the graph
assert client.get_num_edges(new_graph_ID) == 3
assert client.get_num_edges(new_graph_ID) == 2


def test_load_and_call_graph_creation_extension(client,
graph_creation_extension2):
def test_load_and_call_graph_creation_long_running_extension(
client,
graph_creation_extension_long_running):
"""
Tests calling a user-defined server-side graph creation extension from the
GaaS client.
"""
# The graph_creation_extension returns the tmp dir created which contains
# the extension
extension_dir = graph_creation_extension2
extension_dir = graph_creation_extension_long_running

num_files_loaded = client.load_graph_creation_extensions(extension_dir)
assert num_files_loaded == 1

new_graph_ID = client.call_graph_creation_extension(
"my_graph_creation_function", "a", "b")
"long_running_graph_creation_function")

assert new_graph_ID in client.get_graph_ids()

# Inspect the PG and ensure it was created from my_graph_creation_function
# FIXME: add client APIs to allow for a more thorough test of the graph
assert client.get_num_edges(new_graph_ID) == 2
assert client.get_num_edges(new_graph_ID) == 0


def test_call_graph_creation_extension(client):
"""
Ensure the graph creation extension preloaded by the server fixture is
callable.
"""
new_graph_ID = client.call_graph_creation_extension(
"custom_graph_creation_function")

assert new_graph_ID in client.get_graph_ids()

# Inspect the PG and ensure it was created from
# custom_graph_creation_function
# FIXME: add client APIs to allow for a more thorough test of the graph
assert client.get_num_edges(new_graph_ID) == 3

0 comments on commit 79cba4f

Please sign in to comment.