diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 27cd90aa4cbd3e9b6bd6740aa221d885b378eef5..2d0aee324b8730312c3a22e5c81f677fd31e7e5c 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -17,12 +17,6 @@ impl PortId { } } -pub struct Peer { - pub id: CompId, - pub num_associated_ports: u32, - pub(crate) handle: CompHandle, -} - #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PortKind { Putter, @@ -36,15 +30,6 @@ pub enum PortState { Closed, } -#[derive(Debug)] -pub struct Port { - pub self_id: PortId, - pub peer_id: PortId, // eventually consistent - pub kind: PortKind, - pub state: PortState, - pub peer_comp_id: CompId, // eventually consistent -} - pub struct Channel { pub putter_id: PortId, pub getter_id: PortId, diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs new file mode 100644 index 0000000000000000000000000000000000000000..84c5594683bb234fe23c173307648b1b895590cc --- /dev/null +++ b/src/runtime2/component/component_context.rs @@ -0,0 +1,245 @@ +use crate::runtime2::scheduler::*; +use crate::runtime2::runtime::*; +use crate::runtime2::communication::*; + +#[derive(Debug)] +pub struct Port { + pub self_id: PortId, + pub peer_comp_id: CompId, // eventually consistent + pub peer_port_id: PortId, // eventually consistent + pub kind: PortKind, + pub state: PortState, + #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, +} + +pub struct Peer { + pub id: CompId, + pub num_associated_ports: u32, + pub(crate) handle: CompHandle, +} + +/// Port and peer management structure. Will keep a local reference counter to +/// the ports associate with peers, additionally manages the atomic reference +/// counter associated with the peers' component handles. +pub struct CompCtx { + pub id: CompId, + ports: Vec, + peers: Vec, + port_id_counter: u32, +} + +#[derive(Copy, Clone)] +pub struct LocalPortHandle(PortId); + +#[derive(Copy, Clone)] +pub struct LocalPeerHandle(CompId); + +impl CompCtx { + /// Creates a new component context based on a reserved entry in the + /// component store. This reservation is used such that we already know our + /// assigned ID. + pub(crate) fn new(reservation: &CompReserved) -> Self { + return Self{ + id: reservation.id(), + ports: Vec::new(), + peers: Vec::new(), + port_id_counter: 0, + } + } + + /// Creates a new channel that is fully owned by the component associated + /// with this context. + pub(crate) fn create_channel(&mut self) -> Channel { + let putter_id = PortId(self.take_port_id()); + let getter_id = PortId(self.take_port_id()); + self.ports.push(Port{ + self_id: putter_id, + peer_port_id: getter_id, + kind: PortKind::Putter, + state: PortState::Open, + peer_comp_id: self.id, + associated_with_peer: false, + }); + self.ports.push(Port{ + self_id: getter_id, + peer_port_id: putter_id, + kind: PortKind::Getter, + state: PortState::Open, + peer_comp_id: self.id, + associated_with_peer: false, + }); + + return Channel{ putter_id, getter_id }; + } + + /// Adds a new port. Make sure to call `add_peer` afterwards. + pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle { + let self_id = PortId(self.take_port_id()); + self.ports.push(Port{ + self_id, peer_comp_id, peer_port_id, kind, state, + #[cfg(debug_assertions)] associated_with_peer: false, + }); + return LocalPortHandle(self_id); + } + + /// Removes a port. Make sure you called `remove_peer` first. + pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port { + let port_index = self.must_get_port_index(port_handle); + let port = self.ports.remove(port_index); + debug_assert!(!port.associated_with_peer); + return port; + } + + /// Adds a new peer. This must be called for every port, no matter the + /// component the channel is connected to. If a `CompHandle` is supplied, + /// then it will be used to add the peer. Otherwise it will be retrieved + /// from the runtime using its ID. + pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) { + let port = self.get_port_mut(port_handle); + debug_assert_eq!(port.peer_comp_id, peer_comp_id); + debug_assert!(!port.associated_with_peer); + if !self.requires_peer_reference(port) { + return; + } + + dbg_code!(port.associated_with_peer = true); + match self.get_peer_index_by_id(peer_comp_id) { + Some(peer_index) => { + let peer = &mut self.peers[peer_index]; + peer.num_associated_ports += 1; + }, + None => { + let handle = match handle { + Some(handle) => handle.clone(), + None => sched_ctx.runtime.get_component_public(peer_comp_id) + }; + self.peers.push(Peer{ + id: peer_comp_id, + num_associated_ports: 1, + handle, + }); + } + } + } + + /// Removes a peer associated with a port. + pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId) { + let port = self.get_port_mut(port_handle); + debug_assert_eq!(port.peer_comp_id, peer_id); + if !self.requires_peer_reference(port) { + return; + } + + debug_assert!(port.associated_with_peer); + dbg_code!(port.associated_with_peer = false); + let peer_index = self.get_peer_index_by_id(peer_id).unwrap(); + let peer = &mut self.peers[peer_index]; + peer.num_associated_ports -= 1; + if peer.num_associated_ports == 0 { + let mut peer = self.peers.remove(peer_index); + if let Some(key) = peer.handle.decrement_users() { + debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component + sched_ctx.runtime.destroy_component(key); + } + } + } + + pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) { + let port_info = self.get_port_mut(port_handle); + debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state + port_info.state = new_state; + } + + pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle { + return LocalPortHandle(port_id); + } + + // should perhaps be revised, used in main inbox + pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize { + return self.must_get_port_index(port_handle); + } + + pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle { + return LocalPeerHandle(peer_id); + } + + pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port { + let index = self.must_get_port_index(port_handle); + return &self.ports[index]; + } + + pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port { + let index = self.must_get_port_index(port_handle); + return &mut self.ports[index]; + } + + pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer { + let index = self.must_get_peer_index(peer_handle); + return &self.peers[index]; + } + + pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer { + let index = self.must_get_peer_index(peer_handle); + return &mut self.peers[index]; + } + + #[inline] + pub(crate) fn iter_ports(&self) -> impl Iterator { + return self.ports.iter(); + } + + #[inline] + pub(crate) fn iter_peers(&self) -> impl Iterator { + return self.peers.iter(); + } + + #[inline] + pub(crate) fn num_ports(&self) -> usize { + return self.ports.len(); + } + + // ------------------------------------------------------------------------- + // Local utilities + // ------------------------------------------------------------------------- + + #[inline] + fn requires_peer_reference(&self, port: &Port) -> bool { + return port.state == PortState::Closed; + } + + fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { + for (index, port) in self.ports.iter().enumerate() { + if port.self_id == handle.0 { + return index; + } + } + + unreachable!() + } + + fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize { + for (index, peer) in self.peers.iter().enumerate() { + if peer.id == handle.0 { + return index; + } + } + + unreachable!() + } + + fn get_peer_index_by_id(&self, comp_id: CompId) -> Option { + for (index, peer) in self.peers.iter().enumerate() { + if peer.id == comp_id { + return Some(index); + } + } + + return None; + } + + fn take_port_id(&mut self) -> u32 { + let port_id = self.port_id_counter; + self.port_id_counter = self.port_id_counter.wrapping_add(1); + return port_id; + } +} \ No newline at end of file diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 2a6a9283b31b053a5aef528984a648f10cdddac5..c2c4b13b725990310eb9c7d7454f5e5dbac4f7bb 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -11,6 +11,7 @@ use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; use super::*; +use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -21,159 +22,6 @@ pub enum CompScheduling { Exit, } -pub struct CompCtx { - pub id: CompId, - pub ports: Vec, - pub peers: Vec, - pub messages: Vec>, // same size as "ports" - pub port_id_counter: u32, -} - -impl CompCtx { - pub(crate) fn new(reservation: &CompReserved) -> Self { - return Self{ - id: reservation.id(), - ports: Vec::new(), - peers: Vec::new(), - messages: Vec::new(), - port_id_counter: 0, - } - } -} - -struct MessageView<'a> { - index: usize, - pub message: &'a DataMessage, -} - -impl CompCtx { - /// Creates a new channel that is fully owned by the component associated - /// with this context. - fn create_channel(&mut self) -> Channel { - let putter_id = PortId(self.take_port_id()); - let getter_id = PortId(self.take_port_id()); - self.ports.push(Port{ - self_id: putter_id, - peer_id: getter_id, - kind: PortKind::Putter, - state: PortState::Open, - peer_comp_id: self.id, - }); - self.ports.push(Port{ - self_id: getter_id, - peer_id: putter_id, - kind: PortKind::Getter, - state: PortState::Open, - peer_comp_id: self.id, - }); - - return Channel{ putter_id, getter_id }; - } - - /// Adopts a port transferred by another component. Essentially copies all - /// port data but creates a new ID. Caller should ensure that the other - /// endpoint becomes aware of this ID. - fn adopt_port(&mut self, to_transfer: &Port) -> &mut Port { - let port_id = PortId(self.take_port_id()); - let port_index = self.ports.len(); - self.ports.push(Port{ - self_id: port_id, - peer_id: to_transfer.peer_id, - kind: to_transfer.kind, - state: to_transfer.state, - peer_comp_id: to_transfer.peer_comp_id, - }); - return &mut self.ports[port_index]; - } - - /// Adds a peer (or increments the "associated port" counter). Hence caller - /// must make sure that this makes sense. - fn add_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId, peer_handle: Option<&CompHandle>) { - match self.get_peer_index(peer_id) { - Some(peer_index) => { - let peer_info = &mut self.peers[peer_index]; - peer_info.num_associated_ports += 1; - }, - None => { - let handle = if let Some(handle) = peer_handle { - handle.clone() - } else { - sched_ctx.runtime.get_component_public(peer_id) - }; - - self.peers.push(Peer{ - id: peer_id, - num_associated_ports: 1, - handle, - }) - } - } - } - - /// Removes a peer (or decrements the "associated port" counter). If there - /// are no more references to the peer then the handle will be destroyed. - fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId) { - let peer_index = self.get_peer_index(peer_id).unwrap(); - let peer_info = &mut self.peers[peer_index]; - peer_info.num_associated_ports -= 1; - - if peer_info.num_associated_ports == 0 { - let mut peer = self.peers.remove(peer_index); - let should_remove = peer.handle.decrement_users(); - if should_remove { - let key = unsafe{ peer.id.upgrade() }; - sched_ctx.runtime.destroy_component(key); - } - } - } - - pub(crate) fn get_port(&self, port_id: PortId) -> &Port { - let index = self.get_port_index(port_id).unwrap(); - return &self.ports[index]; - } - - pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port { - let index = self.get_port_index(port_id).unwrap(); - return &mut self.ports[index]; - } - - pub(crate) fn get_port_index(&self, port_id: PortId) -> Option { - for (index, port) in self.ports.iter().enumerate() { - if port.self_id == port_id { - return Some(index); - } - } - - return None; - } - - pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer { - let index = self.get_peer_index(peer_id).unwrap(); - return &self.peers[index]; - } - - fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer { - let index = self.get_peer_index(peer_id).unwrap(); - return &mut self.peers[index]; - } - - pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option { - for (index, peer) in self.peers.iter().enumerate() { - if peer.id == peer_id { - return Some(index); - } - } - - return None; - } - - fn take_port_id(&mut self) -> u32 { - let port_id = self.port_id_counter; - self.port_id_counter = self.port_id_counter.wrapping_add(1); - return port_id; - } -} - pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, @@ -242,6 +90,7 @@ pub(crate) enum Mode { SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, + StartExit, // temp state Exit, } @@ -250,7 +99,7 @@ impl Mode { match self { Mode::NonSync | Mode::Sync => return true, - Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit => + Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::StartExit | Mode::Exit => return false, } } @@ -303,7 +152,7 @@ impl CompPDL { let mut target = sched_ctx.runtime.get_component_public(new_target); target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); - debug_assert!(!_should_remove); + debug_assert!(_should_remove.is_none()); return; } @@ -327,6 +176,11 @@ impl CompPDL { pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; + if self.mode == Mode::StartExit { + self.mode = Mode::Exit; + return Ok(CompScheduling::Exit); + } + let can_run = self.mode.can_run(); sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run)); if !can_run { @@ -349,13 +203,15 @@ impl CompPDL { debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); - let port_index = comp_ctx.get_port_index(port_id).unwrap(); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_index = comp_ctx.get_port_index(port_handle); if let Some(message) = &self.inbox_main[port_index] { // Check if we can actually receive the message if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); + self.handle_received_data_message(sched_ctx, comp_ctx, port_handle); self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); @@ -373,11 +229,12 @@ impl CompPDL { EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); let port_id = port_id_from_eval(port_id); - let port_info = comp_ctx.get_port(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); if port_info.state == PortState::Blocked { todo!("handle blocked port"); } - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); self.exec_ctx.stmt = ExecStmt::PerformedPut; return Ok(CompScheduling::Immediate); }, @@ -393,7 +250,7 @@ impl CompPDL { }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); - self.create_component_and_transfer_ports2( + self.create_component_and_transfer_ports( sched_ctx, comp_ctx, definition_id, monomorph_idx, arguments ); @@ -436,11 +293,13 @@ impl CompPDL { fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.mode = Mode::SyncEnd; self.handle_sync_decision(sched_ctx, comp_ctx, decision) } fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.mode, Mode::SyncEnd); + sched_ctx.log(&format!("Handling sync decision: {:?}", decision)); let is_success = match decision { SyncRoundDecision::None => { // No decision yet @@ -451,14 +310,14 @@ impl CompPDL { }; // If here then we've reached a decision + self.mode = Mode::NonSync; if is_success { - self.mode = Mode::NonSync; self.consensus.notify_sync_decision(decision); return None; } else { todo!("handle this better, show some kind of error"); - self.mode = Mode::Exit; self.handle_component_exit(sched_ctx, comp_ctx); + self.mode = Mode::Exit; return Some(CompScheduling::Exit); } } @@ -476,9 +335,10 @@ impl CompPDL { // Handling messages // ------------------------------------------------------------------------- - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { - let port_info = comp_ctx.get_port(source_port_id); - let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); + fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) { + let port_info = comp_ctx.get_port(source_port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); } @@ -490,13 +350,14 @@ impl CompPDL { // Check if we can insert it directly into the storage associated with // the port let target_port_id = message.data_header.target_port; - let port_index = comp_ctx.get_port_index(target_port_id).unwrap(); + let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_index = comp_ctx.get_port_index(port_handle); if self.inbox_main[port_index].is_none() { self.inbox_main[port_index] = Some(message); // After direct insertion, check if this component's execution is // blocked on receiving a message on that port - debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly + debug_assert_ne!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // because we could insert directly if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { // We were indeed blocked self.mode = Mode::Sync; @@ -507,17 +368,16 @@ impl CompPDL { } // The direct inbox is full, so the port will become (or was already) blocked - let port_info = &mut comp_ctx.ports[port_index]; + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); - let _peer_comp_id = port_info.peer_comp_id; if port_info.state == PortState::Open { - let (target_comp_id, block_message) = - self.control.set_port_and_peer_blocked(target_port_id, comp_ctx); - debug_assert_eq!(_peer_comp_id, target_comp_id); + comp_ctx.set_port_state(port_handle, PortState::Blocked); + let (peer_handle, message) = + self.control.initiate_port_blocking(comp_ctx, port_handle); - let peer = comp_ctx.get_peer(target_comp_id); - peer.handle.send_message(sched_ctx, Message::Control(block_message), true); + let peer = comp_ctx.get_peer(peer_handle); + peer.handle.send_message(sched_ctx, Message::Control(message), true); } // But we still need to remember the message, so: @@ -528,36 +388,38 @@ impl CompPDL { /// code. We check to see if there are more messages waiting and, if not, /// then we handle the case where the port might have been blocked /// previously. - fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { - let port_index = comp_ctx.get_port_index(port_id).unwrap(); - debug_assert!(self.inbox_main[port_index].is_none()); // because we just received it + fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { + let port_index = comp_ctx.get_port_index(port_handle); + debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out // Check for any more messages + let port_info = comp_ctx.get_port(port_handle); for message_index in 0..self.inbox_backup.len() { let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == port_id { + if message.data_header.target_port == port_info.self_id { // One more message for this port let message = self.inbox_backup.remove(message_index); - debug_assert_eq!(comp_ctx.get_port(port_id).state, PortState::Blocked); // since we had >1 message on the port + debug_assert_eq!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // since we had >1 message on the port self.inbox_main[port_index] = Some(message); + return; } } // Did not have any more messages. So if we were blocked, then we need // to send the "unblock" message. - let port_info = &comp_ctx.ports[port_index]; if port_info.state == PortState::Blocked { - let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx); - let peer_info = comp_ctx.get_peer(peer_comp_id); + comp_ctx.set_port_state(port_handle, PortState::Open); + let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); + let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { // Little local utility to send an Ack - fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_port_id: PortId, peer_comp_id: CompId) { - let peer_info = comp_ctx.get_peer(peer_comp_id); + fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) { + let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ id: causer_id, sender_comp_id: comp_ctx.id, @@ -578,7 +440,7 @@ impl CompPDL { let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); - debug_assert!(!_should_remove); + debug_assert!(_should_remove.is_none()); to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { @@ -591,7 +453,7 @@ impl CompPDL { let key = unsafe{ to_schedule.upgrade() }; sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); - debug_assert!(!_should_remove); + debug_assert!(_should_remove.is_none()); break; }, AckAction::None => { @@ -603,45 +465,32 @@ impl CompPDL { ControlMessageContent::BlockPort(port_id) => { // On of our messages was accepted, but the port should be // blocked. - let port_info = comp_ctx.get_port_mut(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state != PortState::Closed { - debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages - port_info.state = PortState::Blocked; + comp_ctx.set_port_state(port_handle, PortState::Blocked); } }, ControlMessageContent::ClosePort(port_id) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_index = comp_ctx.get_port_index(port_id).unwrap(); - let port_info = &mut comp_ctx.ports[port_index]; - let peer_port_id = port_info.peer_id; - let peer_comp_id = port_info.peer_comp_id; - port_info.state = PortState::Closed; - - let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap(); - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports -= 1; - if peer_info.num_associated_ports == 0 { - // TODO: @Refactor clean up all these uses of "num_associated_ports" - let should_remove = peer_info.handle.decrement_users(); - if should_remove { - let comp_key = unsafe{ peer_info.id.upgrade() }; - sched_ctx.runtime.destroy_component(comp_key); - } + let port_handle = comp_ctx.get_port_handle(port_id); + let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - comp_ctx.peers.remove(peer_index); - } - - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); - } + comp_ctx.set_port_state(port_handle, PortState::Closed); + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); + comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id); + }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) - let port_info = comp_ctx.get_port(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); if port_info.state == PortState::Blocked { - self.unblock_local_port(sched_ctx, comp_ctx, port_id); + self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); } }, ControlMessageContent::PortPeerChangedBlock(port_id) => { @@ -650,43 +499,50 @@ impl CompPDL { // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_info = comp_ctx.get_port_mut(port_id); - debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); - if port_info.state == PortState::Open { - port_info.state = PortState::Blocked; - } + let port_handle = comp_ctx.get_port_handle(port_id); + comp_ctx.set_port_state(port_handle, PortState::Blocked); + + let port_info = comp_ctx.get_port(port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_port_id = port_info.peer_id; - let peer_comp_id = port_info.peer_comp_id; - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); }, - ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_info = comp_ctx.get_port_mut(port_id); - let old_peer_comp_id = port_info.peer_comp_id; + ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { + let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state == PortState::Blocked); + let old_peer_id = port_info.peer_comp_id; + + comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id); + + let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = new_comp_id; - comp_ctx.add_peer(sched_ctx, new_comp_id, None); - comp_ctx.remove_peer(sched_ctx, old_peer_comp_id); - self.unblock_local_port(sched_ctx, comp_ctx, port_id); + port_info.peer_port_id = new_port_id; + comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); + self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); } } } - fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option { + fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - return self.handle_sync_decision(sched_ctx, comp_ctx, decision); + debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncEnd); + self.handle_sync_decision(sched_ctx, comp_ctx, decision); + if self.mode == Mode::Exit { + // TODO: Bit hacky, move this around + self.mode = Mode::StartExit; + } } // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- - /// Marks the local port as being unblocked. If the execution was blocked on - /// sending a message over this port, then execution will continue and the - /// message will be sent. - fn unblock_local_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { - let port_info = comp_ctx.get_port_mut(port_id); + /// Unblocks a port, potentially continuing execution of the component, in + /// response to a message that told us to unblock a previously blocked + fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { + let port_info = comp_ctx.get_port_mut(port_handle); + let port_id = port_info.self_id; debug_assert_eq!(port_info.state, PortState::Blocked); port_info.state = PortState::Open; @@ -696,19 +552,24 @@ impl CompPDL { debug_assert_eq!(port_info.kind, PortKind::Putter); let mut replacement = ValueGroup::default(); std::mem::swap(&mut replacement, &mut self.mode_value); - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement); self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); } } - fn create_component_and_transfer_ports2( + fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup ) { - struct PortPair{ creator: PortId, created: PortId } + struct PortPair{ + creator_handle: LocalPortHandle, + creator_id: PortId, + created_handle: LocalPortHandle, + created_id: PortId, + } let mut port_id_pairs = Vec::new(); let reservation = sched_ctx.runtime.start_create_pdl_component(); @@ -721,13 +582,20 @@ impl CompPDL { while let Some(port_reference) = arg_iter.next() { // Create port entry for new component let creator_port_id = port_reference.id; - let creator_port = creator_ctx.get_port(creator_port_id); - let created_port = created_ctx.adopt_port(creator_port); + let creator_port_handle = creator_ctx.get_port_handle(creator_port_id); + let creator_port = creator_ctx.get_port(creator_port_handle); + let created_port_handle = created_ctx.add_port( + creator_port.peer_comp_id, creator_port.peer_port_id, + creator_port.kind, creator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; port_id_pairs.push(PortPair{ - creator: creator_port_id, - created: created_port_id, + creator_handle: creator_port_handle, + creator_id: creator_port_id, + created_handle: created_port_handle, + created_id: created_port_id, }); // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) @@ -749,31 +617,34 @@ impl CompPDL { let mut created_component_has_remote_peers = false; for pair in port_id_pairs.iter() { - let creator_port_info = creator_ctx.get_port(pair.creator); - let created_port_info = created_ctx.get_port_mut(pair.created); + let creator_port_info = creator_ctx.get_port(pair.creator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well let created_peer_port_index = port_id_pairs .iter() - .position(|v| v.creator == creator_port_info.peer_id); + .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { - // Peer port moved to the new component as well + // Peer port moved to the new component as well. So + // adjust IDs appropriately. let peer_pair = &port_id_pairs[created_peer_port_index]; - created_port_info.peer_id = peer_pair.created; + created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); + todo!("either add 'self peer', or remove that idea from Ctx altogether") }, None => { // Peer port remains with creator component. created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.add_peer(sched_ctx, creator_ctx.id, None); + created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None); } } } else { // Peer is a different component - let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id); - created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle)); + let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); + let peer_info = creator_ctx.get_peer(peer_handle); + created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle)); created_component_has_remote_peers = true; } } @@ -797,28 +668,27 @@ impl CompPDL { // transfer messages in the main inbox. for pair in port_id_pairs.iter() { // Remove peer if appropriate - let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap(); - let creator_port_info = creator_ctx.ports.remove(creator_port_index); - if creator_port_info.peer_comp_id != creator_ctx.id { - creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id); - } + let creator_port_info = creator_ctx.get_port(pair.creator_handle); + let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); + creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_port_info.peer_comp_id); + creator_ctx.remove_port(pair.creator_handle); // Transfer any messages - let created_port_index = created_ctx.get_port_index(pair.created).unwrap(); - let created_port_info = &created_ctx.ports[created_port_index]; + let created_port_index = created_ctx.get_port_index(pair.created_handle); + let created_port_info = created_ctx.get_port(pair.created_handle); debug_assert!(component.code.inbox_main[created_port_index].is_none()); if let Some(mut message) = self.inbox_main.remove(creator_port_index) { - message.data_header.target_port = pair.created; + message.data_header.target_port = pair.created_id; component.code.inbox_main[created_port_index] = Some(message); } let mut message_index = 0; while message_index < self.inbox_backup.len() { let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.creator { + if message.data_header.target_port == pair.creator_id { // transfer message let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created; + message.data_header.target_port = pair.created_id; component.code.inbox_backup.push(message); } else { message_index += 1; @@ -827,9 +697,10 @@ impl CompPDL { // Handle potential channel between creator and created component if created_port_info.peer_comp_id == creator_ctx.id { - let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id); + let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); + let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); peer_port_info.peer_comp_id = created_ctx.id; - creator_ctx.add_peer(sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(pair.created_handle, sched_ctx, created_ctx.id, None); } } @@ -838,14 +709,15 @@ impl CompPDL { if created_component_has_remote_peers { let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); for pair in port_id_pairs.iter() { - let port_info = created_ctx.get_port(pair.created); + let port_info = created_ctx.get_port(pair.created_handle); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( - creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, - pair.creator, pair.created, created_ctx.id, + creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, + pair.creator_id, pair.created_id, created_ctx.id, schedule_entry_id ); - let peer_info = created_ctx.get_peer(port_info.peer_comp_id); + let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = created_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, message, true); } } @@ -854,72 +726,6 @@ impl CompPDL { sched_ctx.runtime.enqueue_work(created_key); } } - - /// Removes a port from a component. Also decrements the port counter in - /// the peer component's entry. If that hits 0 then it will be removed and - /// returned. If returned then the caller is responsible for decrementing - /// the atomic counters of the peer component's handle. - fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option) { - let port_index = comp_ctx.get_port_index(port_id).unwrap(); - let port_info = comp_ctx.ports.remove(port_index); - - // If the component owns the peer, then we don't have to decrement the - // number of peers (because we don't have an entry for ourselves) - if port_info.peer_comp_id == comp_ctx.id { - return (port_info, None); - } - - let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports -= 1; - - // Check if we still have other ports referencing this peer - if peer_info.num_associated_ports != 0 { - return (port_info, None); - } - - let peer_info = comp_ctx.peers.remove(peer_index); - return (port_info, Some(peer_info)); - } - - /// Only adds/updates a peer for a given port. This function assumes (but - /// does not check!) that the port was not considered to belong to that peer - /// before calling this function. - fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) { - match comp_ctx.get_peer_index(peer_id) { - Some(peer_index) => { - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports += 1; - }, - None => { - let handle = sched_ctx.runtime.get_component_public(peer_id); - comp_ctx.peers.push(Peer{ - id: peer_id, - num_associated_ports: 1, - handle, - }); - } - } - } - - fn change_port_peer_component( - &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, - port_id: PortId, new_peer_comp_id: CompId - ) { - let port_info = comp_ctx.get_port_mut(port_id); - let cur_peer_comp_id = port_info.peer_comp_id; - let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id); - cur_peer_info.num_associated_ports -= 1; - - if cur_peer_info.num_associated_ports == 0 { - let should_remove = cur_peer_info.handle.decrement_users(); - if should_remove { - let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() }; - sched_ctx.runtime.destroy_component(cur_peer_comp_key); - - } - } - } } #[inline] diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 428f88e615e5459211b971730ded896652c8c5ce..2be94616b20a62067d567a01f3bf70b108bbc889 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -4,6 +4,7 @@ use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use super::component_pdl::*; +use super::component_context::*; pub struct PortAnnotation { self_comp_id: CompId, @@ -286,7 +287,8 @@ impl Consensus { let mut local_solution = Vec::with_capacity(self.ports.len()); for port in &self.ports { if let Some(mapping) = port.mapping { - let port_info = comp_ctx.get_port(port.self_port_id); + let port_handle = comp_ctx.get_port_handle(port.self_port_id); + let port_info = comp_ctx.get_port(port_handle); let new_entry = match port_info.kind { PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ self_comp_id: comp_ctx.id, @@ -327,11 +329,11 @@ impl Consensus { fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) { let mut needs_setting_ports = false; - if comp_ctx.ports.len() != self.ports.len() { + if comp_ctx.num_ports() != self.ports.len() { needs_setting_ports = true; } else { - for idx in 0..comp_ctx.ports.len() { - let comp_port_id = comp_ctx.ports[idx].self_id; + for (idx, port) in comp_ctx.iter_ports().enumerate() { + let comp_port_id = port.self_id; let cons_port_id = self.ports[idx].self_port_id; if comp_port_id != cons_port_id { needs_setting_ports = true; @@ -342,8 +344,8 @@ impl Consensus { if needs_setting_ports { self.ports.clear(); - self.ports.reserve(comp_ctx.ports.len()); - for port in &comp_ctx.ports { + self.ports.reserve(comp_ctx.num_ports()); + for port in comp_ctx.iter_ports() { self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id)) } } @@ -419,7 +421,7 @@ impl Consensus { // Sender knows of someone with a higher ID. So store highest ID, // notify all peers, and forward local solutions self.highest_id = header.highest_id; - for peer in &comp_ctx.peers { + for peer in comp_ctx.iter_peers() { if peer.id == header.sending_id { continue; } @@ -438,7 +440,8 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; - let peer_info = comp_ctx.get_peer(header.sending_id); + let peer_handle = comp_ctx.get_peer_handle(header.sending_id); + let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, Message::Sync(message), true); } // else: exactly equal } @@ -589,7 +592,7 @@ impl Consensus { expected_mapping, new_mapping, source_port: port_info.self_id, - target_port: port_info.peer_id, + target_port: port_info.peer_port_id, }; } diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 86d9e8f1f040229c5a56a9336370a13ee9327aa6..c026ef88f9d3836dd87fc8795230c68bbc27d343 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -2,6 +2,8 @@ use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use crate::runtime2::component::*; +use super::component_context::*; + #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) struct ControlId(u32); @@ -72,7 +74,7 @@ impl ControlLayer { } pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction { - let entry_index = self.get_entry_index(entry_id).unwrap(); + let entry_index = self.get_entry_index_by_id(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); @@ -111,22 +113,10 @@ impl ControlLayer { ControlContent::ClosedPort(port_id) => { // If a closed port is Ack'd, then we remove the reference to // that component. - let port_index = comp_ctx.get_port_index(*port_id).unwrap(); - debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked); - let peer_id = comp_ctx.ports[port_index].peer_comp_id; - let peer_index = comp_ctx.get_peer_index(peer_id).unwrap(); - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports -= 1; - - if peer_info.num_associated_ports == 0 { - let should_remove = peer_info.handle.decrement_users(); - if should_remove { - let comp_key = unsafe{ peer_info.id.upgrade() }; - sched_ctx.runtime.destroy_component(comp_key); - } - - comp_ctx.peers.remove(peer_index); - } + let port_handle = comp_ctx.get_port_handle(*port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.state, PortState::Closed); + comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id); return AckAction::None; } @@ -153,7 +143,7 @@ impl ControlLayer { /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, /// hence the `ack_countdown` in the scheduling entry is at 0. pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { - let index = self.get_entry_index(schedule_entry_id).unwrap(); + let index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); debug_assert_eq!(self.entries[index].ack_countdown, 0); self.entries.remove(index); } @@ -198,9 +188,10 @@ impl ControlLayer { // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- - pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { - let port = comp_ctx.get_port_mut(port_id); - let peer_port_id = port.peer_id; + pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { + let port = comp_ctx.get_port_mut(port_handle); + let port_id = port.self_id; + let peer_port_id = port.peer_port_id; let peer_comp_id = port.peer_comp_id; debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); @@ -229,63 +220,67 @@ impl ControlLayer { )); } - pub(crate) fn set_port_and_peer_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { - // TODO: Feels like this shouldn't be an entry. Hence this class should - // be renamed. Lets see where the code ends up being - let entry_id = self.take_id(); - let port_info = comp_ctx.get_port_mut(port_id); - let peer_port_id = port_info.peer_id; + /// Adds a control entry to track that a port is blocked. Expects the caller + /// to have set the port's state to blocking already. The returned tuple + /// contains a message and the peer to send it to. + pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block + debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller + + let peer_port_id = port_info.peer_port_id; let peer_comp_id = port_info.peer_comp_id; - debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues - port_info.state = PortState::Blocked; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 0, - content: ControlContent::BlockedPort(port_id), + content: ControlContent::BlockedPort(port_info.self_id), }); return ( - peer_comp_id, + peer_handle, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, - target_port_id: Some(peer_port_id), + target_port_id: Some(port_info.peer_port_id), content: ControlMessageContent::BlockPort(peer_port_id), } ); } - pub(crate) fn set_port_and_peer_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { - // Find the entry that contains the blocking entry for the port - let mut entry_index = usize::MAX; - let mut entry_id = ControlId::new_invalid(); - for (index, entry) in self.entries.iter().enumerate() { - if let ControlContent::BlockedPort(blocked_port) = &entry.content { - if *blocked_port == port_id { - entry_index = index; - entry_id = entry.id; - break; + /// Removes the control entry that tracks that a port is blocked. Expects + /// the caller to have already marked the port as unblocked. Again the + /// returned tuple contains a message and the target it is intended for + pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking + debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before + + let position = self.entries.iter() + .position(|v| { + if let ControlContent::BlockedPort(blocked_port_id) = &v.content { + if *blocked_port_id == port_info.self_id { + return true; + } } - } - } + return false; + }) + .unwrap(); - let port_info = comp_ctx.get_port_mut(port_id); - let peer_port_id = port_info.peer_id; - let peer_comp_id = port_info.peer_comp_id; - debug_assert_eq!(port_info.state, PortState::Blocked); - debug_assert_eq!(port_info.kind, PortKind::Getter); // because we blocked it because of receiving too many messages - port_info.state = PortState::Open; + let entry = self.entries.remove(position); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); return ( - peer_comp_id, + peer_handle, ControlMessage{ - id: entry_id, + id: entry.id, sender_comp_id: comp_ctx.id, - target_port_id: Some(peer_port_id), - content: ControlMessageContent::UnblockPort(peer_port_id), + target_port_id: Some(port_info.peer_port_id), + content: ControlMessageContent::UnblockPort(port_info.peer_port_id) } - ) + ); } // ------------------------------------------------------------------------- @@ -298,7 +293,7 @@ impl ControlLayer { return id; } - fn get_entry_index(&self, entry_id: ControlId) -> Option { + fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option { for (index, entry) in self.entries.iter().enumerate() { if entry.id == entry_id { return Some(index); diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index a7f2b521f7d3c51c08b6a9feae8d61fd27e36520..58b0ea3defc4d26d844fcc1bb86f53a6d84d98ff 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -1,8 +1,10 @@ mod component_pdl; +mod component_context; mod control_layer; mod consensus; -pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling}; +pub(crate) use component_pdl::{CompPDL, CompScheduling}; +pub(crate) use component_context::CompCtx; pub(crate) use control_layer::{ControlId}; use super::scheduler::*; diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 9b4c2ad4eda55e0eaf97a0b220c8417f253acc8e..8265a8c8b88e2c94d7279d3a96f3ec595a8e1b65 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -3,10 +3,9 @@ use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; use std::collections::VecDeque; use crate::protocol::*; -use crate::runtime2::component::wake_up_if_sleeping; use super::communication::Message; -use super::component::{CompCtx, CompPDL}; +use super::component::{wake_up_if_sleeping, CompPDL, CompCtx}; use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; @@ -107,12 +106,17 @@ impl CompHandle { debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed } - /// Returns true if the component should be destroyed - pub(crate) fn decrement_users(&mut self) -> bool { + /// Returns the `CompKey` to the component if it should be destroyed + pub(crate) fn decrement_users(&mut self) -> Option { debug_assert!(!self.decremented, "illegal to 'decrement_users' twice"); dbg_code!(self.decremented = true); let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); - return old_count == 1; + let new_count = old_count - 1; + if new_count == 0 { + return Some(unsafe{ self.id.upgrade() }); + } + + return None; } } @@ -132,6 +136,7 @@ impl std::ops::Deref for CompHandle { type Target = CompPublic; fn deref(&self) -> &Self::Target { + debug_assert!(!self.decremented); // cannot access if control is relinquished return unsafe{ &*self.target }; } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index c2b6c834d7720443944a418ba07dd0353de7499e..ba86851b7b78e55725512edcefc15e23a9f91cd9 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -91,14 +91,24 @@ impl Scheduler { } fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { + // Send messages that all ports will be closed for port_index in 0..component.ctx.ports.len() { let port_info = &component.ctx.ports[port_index]; - if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) { + if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) { let peer_info = component.ctx.get_peer(peer_id); peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } + // Remove all references to the peers that we have + for mut peer in component.ctx.peers.drain(..) { + let should_remove = peer.handle.decrement_users(); + if should_remove { + let key = unsafe{ peer.id.upgrade() }; + sched_ctx.runtime.destroy_component(key); + } + } + let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); let new_count = old_count - 1; if new_count == 0 {