diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index c2c4b13b725990310eb9c7d7454f5e5dbac4f7bb..ac1c7b6062fae6ccf56116ff3030f491d56068c5 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -6,11 +6,9 @@ use crate::protocol::eval::{ EvalContinuation, EvalResult, EvalError }; -use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; -use super::*; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -86,23 +84,12 @@ impl RunContext for ExecCtx { pub(crate) enum Mode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components - SyncFail, // something went wrong during sync mode (deadlocked, error, whatever) SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, - StartExit, // temp state - Exit, -} - -impl Mode { - fn can_run(&self) -> bool { - match self { - Mode::NonSync | Mode::Sync => - return true, - Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::StartExit | Mode::Exit => - return false, - } - } + StartExit, // temporary state: if encountered then we start the shutdown process + BusyExit, // temporary state: waiting for Acks for all the closed ports + Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } pub(crate) struct CompPDL { @@ -176,15 +163,30 @@ impl CompPDL { pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; - if self.mode == Mode::StartExit { - self.mode = Mode::Exit; - return Ok(CompScheduling::Exit); - } + sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); - let can_run = self.mode.can_run(); - sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run)); - if !can_run { - return Ok(CompScheduling::Sleep); + // Depending on the mode don't do anything at all, take some special + // actions, or fall through and run the PDL code. + match self.mode { + Mode::NonSync | Mode::Sync => {}, + Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => { + return Ok(CompScheduling::Sleep); + } + Mode::StartExit => { + self.handle_component_exit(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); + }, + Mode::BusyExit => { + if self.control.has_acks_remaining() { + return Ok(CompScheduling::Sleep); + } else { + self.mode = Mode::Exit; + return Ok(CompScheduling::Exit); + } + }, + Mode::Exit => { + return Ok(CompScheduling::Exit); + } } let run_result = self.execute_prompt(&sched_ctx)?; @@ -195,8 +197,8 @@ impl CompPDL { // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); - let scheduling = self.handle_sync_end(sched_ctx, comp_ctx); - return Ok(scheduling.unwrap_or(CompScheduling::Immediate)); + self.handle_sync_end(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); @@ -216,7 +218,7 @@ impl CompPDL { self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); } else { - self.mode = Mode::SyncFail; + todo!("handle sync failure due to message deadlock"); return Ok(CompScheduling::Sleep); } } else { @@ -240,8 +242,8 @@ impl CompPDL { }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.handle_component_exit(sched_ctx, comp_ctx); - return Ok(CompScheduling::Exit); + self.mode = Mode::StartExit; // next call we'll take care of the exit + return Ok(CompScheduling::Immediate); }, EC::SyncBlockStart => { debug_assert_eq!(self.mode, Mode::NonSync); @@ -290,45 +292,65 @@ impl CompPDL { self.mode = Mode::Sync; } - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option { + /// Handles end of sync. The conclusion to the sync round might arise + /// immediately (and be handled immediately), or might come later through + /// messaging. In any case the component should be scheduled again + /// immediately + fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); self.mode = Mode::SyncEnd; - self.handle_sync_decision(sched_ctx, comp_ctx, decision) + self.handle_sync_decision(sched_ctx, comp_ctx, decision); } - fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option { + /// Handles decision from the consensus round. This will cause a change in + /// the internal `Mode`, such that the next call to `run` can take the + /// appropriate next steps. + fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { debug_assert_eq!(self.mode, Mode::SyncEnd); sched_ctx.log(&format!("Handling sync decision: {:?}", decision)); let is_success = match decision { SyncRoundDecision::None => { // No decision yet - return None; + return; }, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; // If here then we've reached a decision - self.mode = Mode::NonSync; if is_success { + self.mode = Mode::NonSync; self.consensus.notify_sync_decision(decision); - return None; } else { - todo!("handle this better, show some kind of error"); - self.handle_component_exit(sched_ctx, comp_ctx); - self.mode = Mode::Exit; - return Some(CompScheduling::Exit); + self.mode = Mode::StartExit; } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); - debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync + debug_assert_eq!(self.mode, Mode::StartExit); + self.mode = Mode::BusyExit; + + // Doing this by index, then retrieving the handle is a bit rediculous, + // but Rust is being Rust with its borrowing rules. + for port_index in 0..comp_ctx.num_ports() { + let port = comp_ctx.get_port_by_index_mut(port_index); + if port.state == PortState::Closed { + // Already closed, or in the process of being closed + continue; + } + + // Mark as closed + let port_id = port.self_id; + port.state = PortState::Closed; - // Note: for now we have that the scheduler handles exiting. I don't - // know if that is a good idea, we'll see - self.mode = Mode::Exit; + // Notify peer of closing + let port_handle = comp_ctx.get_port_handle(port_id); + let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); + let peer_info = comp_ctx.get_peer(peer); + peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + } } // ------------------------------------------------------------------------- @@ -433,15 +455,14 @@ impl CompPDL { ControlMessageContent::Ack => { let mut to_ack = message.id; loop { - let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); + let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { - AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { + AckAction::SendMessage(target_comp, message) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); - to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle @@ -454,11 +475,13 @@ impl CompPDL { sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); - break; }, - AckAction::None => { - break; - } + AckAction::None => {} + } + + match new_to_ack { + Some(new_to_ack) => to_ack = new_to_ack, + None => break, } } }, @@ -479,9 +502,9 @@ impl CompPDL { let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - comp_ctx.set_port_state(port_handle, PortState::Closed); send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id); + comp_ctx.set_port_state(port_handle, PortState::Closed); }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) @@ -636,6 +659,7 @@ impl CompPDL { }, None => { // Peer port remains with creator component. + println!("DEBUG: Setting peer for port {:?} of component {:?} to {:?}", created_port_info.self_id, reservation.id(), creator_ctx.id); created_port_info.peer_comp_id = creator_ctx.id; created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None); } @@ -670,7 +694,8 @@ impl CompPDL { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_port_info.peer_comp_id); + let creator_peer_comp_id = creator_port_info.peer_comp_id; + creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id); creator_ctx.remove_port(pair.creator_handle); // Transfer any messages @@ -700,7 +725,7 @@ impl CompPDL { let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); peer_port_info.peer_comp_id = created_ctx.id; - creator_ctx.add_peer(pair.created_handle, sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None); } }