use crate::protocol::*; use crate::protocol::ast::DefinitionId; use crate::protocol::eval::{ PortId as EvalPortId, Prompt, ValueGroup, Value, EvalContinuation, EvalResult, EvalError }; use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; use super::*; use super::control_layer::*; use super::consensus::Consensus; pub enum CompScheduling { Immediate, Requeue, Sleep, Exit, } pub struct CompCtx { pub id: CompId, pub ports: Vec, pub peers: Vec, pub messages: Vec>, // same size as "ports" pub port_id_counter: u32, } impl CompCtx { pub(crate) fn new(reservation: &CompReserved) -> Self { return Self{ id: reservation.id(), ports: Vec::new(), peers: Vec::new(), messages: Vec::new(), port_id_counter: 0, } } } struct MessageView<'a> { index: usize, pub message: &'a DataMessage, } impl CompCtx { /// Creates a new channel that is fully owned by the component associated /// with this context. fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, state: PortState::Open, peer_comp_id: self.id, }); self.ports.push(Port{ self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, state: PortState::Open, peer_comp_id: self.id, }); return Channel{ putter_id, getter_id }; } /// Adopts a port transferred by another component. Essentially copies all /// port data but creates a new ID. Caller should ensure that the other /// endpoint becomes aware of this ID. fn adopt_port(&mut self, to_transfer: &Port) -> &mut Port { let port_id = PortId(self.take_port_id()); let port_index = self.ports.len(); self.ports.push(Port{ self_id: port_id, peer_id: to_transfer.peer_id, kind: to_transfer.kind, state: to_transfer.state, peer_comp_id: to_transfer.peer_comp_id, }); return &mut self.ports[port_index]; } /// Adds a peer (or increments the "associated port" counter). Hence caller /// must make sure that this makes sense. fn add_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId, peer_handle: Option<&CompHandle>) { match self.get_peer_index(peer_id) { Some(peer_index) => { let peer_info = &mut self.peers[peer_index]; peer_info.num_associated_ports += 1; }, None => { let handle = if let Some(handle) = peer_handle { handle.clone() } else { sched_ctx.runtime.get_component_public(peer_id) }; self.peers.push(Peer{ id: peer_id, num_associated_ports: 1, handle, }) } } } /// Removes a peer (or decrements the "associated port" counter). If there /// are no more references to the peer then the handle will be destroyed. fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId) { let peer_index = self.get_peer_index(peer_id).unwrap(); let peer_info = &mut self.peers[peer_index]; peer_info.num_associated_ports -= 1; if peer_info.num_associated_ports == 0 { let mut peer = self.peers.remove(peer_index); let should_remove = peer.handle.decrement_users(); if should_remove { let key = unsafe{ peer.id.upgrade() }; sched_ctx.runtime.destroy_component(key); } } } pub(crate) fn get_port(&self, port_id: PortId) -> &Port { let index = self.get_port_index(port_id).unwrap(); return &self.ports[index]; } pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port { let index = self.get_port_index(port_id).unwrap(); return &mut self.ports[index]; } pub(crate) fn get_port_index(&self, port_id: PortId) -> Option { for (index, port) in self.ports.iter().enumerate() { if port.self_id == port_id { return Some(index); } } return None; } pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer { let index = self.get_peer_index(peer_id).unwrap(); return &self.peers[index]; } fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer { let index = self.get_peer_index(peer_id).unwrap(); return &mut self.peers[index]; } pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option { for (index, peer) in self.peers.iter().enumerate() { if peer.id == peer_id { return Some(index); } } return None; } fn take_port_id(&mut self) -> u32 { let port_id = self.port_id_counter; self.port_id_counter = self.port_id_counter.wrapping_add(1); return port_id; } } pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), None, } impl ExecStmt { fn take(&mut self) -> ExecStmt { let mut value = ExecStmt::None; std::mem::swap(self, &mut value); return value; } fn is_none(&self) -> bool { match self { ExecStmt::None => return true, _ => return false, } } } pub struct ExecCtx { stmt: ExecStmt, } impl RunContext for ExecCtx { fn performed_put(&mut self, _port: EvalPortId) -> bool { match self.stmt.take() { ExecStmt::None => return false, ExecStmt::PerformedPut => return true, _ => unreachable!(), } } fn performed_get(&mut self, _port: EvalPortId) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedGet(value) => return Some(value), _ => unreachable!(), } } fn fires(&mut self, _port: EvalPortId) -> Option { todo!("remove fires") } fn performed_fork(&mut self) -> Option { todo!("remove fork") } fn created_channel(&mut self) -> Option<(Value, Value)> { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::CreatedChannel(ports) => return Some(ports), _ => unreachable!(), } } } #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum Mode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components SyncFail, // something went wrong during sync mode (deadlocked, error, whatever) SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, Exit, } impl Mode { fn can_run(&self) -> bool { match self { Mode::NonSync | Mode::Sync => return true, Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit => return false, } } } pub(crate) struct CompPDL { pub mode: Mode, pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, pub sync_counter: u32, pub exec_ctx: ExecCtx, // TODO: Temporary field, simulates future plans of having one storage place // reserved per port. // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. pub inbox_main: Vec>, pub inbox_backup: Vec, } impl CompPDL { pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { let mut inbox_main = Vec::new(); inbox_main.reserve(num_ports); for _ in 0..num_ports { inbox_main.push(None); } return Self{ mode: Mode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), sync_counter: 0, exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, inbox_main, inbox_backup: Vec::new(), } } pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { sched_ctx.log(&format!("handling message: {:#?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(!_should_remove); return; } match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { self.handle_incoming_control_message(sched_ctx, comp_ctx, message); }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); } } } // ------------------------------------------------------------------------- // Running component and handling changes in global component state // ------------------------------------------------------------------------- pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; let can_run = self.mode.can_run(); sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run)); if !can_run { return Ok(CompScheduling::Sleep); } let run_result = self.execute_prompt(&sched_ctx)?; match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); let scheduling = self.handle_sync_end(sched_ctx, comp_ctx); return Ok(scheduling.unwrap_or(CompScheduling::Immediate)); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); let port_index = comp_ctx.get_port_index(port_id).unwrap(); if let Some(message) = &self.inbox_main[port_index] { // Check if we can actually receive the message if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); } else { self.mode = Mode::SyncFail; return Ok(CompScheduling::Sleep); } } else { // We need to wait self.mode = Mode::BlockedGet; self.mode_port = port_id; return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); let port_id = port_id_from_eval(port_id); let port_info = comp_ctx.get_port(port_id); if port_info.state == PortState::Blocked { todo!("handle blocked port"); } self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); self.exec_ctx.stmt = ExecStmt::PerformedPut; return Ok(CompScheduling::Immediate); }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { self.handle_component_exit(sched_ctx, comp_ctx); return Ok(CompScheduling::Exit); }, EC::SyncBlockStart => { debug_assert_eq!(self.mode, Mode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); self.create_component_and_transfer_ports2( sched_ctx, comp_ctx, definition_id, monomorph_idx, arguments ); return Ok(CompScheduling::Requeue); }, EC::NewChannel => { debug_assert_eq!(self.mode, Mode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( Value::Output(port_id_to_eval(channel.putter_id)), Value::Input(port_id_to_eval(channel.getter_id)) )); self.inbox_main.push(None); self.inbox_main.push(None); return Ok(CompScheduling::Immediate); } } } fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; while let EvalContinuation::Stepping = step_result { step_result = self.prompt.step( &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, )?; } return Ok(step_result) } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component starting sync mode"); self.consensus.notify_sync_start(comp_ctx); debug_assert_eq!(self.mode, Mode::NonSync); self.mode = Mode::Sync; } fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); self.handle_sync_decision(sched_ctx, comp_ctx, decision) } fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option { debug_assert_eq!(self.mode, Mode::Sync); let is_success = match decision { SyncRoundDecision::None => { // No decision yet return None; }, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; // If here then we've reached a decision if is_success { self.mode = Mode::NonSync; self.consensus.notify_sync_decision(decision); return None; } else { todo!("handle this better, show some kind of error"); self.mode = Mode::Exit; self.handle_component_exit(sched_ctx, comp_ctx); return Some(CompScheduling::Exit); } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync // Note: for now we have that the scheduler handles exiting. I don't // know if that is a good idea, we'll see self.mode = Mode::Exit; } // ------------------------------------------------------------------------- // Handling messages // ------------------------------------------------------------------------- fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { let port_info = comp_ctx.get_port(source_port_id); let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); } /// Handles a message that came in through the public inbox. This function /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { // Check if we can insert it directly into the storage associated with // the port let target_port_id = message.data_header.target_port; let port_index = comp_ctx.get_port_index(target_port_id).unwrap(); if self.inbox_main[port_index].is_none() { self.inbox_main[port_index] = Some(message); // After direct insertion, check if this component's execution is // blocked on receiving a message on that port debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { // We were indeed blocked self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); } return; } // The direct inbox is full, so the port will become (or was already) blocked let port_info = &mut comp_ctx.ports[port_index]; debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); let _peer_comp_id = port_info.peer_comp_id; if port_info.state == PortState::Open { let (target_comp_id, block_message) = self.control.set_port_and_peer_blocked(target_port_id, comp_ctx); debug_assert_eq!(_peer_comp_id, target_comp_id); let peer = comp_ctx.get_peer(target_comp_id); peer.handle.send_message(sched_ctx, Message::Control(block_message), true); } // But we still need to remember the message, so: self.inbox_backup.push(message); } /// Handles when a message has been handed off from the inbox to the PDL /// code. We check to see if there are more messages waiting and, if not, /// then we handle the case where the port might have been blocked /// previously. fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { let port_index = comp_ctx.get_port_index(port_id).unwrap(); debug_assert!(self.inbox_main[port_index].is_none()); // because we just received it // Check for any more messages for message_index in 0..self.inbox_backup.len() { let message = &self.inbox_backup[message_index]; if message.data_header.target_port == port_id { // One more message for this port let message = self.inbox_backup.remove(message_index); debug_assert_eq!(comp_ctx.get_port(port_id).state, PortState::Blocked); // since we had >1 message on the port self.inbox_main[port_index] = Some(message); return; } } // Did not have any more messages. So if we were blocked, then we need // to send the "unblock" message. let port_info = &comp_ctx.ports[port_index]; if port_info.state == PortState::Blocked { let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx); let peer_info = comp_ctx.get_peer(peer_comp_id); peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { // Little local utility to send an Ack fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_port_id: PortId, peer_comp_id: CompId) { let peer_info = comp_ctx.get_peer(peer_comp_id); peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ id: causer_id, sender_comp_id: comp_ctx.id, target_port_id: None, content: ControlMessageContent::Ack, }), true); } // Handle the content of the control message, and optionally Ack it match message.content { ControlMessageContent::Ack => { let mut to_ack = message.id; loop { let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(!_should_remove); to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(to_schedule); // Note that the component is intentionally not // sleeping, so we just wake it up debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); let key = unsafe{ to_schedule.upgrade() }; sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); debug_assert!(!_should_remove); break; }, AckAction::None => { break; } } } }, ControlMessageContent::BlockPort(port_id) => { // On of our messages was accepted, but the port should be // blocked. let port_info = comp_ctx.get_port_mut(port_id); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state != PortState::Closed { debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages port_info.state = PortState::Blocked; } }, ControlMessageContent::ClosePort(port_id) => { // Request to close the port. We immediately comply and remove // the component handle as well let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = &mut comp_ctx.ports[port_index]; let peer_port_id = port_info.peer_id; let peer_comp_id = port_info.peer_comp_id; port_info.state = PortState::Closed; let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap(); let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports -= 1; if peer_info.num_associated_ports == 0 { // TODO: @Refactor clean up all these uses of "num_associated_ports" let should_remove = peer_info.handle.decrement_users(); if should_remove { let comp_key = unsafe{ peer_info.id.upgrade() }; sched_ctx.runtime.destroy_component(comp_key); } comp_ctx.peers.remove(peer_index); } send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); } ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) let port_info = comp_ctx.get_port(port_id); debug_assert_eq!(port_info.kind, PortKind::Putter); debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); if port_info.state == PortState::Blocked { self.unblock_local_port(sched_ctx, comp_ctx, port_id); } }, ControlMessageContent::PortPeerChangedBlock(port_id) => { // The peer of our port has just changed. So we are asked to // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); if port_info.state == PortState::Open { port_info.state = PortState::Blocked; } let peer_port_id = port_info.peer_id; let peer_comp_id = port_info.peer_comp_id; send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); }, ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); let old_peer_comp_id = port_info.peer_comp_id; debug_assert!(port_info.state == PortState::Blocked); port_info.peer_comp_id = new_comp_id; comp_ctx.add_peer(sched_ctx, new_comp_id, None); comp_ctx.remove_peer(sched_ctx, old_peer_comp_id); self.unblock_local_port(sched_ctx, comp_ctx, port_id); } } } fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); return self.handle_sync_decision(sched_ctx, comp_ctx, decision); } // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- /// Marks the local port as being unblocked. If the execution was blocked on /// sending a message over this port, then execution will continue and the /// message will be sent. fn unblock_local_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { let port_info = comp_ctx.get_port_mut(port_id); debug_assert_eq!(port_info.state, PortState::Blocked); port_info.state = PortState::Open; if self.mode == Mode::BlockedPut && port_id == self.mode_port { // We were blocked on the port that just became unblocked, so // send the message. debug_assert_eq!(port_info.kind, PortKind::Putter); let mut replacement = ValueGroup::default(); std::mem::swap(&mut replacement, &mut self.mode_value); self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); } } fn create_component_and_transfer_ports2( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup ) { struct PortPair{ creator: PortId, created: PortId } let mut port_id_pairs = Vec::new(); let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are // associated with the component we're about to create let mut arg_iter = ValueGroupIter::new(&mut arguments); while let Some(port_reference) = arg_iter.next() { // Create port entry for new component let creator_port_id = port_reference.id; let creator_port = creator_ctx.get_port(creator_port_id); let created_port = created_ctx.adopt_port(creator_port); let created_port_id = created_port.self_id; port_id_pairs.push(PortPair{ creator: creator_port_id, created: created_port_id, }); // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) let arg_value = if let Some(heap_pos) = port_reference.heap_pos { &mut arg_iter.group.regions[heap_pos][port_reference.index] } else { &mut arg_iter.group.values[port_reference.index] }; match arg_value { Value::Input(id) => *id = port_id_to_eval(created_port_id), Value::Output(id) => *id = port_id_to_eval(created_port_id), _ => unreachable!(), } } // For each transferred port pair set their peer components to the // correct values. This will only change the values for the ports of // the new component. let mut created_component_has_remote_peers = false; for pair in port_id_pairs.iter() { let creator_port_info = creator_ctx.get_port(pair.creator); let created_port_info = created_ctx.get_port_mut(pair.created); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well let created_peer_port_index = port_id_pairs .iter() .position(|v| v.creator == creator_port_info.peer_id); match created_peer_port_index { Some(created_peer_port_index) => { // Peer port moved to the new component as well let peer_pair = &port_id_pairs[created_peer_port_index]; created_port_info.peer_id = peer_pair.created; created_port_info.peer_comp_id = reservation.id(); }, None => { // Peer port remains with creator component. created_port_info.peer_comp_id = creator_ctx.id; created_ctx.add_peer(sched_ctx, creator_ctx.id, None); } } } else { // Peer is a different component let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id); created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle)); created_component_has_remote_peers = true; } } // We'll now actually turn our reservation for a new component into an // actual component. Note that we initialize it as "not sleeping" as // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. let prompt = Prompt::new( &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, definition_id, monomorph_index, arguments, ); let component = CompPDL::new(prompt, port_id_pairs.len()); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); let created_ctx = &component.ctx; // Now modify the creator's ports: remove every transferred port and // potentially remove the peer component. Here is also where we will // transfer messages in the main inbox. for pair in port_id_pairs.iter() { // Remove peer if appropriate let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap(); let creator_port_info = creator_ctx.ports.remove(creator_port_index); if creator_port_info.peer_comp_id != creator_ctx.id { creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id); } // Transfer any messages let created_port_index = created_ctx.get_port_index(pair.created).unwrap(); let created_port_info = &created_ctx.ports[created_port_index]; debug_assert!(component.code.inbox_main[created_port_index].is_none()); if let Some(mut message) = self.inbox_main.remove(creator_port_index) { message.data_header.target_port = pair.created; component.code.inbox_main[created_port_index] = Some(message); } let mut message_index = 0; while message_index < self.inbox_backup.len() { let message = &self.inbox_backup[message_index]; if message.data_header.target_port == pair.creator { // transfer message let mut message = self.inbox_backup.remove(message_index); message.data_header.target_port = pair.created; component.code.inbox_backup.push(message); } else { message_index += 1; } } // Handle potential channel between creator and created component if created_port_info.peer_comp_id == creator_ctx.id { let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id); peer_port_info.peer_comp_id = created_ctx.id; creator_ctx.add_peer(sched_ctx, created_ctx.id, None); } } // By now all ports have been transferred. We'll now do any of the setup // for rerouting/messaging if created_component_has_remote_peers { let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); for pair in port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, pair.creator, pair.created, created_ctx.id, schedule_entry_id ); let peer_info = created_ctx.get_peer(port_info.peer_comp_id); peer_info.handle.send_message(sched_ctx, message, true); } } } else { // Peer can be scheduled immediately sched_ctx.runtime.enqueue_work(created_key); } } /// Removes a port from a component. Also decrements the port counter in /// the peer component's entry. If that hits 0 then it will be removed and /// returned. If returned then the caller is responsible for decrementing /// the atomic counters of the peer component's handle. fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option) { let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = comp_ctx.ports.remove(port_index); // If the component owns the peer, then we don't have to decrement the // number of peers (because we don't have an entry for ourselves) if port_info.peer_comp_id == comp_ctx.id { return (port_info, None); } let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports -= 1; // Check if we still have other ports referencing this peer if peer_info.num_associated_ports != 0 { return (port_info, None); } let peer_info = comp_ctx.peers.remove(peer_index); return (port_info, Some(peer_info)); } /// Only adds/updates a peer for a given port. This function assumes (but /// does not check!) that the port was not considered to belong to that peer /// before calling this function. fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) { match comp_ctx.get_peer_index(peer_id) { Some(peer_index) => { let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports += 1; }, None => { let handle = sched_ctx.runtime.get_component_public(peer_id); comp_ctx.peers.push(Peer{ id: peer_id, num_associated_ports: 1, handle, }); } } } fn change_port_peer_component( &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId, new_peer_comp_id: CompId ) { let port_info = comp_ctx.get_port_mut(port_id); let cur_peer_comp_id = port_info.peer_comp_id; let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id); cur_peer_info.num_associated_ports -= 1; if cur_peer_info.num_associated_ports == 0 { let should_remove = cur_peer_info.handle.decrement_users(); if should_remove { let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() }; sched_ctx.runtime.destroy_component(cur_peer_comp_key); } } } } #[inline] fn port_id_from_eval(port_id: EvalPortId) -> PortId { return PortId(port_id.id); } #[inline] fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortId(port_id.id); for prev_port in ports.iter() { if *prev_port == cur_port { // Already added return; } } ports.push(cur_port); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } } struct ValueGroupIter<'a> { group: &'a mut ValueGroup, heap_stack: Vec<(usize, usize)>, index: usize, } impl<'a> ValueGroupIter<'a> { fn new(group: &'a mut ValueGroup) -> Self { return Self{ group, heap_stack: Vec::new(), index: 0 } } } struct ValueGroupPortRef { id: PortId, heap_pos: Option, // otherwise: on stack index: usize, } impl<'a> Iterator for ValueGroupIter<'a> { type Item = ValueGroupPortRef; fn next(&mut self) -> Option { // Enter loop that keeps iterating until a port is found loop { if let Some(pos) = self.heap_stack.last() { let (heap_pos, region_index) = *pos; if region_index >= self.group.regions[heap_pos].len() { self.heap_stack.pop(); continue; } let value = &self.group.regions[heap_pos][region_index]; self.heap_stack.last_mut().unwrap().1 += 1; match value { Value::Input(id) | Value::Output(id) => { let id = PortId(id.id); return Some(ValueGroupPortRef{ id, heap_pos: Some(heap_pos), index: region_index, }); }, _ => {}, } if let Some(heap_pos) = value.get_heap_pos() { self.heap_stack.push((heap_pos as usize, 0)); } } else { if self.index >= self.group.values.len() { return None; } let value = &mut self.group.values[self.index]; self.index += 1; match value { Value::Input(id) | Value::Output(id) => { let id = PortId(id.id); return Some(ValueGroupPortRef{ id, heap_pos: None, index: self.index - 1 }); }, _ => {}, } // Not a port, check if we need to enter a heap region if let Some(heap_pos) = value.get_heap_pos() { self.heap_stack.push((heap_pos as usize, 0)); } // else: just consider the next value } } } }