Skip to content

Commit

Permalink
Revisit {from,to}_vineyard for tensors and dataframes (mars-project#2419
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sighingnow authored Sep 7, 2021
1 parent ef9e228 commit 7c43c8d
Show file tree
Hide file tree
Showing 29 changed files with 947 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ jobs:
pytest $PYTEST_CONFIG mars/deploy/oscar/tests/test_local.py
mv .coverage build/.coverage.test_local.file
pytest $PYTEST_CONFIG -k "vineyard" \
mars/tensor/datastore/tests/test_datastore_execution.py \
mars/dataframe/datastore/tests/test_datastore_execution.py
mv .coverage build/.coverage.test_vineyard_op.file
coverage combine build/ && coverage report
fi
if [ -n "$WITH_RAY" ]; then
Expand Down
22 changes: 22 additions & 0 deletions mars/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import List, Dict

from ..typing import BandType, SessionType
from ..storage.base import StorageLevel


class Context(ABC):
Expand All @@ -30,6 +31,7 @@ class Context(ABC):
def __init__(self,
session_id: str = None,
supervisor_address: str = None,
worker_address: str = None,
current_address: str = None,
band: BandType = None):
if session_id is None:
Expand All @@ -47,6 +49,7 @@ def __init__(self,

self.session_id = session_id
self.supervisor_address = supervisor_address
self.worker_address = worker_address
self.current_address = current_address
self.band = band

Expand Down Expand Up @@ -130,6 +133,25 @@ def get_chunks_meta(self,
Meta list.
"""

@abstractmethod
def get_storage_info(self, address: str, level: StorageLevel):
"""
Get the customized storage backend info of requested storage backend level at given worker.
Parameters
----------
address: str
The worker address.
level: StorageLevel
The storage level to fetch the backend info.
Returns
-------
info: dict
Customized storage backend info dict of all workers. The key is
worker address, the value is the backend info dict.
"""

@abstractmethod
def create_remote_object(self,
name: str,
Expand Down
4 changes: 4 additions & 0 deletions mars/core/operand/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class SchedulingHint(Serializable):
expect_worker = StringField('expect_worker', default=None)
# will this operand be assigned a worker or not
reassign_worker = BoolField('reassign_worker', default=False)
# mark a op as fuseable
fuseable = BoolField('fuseable', default=True)
# True means control dependency, False means data dependency
_pure_depends = ListField('pure_depends', FieldTypes.bool, default=None)
# useful when setting chunk index as priority,
Expand All @@ -77,6 +79,8 @@ def all_hint_names(cls):
return list(cls._FIELDS)

def can_be_fused(self) -> bool:
if not self.fuseable:
return False
if self.reassign_worker:
return False
if self._pure_depends and \
Expand Down
1 change: 1 addition & 0 deletions mars/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .datasource.from_tensor import dataframe_from_tensor, series_from_tensor
from .datasource.from_index import series_from_index
from .datasource.from_records import from_records
from .datasource.from_vineyard import from_vineyard
from .datasource.read_csv import read_csv
from .datasource.read_sql import read_sql, read_sql_table, read_sql_query
from .datasource.read_parquet import read_parquet
Expand Down
205 changes: 205 additions & 0 deletions mars/dataframe/datasource/from_vineyard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import pandas as pd

from ... import opcodes as OperandDef
from ...core import OutputType
from ...core.context import get_context
from ...serialization.serializables import StringField
from ...tensor.datasource.from_vineyard import resolve_vineyard_socket
from ...utils import calc_nsplits, has_unknown_shape
from ..operands import DataFrameOperand, DataFrameOperandMixin
from ..utils import parse_index


try:
import vineyard
from vineyard.data.utils import normalize_dtype, from_json
except ImportError:
vineyard = None


class DataFrameFromVineyard(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.DATAFRAME_FROM_VINEYARD_CHUNK

# generated columns for metadata
generated_columns = ["id", "worker_address", "dtypes", "shape", "index", "columns"]

# vineyard ipc socket
vineyard_socket = StringField('vineyard_socket')

# ObjectID in vineyard
object_id = StringField('object_id')

def __init__(self, vineyard_socket=None, object_id=None, **kw):
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id,
_output_types=[OutputType.dataframe], **kw)

def check_inputs(self, inputs):
# no inputs
if inputs and len(inputs) > 0:
raise ValueError("DataFrame data source has no inputs")

def _new_chunks(self, inputs, kws=None, **kw):
shape = kw.get('shape', None)
self.extra_params['shape'] = shape # set shape to make the operand key different
return super()._new_chunks(inputs, kws=kws, **kw)

def _new_tileables(self, inputs, kws=None, **kw):
shape = kw.get('shape', None)
self.extra_params['shape'] = shape # set shape to make the operand key different
return super()._new_tileables(inputs, kws=kws, **kw)

def __call__(self, shape, dtypes=None, index_value=None, columns_value=None):
return self.new_dataframe(None, shape, dtypes=dtypes,
index_value=index_value,
columns_value=columns_value)

@classmethod
def tile(cls, op):
ctx = get_context()
workers = ctx.get_worker_addresses()

out_chunks = []
dtypes = pd.Series([np.dtype('O')] * len(cls.generated_columns), index=cls.generated_columns)
for index, worker in enumerate(workers):
chunk_op = op.copy().reset_key()
chunk_op.expect_worker = worker
out_chunk = chunk_op.new_chunk([], dtypes=dtypes,
shape=(1, len(cls.generated_columns)),
index=(index, 0),
index_value=parse_index(pd.RangeIndex(0, 1)),
columns_value=parse_index(pd.Index(cls.generated_columns)))
out_chunks.append(out_chunk)

new_op = op.copy().reset_key()
return new_op.new_dataframes(op.inputs, shape=(np.nan, np.nan), dtypes=dtypes,
chunks=out_chunks, nsplits=((np.nan,), (np.nan,)),
# use the same value as `read_csv`
index_value=parse_index(pd.RangeIndex(0, 1)),
columns_value=parse_index(pd.Index(cls.generated_columns)))

@classmethod
def execute(cls, ctx, op):
if vineyard is None:
raise RuntimeError('vineyard is not available')

socket = resolve_vineyard_socket(ctx, op)
client = vineyard.connect(socket)

meta = client.get_meta(vineyard.ObjectID(op.object_id))
chunks, dtypes = [], None
for idx in range(meta['partitions_-size']):
chunk_meta = meta['partitions_-%d' % idx]
columns = pd.Index(from_json(chunk_meta['columns_']))
shape = (np.nan, len(columns))
if not chunk_meta.islocal:
continue
if dtypes is None:
dtypes = []
for idx in range(len(columns)):
column_meta = chunk_meta['__values_-value-%d' % idx]
dtype = normalize_dtype(column_meta['value_type_'],
column_meta.get('value_type_meta_', None))
dtypes.append(dtype)
dtypes = pd.Series(dtypes, index=columns)
chunk_index = (chunk_meta['partition_index_row_'],
chunk_meta['partition_index_column_'])
# chunk: (chunk_id, worker_address, dtype, shape, index, columns)
chunks.append((repr(chunk_meta.id), ctx.worker_address, dtypes, shape, chunk_index, columns))

ctx[op.outputs[0].key] = pd.DataFrame(chunks, columns=cls.generated_columns)


class DataFrameFromVineyardChunk(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.TENSOR_FROM_VINEYARD_CHUNK

# vineyard ipc socket
vineyard_socket = StringField('vineyard_socket')

# ObjectID of chunk in vineyard
object_id = StringField('object_id')

def __init__(self, vineyard_socket=None, object_id=None, **kw):
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw)

def __call__(self, meta):
return self.new_dataframe([meta])

@classmethod
def tile(cls, op):
if has_unknown_shape(*op.inputs):
yield

ctx = get_context()

in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
out_chunks = []
chunk_map = dict()
dtypes, columns = None, None
for chunk, infos in zip(op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys)):
for _, info in infos.iterrows():
chunk_op = op.copy().reset_key()
chunk_op.object_id = info["id"]
chunk_op.expect_worker = info["worker_address"]
dtypes = info["dtypes"]
columns = info["columns"]
shape = info["shape"]
chunk_index = info["index"]
chunk_map[chunk_index] = info["shape"]
out_chunk = chunk_op.new_chunk([chunk], shape=shape, index=chunk_index,
dtypes=dtypes,
index_value=parse_index(pd.RangeIndex(0, -1)),
columns_value=parse_index(pd.Index(columns)))
out_chunks.append(out_chunk)

nsplits = calc_nsplits(chunk_map)
shape = [np.sum(nsplit) for nsplit in nsplits]
new_op = op.copy().reset_key()
return new_op.new_dataframes(op.inputs, shape=shape, dtypes=dtypes,
chunks=out_chunks, nsplits=nsplits,
index_value=parse_index(pd.RangeIndex(0, -1)),
columns_value=parse_index(pd.Index(columns)))

@classmethod
def execute(cls, ctx, op):
if vineyard is None:
raise RuntimeError('vineyard is not available')

socket = resolve_vineyard_socket(ctx, op)
client = vineyard.connect(socket)

client = vineyard.connect(socket)
ctx[op.outputs[0].key] = client.get(vineyard.ObjectID(op.object_id))


def from_vineyard(df, vineyard_socket=None):
if vineyard is not None and isinstance(df, vineyard.Object): # pragma: no cover
if 'vineyard::GlobalDataFrame' not in df.typename:
raise TypeError('The input dataframe %r is not a vineyard\' GlobalDataFrame' % df)
object_id = df.id
else:
object_id = df
if vineyard is not None and isinstance(object_id, vineyard.ObjectID):
object_id = repr(object_id)
metaop = DataFrameFromVineyard(vineyard_socket=vineyard_socket, object_id=object_id,
dtype=np.dtype('byte'), gpu=False)
meta = metaop(shape=(np.nan,), dtypes=pd.Series([]),
index_value=parse_index(pd.Index([])),
columns_value=parse_index(pd.Index([])))
op = DataFrameFromVineyardChunk(vineyard_socket=vineyard_socket,
object_id=object_id, gpu=False)
return op(meta)
2 changes: 2 additions & 0 deletions mars/dataframe/datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ def _install():
from .to_csv import to_csv
from .to_sql import to_sql
from .to_parquet import to_parquet
from .to_vineyard import to_vineyard
from ..operands import DATAFRAME_TYPE, SERIES_TYPE

for cls in DATAFRAME_TYPE:
setattr(cls, 'to_csv', to_csv)
setattr(cls, 'to_sql', to_sql)
setattr(cls, 'to_parquet', to_parquet)
setattr(cls, 'to_vineyard', to_vineyard)

for cls in SERIES_TYPE:
setattr(cls, 'to_csv', to_csv)
Expand Down
40 changes: 40 additions & 0 deletions mars/dataframe/datastore/tests/test_datastore_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import fastparquet
except ImportError:
fastparquet = None
try:
import vineyard
except ImportError:
vineyard = None

from .... import dataframe as md
from ... import DataFrame
Expand Down Expand Up @@ -195,3 +199,39 @@ def test_to_parquet_fast_parquet_execution():
# test fastparquet
path = os.path.join(base_path, 'out-fastparquet-*.parquet')
df.to_parquet(path, engine='fastparquet', compression='gzip').execute()


@pytest.mark.skipif(vineyard is None, reason='vineyard not installed')
def test_vineyard_execution(setup):
raw = np.random.RandomState(0).rand(55, 55)

extra_config = {
'check_dtype': False,
'check_nsplits': False,
'check_shape': False,
'check_dtypes': False,
'check_columns_value': False,
'check_index_value': False,
}

with vineyard.deploy.local.start_vineyardd() as (_, vineyard_socket):
raw = pd.DataFrame({'a': np.arange(0, 55), 'b': np.arange(55, 110)})
a = md.DataFrame(raw, chunk_size=15)
a.execute() # n.b.: pre-execute

b = a.to_vineyard(vineyard_socket=vineyard_socket)
object_id = b.execute(extra_config=extra_config).fetch()[0][0]

c = md.from_vineyard(object_id, vineyard_socket=vineyard_socket)
df = c.execute(extra_config=extra_config).fetch()
pd.testing.assert_frame_equal(df, raw)

raw = pd.DataFrame({'a': np.arange(0, 55), 'b': np.arange(55, 110)})
a = md.DataFrame(raw, chunk_size=15) # n.b.: no pre-execute

b = a.to_vineyard(vineyard_socket=vineyard_socket)
object_id = b.execute(extra_config=extra_config).fetch()[0][0]

c = md.from_vineyard(object_id, vineyard_socket=vineyard_socket)
df = c.execute(extra_config=extra_config).fetch()
pd.testing.assert_frame_equal(df, raw)
Loading

0 comments on commit 7c43c8d

Please sign in to comment.