Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change generic variable for containers from D to C #552

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,39 @@ pub mod pullers;
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type Bundle<T, D> = crate::communication::Message<Message<T, D>>;
pub type Bundle<T, C> = crate::communication::Message<Message<T, C>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Abomonation, Serialize, Deserialize)]
pub struct Message<T, D> {
pub struct Message<T, C> {
/// The timestamp associated with the message.
pub time: T,
/// The data in the message.
pub data: D,
pub data: C,
/// The source worker.
pub from: usize,
/// A sequence number for this worker-to-worker stream.
pub seq: usize,
}

impl<T, D> Message<T, D> {
impl<T, C> Message<T, C> {
/// Default buffer size.
#[deprecated = "Use timely::buffer::default_capacity instead"]
pub fn default_length() -> usize {
crate::container::buffer::default_capacity::<D>()
crate::container::buffer::default_capacity::<C>()
}
}

impl<T, D: Container> Message<T, D> {
impl<T, C: Container> Message<T, C> {
/// Creates a new message instance from arguments.
pub fn new(time: T, data: D, from: usize, seq: usize) -> Self {
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
Message { time, data, from, seq }
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
#[inline]
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Bundle<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
Expand Down
36 changes: 17 additions & 19 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use crate::progress::Timestamp;
use crate::worker::AsWorker;

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, D> {
pub trait ParallelizationContract<T, C> {
/// Type implementing `Push` produced by this pact.
type Pusher: Push<Bundle<T, D>>+'static;
type Pusher: Push<Bundle<T, C>>+'static;
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, D>>+'static;
type Puller: Pull<Bundle<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}
Expand All @@ -33,13 +33,11 @@ pub trait ParallelizationContract<T, D> {
#[derive(Debug)]
pub struct Pipeline;

impl<T: 'static, D: Container> ParallelizationContract<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
// // ignore `&mut A` and use thread allocator
// let (pusher, puller) = Thread::new::<Bundle<T, Vec<D>>>();
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
}
Expand Down Expand Up @@ -89,17 +87,17 @@ impl<C, F> Debug for ExchangeCore<C, F> {

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
pusher: P,
channel: usize,
counter: usize,
source: usize,
target: usize,
phantom: PhantomData<(T, D)>,
phantom: PhantomData<(T, C)>,
logging: Option<Logger>,
}

impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
Expand All @@ -114,9 +112,9 @@ impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
}
}

impl<T, D: Container, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T, C, P> {
#[inline]
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
fn push(&mut self, pair: &mut Option<Bundle<T, C>>) {
if let Some(bundle) = pair {
self.counter += 1;

Expand Down Expand Up @@ -145,15 +143,15 @@ impl<T, D: Container, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T,

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
pub struct LogPuller<T, C, P: Pull<Bundle<T, C>>> {
puller: P,
channel: usize,
index: usize,
phantom: PhantomData<(T, D)>,
phantom: PhantomData<(T, C)>,
logging: Option<Logger>,
}

impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
Expand All @@ -166,9 +164,9 @@ impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
}
}

impl<T, D: Container, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
impl<T, C: Container, P: Pull<Bundle<T, C>>> Pull<Bundle<T, C>> for LogPuller<T, C, P> {
#[inline]
fn pull(&mut self) -> &mut Option<Bundle<T,D>> {
fn pull(&mut self) -> &mut Option<Bundle<T, C>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use crate::communication::Pull;
use crate::Container;

/// A wrapper which accounts records pulled past in a shared count map.
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> {
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
phantom: ::std::marker::PhantomData<C>,
}

/// A guard type that updates the change batch counts on drop
Expand All @@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
}
}

impl<T:Ord+Clone+'static, D: Container, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
/// Retrieves the next timestamp and batch of data.
#[inline]
pub fn next(&mut self) -> Option<&mut Bundle<T, D>> {
pub fn next(&mut self) -> Option<&mut Bundle<T, C>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, D>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, C>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
Expand All @@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<Bundle<T, D>>> Counter<T, D, P>
}
}

impl<T:Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Counter {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use crate::{Container, Data};
/// The `Buffer` type should be used by calling `session` with a time, which checks whether
/// data must be flushed and creates a `Session` object which allows sending at the given time.
#[derive(Debug)]
pub struct Buffer<T, D: Container, P: Push<Bundle<T, D>>> {
pub struct Buffer<T, C: Container, P: Push<Bundle<T, C>>> {
/// the currently open time, if it is open
time: Option<T>,
/// a buffer for records, to send at self.time
buffer: D,
buffer: C,
pusher: P,
}

Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use crate::Container;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct Counter<T, D, P: Push<Bundle<T, D>>> {
pub struct Counter<T, C, P: Push<Bundle<T, C>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: PhantomData<D>,
phantom: PhantomData<C>,
}

impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for Counter<T, D, P> where P: Push<Bundle<T, D>> {
impl<T: Timestamp, C: Container, P> Push<Bundle<T, C>> for Counter<T, C, P> where P: Push<Bundle<T, C>> {
#[inline]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
if let Some(message) = message {
self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
}
Expand All @@ -31,9 +31,9 @@ impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for Counter<T, D, P> wher
}
}

impl<T, D, P: Push<Bundle<T, D>>> Counter<T, D, P> where T : Ord+Clone+'static {
impl<T, C, P: Push<Bundle<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
/// Allocates a new `Counter` from a pushee and shared counts.
pub fn new(pushee: P) -> Counter<T, D, P> {
pub fn new(pushee: P) -> Counter<T, C, P> {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
Expand Down
34 changes: 17 additions & 17 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use crate::dataflow::channels::{Bundle, Message};
use crate::communication::Push;
use crate::{Container, Data};

type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>;
type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, C>>>>>>;

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct Tee<T, D> {
buffer: D,
shared: PushList<T, D>,
pub struct Tee<T, C> {
buffer: C,
shared: PushList<T, C>,
}

impl<T: Data, D: Container> Push<Bundle<T, D>> for Tee<T, D> {
impl<T: Data, C: Container> Push<Bundle<T, C>> for Tee<T, C> {
#[inline]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
let mut pushers = self.shared.borrow_mut();
if let Some(message) = message {
for index in 1..pushers.len() {
Expand All @@ -39,9 +39,9 @@ impl<T: Data, D: Container> Push<Bundle<T, D>> for Tee<T, D> {
}
}

impl<T, D: Container> Tee<T, D> {
impl<T, C: Container> Tee<T, C> {
/// Allocates a new pair of `Tee` and `TeeHelper`.
pub fn new() -> (Tee<T, D>, TeeHelper<T, D>) {
pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
let shared = Rc::new(RefCell::new(Vec::new()));
let port = Tee {
buffer: Default::default(),
Expand All @@ -52,7 +52,7 @@ impl<T, D: Container> Tee<T, D> {
}
}

impl<T, D: Container> Clone for Tee<T, D> {
impl<T, C: Container> Clone for Tee<T, C> {
fn clone(&self) -> Self {
Self {
buffer: Default::default(),
Expand All @@ -61,9 +61,9 @@ impl<T, D: Container> Clone for Tee<T, D> {
}
}

impl<T, D> Debug for Tee<T, D>
impl<T, C> Debug for Tee<T, C>
where
D: Debug,
C: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("Tee");
Expand All @@ -80,26 +80,26 @@ where
}

/// A shared list of `Box<Push>` used to add `Push` implementors.
pub struct TeeHelper<T, D> {
shared: PushList<T, D>,
pub struct TeeHelper<T, C> {
shared: PushList<T, C>,
}

impl<T, D> TeeHelper<T, D> {
impl<T, C> TeeHelper<T, C> {
/// Adds a new `Push` implementor to the list of recipients shared with a `Stream`.
pub fn add_pusher<P: Push<Bundle<T, D>>+'static>(&self, pusher: P) {
pub fn add_pusher<P: Push<Bundle<T, C>>+'static>(&self, pusher: P) {
self.shared.borrow_mut().push(Box::new(pusher));
}
}

impl<T, D> Clone for TeeHelper<T, D> {
impl<T, C> Clone for TeeHelper<T, C> {
fn clone(&self) -> Self {
TeeHelper {
shared: self.shared.clone(),
}
}
}

impl<T, D> Debug for TeeHelper<T, D> {
impl<T, C> Debug for TeeHelper<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("TeeHelper");

Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::progress::Timestamp;
use super::{EventCore, EventPusherCore};

/// Capture a stream of timestamped data for later replay.
pub trait Capture<T: Timestamp, D: Container> {
pub trait Capture<T: Timestamp, C: Container> {
/// Captures a stream of timestamped data for later replay.
///
/// # Examples
Expand Down Expand Up @@ -103,18 +103,18 @@ pub trait Capture<T: Timestamp, D: Container> {
///
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
/// ```
fn capture_into<P: EventPusherCore<T, D>+'static>(&self, pusher: P);
fn capture_into<P: EventPusherCore<T, C>+'static>(&self, pusher: P);

/// Captures a stream using Rust's MPSC channels.
fn capture(&self) -> ::std::sync::mpsc::Receiver<EventCore<T, D>> {
fn capture(&self) -> ::std::sync::mpsc::Receiver<EventCore<T, C>> {
let (send, recv) = ::std::sync::mpsc::channel();
self.capture_into(send);
recv
}
}

impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {
fn capture_into<P: EventPusherCore<S::Timestamp, D>+'static>(&self, mut event_pusher: P) {
impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
fn capture_into<P: EventPusherCore<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {

let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
Expand Down
Loading
Loading