Skip to content

Commit

Permalink
Merge pull request #8 from rlratzel/branch-22.06-updates_for_dgl_1
Browse files Browse the repository at this point in the history
Add ability to load graphs server-side using custom functions
  • Loading branch information
rlratzel authored May 22, 2022
2 parents b32b286 + 79cba4f commit 1ecf61a
Show file tree
Hide file tree
Showing 16 changed files with 985 additions and 316 deletions.
2 changes: 1 addition & 1 deletion python/gaas_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .client import GaasClient
from gaas_client.client import GaasClient
149 changes: 142 additions & 7 deletions python/gaas_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

from functools import wraps

from . import defaults
from .gaas_thrift import create_client
from gaas_client import defaults
from gaas_client.gaas_thrift import create_client


class GaasClient:
Expand Down Expand Up @@ -76,20 +76,24 @@ def wrapped_method(self, *args, **kwargs):
return ret_val
return wrapped_method

def open(self):
def open(self, call_timeout=90000):
"""
Opens a connection to the server at self.host/self.port if one is not
already established. close() must be called in order to allow other
connections from other clients to be made.
This call does nothing if a connection to the server is already open.
Note: all APIs that access the server will call this method
automatically, followed automatically by a call to close(), so calling
this method should not be necessary. close() is not automatically called
if self.hold_open is False.
Parameters
----------
None
call_timeout : int (default is 90000)
Time in millisecods that calls to the server using this open
connection must return by.
Returns
-------
Expand All @@ -103,12 +107,15 @@ def open(self):
>>> # clients cannot connect until a client API call completes or
>>> # close() is manually called.
>>> client.open()
"""
if self.__client is None:
self.__client = create_client(self.host, self.port)
self.__client = create_client(self.host, self.port,
call_timeout=call_timeout)

def close(self):
"""Closes a connection to the server if one has been established, allowing
"""
Closes a connection to the server if one has been established, allowing
other clients to access the server. This method is called automatically
for all APIs that access the server if self.hold_open is False.
Expand Down Expand Up @@ -138,6 +145,131 @@ def close(self):
self.__client.close()
self.__client = None

############################################################################
# Environment management
@__server_connection
def uptime(self):
"""
Return the server uptime in seconds. This is often used as a "ping".
Parameters
----------
None
Returns
-------
uptime : int
The time in seconds the server has been running.
Examples
--------
>>> from gaas_client import GaasClient
>>> client = GaasClient()
>>> client.uptime()
>>> 32
"""
return self.__client.uptime()

@__server_connection
def load_graph_creation_extensions(self, extension_dir_path):
"""
Loads the extensions for graph creation present in the directory
specified by extension_dir_path.
Parameters
----------
extension_dir_path : string
Path to the directory containing the extension files (.py source
files). This directory must be readable by the server.
Returns
-------
num_files_read : int
Number of extension files read in the extension_dir_path directory.
Examples
--------
>>> from gaas_client import GaasClient
>>> client = GaasClient()
>>> num_files_read = client.load_graph_creation_extensions(
... "/some/server/side/directory")
>>>
"""
return self.__client.load_graph_creation_extensions(extension_dir_path)

@__server_connection
def unload_graph_creation_extensions(self):
"""
Removes all extensions for graph creation previously loaded.
Parameters
----------
None
Returns
-------
None
Examples
--------
>>> from gaas_client import GaasClient
>>> client = GaasClient()
>>> client.unload_graph_creation_extensions()
>>>
"""
return self.__client.unload_graph_creation_extensions()

@__server_connection
def call_graph_creation_extension(self, func_name,
*func_args, **func_kwargs):
"""
Calls a graph creation extension on the server that was previously
loaded by a prior call to load_graph_creation_extensions(), then returns
the graph ID of the graph created by the extension.
Parameters
----------
func_name : string
The name of the server-side extension function loaded by a prior
call to load_graph_creation_extensions(). All graph creation
extension functions are expected to return a new graph.
*func_args : string, int, list, dictionary (optional)
The positional args to pass to func_name. Note that func_args are
converted to their string representation using repr() on the client,
then restored to python objects on the server using eval(), and
therefore only objects that can be restored server-side with eval()
are supported.
**func_kwargs : string, int, list, dictionary
The keyword args to pass to func_name. Note that func_kwargs are
converted to their string representation using repr() on the client,
then restored to python objects on the server using eval(), and
therefore only objects that can be restored server-side with eval()
are supported.
Returns
-------
graph_id : int
unique graph ID
Examples
--------
>>> from gaas_client import GaasClient
>>> client = GaasClient()
>>> # Load the extension file containing "my_complex_create_graph()"
>>> client.load_graph_creation_extensions("/some/server/side/directory")
>>> new_graph_id = client.call_graph_creation_extension(
... "my_complex_create_graph",
... "/path/to/csv/on/server/graph.csv",
... clean_data=True)
>>>
"""
func_args_repr = repr(func_args)
func_kwargs_repr = repr(func_kwargs)
return self.__client.call_graph_creation_extension(
func_name, func_args_repr, func_kwargs_repr)

############################################################################
# Graph management
@__server_connection
Expand Down Expand Up @@ -460,8 +592,9 @@ def extract_subgraph(self,
Examples
--------
>>>
"""
# FIXME: finish docstring above

# FIXME: convert defaults to type needed by the Thrift API. These will
# be changing to different types.
create_using = create_using or ""
Expand Down Expand Up @@ -504,6 +637,8 @@ def node2vec(self, start_vertices, max_depth, graph_id=defaults.graph_id):
--------
>>>
"""
# FIXME: finish docstring above

# start_vertices must be a list (cannot just be an iterable), and assume
# return value is tuple of python lists on host.
if not isinstance(start_vertices, list):
Expand Down
2 changes: 1 addition & 1 deletion python/gaas_client/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

host = "127.0.0.1"
host = "localhost"
port = 9090
graph_id = 0
3 changes: 2 additions & 1 deletion python/gaas_client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .gaas_thrift import spec
from gaas_client.gaas_thrift import spec

# FIXME: add more fine-grained exceptions!
GaasError = spec.GaasError
42 changes: 38 additions & 4 deletions python/gaas_client/gaas_thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
import thriftpy2
from thriftpy2.rpc import make_server, make_client


# This is the Thrift input file as a string rather than a separate file. This
# allows the Thrift input to be contained within the module that's responsible
# for all Thrift-specific details rather than a separate .thrift file.
#
# thriftpy2 (https://github.com/Thriftpy/thriftpy2) is being used here instead
# of Apache Thrift since it offers an easier-to-use API exclusively for Python
# which is still compatible with servers/cleints using Apache Thrift (Apache
# Thrift can be used from a variety of different languages) and offers roughly
# the same performance.
# Thrift can be used from a variety of different languages) while offering
# approximately the same performance.
#
# See the Apache Thrift tutorial for Python for examples:
# https://thrift.apache.org/tutorial/py.html
Expand All @@ -43,6 +44,8 @@
service GaasService {
i32 uptime()
i32 create_graph() throws(1:GaasError e),
void delete_graph(1:i32 graph_id) throws (1:GaasError e),
Expand Down Expand Up @@ -84,6 +87,16 @@
5:bool allow_multi_edges,
6:i32 graph_id
) throws (1:GaasError e),
i32 load_graph_creation_extensions(1:string extension_dir_path
) throws (1:GaasError e),
void unload_graph_creation_extensions(),
i32 call_graph_creation_extension(1:string func_name,
2:string func_args_repr,
3:string func_kwargs_repr
) throws (1:GaasError e),
}
"""

Expand All @@ -108,9 +121,30 @@ def create_server(handler, host, port):
"""
return make_server(spec.GaasService, handler, host, port)

def create_client(host, port):

def create_client(host, port, call_timeout=90000):
"""
Return a client object that will make calls on a server listening on
host/port.
The call_timeout value defaults to 90 seconds, and is used for setting the
timeout for server API calls when using the client created here - if a call
does not return in call_timeout milliseconds, an exception is raised.
"""
return make_client(spec.GaasService, host=host, port=port)
try:
return make_client(spec.GaasService, host=host, port=port,
timeout=call_timeout)
except thriftpy2.transport.TTransportException:
# Rasie a GaaS exception in order to completely encapsulate all Thrift
# details in this module. If this was not done, callers of this function
# would have to import thriftpy2 in order to catch the
# TTransportException, which then leaks thriftpy2.
#
# NOTE: normally the GaasError exception is imported from the
# gaas_client.exceptions module, but since
# gaas_client.exceptions.GaasError is actually defined from the spec in
# this module, just use it directly from spec.
#
# FIXME: this exception could use more detail
raise spec.GaasError("could not create a client session with a "
"GaaS server")
2 changes: 1 addition & 1 deletion python/gaas_client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .gaas_thrift import spec
from gaas_client.gaas_thrift import spec

Node2vecResult = spec.Node2vecResult
Loading

0 comments on commit 1ecf61a

Please sign in to comment.