diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 05119e155abc9827abad7eaa7cbe7f6222af6a50..2a6a9283b31b053a5aef528984a648f10cdddac5 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -63,7 +63,7 @@ impl CompCtx { self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, - state: PortState::Closed, + state: PortState::Open, peer_comp_id: self.id, }); @@ -341,8 +341,8 @@ impl CompPDL { // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); - self.handle_sync_end(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + let scheduling = self.handle_sync_end(sched_ctx, comp_ctx); + return Ok(scheduling.unwrap_or(CompScheduling::Immediate)); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); @@ -427,18 +427,44 @@ impl CompPDL { } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + sched_ctx.log("Component starting sync mode"); self.consensus.notify_sync_start(comp_ctx); debug_assert_eq!(self.mode, Mode::NonSync); self.mode = Mode::Sync; } - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - self.consensus.notify_sync_end(sched_ctx, comp_ctx); + fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option { + sched_ctx.log("Component ending sync mode (now waiting for solution)"); + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.handle_sync_decision(sched_ctx, comp_ctx, decision) + } + + fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option { debug_assert_eq!(self.mode, Mode::Sync); - self.mode = Mode::SyncEnd; + let is_success = match decision { + SyncRoundDecision::None => { + // No decision yet + return None; + }, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + // If here then we've reached a decision + if is_success { + self.mode = Mode::NonSync; + self.consensus.notify_sync_decision(decision); + return None; + } else { + todo!("handle this better, show some kind of error"); + self.mode = Mode::Exit; + self.handle_component_exit(sched_ctx, comp_ctx); + return Some(CompScheduling::Exit); + } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + sched_ctx.log("Component exiting"); debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync // Note: for now we have that the scheduler handles exiting. I don't @@ -637,8 +663,11 @@ impl CompPDL { ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); + let old_peer_comp_id = port_info.peer_comp_id; debug_assert!(port_info.state == PortState::Blocked); port_info.peer_comp_id = new_comp_id; + comp_ctx.add_peer(sched_ctx, new_comp_id, None); + comp_ctx.remove_peer(sched_ctx, old_peer_comp_id); self.unblock_local_port(sched_ctx, comp_ctx, port_id); } } @@ -646,28 +675,7 @@ impl CompPDL { fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - let is_success = match decision { - SyncRoundDecision::None => { - // No decision yet - return None; - }, - SyncRoundDecision::Solution => true, - SyncRoundDecision::Failure => false, - }; - - // If here then we've reached a conclusion - debug_assert_eq!(self.mode, Mode::SyncEnd); - self.mode = Mode::NonSync; - - if is_success { - // We can simply continue executing. So we do nothing extra! - } else { - todo!("handle this better, show some kind of error"); - self.handle_component_exit(sched_ctx, comp_ctx); - return Some(CompScheduling::Exit); - } - - return None; + return self.handle_sync_decision(sched_ctx, comp_ctx, decision); } // -------------------------------------------------------------------------