diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 2a6a9283b31b053a5aef528984a648f10cdddac5..c2c4b13b725990310eb9c7d7454f5e5dbac4f7bb 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -11,6 +11,7 @@ use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; use super::*; +use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -21,159 +22,6 @@ pub enum CompScheduling { Exit, } -pub struct CompCtx { - pub id: CompId, - pub ports: Vec, - pub peers: Vec, - pub messages: Vec>, // same size as "ports" - pub port_id_counter: u32, -} - -impl CompCtx { - pub(crate) fn new(reservation: &CompReserved) -> Self { - return Self{ - id: reservation.id(), - ports: Vec::new(), - peers: Vec::new(), - messages: Vec::new(), - port_id_counter: 0, - } - } -} - -struct MessageView<'a> { - index: usize, - pub message: &'a DataMessage, -} - -impl CompCtx { - /// Creates a new channel that is fully owned by the component associated - /// with this context. - fn create_channel(&mut self) -> Channel { - let putter_id = PortId(self.take_port_id()); - let getter_id = PortId(self.take_port_id()); - self.ports.push(Port{ - self_id: putter_id, - peer_id: getter_id, - kind: PortKind::Putter, - state: PortState::Open, - peer_comp_id: self.id, - }); - self.ports.push(Port{ - self_id: getter_id, - peer_id: putter_id, - kind: PortKind::Getter, - state: PortState::Open, - peer_comp_id: self.id, - }); - - return Channel{ putter_id, getter_id }; - } - - /// Adopts a port transferred by another component. Essentially copies all - /// port data but creates a new ID. Caller should ensure that the other - /// endpoint becomes aware of this ID. - fn adopt_port(&mut self, to_transfer: &Port) -> &mut Port { - let port_id = PortId(self.take_port_id()); - let port_index = self.ports.len(); - self.ports.push(Port{ - self_id: port_id, - peer_id: to_transfer.peer_id, - kind: to_transfer.kind, - state: to_transfer.state, - peer_comp_id: to_transfer.peer_comp_id, - }); - return &mut self.ports[port_index]; - } - - /// Adds a peer (or increments the "associated port" counter). Hence caller - /// must make sure that this makes sense. - fn add_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId, peer_handle: Option<&CompHandle>) { - match self.get_peer_index(peer_id) { - Some(peer_index) => { - let peer_info = &mut self.peers[peer_index]; - peer_info.num_associated_ports += 1; - }, - None => { - let handle = if let Some(handle) = peer_handle { - handle.clone() - } else { - sched_ctx.runtime.get_component_public(peer_id) - }; - - self.peers.push(Peer{ - id: peer_id, - num_associated_ports: 1, - handle, - }) - } - } - } - - /// Removes a peer (or decrements the "associated port" counter). If there - /// are no more references to the peer then the handle will be destroyed. - fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId) { - let peer_index = self.get_peer_index(peer_id).unwrap(); - let peer_info = &mut self.peers[peer_index]; - peer_info.num_associated_ports -= 1; - - if peer_info.num_associated_ports == 0 { - let mut peer = self.peers.remove(peer_index); - let should_remove = peer.handle.decrement_users(); - if should_remove { - let key = unsafe{ peer.id.upgrade() }; - sched_ctx.runtime.destroy_component(key); - } - } - } - - pub(crate) fn get_port(&self, port_id: PortId) -> &Port { - let index = self.get_port_index(port_id).unwrap(); - return &self.ports[index]; - } - - pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port { - let index = self.get_port_index(port_id).unwrap(); - return &mut self.ports[index]; - } - - pub(crate) fn get_port_index(&self, port_id: PortId) -> Option { - for (index, port) in self.ports.iter().enumerate() { - if port.self_id == port_id { - return Some(index); - } - } - - return None; - } - - pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer { - let index = self.get_peer_index(peer_id).unwrap(); - return &self.peers[index]; - } - - fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer { - let index = self.get_peer_index(peer_id).unwrap(); - return &mut self.peers[index]; - } - - pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option { - for (index, peer) in self.peers.iter().enumerate() { - if peer.id == peer_id { - return Some(index); - } - } - - return None; - } - - fn take_port_id(&mut self) -> u32 { - let port_id = self.port_id_counter; - self.port_id_counter = self.port_id_counter.wrapping_add(1); - return port_id; - } -} - pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, @@ -242,6 +90,7 @@ pub(crate) enum Mode { SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, + StartExit, // temp state Exit, } @@ -250,7 +99,7 @@ impl Mode { match self { Mode::NonSync | Mode::Sync => return true, - Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit => + Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::StartExit | Mode::Exit => return false, } } @@ -303,7 +152,7 @@ impl CompPDL { let mut target = sched_ctx.runtime.get_component_public(new_target); target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); - debug_assert!(!_should_remove); + debug_assert!(_should_remove.is_none()); return; } @@ -327,6 +176,11 @@ impl CompPDL { pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; + if self.mode == Mode::StartExit { + self.mode = Mode::Exit; + return Ok(CompScheduling::Exit); + } + let can_run = self.mode.can_run(); sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run)); if !can_run { @@ -349,13 +203,15 @@ impl CompPDL { debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); - let port_index = comp_ctx.get_port_index(port_id).unwrap(); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_index = comp_ctx.get_port_index(port_handle); if let Some(message) = &self.inbox_main[port_index] { // Check if we can actually receive the message if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); + self.handle_received_data_message(sched_ctx, comp_ctx, port_handle); self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); @@ -373,11 +229,12 @@ impl CompPDL { EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); let port_id = port_id_from_eval(port_id); - let port_info = comp_ctx.get_port(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); if port_info.state == PortState::Blocked { todo!("handle blocked port"); } - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); self.exec_ctx.stmt = ExecStmt::PerformedPut; return Ok(CompScheduling::Immediate); }, @@ -393,7 +250,7 @@ impl CompPDL { }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); - self.create_component_and_transfer_ports2( + self.create_component_and_transfer_ports( sched_ctx, comp_ctx, definition_id, monomorph_idx, arguments ); @@ -436,11 +293,13 @@ impl CompPDL { fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.mode = Mode::SyncEnd; self.handle_sync_decision(sched_ctx, comp_ctx, decision) } fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.mode, Mode::SyncEnd); + sched_ctx.log(&format!("Handling sync decision: {:?}", decision)); let is_success = match decision { SyncRoundDecision::None => { // No decision yet @@ -451,14 +310,14 @@ impl CompPDL { }; // If here then we've reached a decision + self.mode = Mode::NonSync; if is_success { - self.mode = Mode::NonSync; self.consensus.notify_sync_decision(decision); return None; } else { todo!("handle this better, show some kind of error"); - self.mode = Mode::Exit; self.handle_component_exit(sched_ctx, comp_ctx); + self.mode = Mode::Exit; return Some(CompScheduling::Exit); } } @@ -476,9 +335,10 @@ impl CompPDL { // Handling messages // ------------------------------------------------------------------------- - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { - let port_info = comp_ctx.get_port(source_port_id); - let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); + fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) { + let port_info = comp_ctx.get_port(source_port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); } @@ -490,13 +350,14 @@ impl CompPDL { // Check if we can insert it directly into the storage associated with // the port let target_port_id = message.data_header.target_port; - let port_index = comp_ctx.get_port_index(target_port_id).unwrap(); + let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_index = comp_ctx.get_port_index(port_handle); if self.inbox_main[port_index].is_none() { self.inbox_main[port_index] = Some(message); // After direct insertion, check if this component's execution is // blocked on receiving a message on that port - debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly + debug_assert_ne!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // because we could insert directly if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { // We were indeed blocked self.mode = Mode::Sync; @@ -507,17 +368,16 @@ impl CompPDL { } // The direct inbox is full, so the port will become (or was already) blocked - let port_info = &mut comp_ctx.ports[port_index]; + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); - let _peer_comp_id = port_info.peer_comp_id; if port_info.state == PortState::Open { - let (target_comp_id, block_message) = - self.control.set_port_and_peer_blocked(target_port_id, comp_ctx); - debug_assert_eq!(_peer_comp_id, target_comp_id); + comp_ctx.set_port_state(port_handle, PortState::Blocked); + let (peer_handle, message) = + self.control.initiate_port_blocking(comp_ctx, port_handle); - let peer = comp_ctx.get_peer(target_comp_id); - peer.handle.send_message(sched_ctx, Message::Control(block_message), true); + let peer = comp_ctx.get_peer(peer_handle); + peer.handle.send_message(sched_ctx, Message::Control(message), true); } // But we still need to remember the message, so: @@ -528,36 +388,38 @@ impl CompPDL { /// code. We check to see if there are more messages waiting and, if not, /// then we handle the case where the port might have been blocked /// previously. - fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { - let port_index = comp_ctx.get_port_index(port_id).unwrap(); - debug_assert!(self.inbox_main[port_index].is_none()); // because we just received it + fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { + let port_index = comp_ctx.get_port_index(port_handle); + debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out // Check for any more messages + let port_info = comp_ctx.get_port(port_handle); for message_index in 0..self.inbox_backup.len() { let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == port_id { + if message.data_header.target_port == port_info.self_id { // One more message for this port let message = self.inbox_backup.remove(message_index); - debug_assert_eq!(comp_ctx.get_port(port_id).state, PortState::Blocked); // since we had >1 message on the port + debug_assert_eq!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // since we had >1 message on the port self.inbox_main[port_index] = Some(message); + return; } } // Did not have any more messages. So if we were blocked, then we need // to send the "unblock" message. - let port_info = &comp_ctx.ports[port_index]; if port_info.state == PortState::Blocked { - let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx); - let peer_info = comp_ctx.get_peer(peer_comp_id); + comp_ctx.set_port_state(port_handle, PortState::Open); + let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); + let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { // Little local utility to send an Ack - fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_port_id: PortId, peer_comp_id: CompId) { - let peer_info = comp_ctx.get_peer(peer_comp_id); + fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) { + let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ id: causer_id, sender_comp_id: comp_ctx.id, @@ -578,7 +440,7 @@ impl CompPDL { let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); - debug_assert!(!_should_remove); + debug_assert!(_should_remove.is_none()); to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { @@ -591,7 +453,7 @@ impl CompPDL { let key = unsafe{ to_schedule.upgrade() }; sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); - debug_assert!(!_should_remove); + debug_assert!(_should_remove.is_none()); break; }, AckAction::None => { @@ -603,45 +465,32 @@ impl CompPDL { ControlMessageContent::BlockPort(port_id) => { // On of our messages was accepted, but the port should be // blocked. - let port_info = comp_ctx.get_port_mut(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state != PortState::Closed { - debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages - port_info.state = PortState::Blocked; + comp_ctx.set_port_state(port_handle, PortState::Blocked); } }, ControlMessageContent::ClosePort(port_id) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_index = comp_ctx.get_port_index(port_id).unwrap(); - let port_info = &mut comp_ctx.ports[port_index]; - let peer_port_id = port_info.peer_id; - let peer_comp_id = port_info.peer_comp_id; - port_info.state = PortState::Closed; - - let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap(); - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports -= 1; - if peer_info.num_associated_ports == 0 { - // TODO: @Refactor clean up all these uses of "num_associated_ports" - let should_remove = peer_info.handle.decrement_users(); - if should_remove { - let comp_key = unsafe{ peer_info.id.upgrade() }; - sched_ctx.runtime.destroy_component(comp_key); - } + let port_handle = comp_ctx.get_port_handle(port_id); + let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - comp_ctx.peers.remove(peer_index); - } - - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); - } + comp_ctx.set_port_state(port_handle, PortState::Closed); + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); + comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id); + }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) - let port_info = comp_ctx.get_port(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); if port_info.state == PortState::Blocked { - self.unblock_local_port(sched_ctx, comp_ctx, port_id); + self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); } }, ControlMessageContent::PortPeerChangedBlock(port_id) => { @@ -650,43 +499,50 @@ impl CompPDL { // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_info = comp_ctx.get_port_mut(port_id); - debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); - if port_info.state == PortState::Open { - port_info.state = PortState::Blocked; - } + let port_handle = comp_ctx.get_port_handle(port_id); + comp_ctx.set_port_state(port_handle, PortState::Blocked); + + let port_info = comp_ctx.get_port(port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_port_id = port_info.peer_id; - let peer_comp_id = port_info.peer_comp_id; - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); }, - ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_info = comp_ctx.get_port_mut(port_id); - let old_peer_comp_id = port_info.peer_comp_id; + ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { + let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state == PortState::Blocked); + let old_peer_id = port_info.peer_comp_id; + + comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id); + + let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = new_comp_id; - comp_ctx.add_peer(sched_ctx, new_comp_id, None); - comp_ctx.remove_peer(sched_ctx, old_peer_comp_id); - self.unblock_local_port(sched_ctx, comp_ctx, port_id); + port_info.peer_port_id = new_port_id; + comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); + self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); } } } - fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option { + fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - return self.handle_sync_decision(sched_ctx, comp_ctx, decision); + debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncEnd); + self.handle_sync_decision(sched_ctx, comp_ctx, decision); + if self.mode == Mode::Exit { + // TODO: Bit hacky, move this around + self.mode = Mode::StartExit; + } } // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- - /// Marks the local port as being unblocked. If the execution was blocked on - /// sending a message over this port, then execution will continue and the - /// message will be sent. - fn unblock_local_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { - let port_info = comp_ctx.get_port_mut(port_id); + /// Unblocks a port, potentially continuing execution of the component, in + /// response to a message that told us to unblock a previously blocked + fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { + let port_info = comp_ctx.get_port_mut(port_handle); + let port_id = port_info.self_id; debug_assert_eq!(port_info.state, PortState::Blocked); port_info.state = PortState::Open; @@ -696,19 +552,24 @@ impl CompPDL { debug_assert_eq!(port_info.kind, PortKind::Putter); let mut replacement = ValueGroup::default(); std::mem::swap(&mut replacement, &mut self.mode_value); - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement); self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); } } - fn create_component_and_transfer_ports2( + fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup ) { - struct PortPair{ creator: PortId, created: PortId } + struct PortPair{ + creator_handle: LocalPortHandle, + creator_id: PortId, + created_handle: LocalPortHandle, + created_id: PortId, + } let mut port_id_pairs = Vec::new(); let reservation = sched_ctx.runtime.start_create_pdl_component(); @@ -721,13 +582,20 @@ impl CompPDL { while let Some(port_reference) = arg_iter.next() { // Create port entry for new component let creator_port_id = port_reference.id; - let creator_port = creator_ctx.get_port(creator_port_id); - let created_port = created_ctx.adopt_port(creator_port); + let creator_port_handle = creator_ctx.get_port_handle(creator_port_id); + let creator_port = creator_ctx.get_port(creator_port_handle); + let created_port_handle = created_ctx.add_port( + creator_port.peer_comp_id, creator_port.peer_port_id, + creator_port.kind, creator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; port_id_pairs.push(PortPair{ - creator: creator_port_id, - created: created_port_id, + creator_handle: creator_port_handle, + creator_id: creator_port_id, + created_handle: created_port_handle, + created_id: created_port_id, }); // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) @@ -749,31 +617,34 @@ impl CompPDL { let mut created_component_has_remote_peers = false; for pair in port_id_pairs.iter() { - let creator_port_info = creator_ctx.get_port(pair.creator); - let created_port_info = created_ctx.get_port_mut(pair.created); + let creator_port_info = creator_ctx.get_port(pair.creator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well let created_peer_port_index = port_id_pairs .iter() - .position(|v| v.creator == creator_port_info.peer_id); + .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { - // Peer port moved to the new component as well + // Peer port moved to the new component as well. So + // adjust IDs appropriately. let peer_pair = &port_id_pairs[created_peer_port_index]; - created_port_info.peer_id = peer_pair.created; + created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); + todo!("either add 'self peer', or remove that idea from Ctx altogether") }, None => { // Peer port remains with creator component. created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.add_peer(sched_ctx, creator_ctx.id, None); + created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None); } } } else { // Peer is a different component - let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id); - created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle)); + let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); + let peer_info = creator_ctx.get_peer(peer_handle); + created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle)); created_component_has_remote_peers = true; } } @@ -797,28 +668,27 @@ impl CompPDL { // transfer messages in the main inbox. for pair in port_id_pairs.iter() { // Remove peer if appropriate - let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap(); - let creator_port_info = creator_ctx.ports.remove(creator_port_index); - if creator_port_info.peer_comp_id != creator_ctx.id { - creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id); - } + let creator_port_info = creator_ctx.get_port(pair.creator_handle); + let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); + creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_port_info.peer_comp_id); + creator_ctx.remove_port(pair.creator_handle); // Transfer any messages - let created_port_index = created_ctx.get_port_index(pair.created).unwrap(); - let created_port_info = &created_ctx.ports[created_port_index]; + let created_port_index = created_ctx.get_port_index(pair.created_handle); + let created_port_info = created_ctx.get_port(pair.created_handle); debug_assert!(component.code.inbox_main[created_port_index].is_none()); if let Some(mut message) = self.inbox_main.remove(creator_port_index) { - message.data_header.target_port = pair.created; + message.data_header.target_port = pair.created_id; component.code.inbox_main[created_port_index] = Some(message); } let mut message_index = 0; while message_index < self.inbox_backup.len() { let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.creator { + if message.data_header.target_port == pair.creator_id { // transfer message let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created; + message.data_header.target_port = pair.created_id; component.code.inbox_backup.push(message); } else { message_index += 1; @@ -827,9 +697,10 @@ impl CompPDL { // Handle potential channel between creator and created component if created_port_info.peer_comp_id == creator_ctx.id { - let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id); + let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); + let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); peer_port_info.peer_comp_id = created_ctx.id; - creator_ctx.add_peer(sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(pair.created_handle, sched_ctx, created_ctx.id, None); } } @@ -838,14 +709,15 @@ impl CompPDL { if created_component_has_remote_peers { let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); for pair in port_id_pairs.iter() { - let port_info = created_ctx.get_port(pair.created); + let port_info = created_ctx.get_port(pair.created_handle); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( - creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, - pair.creator, pair.created, created_ctx.id, + creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, + pair.creator_id, pair.created_id, created_ctx.id, schedule_entry_id ); - let peer_info = created_ctx.get_peer(port_info.peer_comp_id); + let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = created_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, message, true); } } @@ -854,72 +726,6 @@ impl CompPDL { sched_ctx.runtime.enqueue_work(created_key); } } - - /// Removes a port from a component. Also decrements the port counter in - /// the peer component's entry. If that hits 0 then it will be removed and - /// returned. If returned then the caller is responsible for decrementing - /// the atomic counters of the peer component's handle. - fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option) { - let port_index = comp_ctx.get_port_index(port_id).unwrap(); - let port_info = comp_ctx.ports.remove(port_index); - - // If the component owns the peer, then we don't have to decrement the - // number of peers (because we don't have an entry for ourselves) - if port_info.peer_comp_id == comp_ctx.id { - return (port_info, None); - } - - let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports -= 1; - - // Check if we still have other ports referencing this peer - if peer_info.num_associated_ports != 0 { - return (port_info, None); - } - - let peer_info = comp_ctx.peers.remove(peer_index); - return (port_info, Some(peer_info)); - } - - /// Only adds/updates a peer for a given port. This function assumes (but - /// does not check!) that the port was not considered to belong to that peer - /// before calling this function. - fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) { - match comp_ctx.get_peer_index(peer_id) { - Some(peer_index) => { - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports += 1; - }, - None => { - let handle = sched_ctx.runtime.get_component_public(peer_id); - comp_ctx.peers.push(Peer{ - id: peer_id, - num_associated_ports: 1, - handle, - }); - } - } - } - - fn change_port_peer_component( - &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, - port_id: PortId, new_peer_comp_id: CompId - ) { - let port_info = comp_ctx.get_port_mut(port_id); - let cur_peer_comp_id = port_info.peer_comp_id; - let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id); - cur_peer_info.num_associated_ports -= 1; - - if cur_peer_info.num_associated_ports == 0 { - let should_remove = cur_peer_info.handle.decrement_users(); - if should_remove { - let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() }; - sched_ctx.runtime.destroy_component(cur_peer_comp_key); - - } - } - } } #[inline]