use crate::random::Random; use crate::protocol::*; use crate::protocol::ast::ProcedureDefinitionId; use crate::protocol::eval::{ PortId as EvalPortId, Prompt, ValueGroup, Value, EvalContinuation, EvalResult, EvalError }; use crate::runtime2::runtime::CompId; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; use super::component::{ self, InboxMain, InboxBackup, GetResult, CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason, port_id_from_eval, port_id_to_eval }; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), PerformedSelectWait(u32), None, } impl ExecStmt { fn take(&mut self) -> ExecStmt { let mut value = ExecStmt::None; std::mem::swap(self, &mut value); return value; } fn is_none(&self) -> bool { match self { ExecStmt::None => return true, _ => return false, } } } pub struct ExecCtx { stmt: ExecStmt, } impl RunContext for ExecCtx { fn performed_put(&mut self, _port: EvalPortId) -> bool { match self.stmt.take() { ExecStmt::None => return false, ExecStmt::PerformedPut => return true, _ => unreachable!(), } } fn performed_get(&mut self, _port: EvalPortId) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedGet(value) => return Some(value), _ => unreachable!(), } } fn fires(&mut self, _port: EvalPortId) -> Option { todo!("remove fires") } fn performed_fork(&mut self) -> Option { todo!("remove fork") } fn created_channel(&mut self) -> Option<(Value, Value)> { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::CreatedChannel(ports) => return Some(ports), _ => unreachable!(), } } fn performed_select_wait(&mut self) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case), _v => unreachable!(), } } } struct SelectCase { involved_ports: Vec, } // TODO: @Optimize, flatten cases into single array, have index-pointers to next case struct SelectState { cases: Vec, next_case: u32, num_cases: u32, random: Random, candidates_workspace: Vec, } enum SelectDecision { None, Case(u32), // contains case index, should be passed along to PDL code } impl SelectState { fn new() -> Self { return Self{ cases: Vec::new(), next_case: 0, num_cases: 0, random: Random::new(), candidates_workspace: Vec::new(), } } fn handle_select_start(&mut self, num_cases: u32) { self.cases.clear(); self.next_case = 0; self.num_cases = num_cases; } /// Register a port as belonging to a particular case. As for correctness of /// PDL code one cannot register the same port twice, this function might /// return an error fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> { // Retrieve case and port handle self.ensure_at_case(case_index); let cur_case = &mut self.cases[case_index as usize]; let port_handle = comp_ctx.get_port_handle(port_id); debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize); // Make sure port wasn't added before, we disallow having the same port // in the same select guard twice. if cur_case.involved_ports.contains(&port_handle) { return Err(port_id); } cur_case.involved_ports.push(port_handle); return Ok(()); } /// Notification that all ports have been registered and we should now wait /// until the appropriate messages have come in. fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { if self.num_cases != self.next_case { // This happens when there are >=1 select cases written at the end // of the select block. self.ensure_at_case(self.num_cases - 1); } return self.has_decision(inbox, comp_ctx); } fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { return self.has_decision(inbox, comp_ctx); } /// Internal helper, pushes empty cases inbetween last case and provided new /// case index. fn ensure_at_case(&mut self, new_case_index: u32) { // Push an empty case for all intermediate cases that were not // registered with a port. debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases); for _ in self.next_case..new_case_index + 1 { self.cases.push(SelectCase{ involved_ports: Vec::new() }); } self.next_case = new_case_index + 1; } /// Checks if a decision can be reached fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { self.candidates_workspace.clear(); if self.cases.is_empty() { // If there are no cases then we can immediately reach a "bogus // decision". return SelectDecision::Case(0); } // Need to check for valid case 'case_loop: for (case_index, case) in self.cases.iter().enumerate() { for port_handle in case.involved_ports.iter().copied() { let port_index = comp_ctx.get_port_index(port_handle); if inbox[port_index].is_none() { // Condition not satisfied continue 'case_loop; } } // If here then the case guard is satisfied self.candidates_workspace.push(case_index); } if self.candidates_workspace.is_empty() { return SelectDecision::None; } else { let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len(); return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32); } } } pub(crate) struct CompPDL { pub exec_state: CompExecState, select_state: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, pub sync_counter: u32, pub exec_ctx: ExecCtx, // TODO: Temporary field, simulates future plans of having one storage place // reserved per port. // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. pub inbox_main: InboxMain, pub inbox_backup: InboxBackup, } impl Component for CompPDL { fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) { // Intentionally empty } fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) { // Intentionally empty } fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); if self.inbox_main[port_index].is_none() { self.inbox_main[port_index] = Some(message); } else { self.inbox_backup.push(message); } } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { sched_ctx.debug(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle sched_ctx.debug(&format!("rerouting to: {:?}", new_target)); target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; } sched_ctx.debug("handling message myself"); match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { self.handle_generic_component_error(sched_ctx, location_and_message); } }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); }, Message::Poll => { unreachable!(); // because we never register at the polling thread } } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { use EvalContinuation as EC; sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. match self.exec_state.mode { CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect | CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady | CompMode::NewComponentBlocked => { return CompScheduling::Sleep; } CompMode::StartExit => return component::default_handle_start_exit( &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus ), CompMode::BusyExit => return component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx ), CompMode::Exit => return component::default_handle_exit(&self.exec_state), } let run_result = self.execute_prompt(&sched_ctx); if let Err(error) = run_result { self.handle_component_error(sched_ctx, CompError::Executor(error)); return CompScheduling::Immediate; } let run_result = run_result.unwrap(); match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Immediate; }, EC::BlockGet(expr_id, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); match component::default_attempt_get( &mut self.exec_state, port_id, PortInstruction::SourceLocation(expr_id), &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, &mut self.control, &mut self.consensus ) { GetResult::Received(message) => { self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return CompScheduling::Immediate; }, GetResult::NoMessage => { return CompScheduling::Sleep; }, GetResult::Error(location_and_message) => { self.handle_generic_component_error(sched_ctx, location_and_message); return CompScheduling::Immediate; } } }, EC::Put(expr_id, port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.info(&format!("Putting value {:?}", value)); // Send the message let target_port_id = port_id_from_eval(port_id); let send_result = component::default_send_data_message( &mut self.exec_state, target_port_id, PortInstruction::SourceLocation(expr_id), value, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx ); if let Err(location_and_message) = send_result { self.handle_generic_component_error(sched_ctx, location_and_message); return CompScheduling::Immediate; } else { // When `run` is called again (potentially after becoming // unblocked) we need to instruct the executor that we performed // the `put` let scheduling = send_result.unwrap(); self.exec_ctx.stmt = ExecStmt::PerformedPut; return scheduling; } }, EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.select_state.handle_select_start(num_cases); return CompScheduling::Requeue; }, EC::SelectRegisterPort(expr_id, case_index, port_index, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); // Note: we register the "last_instruction" here already. This // way if we get a `ClosePort` message, the condition to fail // the synchronous round is satisfied. let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = PortInstruction::SourceLocation(expr_id); let port_is_closed = port_info.state.is_closed(); // Register port as part of select guard if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { // Failure occurs if a port is used twice in the same guard let protocol = &sched_ctx.runtime.protocol; self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( &self.prompt, &protocol.modules, &protocol.heap, expr_id, String::from("Cannot have the one port appear in the same guard twice") ))); } else if port_is_closed { // Port is closed let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; let protocol = &sched_ctx.runtime.protocol; self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( &self.prompt, &protocol.modules, &protocol.heap, expr_id, format!("Cannot register port, as the peer component (id:{}) has shut down", peer_id.0) ))); } return CompScheduling::Immediate; }, EC::SelectWait => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); self.exec_state.mode = CompMode::Sync; return CompScheduling::Immediate; } else { // No decision yet self.exec_state.mode = CompMode::BlockedSelect; return CompScheduling::Sleep; } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { self.exec_state.set_as_start_exit(ExitReason::Termination); return CompScheduling::Immediate; }, EC::SyncBlockStart => { component::default_handle_sync_start( &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus ); return CompScheduling::Immediate; }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); component::default_start_create_component( &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, &mut self.inbox_main, &mut self.inbox_backup, definition_id, type_id, arguments ); return CompScheduling::Requeue; }, EC::NewChannel => { debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( Value::Output(port_id_to_eval(channel.putter_id)), Value::Input(port_id_to_eval(channel.getter_id)) )); self.inbox_main.push(None); self.inbox_main.push(None); return CompScheduling::Immediate; } } } } impl CompPDL { pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { let mut inbox_main = Vec::new(); inbox_main.reserve(num_ports); for _ in 0..num_ports { inbox_main.push(None); } return Self{ exec_state: CompExecState::new(), select_state: SelectState::new(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), sync_counter: 0, exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, inbox_main, inbox_backup: Vec::new(), } } // ------------------------------------------------------------------------- // Running component and handling changes in global component state // ------------------------------------------------------------------------- fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; while let EvalContinuation::Stepping = step_result { step_result = self.prompt.step( &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, )?; } return Ok(step_result) } // ------------------------------------------------------------------------- // Handling messages // ------------------------------------------------------------------------- /// Handles a message that came in through the public inbox. This function /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { use component::IncomingData; // Whatever we do, glean information from headers in message if self.exec_state.mode.is_in_sync_block() { self.consensus.handle_incoming_data_message(comp_ctx, &message); } match component::default_handle_incoming_data_message( &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control ) { IncomingData::PlacedInSlot => { if self.exec_state.mode == CompMode::BlockedSelect { let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); self.exec_state.mode = CompMode::Sync; } } }, IncomingData::SlotFull(message) => { self.inbox_backup.push(message); } } } fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); } /// Handles an error coming from the generic `component::handle_xxx` /// functions. Hence accepts argument as a tuple. fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) { // Retrieve location and message, display in terminal let (location, message) = location_and_message; let error = match location { PortInstruction::None => CompError::Component(message), PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location PortInstruction::SourceLocation(expression_id) => { let protocol = &sched_ctx.runtime.protocol; CompError::Executor(EvalError::new_error_at_expr( &self.prompt, &protocol.modules, &protocol.heap, expression_id, message )) } }; self.handle_component_error(sched_ctx, error); } fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) { sched_ctx.error(&format!("{}", error)); // Set state to handle subsequent error let exit_reason = if self.exec_state.mode.is_in_sync_block() { ExitReason::ErrorInSync } else { ExitReason::ErrorNonSync }; self.exec_state.set_as_start_exit(exit_reason); } }