Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MIX]manylinux build fix #63

Merged
merged 10 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/aliyun-source.list
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse
deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted
deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe
deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@ env:
jobs:
streaming-debian-manylinux-pipeline:
timeout-minutes: 120
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
container: quay.io/pypa/manylinux_2_24_x86_64

steps:
- uses: actions/checkout@v2

- name: Apt get update and Install bazel
run: |
cp /etc/apt/sources.list /etc/apt/sources.list.bak
cp .github/sources.list /etc/apt/sources.list
cp .github/ubuntu-source.list /etc/apt/sources.list
apt-get update
apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git
apt-get update
apt-get install -yq gcc g++
sh scripts/install-bazel.sh

Expand Down
8 changes: 0 additions & 8 deletions .github/sources.list

This file was deleted.

14 changes: 14 additions & 0 deletions .github/ubuntu-source.list
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
deb http://archive.ubuntu.com/ubuntu/ jammy main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy main restricted universe multiverse

deb http://archive.ubuntu.com/ubuntu/ jammy-updates main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy-updates main restricted universe multiverse

deb http://archive.ubuntu.com/ubuntu/ jammy-security main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy-security main restricted universe multiverse

deb http://archive.ubuntu.com/ubuntu/ jammy-backports main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy-backports main restricted universe multiverse

deb http://archive.canonical.com/ubuntu/ jammy partner
# deb-src http://archive.canonical.com/ubuntu/ jammy partner
1 change: 1 addition & 0 deletions scripts/suppress_output
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/bin/bash
# Run a command, suppressing output unless it hangs or crashes.
CURRENT_DIR=$(dirname "${BASH_SOURCE:-$0}")

TMPFILE="$(mktemp)"
PID=$$
Expand Down
96 changes: 49 additions & 47 deletions streaming/python/raystreaming/runtime/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def __init__(self, channel_id_str: str):
channel_id_str: string representation of channel id
"""
self.channel_id_str = channel_id_str
self.object_qid = ray.ObjectRef(
channel_id_str_to_bytes(channel_id_str))
self.object_qid = ray.ObjectRef(channel_id_str_to_bytes(channel_id_str))

def __eq__(self, other):
if other is None:
Expand Down Expand Up @@ -100,7 +99,6 @@ def channel_bytes_to_str(id_bytes):


class Message(ABC):

@property
@abstractmethod
def body(self):
Expand Down Expand Up @@ -131,12 +129,7 @@ class DataMessage(Message):
DataMessage represents data between upstream and downstream operator.
"""

def __init__(self,
body,
timestamp,
message_id,
channel_id,
is_empty_message=False):
def __init__(self, body, timestamp, message_id, channel_id, is_empty_message=False):
self.__body = body
self.__timestamp = timestamp
self.__channel_id = channel_id
Expand Down Expand Up @@ -226,25 +219,29 @@ class ChannelCreationParametersBuilder:
"""

_java_reader_async_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessage",
"([B)V")
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessage", "([B)V"
)
_java_reader_sync_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessageSync",
"([B)[B")
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessageSync", "([B)[B"
)
_java_writer_async_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessage",
"([B)V")
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessage", "([B)V"
)
_java_writer_sync_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessageSync",
"([B)[B")
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessageSync", "([B)[B"
)
_python_reader_async_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_reader_message", "JobWorker")
"raystreaming.runtime.worker", "on_reader_message", "JobWorker"
)
_python_reader_sync_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_reader_message_sync", "JobWorker")
"raystreaming.runtime.worker", "on_reader_message_sync", "JobWorker"
)
_python_writer_async_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_writer_message", "JobWorker")
"raystreaming.runtime.worker", "on_writer_message", "JobWorker"
)
_python_writer_sync_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_writer_message_sync", "JobWorker")
"raystreaming.runtime.worker", "on_writer_message_sync", "JobWorker"
)

def get_parameters(self):
return self._parameters
Expand Down Expand Up @@ -272,41 +269,47 @@ def build_output_queue_parameters(self, to_actors):
)
return self

def build_parameters(self, actors, java_async_func, java_sync_func,
py_async_func, py_sync_func):
def build_parameters(
self, actors, java_async_func, java_sync_func, py_async_func, py_sync_func
):
for handle in actors:
parameter = None
if handle._ray_actor_language == Language.PYTHON:
parameter = _streaming.ChannelCreationParameter(
handle._ray_actor_id, py_async_func, py_sync_func)
handle._ray_actor_id, py_async_func, py_sync_func
)
else:
parameter = _streaming.ChannelCreationParameter(
handle._ray_actor_id, java_async_func, java_sync_func)
handle._ray_actor_id, java_async_func, java_sync_func
)
self._parameters.append(parameter)
return self

@staticmethod
def set_python_writer_function_descriptor(async_function, sync_function):
ChannelCreationParametersBuilder._python_writer_async_function_descriptor = (
async_function)
async_function
)
ChannelCreationParametersBuilder._python_writer_sync_function_descriptor = (
sync_function)
sync_function
)

@staticmethod
def set_python_reader_function_descriptor(async_function, sync_function):
ChannelCreationParametersBuilder._python_reader_async_function_descriptor = (
async_function)
async_function
)
ChannelCreationParametersBuilder._python_reader_sync_function_descriptor = (
sync_function)
sync_function
)


class DataWriter:
"""Data Writer is a wrapper of streaming c++ DataWriter, which sends data
to downstream workers
"""

def __init__(self, output_channels, to_actors: List[ActorHandle],
conf: dict):
def __init__(self, output_channels, to_actors: List[ActorHandle], conf: dict):
"""Get DataWriter of output channels
Args:
output_channels: output channels ids
Expand All @@ -320,8 +323,7 @@ def __init__(self, output_channels, to_actors: List[ActorHandle],
]
creation_parameters = ChannelCreationParametersBuilder()
creation_parameters.build_output_queue_parameters(to_actors)
channel_size = conf.get(Config.CHANNEL_SIZE,
Config.CHANNEL_SIZE_DEFAULT)
channel_size = conf.get(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT)
py_msg_ids = [0 for _ in range(len(output_channels))]
config_bytes = _to_native_conf(conf)
is_mock = conf[Config.CHANNEL_TYPE] == Config.MEMORY_CHANNEL
Expand Down Expand Up @@ -365,8 +367,8 @@ def get_output_checkpoints(self) -> List[int]:

def clear_checkpoint(self, checkpoint_id):
logger.info(
"producer start to clear checkpoint, checkpoint_id={}".format(
checkpoint_id))
"producer start to clear checkpoint, checkpoint_id={}".format(checkpoint_id)
)
self.writer.clear_checkpoint(checkpoint_id)

def stop(self):
Expand All @@ -384,8 +386,9 @@ class DataReader:
from channels of upstream workers
"""

def __init__(self, input_channels: List, from_actors: List[ActorHandle],
conf: dict):
def __init__(
self, input_channels: List, from_actors: List[ActorHandle], conf: dict
):
"""Get DataReader of input channels
Args:
input_channels: input channels
Expand Down Expand Up @@ -416,8 +419,11 @@ def __init__(self, input_channels: List, from_actors: List[ActorHandle],
self.__creation_status = {}
for q, status in queues_creation_status.items():
self.__creation_status[q] = ChannelCreationStatus(status)
logger.info("create DataReader succeed, creation_status={}".format(
self.__creation_status))
logger.info(
"create DataReader succeed, creation_status={}".format(
self.__creation_status
)
)

def read(self, timeout_millis):
"""Read data from channel
Expand Down Expand Up @@ -459,11 +465,11 @@ def _to_native_conf(conf):
config.op_name = conf[Config.STREAMING_OP_NAME]
# TODO set operator type
if Config.STREAMING_RING_BUFFER_CAPACITY in conf:
config.ring_buffer_capacity = int(
conf[Config.STREAMING_RING_BUFFER_CAPACITY])
config.ring_buffer_capacity = int(conf[Config.STREAMING_RING_BUFFER_CAPACITY])
if Config.STREAMING_EMPTY_MESSAGE_INTERVAL in conf:
config.empty_message_interval = int(
conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL])
conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL]
)
if Config.FLOW_CONTROL_TYPE in conf:
config.flow_control_type = int(conf[Config.FLOW_CONTROL_TYPE])
if Config.WRITER_CONSUMED_STEP in conf:
Expand All @@ -475,20 +481,17 @@ def _to_native_conf(conf):


class ChannelInitException(Exception):

def __init__(self, msg, abnormal_channels):
self.abnormal_channels = abnormal_channels
self.msg = msg


class ChannelInterruptException(Exception):

def __init__(self, msg=None):
self.msg = msg


class ChannelRecoverInfo:

def __init__(self, queue_creation_status_map=None):
if queue_creation_status_map is None:
queue_creation_status_map = {}
Expand All @@ -505,8 +508,7 @@ def get_data_lost_queues(self):
return data_lost_queues

def __str__(self):
return "QueueRecoverInfo [dataLostQueues=%s]" % (
self.get_data_lost_queues())
return "QueueRecoverInfo [dataLostQueues=%s]" % (self.get_data_lost_queues())


class ChannelCreationStatus(Enum):
Expand Down
Loading
Loading