From 6bb433f85dc728b847a04b04ba6f5b537635744f 2022-04-29 13:23:11 From: MH Date: 2022-04-29 13:23:11 Subject: [PATCH] Refactored port state into flags --- diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index f563446fba8df70db1e867ae1f42ab2d0d0c9b80..335f4c6d0864416a0135f0c9522eb433f19f3328 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -17,29 +17,6 @@ impl PortId { } } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum PortKind { - Putter, - Getter, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PortState { - Open, - BlockedDueToPeerChange, - BlockedDueToFullBuffers, - Closed, -} - -impl PortState { - pub fn is_blocked(&self) -> bool { - match self { - PortState::BlockedDueToPeerChange | PortState::BlockedDueToFullBuffers => true, - PortState::Open | PortState::Closed => false, - } - } -} - pub struct Channel { pub putter_id: PortId, pub getter_id: PortId, @@ -173,19 +150,20 @@ pub struct ControlMessage { pub content: ControlMessageContent, } +/// Content of a control message. If the content refers to a port then the +/// `target_port_id` field is the one that it refers to. #[derive(Copy, Clone, Debug)] pub enum ControlMessageContent { Ack, - BlockPort(PortId), - UnblockPort(PortId), + BlockPort, + UnblockPort, ClosePort(ControlMessageClosePort), - PortPeerChangedBlock(PortId), - PortPeerChangedUnblock(PortId, CompId), + PortPeerChangedBlock, + PortPeerChangedUnblock(PortId, CompId), // contains (new_port_id, new_component_id) } #[derive(Copy, Clone, Debug)] pub struct ControlMessageClosePort { - pub port_to_close: PortId, // ID of the receiving port pub closed_in_sync_round: bool, // needed to ensure correct handling of errors } diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 461fc510cb5480691d3578b2a811defe5880ef73..57d89d9ba93a959fd7f7c14ebe9ff3d5f0fa7f0c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -243,7 +243,7 @@ pub(crate) fn default_send_data_message( let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Closed { + if port_info.state.is_closed() { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) return Err(( @@ -312,10 +312,9 @@ pub(crate) fn default_handle_incoming_data_message( // Slot is already full, so if the port was previously opened, it will // now become closed let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future + if port_info.state.is_open() { + port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); - if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); let (peer_handle, message) = control.initiate_port_blocking(comp_ctx, port_handle); let peer = comp_ctx.get_peer(peer_handle); @@ -346,9 +345,7 @@ pub(crate) fn default_attempt_get( let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = target_port_instruction; - - let port_is_closed = port_info.state == PortState::Closed; - if port_is_closed { + if port_info.state.is_closed() { let peer_id = port_info.peer_comp_id; return GetResult::Error(( target_port_instruction, @@ -402,18 +399,17 @@ pub(crate) fn default_handle_received_data_message( debug_assert!(slot.is_none()); // because we've just received from it // Modify last-known location where port instruction was retrieved - let port_info = comp_ctx.get_port_mut(port_handle); + let port_info = comp_ctx.get_port(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller - debug_assert_ne!(port_info.state, PortState::Closed); // checked by caller + debug_assert!(port_info.state.is_open()); // checked by caller // Check if there are any more messages in the backup buffer - let port_info = comp_ctx.get_port(port_handle); for message_index in 0..inbox_backup.len() { let message = &inbox_backup[message_index]; if message.data_header.target_port == targeted_port { // One more message, place it in the slot let message = inbox_backup.remove(message_index); - debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup + debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup *slot = Some(message); return Ok(()); @@ -422,8 +418,10 @@ pub(crate) fn default_handle_received_data_message( // Did not have any more messages, so if we were blocked, then we need to // unblock the port now (and inform the peer of this unblocking) - if port_info.state == PortState::BlockedDueToFullBuffers { - comp_ctx.set_port_state(port_handle, PortState::Open); + if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) { + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); + let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); @@ -445,21 +443,20 @@ pub(crate) fn default_handle_control_message( ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); }, - ControlMessageContent::BlockPort(port_id) => { + ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be // blocked. - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); + let port_to_block = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_block); + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Open { - // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - } + port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); }, ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_handle = comp_ctx.get_port_handle(content.port_to_close); + let port_to_close = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_close); // We're closing the port, so we will always update the peer of the // port (in case of error messages) @@ -467,7 +464,6 @@ pub(crate) fn default_handle_control_message( port_info.peer_comp_id = message.sender_comp_id; port_info.close_at_sync_end = true; // might be redundant (we might set it closed now) - let port_info = comp_ctx.get_port(port_handle); let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); @@ -481,6 +477,7 @@ pub(crate) fn default_handle_control_message( default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { // Respond to the message + let port_info = comp_ctx.get_port(port_handle); let last_instruction = port_info.last_instruction; let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); @@ -507,37 +504,43 @@ pub(crate) fn default_handle_control_message( )); } } else { - comp_ctx.set_port_state(port_handle, PortState::Closed); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.state.set(PortStateFlag::Closed); } } }, - ControlMessageContent::UnblockPort(port_id) => { + ControlMessageContent::UnblockPort => { // We were previously blocked (or already closed) - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); + let port_to_unblock = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_unblock); + let port_info = comp_ctx.get_port_mut(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::BlockedDueToFullBuffers { - default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); - } + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); + + port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); + default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); }, - ControlMessageContent::PortPeerChangedBlock(port_id) => { + ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); + let port_to_change = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_change); - let port_info = comp_ctx.get_port(port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let port_info = comp_ctx.get_port_mut(port_handle); + let peer_comp_id = port_info.peer_comp_id; + port_info.state.set(PortStateFlag::BlockedDueToPeerChange); + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); }, ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { - let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_to_change = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); let old_peer_id = port_info.peer_comp_id; comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); @@ -545,8 +548,10 @@ pub(crate) fn default_handle_control_message( let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = new_comp_id; port_info.peer_port_id = new_port_id; + + port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); - default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } } @@ -615,14 +620,14 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state == PortState::Closed || port.close_at_sync_end { + if port.state.is_closed() || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } // Mark as closed let port_id = port.self_id; - port.state = PortState::Closed; + port.state.set(PortStateFlag::Closed); // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); @@ -687,7 +692,7 @@ pub(crate) fn default_handle_sync_decision( for port_index in 0..comp_ctx.num_ports() { let port_info = comp_ctx.get_port_by_index_mut(port_index); if port_info.close_at_sync_end { - port_info.state = PortState::Closed; + port_info.state.set(PortStateFlag::Closed); } } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); @@ -789,14 +794,13 @@ fn default_send_ack( /// Handles the unblocking of a putter port. In case there is a pending message /// on that port then it will be sent. -fn default_handle_unblock_put( +fn default_handle_recently_unblocked_port( exec_state: &mut CompExecState, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, ) { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert!(port_info.state.is_blocked()); - port_info.state = PortState::Open; + debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller if exec_state.is_blocked_on_put(port_id) { // Annotate the message that we're going to send diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index c02320d58b382798cd59dff48078bd4f00e9b7d6..bb1d8c6254cdff67386d18035d0d3d23c0ac757f 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -1,3 +1,5 @@ +use std::fmt::{Debug, Formatter, Result as FmtResult}; + use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; @@ -12,6 +14,86 @@ pub enum PortInstruction { SourceLocation(ExpressionId), } +/// Directionality of a port +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PortKind { + Putter, + Getter, +} + +/// Bitflags for port +// TODO: Incorporate remaining flags from `Port` struct +#[repr(u32)] +#[derive(Debug, Copy, Clone)] +pub enum PortStateFlag { + Closed = 0x01, // If not closed, then the port is open + BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked + BlockedDueToFullBuffers = 0x04, +} + +#[derive(Copy, Clone)] +pub struct PortState { + flags: u32 +} + +impl PortState { + pub(crate) fn new() -> PortState { + return PortState{ flags: 0 } + } + + // high-level + + #[inline] + pub fn is_open(&self) -> bool { + return !self.is_closed(); + } + + #[inline] + pub fn is_closed(&self) -> bool { + return self.is_set(PortStateFlag::Closed); + } + + #[inline] + pub fn is_blocked(&self) -> bool { + return + self.is_set(PortStateFlag::BlockedDueToPeerChange) || + self.is_set(PortStateFlag::BlockedDueToFullBuffers); + } + + // lower-level utils + #[inline] + pub fn set(&mut self, flag: PortStateFlag) { + self.flags |= flag as u32; + } + + #[inline] + pub fn clear(&mut self, flag: PortStateFlag) { + self.flags &= !(flag as u32); + } + + #[inline] + pub const fn is_set(&self, flag: PortStateFlag) -> bool { + return (self.flags & (flag as u32)) != 0; + } +} + +impl Debug for PortState { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + use PortStateFlag::*; + + let mut s = f.debug_struct("PortState"); + for (flag_name, flag_value) in &[ + ("closed", Closed), + ("blocked_peer_change", BlockedDueToPeerChange), + ("blocked_full_buffers", BlockedDueToFullBuffers) + ] { + s.field(flag_name, &self.is_set(*flag_value)); + } + + return s.finish(); + } +} + #[derive(Debug)] pub struct Port { // Identifiers @@ -25,9 +107,7 @@ pub struct Port { pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message - // Debugging flag to make sure each port is properly associated and - // disassociated with a peer component - #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, + pub(crate) associated_with_peer: bool, } pub struct Peer { @@ -74,23 +154,23 @@ impl CompCtx { self_id: putter_id, peer_port_id: getter_id, kind: PortKind::Putter, - state: PortState::Open, + state: PortState::new(), peer_comp_id: self.id, last_instruction: PortInstruction::None, close_at_sync_end: false, received_message_for_sync: false, - #[cfg(debug_assertions)] associated_with_peer: false, + associated_with_peer: false, }); self.ports.push(Port{ self_id: getter_id, peer_port_id: putter_id, kind: PortKind::Getter, - state: PortState::Open, + state: PortState::new(), peer_comp_id: self.id, last_instruction: PortInstruction::None, close_at_sync_end: false, received_message_for_sync: false, - #[cfg(debug_assertions)] associated_with_peer: false, + associated_with_peer: false, }); return Channel{ putter_id, getter_id }; @@ -104,7 +184,7 @@ impl CompCtx { last_instruction: PortInstruction::None, close_at_sync_end: false, received_message_for_sync: false, - #[cfg(debug_assertions)] associated_with_peer: false, + associated_with_peer: false, }); return LocalPortHandle(self_id); } @@ -173,12 +253,6 @@ impl CompCtx { } } - pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) { - let port_info = self.get_port_mut(port_handle); - debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state - port_info.state = new_state; - } - pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle { return LocalPortHandle(port_id); } @@ -242,7 +316,7 @@ impl CompCtx { #[inline] fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool { - return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id; + return (!port.state.is_closed() || required_if_closed) && port.peer_comp_id != self_id; } fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index c125b9dbe01c19ce69428411d420711a9c634d0d..afa053bf3d65d24b3d13b14d3c8fcbc5921973cf 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -371,7 +371,7 @@ impl Component for CompPDL { // the synchronous round is satisfied. let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = PortInstruction::SourceLocation(expr_id); - let port_is_closed = port_info.state == PortState::Closed; + let port_is_closed = port_info.state.is_closed(); // Register port as part of select guard if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { @@ -614,7 +614,7 @@ impl CompPDL { created_id: created_port_id, }; - if creator_port.state == PortState::Closed { + if creator_port.state.is_closed() { closed_port_id_pairs.push(port_id_pair) } else { opened_port_id_pairs.push(port_id_pair); diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 17d575a6a87403c4d4ee81b8b29f8004a42df580..94cd052e5014343afd81807c3f2efe4bc4af2717 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -126,7 +126,7 @@ impl ControlLayer { let port_handle = comp_ctx.get_port_handle(closed_port); let port_info = comp_ctx.get_port(port_handle); let port_peer_comp_id = port_info.peer_comp_id; - debug_assert_eq!(port_info.state, PortState::Closed); + debug_assert!(port_info.state.is_closed()); comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id, true); // remove if closed return (AckAction::None, None); @@ -191,7 +191,7 @@ impl ControlLayer { id: entry_id, sender_comp_id: creator_comp_id, target_port_id: Some(source_port_id), - content: ControlMessageContent::PortPeerChangedBlock(source_port_id) + content: ControlMessageContent::PortPeerChangedBlock }) } @@ -218,7 +218,7 @@ impl ControlLayer { pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { let port = comp_ctx.get_port(port_handle); let peer_port_id = port.peer_port_id; - debug_assert!(port.state == PortState::Closed); + debug_assert!(port.state.is_closed()); // Construct the port-closing entry let entry_id = self.take_id(); @@ -237,7 +237,6 @@ impl ControlLayer { sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::ClosePort(ControlMessageClosePort{ - port_to_close: peer_port_id, closed_in_sync_round: exit_inside_sync, }), } @@ -250,7 +249,7 @@ impl ControlLayer { pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block - debug_assert_eq!(port_info.state, PortState::BlockedDueToFullBuffers); // contract with caller + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller let peer_port_id = port_info.peer_port_id; let peer_comp_id = port_info.peer_comp_id; @@ -261,8 +260,8 @@ impl ControlLayer { ControlMessage{ id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, - target_port_id: Some(port_info.peer_port_id), - content: ControlMessageContent::BlockPort(peer_port_id), + target_port_id: Some(peer_port_id), + content: ControlMessageContent::BlockPort, } ); } @@ -272,7 +271,6 @@ impl ControlLayer { pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking - debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); @@ -282,7 +280,7 @@ impl ControlLayer { id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(port_info.peer_port_id), - content: ControlMessageContent::UnblockPort(port_info.peer_port_id) + content: ControlMessageContent::UnblockPort, } ); } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 50fd6dc2813384d890c9d4fe875f745b6d872f48..e69329094ac7b32b1a663dbc8c4f6b21098044f1 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -12,7 +12,7 @@ pub use runtime::Runtime; pub(crate) use error::RtError; pub(crate) use scheduler::SchedulerCtx; pub(crate) use communication::{ - PortId, PortKind, PortState, + PortId, Message, ControlMessage, SyncMessage, DataMessage, SyncRoundDecision }; \ No newline at end of file