Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit {from,to}_vineyard for tensors and dataframes. #2419

Merged
merged 15 commits into from
Sep 7, 2021
5 changes: 5 additions & 0 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ jobs:
pytest $PYTEST_CONFIG mars/deploy/oscar/tests/test_local.py
mv .coverage build/.coverage.test_local.file
pytest $PYTEST_CONFIG -k "vineyard" \
mars/tensor/datastore/tests/test_datastore_execution.py \
mars/dataframe/datastore/tests/test_datastore_execution.py
mv .coverage build/.coverage.test_vineyard_op.file
coverage combine build/ && coverage report
fi
if [ -n "$WITH_RAY" ]; then
Expand Down
22 changes: 22 additions & 0 deletions mars/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import List, Dict

from ..typing import BandType, SessionType
from ..storage.base import StorageLevel


class Context(ABC):
Expand All @@ -30,6 +31,7 @@ class Context(ABC):
def __init__(self,
session_id: str = None,
supervisor_address: str = None,
worker_address: str = None,
current_address: str = None,
band: BandType = None):
if session_id is None:
Expand All @@ -47,6 +49,7 @@ def __init__(self,

self.session_id = session_id
self.supervisor_address = supervisor_address
self.worker_address = worker_address
self.current_address = current_address
self.band = band

Expand Down Expand Up @@ -130,6 +133,25 @@ def get_chunks_meta(self,
Meta list.
"""

@abstractmethod
def get_storage_info(self, address: str, level: StorageLevel):
"""
Get the customized storage backend info of requested storage backend level at given worker.

Parameters
----------
address: str
The worker address.
level: StorageLevel
The storage level to fetch the backend info.

Returns
-------
info: dict
Customized storage backend info dict of all workers. The key is
worker address, the value is the backend info dict.
"""

@abstractmethod
def create_remote_object(self,
name: str,
Expand Down
4 changes: 4 additions & 0 deletions mars/core/operand/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class SchedulingHint(Serializable):
expect_worker = StringField('expect_worker', default=None)
# will this operand be assigned a worker or not
reassign_worker = BoolField('reassign_worker', default=False)
# mark a op as fuseable
fuseable = BoolField('fuseable', default=True)
# True means control dependency, False means data dependency
_pure_depends = ListField('pure_depends', FieldTypes.bool, default=None)
# useful when setting chunk index as priority,
Expand All @@ -77,6 +79,8 @@ def all_hint_names(cls):
return list(cls._FIELDS)

def can_be_fused(self) -> bool:
if not self.fuseable:
return False
if self.reassign_worker:
return False
if self._pure_depends and \
Expand Down
1 change: 1 addition & 0 deletions mars/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .datasource.from_tensor import dataframe_from_tensor, series_from_tensor
from .datasource.from_index import series_from_index
from .datasource.from_records import from_records
from .datasource.from_vineyard import from_vineyard
from .datasource.read_csv import read_csv
from .datasource.read_sql import read_sql, read_sql_table, read_sql_query
from .datasource.read_parquet import read_parquet
Expand Down
205 changes: 205 additions & 0 deletions mars/dataframe/datasource/from_vineyard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import pandas as pd

from ... import opcodes as OperandDef
from ...core import OutputType
from ...core.context import get_context
from ...serialization.serializables import StringField
from ...tensor.datasource.from_vineyard import resolve_vineyard_socket
from ...utils import calc_nsplits, has_unknown_shape
from ..operands import DataFrameOperand, DataFrameOperandMixin
from ..utils import parse_index


try:
import vineyard
from vineyard.data.utils import normalize_dtype, from_json
except ImportError:
vineyard = None


class DataFrameFromVineyard(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.DATAFRAME_FROM_VINEYARD_CHUNK

# generated columns for metadata
generated_columns = ["id", "worker_address", "dtypes", "shape", "index", "columns"]

# vineyard ipc socket
vineyard_socket = StringField('vineyard_socket')

# ObjectID in vineyard
object_id = StringField('object_id')

def __init__(self, vineyard_socket=None, object_id=None, **kw):
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id,
_output_types=[OutputType.dataframe], **kw)

def check_inputs(self, inputs):
# no inputs
if inputs and len(inputs) > 0:
raise ValueError("DataFrame data source has no inputs")

def _new_chunks(self, inputs, kws=None, **kw):
shape = kw.get('shape', None)
self.extra_params['shape'] = shape # set shape to make the operand key different
return super()._new_chunks(inputs, kws=kws, **kw)

def _new_tileables(self, inputs, kws=None, **kw):
shape = kw.get('shape', None)
self.extra_params['shape'] = shape # set shape to make the operand key different
return super()._new_tileables(inputs, kws=kws, **kw)

def __call__(self, shape, dtypes=None, index_value=None, columns_value=None):
return self.new_dataframe(None, shape, dtypes=dtypes,
index_value=index_value,
columns_value=columns_value)

@classmethod
def tile(cls, op):
ctx = get_context()
workers = ctx.get_worker_addresses()

out_chunks = []
dtypes = pd.Series([np.dtype('O')] * len(cls.generated_columns), index=cls.generated_columns)
for index, worker in enumerate(workers):
chunk_op = op.copy().reset_key()
chunk_op.expect_worker = worker
out_chunk = chunk_op.new_chunk([], dtypes=dtypes,
shape=(1, len(cls.generated_columns)),
index=(index, 0),
index_value=parse_index(pd.RangeIndex(0, 1)),
columns_value=parse_index(pd.Index(cls.generated_columns)))
out_chunks.append(out_chunk)

new_op = op.copy().reset_key()
return new_op.new_dataframes(op.inputs, shape=(np.nan, np.nan), dtypes=dtypes,
chunks=out_chunks, nsplits=((np.nan,), (np.nan,)),
# use the same value as `read_csv`
index_value=parse_index(pd.RangeIndex(0, 1)),
columns_value=parse_index(pd.Index(cls.generated_columns)))

@classmethod
def execute(cls, ctx, op):
if vineyard is None:
raise RuntimeError('vineyard is not available')

socket = resolve_vineyard_socket(ctx, op)
client = vineyard.connect(socket)

meta = client.get_meta(vineyard.ObjectID(op.object_id))
chunks, dtypes = [], None
for idx in range(meta['partitions_-size']):
chunk_meta = meta['partitions_-%d' % idx]
columns = pd.Index(from_json(chunk_meta['columns_']))
shape = (np.nan, len(columns))
if not chunk_meta.islocal:
continue
if dtypes is None:
dtypes = []
for idx in range(len(columns)):
column_meta = chunk_meta['__values_-value-%d' % idx]
dtype = normalize_dtype(column_meta['value_type_'],
column_meta.get('value_type_meta_', None))
dtypes.append(dtype)
dtypes = pd.Series(dtypes, index=columns)
chunk_index = (chunk_meta['partition_index_row_'],
chunk_meta['partition_index_column_'])
# chunk: (chunk_id, worker_address, dtype, shape, index, columns)
chunks.append((repr(chunk_meta.id), ctx.worker_address, dtypes, shape, chunk_index, columns))

ctx[op.outputs[0].key] = pd.DataFrame(chunks, columns=cls.generated_columns)


class DataFrameFromVineyardChunk(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.TENSOR_FROM_VINEYARD_CHUNK

# vineyard ipc socket
vineyard_socket = StringField('vineyard_socket')

# ObjectID of chunk in vineyard
object_id = StringField('object_id')

def __init__(self, vineyard_socket=None, object_id=None, **kw):
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw)

def __call__(self, meta):
return self.new_dataframe([meta])

@classmethod
def tile(cls, op):
if has_unknown_shape(*op.inputs):
yield

ctx = get_context()

in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
out_chunks = []
chunk_map = dict()
dtypes, columns = None, None
for chunk, infos in zip(op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys)):
for _, info in infos.iterrows():
chunk_op = op.copy().reset_key()
chunk_op.object_id = info["id"]
chunk_op.expect_worker = info["worker_address"]
dtypes = info["dtypes"]
columns = info["columns"]
shape = info["shape"]
chunk_index = info["index"]
chunk_map[chunk_index] = info["shape"]
out_chunk = chunk_op.new_chunk([chunk], shape=shape, index=chunk_index,
dtypes=dtypes,
index_value=parse_index(pd.RangeIndex(0, -1)),
columns_value=parse_index(pd.Index(columns)))
out_chunks.append(out_chunk)

nsplits = calc_nsplits(chunk_map)
shape = [np.sum(nsplit) for nsplit in nsplits]
new_op = op.copy().reset_key()
return new_op.new_dataframes(op.inputs, shape=shape, dtypes=dtypes,
chunks=out_chunks, nsplits=nsplits,
index_value=parse_index(pd.RangeIndex(0, -1)),
columns_value=parse_index(pd.Index(columns)))

@classmethod
def execute(cls, ctx, op):
if vineyard is None:
raise RuntimeError('vineyard is not available')

socket = resolve_vineyard_socket(ctx, op)
client = vineyard.connect(socket)

client = vineyard.connect(socket)
ctx[op.outputs[0].key] = client.get(vineyard.ObjectID(op.object_id))


def from_vineyard(df, vineyard_socket=None):
if vineyard is not None and isinstance(df, vineyard.Object): # pragma: no cover
if 'vineyard::GlobalDataFrame' not in df.typename:
raise TypeError('The input dataframe %r is not a vineyard\' GlobalDataFrame' % df)
object_id = df.id
else:
object_id = df
if vineyard is not None and isinstance(object_id, vineyard.ObjectID):
object_id = repr(object_id)
metaop = DataFrameFromVineyard(vineyard_socket=vineyard_socket, object_id=object_id,
dtype=np.dtype('byte'), gpu=False)
meta = metaop(shape=(np.nan,), dtypes=pd.Series([]),
index_value=parse_index(pd.Index([])),
columns_value=parse_index(pd.Index([])))
op = DataFrameFromVineyardChunk(vineyard_socket=vineyard_socket,
object_id=object_id, gpu=False)
return op(meta)
2 changes: 2 additions & 0 deletions mars/dataframe/datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ def _install():
from .to_csv import to_csv
from .to_sql import to_sql
from .to_parquet import to_parquet
from .to_vineyard import to_vineyard
from ..operands import DATAFRAME_TYPE, SERIES_TYPE

for cls in DATAFRAME_TYPE:
setattr(cls, 'to_csv', to_csv)
setattr(cls, 'to_sql', to_sql)
setattr(cls, 'to_parquet', to_parquet)
setattr(cls, 'to_vineyard', to_vineyard)

for cls in SERIES_TYPE:
setattr(cls, 'to_csv', to_csv)
Expand Down
40 changes: 40 additions & 0 deletions mars/dataframe/datastore/tests/test_datastore_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import fastparquet
except ImportError:
fastparquet = None
try:
import vineyard
except ImportError:
vineyard = None

from .... import dataframe as md
from ... import DataFrame
Expand Down Expand Up @@ -195,3 +199,39 @@ def test_to_parquet_fast_parquet_execution():
# test fastparquet
path = os.path.join(base_path, 'out-fastparquet-*.parquet')
df.to_parquet(path, engine='fastparquet', compression='gzip').execute()


@pytest.mark.skipif(vineyard is None, reason='vineyard not installed')
def test_vineyard_execution(setup):
raw = np.random.RandomState(0).rand(55, 55)

extra_config = {
'check_dtype': False,
'check_nsplits': False,
'check_shape': False,
'check_dtypes': False,
'check_columns_value': False,
'check_index_value': False,
}

with vineyard.deploy.local.start_vineyardd() as (_, vineyard_socket):
raw = pd.DataFrame({'a': np.arange(0, 55), 'b': np.arange(55, 110)})
a = md.DataFrame(raw, chunk_size=15)
a.execute() # n.b.: pre-execute

b = a.to_vineyard(vineyard_socket=vineyard_socket)
object_id = b.execute(extra_config=extra_config).fetch()[0][0]

c = md.from_vineyard(object_id, vineyard_socket=vineyard_socket)
df = c.execute(extra_config=extra_config).fetch()
pd.testing.assert_frame_equal(df, raw)

raw = pd.DataFrame({'a': np.arange(0, 55), 'b': np.arange(55, 110)})
a = md.DataFrame(raw, chunk_size=15) # n.b.: no pre-execute

b = a.to_vineyard(vineyard_socket=vineyard_socket)
object_id = b.execute(extra_config=extra_config).fetch()[0][0]

c = md.from_vineyard(object_id, vineyard_socket=vineyard_socket)
df = c.execute(extra_config=extra_config).fetch()
pd.testing.assert_frame_equal(df, raw)
Loading