Skip to content

Commit

Permalink
Request batches to GeneratorStep if only step in pipeline (#828)
Browse files Browse the repository at this point in the history
* Add generating batches to `GeneratorStep` if unique step in the pipeline

* Remove print
  • Loading branch information
gabrielmbmb authored Jul 24, 2024
1 parent e99ecdf commit 90909ab
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions src/distilabel/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,9 @@ def _manage_batch_flow(self, batch: "_Batch") -> None:
self._send_batch_to_step(new_batch)
else:
self._request_more_batches_if_needed(step)
else:
if len(self.dag) == 1:
self._request_batch_from_generator(step.name) # type: ignore

self._cache()

Expand Down Expand Up @@ -1225,6 +1228,19 @@ def _request_initial_batches(self) -> None:
)
self._send_batch_to_step(batch)

def _request_batch_from_generator(self, step_name: str) -> None:
"""Request a new batch to a `GeneratorStep`.
Args:
step_name: the name of the `GeneratorStep` to which a batch has to be requested.
"""
# Get the last batch that the previous step sent to generate the next batch
# (next `seq_no`).
last_batch = self._batch_manager.get_last_batch_sent(step_name) # type: ignore
if last_batch is None:
return
self._send_batch_to_step(last_batch.next_batch())

def _request_more_batches_if_needed(self, step: "Step") -> None:
"""Request more batches to the predecessors steps of `step` if needed.
Expand All @@ -1239,17 +1255,7 @@ def _request_more_batches_if_needed(self, step: "Step") -> None:
if previous_step_name not in self.dag.root_steps:
continue

# Get the last batch that the previous step sent to generate the next batch
# (next `seq_no`).
last_batch = self._batch_manager.get_last_batch_sent(previous_step_name) # type: ignore
if last_batch is None:
continue

self._logger.debug(
f"Step '{step.name}' input buffer for step '{previous_step_name}' is"
" empty. Requesting new batch..."
)
self._send_batch_to_step(last_batch.next_batch())
self._request_batch_from_generator(previous_step_name)

def _handle_batch_on_stop(self, batch: "_Batch") -> None:
"""Handles a batch that was received from the output queue when the pipeline was
Expand Down

0 comments on commit 90909ab

Please sign in to comment.