Skip to content

Commit

Permalink
Merge pull request #24 from rlratzel/branch-22.10-mg_updates
Browse files Browse the repository at this point in the history
Add initial MNMG support, consolidate APIs, minor refactoring, update tests
  • Loading branch information
rlratzel authored Sep 2, 2022
2 parents dcc3431 + 67fcef9 commit ad84042
Show file tree
Hide file tree
Showing 15 changed files with 1,766 additions and 450 deletions.
343 changes: 194 additions & 149 deletions python/gaas_client/client.py

Large diffs are not rendered by default.

86 changes: 50 additions & 36 deletions python/gaas_client/gaas_thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@
3:list<i32> path_sizes
}
union DataframeRowIndex {
1:i32 int32_index
2:i64 int64_index
3:list<i32> int32_indices
4:list<i64> int64_indices
# FIXME: uniform_neighbor_sample may need to return indices as ints
# See: https://github.com/rapidsai/cugraph/issues/2654
struct UniformNeighborSampleResult {
1:list<i32> sources
2:list<i32> destinations
3:list<double> indices
}
union GraphVertexEdgeID {
1:i32 int32_id
2:i64 int64_id
3:list<i32> int32_ids
4:list<i64> int64_ids
}
union Value {
Expand All @@ -73,8 +81,12 @@
service GaasService {
##############################################################################
# Environment management
i32 uptime()
map<string, Value> get_server_info() throws (1:GaasError e),
i32 load_graph_creation_extensions(1:string extension_dir_path
) throws (1:GaasError e),
Expand All @@ -86,12 +98,18 @@
) throws (1:GaasError e),
##############################################################################
# Graph management
i32 create_graph() throws(1:GaasError e),
void delete_graph(1:i32 graph_id) throws (1:GaasError e),
list<i32> get_graph_ids() throws(1:GaasError e),
map<string, Value> get_graph_info(1:list<string> keys,
2:i32 graph_id
) throws(1:GaasError e),
void load_csv_as_vertex_data(1:string csv_file_name,
2:string delimiter,
3:list<string> dtypes,
Expand All @@ -114,16 +132,6 @@
9:list<string> names
) throws (1:GaasError e),
i32 get_num_edges(1:i32 graph_id) throws(1:GaasError e),
i32 get_num_vertices(1:i32 graph_id) throws(1:GaasError e),
Node2vecResult
node2vec(1:list<i32> start_vertices,
2:i32 max_depth,
3:i32 graph_id
) throws (1:GaasError e),
list<i32> get_edge_IDs_for_vertices(1:list<i32> src_vert_IDs,
2:list<i32> dst_vert_IDs,
3:i32 graph_id
Expand All @@ -134,33 +142,31 @@
3:string edge_weight_property,
4:double default_edge_weight,
5:bool allow_multi_edges,
6:i32 graph_id
6:bool renumber_graph,
7:bool add_edge_data,
8:i32 graph_id
) throws (1:GaasError e),
binary get_graph_vertex_dataframe_rows(1:DataframeRowIndex index_or_indices,
2:Value null_replacement_value,
3:i32 graph_id,
4:list<string> property_keys
) throws (1:GaasError e),
list<i64> get_graph_vertex_dataframe_shape(1:i32 graph_id
) throws (1:GaasError e),
binary get_graph_edge_dataframe_rows(1:DataframeRowIndex index_or_indices,
2:Value null_replacement_value
3:i32 graph_id,
4:list<string> property_keys
) throws (1:GaasError e),
binary get_graph_vertex_data(1:GraphVertexEdgeID vertex_id,
2:Value null_replacement_value,
3:i32 graph_id,
4:list<string> property_keys
) throws (1:GaasError e),
list<i64> get_graph_edge_dataframe_shape(1:i32 graph_id
) throws (1:GaasError e),
binary get_graph_edge_data(1:GraphVertexEdgeID edge_id,
2:Value null_replacement_value
3:i32 graph_id,
4:list<string> property_keys
) throws (1:GaasError e),
bool is_vertex_property(1:string property_key,
2:i32 graph_id) throws (1:GaasError e),
bool is_edge_property(1:string property_key,
2:i32 graph_id) throws (1:GaasError e),
##############################################################################
# Algos
BatchedEgoGraphsResult
batched_ego_graphs(1:list<i32> seeds,
2:i32 radius,
Expand All @@ -173,19 +179,27 @@
3:i32 graph_id
) throws (1:GaasError e),
UniformNeighborSampleResult
uniform_neighbor_sample(1:list<i32> start_list,
2:list<i32> fanout_vals,
3:bool with_replacement,
4:i32 graph_id
) throws (1:GaasError e),
##############################################################################
# Test/Debug
string get_graph_type(1:i32 graph_id) throws(1:GaasError e),
}
"""

# Load the GaaS Thrift specification on import. Syntax errors and other problems
# will be apparent immediately on import, and it allows any other module to
# import this and access the various types define in the Thrift specification
# import this and access the various types defined in the Thrift specification
# without being exposed to the thriftpy2 API.
spec = thriftpy2.load_fp(io.StringIO(gaas_thrift_spec),
module_name="gaas_thrift")


def create_server(handler, host, port):
def create_server(handler, host, port, client_timeout=90000):
"""
Return a server object configured to listen on host/port and use the handler
object to handle calls from clients. The handler object must have an
Expand All @@ -199,7 +213,7 @@ def create_server(handler, host, port):
"""
proto_factory = TBinaryProtocolFactory()
trans_factory = TBufferedTransportFactory()
client_timeout = 3000
client_timeout = client_timeout

processor = TProcessor(spec.GaasService, handler)
server_socket = TServerSocket(host=host, port=port,
Expand Down
65 changes: 64 additions & 1 deletion python/gaas_client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,72 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy

from gaas_client.gaas_thrift import spec

Value = spec.Value
DataframeRowIndex = spec.DataframeRowIndex
GraphVertexEdgeID = spec.GraphVertexEdgeID
BatchedEgoGraphsResult = spec.BatchedEgoGraphsResult
Node2vecResult = spec.Node2vecResult
UniformNeighborSampleResult = spec.UniformNeighborSampleResult


class UnionWrapper:
"""
Provides easy conversions between py objs and Thrift "unions".
"""
def get_py_obj(self):
not_members = set(["default_spec", "thrift_spec", "read", "write"])
attrs = [a for a in dir(self.union)
if not(a.startswith("_")) and a not in not_members]
for a in attrs:
val = getattr(self.union, a)
if val is not None:
return val

return None


class ValueWrapper(UnionWrapper):
def __init__(self, val, val_name="value"):
if isinstance(val, Value):
self.union = val
elif isinstance(val, int):
if val < 4294967296:
self.union = Value(int32_value=val)
else:
self.union = Value(int64_value=val)
elif isinstance(val, numpy.int32):
self.union = Value(int32_value=int(val))
elif isinstance(val, numpy.int64):
self.union = Value(int64_value=int(val))
elif isinstance(val, str):
self.union = Value(string_value=val)
elif isinstance(val, bool):
self.union = Value(bool_value=val)
else:
raise TypeError(f"{val_name} must be one of the "
"following types: [int, str, bool], got "
f"{type(val)}")


class GraphVertexEdgeIDWrapper(UnionWrapper):
def __init__(self, val, val_name="id"):
if isinstance(val, GraphVertexEdgeID):
self.union = val
elif isinstance(val, int):
if val >= 4294967296:
self.union = GraphVertexEdgeID(int64_id=val)
else:
self.union = GraphVertexEdgeID(int32_id=val)
elif isinstance(val, list):
# FIXME: this only check the first item, others could be larger
if val[0] >= 4294967296:
self.union = GraphVertexEdgeID(int64_ids=val)
else:
self.union = GraphVertexEdgeID(int32_ids=val)
else:
raise TypeError(f"{val_name} must be one of the "
"following types: [int, list<int>], got "
f"{type(val)}")
Loading

0 comments on commit ad84042

Please sign in to comment.