diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 011ce9823aeb18f74da57befad141de61da4ef0d..14fd2fb0dd6670acda349042897d5b70743c1c47 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -1,3 +1,4 @@ +use crate::random::Random; use crate::protocol::*; use crate::protocol::ast::ProcedureDefinitionId; use crate::protocol::eval::{ @@ -24,8 +25,6 @@ pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), - PerformedSelectStart, - PerformedSelectRegister, PerformedSelectWait(u32), None, } @@ -82,27 +81,11 @@ impl RunContext for ExecCtx { } } - fn performed_select_start(&mut self) -> bool { - match self.stmt.take() { - ExecStmt::None => return false, - ExecStmt::PerformedSelectStart => return true, - _ => unreachable!(), - } - } - - fn performed_select_register_port(&mut self) -> bool { - match self.stmt.take() { - ExecStmt::None => return false, - ExecStmt::PerformedSelectRegister => return true, - _ => unreachable!(), - } - } - fn performed_select_wait(&mut self) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case), - _ => unreachable!(), + _v => unreachable!(), } } } @@ -112,17 +95,136 @@ pub(crate) enum Mode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block - BlockedGet, - BlockedPut, + BlockedGet, // blocked because we need to receive a message on a particular port + BlockedPut, // component is blocked because the port is blocked + BlockedSelect, // waiting on message to complete the select statement StartExit, // temporary state: if encountered then we start the shutdown process BusyExit, // temporary state: waiting for Acks for all the closed ports Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } +struct SelectCase { + involved_ports: Vec, +} + +// TODO: @Optimize, flatten cases into single array, have index-pointers to next case +struct SelectState { + cases: Vec, + next_case: u32, + num_cases: u32, + random: Random, + candidates_workspace: Vec, +} + +enum SelectDecision { + None, + Case(u32), // contains case index, should be passed along to PDL code +} + +type InboxMain = Vec>; + +impl SelectState { + fn new() -> Self { + return Self{ + cases: Vec::new(), + next_case: 0, + num_cases: 0, + random: Random::new(), + candidates_workspace: Vec::new(), + } + } + + fn handle_select_start(&mut self, num_cases: u32) { + self.cases.clear(); + self.next_case = 0; + self.num_cases = num_cases; + } + + /// Register a port as belonging to a particular case. As for correctness of + /// PDL code one cannot register the same port twice, this function might + /// return an error + fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> { + // Retrieve case and port handle + self.ensure_at_case(case_index); + let cur_case = &mut self.cases[case_index as usize]; + let port_handle = comp_ctx.get_port_handle(port_id); + debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize); + + // Make sure port wasn't added before, we disallow having the same port + // in the same select guard twice. + if cur_case.involved_ports.contains(&port_handle) { + return Err(port_id); + } + + cur_case.involved_ports.push(port_handle); + return Ok(()); + } + + /// Notification that all ports have been registered and we should now wait + /// until the appropriate messages have come in. + fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + if self.num_cases != self.next_case { + // This happens when there are >=1 select cases written at the end + // of the select block. + self.ensure_at_case(self.num_cases - 1); + } + + return self.has_decision(inbox, comp_ctx); + } + + fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + return self.has_decision(inbox, comp_ctx); + } + + /// Internal helper, pushes empty cases inbetween last case and provided new + /// case index. + fn ensure_at_case(&mut self, new_case_index: u32) { + // Push an empty case for all intermediate cases that were not + // registered with a port. + debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases); + for _ in self.next_case..new_case_index + 1 { + self.cases.push(SelectCase{ involved_ports: Vec::new() }); + } + self.next_case = new_case_index + 1; + } + + /// Checks if a decision can be reached + fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + self.candidates_workspace.clear(); + if self.cases.is_empty() { + // If there are no cases then we can immediately reach a "bogus + // decision". + return SelectDecision::Case(0); + } + + // Need to check for valid case + 'case_loop: for (case_index, case) in self.cases.iter().enumerate() { + for port_handle in case.involved_ports.iter().copied() { + let port_index = comp_ctx.get_port_index(port_handle); + if inbox[port_index].is_none() { + // Condition not satisfied + continue 'case_loop; + } + } + + // If here then the case guard is satisfied + self.candidates_workspace.push(case_index); + } + + if self.candidates_workspace.is_empty() { + return SelectDecision::None; + } else { + let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len(); + return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32); + } + } +} + pub(crate) struct CompPDL { pub mode: Mode, pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put + select: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, @@ -132,7 +234,7 @@ pub(crate) struct CompPDL { // reserved per port. // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. - pub inbox_main: Vec>, + pub inbox_main: InboxMain, pub inbox_backup: Vec, } @@ -148,6 +250,7 @@ impl CompPDL { mode: Mode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + select: SelectState::new(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), @@ -195,7 +298,9 @@ impl CompPDL { // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. match self.mode { - Mode::NonSync | Mode::Sync => {}, + Mode::NonSync | Mode::Sync | Mode::BlockedSelect => { + // continue and run PDL code + }, Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => { return Ok(CompScheduling::Sleep); } @@ -257,6 +362,7 @@ impl CompPDL { }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); + sched_ctx.log(&format!("Putting value {:?}", value)); let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); @@ -272,18 +378,32 @@ impl CompPDL { return Ok(CompScheduling::Immediate); } }, - EC::SelectStart(num_cases, num_ports) => { + EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.mode, Mode::Sync); - todo!("finish handling select start") + self.select.handle_select_start(num_cases); + return Ok(CompScheduling::Requeue); }, EC::SelectRegisterPort(case_index, port_index, port_id) => { debug_assert_eq!(self.mode, Mode::Sync); - todo!("finish handling register port") + let port_id = port_id_from_eval(port_id); + if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) { + todo!("handle registering a port multiple times"); + } + return Ok(CompScheduling::Immediate); }, EC::SelectWait => { debug_assert_eq!(self.mode, Mode::Sync); - self.handle_select_wait(sched_ctx, comp_ctx); - todo!("finish handling select wait") + let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + // Reached a conclusion, so we can continue immediately + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.mode = Mode::Sync; + return Ok(CompScheduling::Immediate); + } else { + // No decision yet + self.mode = Mode::BlockedSelect; + return Ok(CompScheduling::Sleep); + } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { @@ -372,13 +492,6 @@ impl CompPDL { } } - /// Handles the moment where the PDL code has notified the runtime of all - /// the ports it is waiting on. - fn handle_select_wait(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component waiting for select conclusion"); - - } - fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); debug_assert_eq!(self.mode, Mode::StartExit); @@ -436,6 +549,12 @@ impl CompPDL { // We were indeed blocked self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); + } else if self.mode == Mode::BlockedSelect { + let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.mode = Mode::Sync; + } } return;