Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jun 28, 2024
1 parent 2eefa75 commit d742cf3
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 203 deletions.
12 changes: 6 additions & 6 deletions nativelink-scheduler/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ pub struct OrderBy {
pub type ActionStateResultStream = Pin<Box<dyn Stream<Item = Arc<dyn ActionStateResult>> + Send>>;

#[async_trait]
pub trait ClientStateManager {
pub trait ClientStateManager: Sync + Send {
/// Add a new action to the queue or joins an existing action.
async fn add_action(
&mut self,
&self,
action_info: ActionInfo,
) -> Result<Arc<dyn ActionStateResult>, Error>;

Expand All @@ -113,21 +113,21 @@ pub trait ClientStateManager {
}

#[async_trait]
pub trait WorkerStateManager {
pub trait WorkerStateManager: Sync + Send {
/// Update that state of an operation.
/// The worker must also send periodic updates even if the state
/// did not change with a modified timestamp in order to prevent
/// the operation from being considered stale and being rescheduled.
async fn update_operation(
&mut self,
&self,
operation_id: OperationId,
worker_id: WorkerId,
action_stage: Result<ActionStage, Error>,
) -> Result<(), Error>;
}

#[async_trait]
pub trait MatchingEngineStateManager {
pub trait MatchingEngineStateManager: Sync + Send {
/// Returns a stream of operations that match the filter.
async fn filter_operations(
&self,
Expand All @@ -136,7 +136,7 @@ pub trait MatchingEngineStateManager {

/// Update that state of an operation.
async fn update_operation(
&mut self,
&self,
operation_id: OperationId,
worker_id: Option<WorkerId>,
action_stage: Result<ActionStage, Error>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,26 @@ use tokio::sync::watch;
use crate::operation_state_manager::ActionStateResult;

pub struct MatchingEngineActionStateResult {
pub action_info: Arc<ActionInfo>,
action_info: Arc<ActionInfo>,
action_state: watch::Receiver<Arc<ActionState>>,
}
impl MatchingEngineActionStateResult {
pub(crate) fn new(action_info: Arc<ActionInfo>) -> Self {
Self { action_info }
pub(crate) fn new(action_info: Arc<ActionInfo>, action_state: watch::Receiver<Arc<ActionState>>) -> Self {
Self {
action_info,
action_state,
}
}
}

#[async_trait]
impl ActionStateResult for MatchingEngineActionStateResult {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
unimplemented!()
Ok(self.action_state.borrow().clone())
}

async fn as_receiver(&self) -> Result<&'_ watch::Receiver<Arc<ActionState>>, Error> {
unimplemented!()
Ok(&self.action_state)
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
Expand Down
28 changes: 9 additions & 19 deletions nativelink-scheduler/src/scheduler_state/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::scheduler_state::client_action_state_result::ClientActionStateResult;
use crate::scheduler_state::completed_action::CompletedAction;
use crate::scheduler_state::matching_engine_action_state_result::MatchingEngineActionStateResult;
use crate::scheduler_state::metrics::Metrics;
use crate::scheduler_state::workers::Workers;
use crate::worker::WorkerUpdate;

#[repr(transparent)]
Expand All @@ -53,7 +52,6 @@ impl StateManager {
pub(crate) fn new(
queued_actions_set: HashSet<Arc<ActionInfo>>,
queued_actions: BTreeMap<Arc<ActionInfo>, AwaitedAction>,
workers: Workers,
active_actions: HashMap<Arc<ActionInfo>, AwaitedAction>,
recently_completed_actions: HashSet<CompletedAction>,
metrics: Arc<Metrics>,
Expand All @@ -64,7 +62,6 @@ impl StateManager {
inner: StateManagerImpl {
queued_actions_set,
queued_actions,
workers,
active_actions,
recently_completed_actions,
metrics,
Expand Down Expand Up @@ -179,10 +176,6 @@ pub(crate) struct StateManagerImpl {
/// Important: `queued_actions_set` and `queued_actions` must be kept in sync.
pub(crate) queued_actions: BTreeMap<Arc<ActionInfo>, AwaitedAction>,

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
pub(crate) workers: Workers,

/// A map of all actions that are active. A hashmap is used to find actions that are active in
/// O(1) time. The key is the `ActionInfo` struct. The value is the `AwaitedAction` struct.
pub(crate) active_actions: HashMap<Arc<ActionInfo>, AwaitedAction>,
Expand Down Expand Up @@ -561,7 +554,7 @@ impl ClientStateManager for StateManager {
#[async_trait]
impl WorkerStateManager for StateManager {
async fn update_operation(
&mut self,
&self,
operation_id: OperationId,
worker_id: WorkerId,
action_stage: Result<ActionStage, Error>,
Expand Down Expand Up @@ -684,23 +677,20 @@ impl MatchingEngineStateManager for StateManager {
_filter: OperationFilter, // TODO(adam): reference filter
) -> Result<ActionStateResultStream, Error> {
// TODO(adams): use OperationFilter vs directly encoding it.
let action_infos: Map<
Cloned<Rev<Keys<Arc<ActionInfo>, AwaitedAction>>>,
fn(Arc<ActionInfo>) -> Arc<dyn ActionStateResult>,
> = self
let action_infos = self
.inner
.queued_actions
.keys()
.iter()
.rev()
.cloned()
.map(|action_info| {
// TODO(adam): ActionState is always available and can be returned from here.
// later we might want to rewrite this to return ActionState.
.map(|(action_info, awaited_action)| {
let cloned_action_info = action_info.clone();
Arc::new(MatchingEngineActionStateResult::new(cloned_action_info))
Arc::new(
MatchingEngineActionStateResult::new(cloned_action_info, awaited_action.notify_channel.subscribe())

) as Arc<dyn ActionStateResult>
});

let action_infos: Vec<Arc<dyn ActionStateResult>> = action_infos.collect();
let action_infos: Vec<Arc<(dyn ActionStateResult)>> = action_infos.collect();
Ok(Box::pin(stream::iter(action_infos)))
}

Expand Down
8 changes: 2 additions & 6 deletions nativelink-scheduler/src/scheduler_state/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
use nativelink_util::action_messages::{ActionStage, WorkerId};
use nativelink_util::platform_properties::{self, PlatformProperties};
use tracing::{event, Level};

use crate::scheduler_state::awaited_action::AwaitedAction;
Expand Down Expand Up @@ -96,13 +97,8 @@ impl Workers {
// simulation of worst cases in a single threaded environment.
pub(crate) fn find_worker_for_action(
&self,
awaited_action: &AwaitedAction,
action_properties: &PlatformProperties,
) -> Option<WorkerId> {
assert!(matches!(
awaited_action.current_state.stage,
ActionStage::Queued
));
let action_properties = &awaited_action.action_info.platform_properties;
let mut workers_iter = self.workers.iter();
let workers_iter = match self.allocation_strategy {
// Use rfind to get the least recently used that satisfies the properties.
Expand Down
Loading

0 comments on commit d742cf3

Please sign in to comment.