use crate::random::Random; use crate::protocol::*; use crate::protocol::ast::ProcedureDefinitionId; use crate::protocol::eval::{ PortId as EvalPortId, Prompt, ValueGroup, Value, EvalContinuation, EvalResult, EvalError }; use crate::runtime2::runtime::CompId; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; use super::component::{ self, CompExecState, Component, CompScheduling, CompMode, port_id_from_eval, port_id_to_eval }; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), PerformedSelectWait(u32), None, } impl ExecStmt { fn take(&mut self) -> ExecStmt { let mut value = ExecStmt::None; std::mem::swap(self, &mut value); return value; } fn is_none(&self) -> bool { match self { ExecStmt::None => return true, _ => return false, } } } pub struct ExecCtx { stmt: ExecStmt, } impl RunContext for ExecCtx { fn performed_put(&mut self, _port: EvalPortId) -> bool { match self.stmt.take() { ExecStmt::None => return false, ExecStmt::PerformedPut => return true, _ => unreachable!(), } } fn performed_get(&mut self, _port: EvalPortId) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedGet(value) => return Some(value), _ => unreachable!(), } } fn fires(&mut self, _port: EvalPortId) -> Option { todo!("remove fires") } fn performed_fork(&mut self) -> Option { todo!("remove fork") } fn created_channel(&mut self) -> Option<(Value, Value)> { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::CreatedChannel(ports) => return Some(ports), _ => unreachable!(), } } fn performed_select_wait(&mut self) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case), _v => unreachable!(), } } } struct SelectCase { involved_ports: Vec, } // TODO: @Optimize, flatten cases into single array, have index-pointers to next case struct SelectState { cases: Vec, next_case: u32, num_cases: u32, random: Random, candidates_workspace: Vec, } enum SelectDecision { None, Case(u32), // contains case index, should be passed along to PDL code } type InboxMain = Vec>; impl SelectState { fn new() -> Self { return Self{ cases: Vec::new(), next_case: 0, num_cases: 0, random: Random::new(), candidates_workspace: Vec::new(), } } fn handle_select_start(&mut self, num_cases: u32) { self.cases.clear(); self.next_case = 0; self.num_cases = num_cases; } /// Register a port as belonging to a particular case. As for correctness of /// PDL code one cannot register the same port twice, this function might /// return an error fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> { // Retrieve case and port handle self.ensure_at_case(case_index); let cur_case = &mut self.cases[case_index as usize]; let port_handle = comp_ctx.get_port_handle(port_id); debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize); // Make sure port wasn't added before, we disallow having the same port // in the same select guard twice. if cur_case.involved_ports.contains(&port_handle) { return Err(port_id); } cur_case.involved_ports.push(port_handle); return Ok(()); } /// Notification that all ports have been registered and we should now wait /// until the appropriate messages have come in. fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { if self.num_cases != self.next_case { // This happens when there are >=1 select cases written at the end // of the select block. self.ensure_at_case(self.num_cases - 1); } return self.has_decision(inbox, comp_ctx); } fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { return self.has_decision(inbox, comp_ctx); } /// Internal helper, pushes empty cases inbetween last case and provided new /// case index. fn ensure_at_case(&mut self, new_case_index: u32) { // Push an empty case for all intermediate cases that were not // registered with a port. debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases); for _ in self.next_case..new_case_index + 1 { self.cases.push(SelectCase{ involved_ports: Vec::new() }); } self.next_case = new_case_index + 1; } /// Checks if a decision can be reached fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { self.candidates_workspace.clear(); if self.cases.is_empty() { // If there are no cases then we can immediately reach a "bogus // decision". return SelectDecision::Case(0); } // Need to check for valid case 'case_loop: for (case_index, case) in self.cases.iter().enumerate() { for port_handle in case.involved_ports.iter().copied() { let port_index = comp_ctx.get_port_index(port_handle); if inbox[port_index].is_none() { // Condition not satisfied continue 'case_loop; } } // If here then the case guard is satisfied self.candidates_workspace.push(case_index); } if self.candidates_workspace.is_empty() { return SelectDecision::None; } else { let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len(); return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32); } } } pub(crate) struct CompPDL { pub exec_state: CompExecState, select_state: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, pub sync_counter: u32, pub exec_ctx: ExecCtx, // TODO: Temporary field, simulates future plans of having one storage place // reserved per port. // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. pub inbox_main: InboxMain, pub inbox_backup: Vec, } impl Component for CompPDL { fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) { // Intentionally empty } fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) { // Intentionally empty } fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); if self.inbox_main[port_index].is_none() { self.inbox_main[port_index] = Some(message); } else { self.inbox_backup.push(message); } } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { // sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; } match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); }, Message::Poll => { unreachable!(); // because we never register at the polling thread } } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. match self.exec_state.mode { CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { return Ok(CompScheduling::Sleep); } CompMode::StartExit => return Ok(component::default_handle_start_exit( &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx )), CompMode::BusyExit => return Ok(component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx )), CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } let run_result = self.execute_prompt(&sched_ctx)?; match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_index = comp_ctx.get_port_index(port_handle); if let Some(message) = &self.inbox_main[port_index] { // Check if we can actually receive the message if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); component::default_handle_received_data_message( port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup, comp_ctx, sched_ctx, &mut self.control ); self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); } else { todo!("handle sync failure due to message deadlock"); return Ok(CompScheduling::Sleep); } } else { // We need to wait self.exec_state.set_as_blocked_get(port_id); return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); // Send the message let target_port_id = port_id_from_eval(port_id); let scheduling = component::default_send_data_message( &mut self.exec_state, target_port_id, value, sched_ctx, &mut self.consensus, comp_ctx ); // When `run` is called again (potentially after becoming // unblocked) we need to instruct the executor that we performed // the `put` self.exec_ctx.stmt = ExecStmt::PerformedPut; return Ok(scheduling); }, EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.select_state.handle_select_start(num_cases); return Ok(CompScheduling::Requeue); }, EC::SelectRegisterPort(case_index, port_index, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { todo!("handle registering a port multiple times"); } return Ok(CompScheduling::Immediate); }, EC::SelectWait => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); self.exec_state.mode = CompMode::Sync; return Ok(CompScheduling::Immediate); } else { // No decision yet self.exec_state.mode = CompMode::BlockedSelect; return Ok(CompScheduling::Sleep); } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit return Ok(CompScheduling::Immediate); }, EC::SyncBlockStart => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.create_component_and_transfer_ports( sched_ctx, comp_ctx, definition_id, type_id, arguments ); return Ok(CompScheduling::Requeue); }, EC::NewChannel => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( Value::Output(port_id_to_eval(channel.putter_id)), Value::Input(port_id_to_eval(channel.getter_id)) )); self.inbox_main.push(None); self.inbox_main.push(None); return Ok(CompScheduling::Immediate); } } } } impl CompPDL { pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { let mut inbox_main = Vec::new(); inbox_main.reserve(num_ports); for _ in 0..num_ports { inbox_main.push(None); } return Self{ exec_state: CompExecState::new(), select_state: SelectState::new(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), sync_counter: 0, exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, inbox_main, inbox_backup: Vec::new(), } } // ------------------------------------------------------------------------- // Running component and handling changes in global component state // ------------------------------------------------------------------------- fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; while let EvalContinuation::Stepping = step_result { step_result = self.prompt.step( &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, )?; } return Ok(step_result) } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component starting sync mode"); self.consensus.notify_sync_start(comp_ctx); for message in self.inbox_main.iter() { if let Some(message) = message { self.consensus.handle_incoming_data_message(comp_ctx, message); } } debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.exec_state.mode = CompMode::Sync; } /// Handles end of sync. The conclusion to the sync round might arise /// immediately (and be handled immediately), or might come later through /// messaging. In any case the component should be scheduled again /// immediately fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; self.handle_sync_decision(sched_ctx, comp_ctx, decision); } /// Handles decision from the consensus round. This will cause a change in /// the internal `Mode`, such that the next call to `run` can take the /// appropriate next steps. fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode)); match decision { SyncRoundDecision::None => { // No decision yet return; }, SyncRoundDecision::Solution => { debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); self.exec_state.mode = CompMode::NonSync; self.consensus.notify_sync_decision(decision); }, SyncRoundDecision::Failure => { debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); self.exec_state.mode = CompMode::StartExit; }, } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); self.exec_state.mode = CompMode::BusyExit; // Doing this by index, then retrieving the handle is a bit rediculous, // but Rust is being Rust with its borrowing rules. for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); if port.state == PortState::Closed { // Already closed, or in the process of being closed continue; } // Mark as closed let port_id = port.self_id; port.state = PortState::Closed; // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); let peer_info = comp_ctx.get_peer(peer); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } } // ------------------------------------------------------------------------- // Handling messages // ------------------------------------------------------------------------- /// Handles a message that came in through the public inbox. This function /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { use component::IncomingData; // Whatever we do, glean information from headers in message if self.exec_state.mode.is_in_sync_block() { self.consensus.handle_incoming_data_message(comp_ctx, &message); } let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); match component::default_handle_incoming_data_message( &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, sched_ctx, &mut self.control ) { IncomingData::PlacedInSlot => { if self.exec_state.mode == CompMode::BlockedSelect { let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); self.exec_state.mode = CompMode::Sync; } } }, IncomingData::SlotFull(message) => { self.inbox_backup.push(message); } } } fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); self.handle_sync_decision(sched_ctx, comp_ctx, decision); } // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- /// Creates a new component and transfers ports. Because of the stepwise /// process in which memory is allocated, ports are transferred, messages /// are exchanged, component lifecycle methods are called, etc. This /// function facilitates a lot of implicit assumptions (e.g. when the /// `Component::on_creation` method is called, the component is already /// registered at the runtime). fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup ) { struct PortPair{ creator_handle: LocalPortHandle, creator_id: PortId, created_handle: LocalPortHandle, created_id: PortId, } let mut opened_port_id_pairs = Vec::new(); let mut closed_port_id_pairs = Vec::new(); let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; // dbg_code!({ // sched_ctx.log(&format!( // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", // self_proc.identifier.value.as_str(), creator_ctx.id, // other_proc.identifier.value.as_str(), reservation.id() // )); // }); // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are // associated with the component we're about to create let mut arg_iter = ValueGroupPortIter::new(&mut arguments); while let Some(port_reference) = arg_iter.next() { // Create port entry for new component let creator_port_id = port_reference.id; let creator_port_handle = creator_ctx.get_port_handle(creator_port_id); let creator_port = creator_ctx.get_port(creator_port_handle); let created_port_handle = created_ctx.add_port( creator_port.peer_comp_id, creator_port.peer_port_id, creator_port.kind, creator_port.state ); let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; let port_id_pair = PortPair { creator_handle: creator_port_handle, creator_id: creator_port_id, created_handle: created_port_handle, created_id: created_port_id, }; if creator_port.state == PortState::Closed { closed_port_id_pairs.push(port_id_pair) } else { opened_port_id_pairs.push(port_id_pair); } // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) let arg_value = if let Some(heap_pos) = port_reference.heap_pos { &mut arg_iter.group.regions[heap_pos][port_reference.index] } else { &mut arg_iter.group.values[port_reference.index] }; match arg_value { Value::Input(id) => *id = port_id_to_eval(created_port_id), Value::Output(id) => *id = port_id_to_eval(created_port_id), _ => unreachable!(), } } // For each transferred port pair set their peer components to the // correct values. This will only change the values for the ports of // the new component. let mut created_component_has_remote_peers = false; for pair in opened_port_id_pairs.iter() { let creator_port_info = creator_ctx.get_port(pair.creator_handle); let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well let created_peer_port_index = opened_port_id_pairs .iter() .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { // Peer port moved to the new component as well. So // adjust IDs appropriately. let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); todo!("either add 'self peer', or remove that idea from Ctx altogether") }, None => { // Peer port remains with creator component. created_port_info.peer_comp_id = creator_ctx.id; created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None); } } } else { // Peer is a different component. We'll deal with sending the // appropriate messages later let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); let peer_info = creator_ctx.get_peer(peer_handle); created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle)); created_component_has_remote_peers = true; } } // We'll now actually turn our reservation for a new component into an // actual component. Note that we initialize it as "not sleeping" as // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); component.component.on_creation(created_key.downgrade(), sched_ctx); // Now modify the creator's ports: remove every transferred port and // potentially remove the peer component. for pair in opened_port_id_pairs.iter() { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); let creator_peer_comp_id = creator_port_info.peer_comp_id; creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id, false); creator_ctx.remove_port(pair.creator_handle); // Transfer any messages if let Some(mut message) = self.inbox_main.remove(creator_port_index) { message.data_header.target_port = pair.created_id; component.component.adopt_message(&mut component.ctx, message) } let mut message_index = 0; while message_index < self.inbox_backup.len() { let message = &self.inbox_backup[message_index]; if message.data_header.target_port == pair.creator_id { // transfer message let mut message = self.inbox_backup.remove(message_index); message.data_header.target_port = pair.created_id; component.component.adopt_message(&mut component.ctx, message); } else { message_index += 1; } } // Handle potential channel between creator and created component let created_port_info = component.ctx.get_port(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); peer_port_info.peer_comp_id = component.ctx.id; peer_port_info.peer_port_id = created_port_info.self_id; creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None); } } // Do the same for the closed ports for pair in closed_port_id_pairs.iter() { let port_index = creator_ctx.get_port_index(pair.creator_handle); creator_ctx.remove_port(pair.creator_handle); let _removed_message = self.inbox_main.remove(port_index); // In debug mode: since we've closed the port we shouldn't have any // messages for that port. debug_assert!(_removed_message.is_none()); debug_assert!(!self.inbox_backup.iter().any(|v| v.data_header.target_port == pair.creator_id)); } // By now all ports and messages have been transferred. If there are any // peers that need to be notified about this new component, then we // initiate the protocol that will notify everyone here. if created_component_has_remote_peers { let created_ctx = &component.ctx; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); for pair in opened_port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created_handle); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, pair.creator_id, pair.created_id, created_ctx.id, schedule_entry_id ); let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = created_ctx.get_peer(peer_handle); peer_info.handle.send_message(&sched_ctx.runtime, message, true); } } } else { // Peer can be scheduled immediately sched_ctx.runtime.enqueue_work(created_key); } } } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortId(port_id.id); for prev_port in ports.iter() { if *prev_port == cur_port { // Already added return; } } ports.push(cur_port); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } } struct ValueGroupPortIter<'a> { group: &'a mut ValueGroup, heap_stack: Vec<(usize, usize)>, index: usize, } impl<'a> ValueGroupPortIter<'a> { fn new(group: &'a mut ValueGroup) -> Self { return Self{ group, heap_stack: Vec::new(), index: 0 } } } struct ValueGroupPortRef { id: PortId, heap_pos: Option, // otherwise: on stack index: usize, } impl<'a> Iterator for ValueGroupPortIter<'a> { type Item = ValueGroupPortRef; fn next(&mut self) -> Option { // Enter loop that keeps iterating until a port is found loop { if let Some(pos) = self.heap_stack.last() { let (heap_pos, region_index) = *pos; if region_index >= self.group.regions[heap_pos].len() { self.heap_stack.pop(); continue; } let value = &self.group.regions[heap_pos][region_index]; self.heap_stack.last_mut().unwrap().1 += 1; match value { Value::Input(id) | Value::Output(id) => { let id = PortId(id.id); return Some(ValueGroupPortRef{ id, heap_pos: Some(heap_pos), index: region_index, }); }, _ => {}, } if let Some(heap_pos) = value.get_heap_pos() { self.heap_stack.push((heap_pos as usize, 0)); } } else { if self.index >= self.group.values.len() { return None; } let value = &mut self.group.values[self.index]; self.index += 1; match value { Value::Input(id) | Value::Output(id) => { let id = PortId(id.id); return Some(ValueGroupPortRef{ id, heap_pos: None, index: self.index - 1 }); }, _ => {}, } // Not a port, check if we need to enter a heap region if let Some(heap_pos) = value.get_heap_pos() { self.heap_stack.push((heap_pos as usize, 0)); } // else: just consider the next value } } } }