diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 8fdb556af27fe536f23f6d19a592551b097e4450..14fd2fb0dd6670acda349042897d5b70743c1c47 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -1,5 +1,6 @@ +use crate::random::Random; use crate::protocol::*; -use crate::protocol::ast::DefinitionId; +use crate::protocol::ast::ProcedureDefinitionId; use crate::protocol::eval::{ PortId as EvalPortId, Prompt, ValueGroup, Value, @@ -24,6 +25,7 @@ pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), + PerformedSelectWait(u32), None, } @@ -78,6 +80,14 @@ impl RunContext for ExecCtx { _ => unreachable!(), } } + + fn performed_select_wait(&mut self) -> Option { + match self.stmt.take() { + ExecStmt::None => return None, + ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case), + _v => unreachable!(), + } + } } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -85,17 +95,136 @@ pub(crate) enum Mode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block - BlockedGet, - BlockedPut, + BlockedGet, // blocked because we need to receive a message on a particular port + BlockedPut, // component is blocked because the port is blocked + BlockedSelect, // waiting on message to complete the select statement StartExit, // temporary state: if encountered then we start the shutdown process BusyExit, // temporary state: waiting for Acks for all the closed ports Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } +struct SelectCase { + involved_ports: Vec, +} + +// TODO: @Optimize, flatten cases into single array, have index-pointers to next case +struct SelectState { + cases: Vec, + next_case: u32, + num_cases: u32, + random: Random, + candidates_workspace: Vec, +} + +enum SelectDecision { + None, + Case(u32), // contains case index, should be passed along to PDL code +} + +type InboxMain = Vec>; + +impl SelectState { + fn new() -> Self { + return Self{ + cases: Vec::new(), + next_case: 0, + num_cases: 0, + random: Random::new(), + candidates_workspace: Vec::new(), + } + } + + fn handle_select_start(&mut self, num_cases: u32) { + self.cases.clear(); + self.next_case = 0; + self.num_cases = num_cases; + } + + /// Register a port as belonging to a particular case. As for correctness of + /// PDL code one cannot register the same port twice, this function might + /// return an error + fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> { + // Retrieve case and port handle + self.ensure_at_case(case_index); + let cur_case = &mut self.cases[case_index as usize]; + let port_handle = comp_ctx.get_port_handle(port_id); + debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize); + + // Make sure port wasn't added before, we disallow having the same port + // in the same select guard twice. + if cur_case.involved_ports.contains(&port_handle) { + return Err(port_id); + } + + cur_case.involved_ports.push(port_handle); + return Ok(()); + } + + /// Notification that all ports have been registered and we should now wait + /// until the appropriate messages have come in. + fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + if self.num_cases != self.next_case { + // This happens when there are >=1 select cases written at the end + // of the select block. + self.ensure_at_case(self.num_cases - 1); + } + + return self.has_decision(inbox, comp_ctx); + } + + fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + return self.has_decision(inbox, comp_ctx); + } + + /// Internal helper, pushes empty cases inbetween last case and provided new + /// case index. + fn ensure_at_case(&mut self, new_case_index: u32) { + // Push an empty case for all intermediate cases that were not + // registered with a port. + debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases); + for _ in self.next_case..new_case_index + 1 { + self.cases.push(SelectCase{ involved_ports: Vec::new() }); + } + self.next_case = new_case_index + 1; + } + + /// Checks if a decision can be reached + fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + self.candidates_workspace.clear(); + if self.cases.is_empty() { + // If there are no cases then we can immediately reach a "bogus + // decision". + return SelectDecision::Case(0); + } + + // Need to check for valid case + 'case_loop: for (case_index, case) in self.cases.iter().enumerate() { + for port_handle in case.involved_ports.iter().copied() { + let port_index = comp_ctx.get_port_index(port_handle); + if inbox[port_index].is_none() { + // Condition not satisfied + continue 'case_loop; + } + } + + // If here then the case guard is satisfied + self.candidates_workspace.push(case_index); + } + + if self.candidates_workspace.is_empty() { + return SelectDecision::None; + } else { + let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len(); + return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32); + } + } +} + pub(crate) struct CompPDL { pub mode: Mode, pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put + select: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, @@ -105,7 +234,7 @@ pub(crate) struct CompPDL { // reserved per port. // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. - pub inbox_main: Vec>, + pub inbox_main: InboxMain, pub inbox_backup: Vec, } @@ -121,6 +250,7 @@ impl CompPDL { mode: Mode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + select: SelectState::new(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), @@ -168,7 +298,9 @@ impl CompPDL { // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. match self.mode { - Mode::NonSync | Mode::Sync => {}, + Mode::NonSync | Mode::Sync | Mode::BlockedSelect => { + // continue and run PDL code + }, Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => { return Ok(CompScheduling::Sleep); } @@ -230,6 +362,7 @@ impl CompPDL { }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); + sched_ctx.log(&format!("Putting value {:?}", value)); let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); @@ -245,6 +378,33 @@ impl CompPDL { return Ok(CompScheduling::Immediate); } }, + EC::SelectStart(num_cases, _num_ports) => { + debug_assert_eq!(self.mode, Mode::Sync); + self.select.handle_select_start(num_cases); + return Ok(CompScheduling::Requeue); + }, + EC::SelectRegisterPort(case_index, port_index, port_id) => { + debug_assert_eq!(self.mode, Mode::Sync); + let port_id = port_id_from_eval(port_id); + if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) { + todo!("handle registering a port multiple times"); + } + return Ok(CompScheduling::Immediate); + }, + EC::SelectWait => { + debug_assert_eq!(self.mode, Mode::Sync); + let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + // Reached a conclusion, so we can continue immediately + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.mode = Mode::Sync; + return Ok(CompScheduling::Immediate); + } else { + // No decision yet + self.mode = Mode::BlockedSelect; + return Ok(CompScheduling::Sleep); + } + }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { self.mode = Mode::StartExit; // next call we'll take care of the exit @@ -255,11 +415,11 @@ impl CompPDL { self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, - EC::NewComponent(definition_id, monomorph_idx, arguments) => { + EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); self.create_component_and_transfer_ports( sched_ctx, comp_ctx, - definition_id, monomorph_idx, arguments + definition_id, type_id, arguments ); return Ok(CompScheduling::Requeue); }, @@ -311,7 +471,7 @@ impl CompPDL { /// Handles decision from the consensus round. This will cause a change in /// the internal `Mode`, such that the next call to `run` can take the /// appropriate next steps. - fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { + fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode)); let is_success = match decision { SyncRoundDecision::None => { @@ -389,6 +549,12 @@ impl CompPDL { // We were indeed blocked self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); + } else if self.mode == Mode::BlockedSelect { + let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.mode = Mode::Sync; + } } return; @@ -598,7 +764,7 @@ impl CompPDL { fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, - definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup + definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup ) { struct PortPair{ creator_handle: LocalPortHandle, @@ -692,7 +858,7 @@ impl CompPDL { // to message exchanges between remote peers. let prompt = Prompt::new( &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, - definition_id, monomorph_index, arguments, + definition_id, type_id, arguments, ); let component = CompPDL::new(prompt, port_id_pairs.len()); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(