diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index e7b2d674b32c4330a5e07c5cb859aadfa740735e..803db6e48ef4c9ef920d8ffc26b4a86a2d13ad86 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -65,10 +65,12 @@ pub struct MessageDataHeader { #[derive(Debug)] pub struct TransmittedPort { - location: ValueId, // within `content` - messages: Vec, // owned by previous component - peer_comp: CompId, - peer_port: PortId, + pub locations: Vec, // within `content` + pub messages: Vec, // owned by previous component + pub peer_comp: CompId, + pub peer_port: PortId, + pub kind: PortKind, + pub state: PortState, } // ----------------------------------------------------------------------------- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index f71dfb0556f1fb8627dc2c7a333ee96b53b84ddf..7e450523700cc9aebe4153b98da061478f4cc1c5 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -363,7 +363,7 @@ pub(crate) enum GetResult { /// instruction we're attempting on this port. pub(crate) fn default_attempt_get( exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus ) -> GetResult { let port_handle = comp_ctx.get_port_handle(target_port); @@ -382,13 +382,14 @@ pub(crate) fn default_attempt_get( if let Some(message) = &inbox_main[port_index] { if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // We're allowed to receive this message - let message = inbox_main[port_index].take().unwrap(); + let mut message = inbox_main[port_index].take().unwrap(); debug_assert_eq!(target_port, message.data_header.target_port); // Note: we can still run into an unrecoverable error when actually // receiving this message match default_handle_received_data_message( - target_port, target_port_instruction, inbox_main, inbox_backup, + target_port, target_port_instruction, + &mut message, inbox_main, inbox_backup, comp_ctx, sched_ctx, control, ) { Ok(()) => return GetResult::Received(message), @@ -415,8 +416,8 @@ pub(crate) fn default_attempt_get( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, + targeted_port: PortId, port_instruction: PortInstruction, message: &mut DataMessage, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); @@ -424,6 +425,59 @@ pub(crate) fn default_handle_received_data_message( let slot = &mut inbox_main[port_index]; debug_assert!(slot.is_none()); // because we've just received from it + // If we received any ports, add them to the port tracking and inbox struct. + // Then notify the peers that they can continue sending to this port, but + // now at a new address. + for received_port in &mut message.ports { + // Transfer messages to main/backup inbox + let new_inbox_index = inbox_main.len(); + if !received_port.messages.is_empty() { + inbox_main.push(Some(received_port.messages.remove(0))); + } + inbox_backup.extend(received_port.messages.drain(..)); + + // Create a new port locally + let mut new_port_state = received_port.state; + new_port_state.set(PortStateFlag::Received); + let new_port_handle = comp_ctx.add_port( + received_port.peer_comp, received_port.peer_port, + received_port.kind, new_port_state + ); + let new_port = comp_ctx.get_port(new_port_handle); + comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(new_port.peer_comp_id)); + + // Replace all references to the port in the received message + for message_location in received_port.locations { + let value = match message_location { + ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize], + ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize], + }; + + match value { + Value::Input(_) => { + debug_assert_eq!(new_port.kind, PortKind::Getter); + *value = Value::Input(port_id_to_eval(new_port.self_id)); + }, + Value::Output(_) => { + debug_assert_eq!(new_port.kind, PortKind::Putter); + *value = Value::Output(port_id_to_eval(new_port.self_id)); + }, + _ => unreachable!(), + } + } + + // Let the peer know that the port can now be used + let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ + id: ControlId::new_invalid(), + sender_comp_id: comp_ctx.id, + target_port_id: Some(new_port.peer_port_id), + content: ControlMessageContent::PortPeerChangedUnblock(new_port.id, comp_ctx.id) + }), true); + } + // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller @@ -463,11 +517,12 @@ pub(crate) fn default_handle_received_data_message( /// state may all change. pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, - message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { - default_handle_ack(control, message.id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, inbox_main, inbox_backup); }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be @@ -500,7 +555,7 @@ pub(crate) fn default_handle_control_message( // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. - default_handle_ack(control, control_id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, inbox_main, inbox_backup); } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); @@ -777,14 +832,17 @@ fn send_message_without_ports( peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); } -fn send_message_with_ports( - sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction, - value: ValueGroup, +/// Prepares sending a message that contains ports. Only once a particular +/// protocol has completed (where we notify all the peers that the ports will +/// be transferred) will we actually send the message to the recipient. +fn prepare_send_message_with_ports( + sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, - control: &mut ControlLayer, consensus: &mut Consensus + control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send + let sending_port_handle = comp_ctx.get_port_handle(sending_port_id); let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); sending_port_info.last_instruction = sending_port_instruction; @@ -797,37 +855,98 @@ fn send_message_with_ports( for (_, port_id) in &transmit_ports { let transmit_port_handle = comp_ctx.get_port_handle(*port_id); let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle); + let peer_comp_id = transmit_port_info.peer_comp_id; + let peer_port_id = transmit_port_info.peer_port_id; // Note: we checked earlier that we are currently in sync mode. Now we // will check if we've already used the port we're about to transmit. if !transmit_port_info.last_instruction.is_none() { return Err(( sending_port_instruction, - String::from("Cannot transmit one of the ports, as it is used in this sync round") + String::from("Cannot transmit one of the ports in this message, as it is used in this sync round") )); } if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { return Err(( sending_port_instruction, - String::from("Cannot transmit one of the ports, as that port is already transmitted") + String::from("Cannot transmit one of the ports in this message, as that port is already transmitted") )); } + // Set the flag for transmission transmit_port_info.state.set(PortStateFlag::Transmitted); - } + // Block the peer of the port + let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); - let port_info = comp_ctx.get_port(sending_port_handle); + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + // We've set up the protocol, once all the PPC's are blocked we are supposed + // to transfer the message to the recipient. So store it temporarily + exec_state.mode = CompMode::BlockedPutPorts; + exec_state.mode_port = sending_port_id; + exec_state.mode_value = value; return Ok(()); } +/// Performs the transmission of a data message that contains ports. These were +/// all stored in the component's execution state by the +/// `prepare_send_message_with_ports` function. +fn perform_send_message_with_ports( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) { + debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPorts); + + // Find all ports again + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports); + + let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let port_info = comp_ctx.get_port(port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + + // Annotate the data message + let message_value = exec_state.mode_value.take(); + let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value); + + // And further enhance the message by adding data about the ports that are + // being transferred + for (port_locations, port_id) in transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(port_id); + let transmit_port_info = comp_ctx.get_port(transmit_port_handle); + + let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup); + + let mut transmit_port_state = transmit_port_info.state; + debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted)); + transmit_port_state.clear(PortStateFlag::Transmitted); + + annotated_message.ports.push(TransmittedPort{ + locations: port_locations, + messages: transmit_messages, + peer_comp: transmit_port_info.peer_comp_id, + peer_port: transmit_port_info.peer_port_id, + kind: transmit_port_info.kind, + state: transmit_port_state + }) + } + + // And finally, send the message to the peer + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); +} + /// Handles an `Ack` for the control layer. fn default_handle_ack( - control: &mut ControlLayer, control_id: ControlId, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, + inbox_backup: &mut InboxBackup ) { // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; @@ -853,6 +972,12 @@ fn default_handle_ack( let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, + AckAction::UnblockPutWithPorts => { + // Send the message (containing ports) to the recipient + perform_send_message_with_ports(exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + }, AckAction::None => {} } @@ -962,4 +1087,32 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut En for (value_index, value) in &value_group.values.iter().enumerate() { find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); } +} + +/// Goes through the inbox of a component and takes out all the messages that +/// are targeted at a specific port +pub(crate) fn take_port_messages( + comp_ctx: &CompCtx, port_id: PortId, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Vec { + let mut messages = Vec::new(); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_index = comp_ctx.get_port_index(port_handle); + + if let Some(message) = inbox_main[port_index].take() { + messages.push(message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == port_id { + let message = inbox_backup.remove(message_index); + messages.push(message); + } else { + message_index += 1; + } + } + + return messages; } \ No newline at end of file diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 9aab35cfd66f36787b4f538ce99d455de7475cee..94dff5badfa7677ddbe3d4b2425ddff7366afc0a 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -39,6 +39,7 @@ pub enum PortStateFlag { BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked BlockedDueToFullBuffers = 0x04, Transmitted = 0x08, // Transmitted, so cannot be used anymore + Received = 0x10, // Received, so cannot be used yet, only after the sync round } #[derive(Copy, Clone)] @@ -62,7 +63,8 @@ impl PortState { pub fn can_send(&self) -> bool { return !self.is_set(PortStateFlag::Closed) && - !self.is_set(PortStateFlag::Transmitted); + !self.is_set(PortStateFlag::Transmitted) && + !self.is_set(PortStateFlag::Received); } #[inline] @@ -102,7 +104,8 @@ impl Debug for PortState { for (flag_name, flag_value) in &[ ("closed", Closed), ("blocked_peer_change", BlockedDueToPeerChange), - ("blocked_full_buffers", BlockedDueToFullBuffers) + ("blocked_full_buffers", BlockedDueToFullBuffers), + ("transmitted", Transmitted), ] { s.field(flag_name, &self.is_set(*flag_value)); } diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index da6a34d2932f0cdf9a8f072b5966d0fdc47481a3..ccce27eca977256b682599edd1db0dc6e7946604 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -10,7 +10,7 @@ pub(crate) struct ControlId(u32); impl ControlId { /// Like other invalid IDs, this one doesn't care any significance, but is /// just set at u32::MAX to hopefully bring out bugs sooner. - fn new_invalid() -> Self { + pub(crate) fn new_invalid() -> Self { return ControlId(u32::MAX); } } @@ -142,7 +142,7 @@ impl ControlLayer { } // ------------------------------------------------------------------------- - // Port transfer (due to component creation) + // Port transfer // ------------------------------------------------------------------------- /// Adds an entry that, when completely ack'd, will schedule a component. @@ -169,15 +169,8 @@ impl ControlLayer { return entry_id; } - /// Removes a schedule entry. Only used if the caller preemptively called - /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, - /// hence the `ack_countdown` in the scheduling entry is at 0. - pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { - let index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); - debug_assert_eq!(self.entries[index].ack_countdown, 0); - self.entries.remove(index); - } - + /// Adds a rerouting entry (used to ensure all messages will end up at a + /// newly created component). Used when creating a new component. pub(crate) fn add_reroute_entry( &mut self, creator_comp_id: CompId, source_port_id: PortId, source_comp_id: CompId, @@ -210,6 +203,24 @@ impl ControlLayer { }) } + /// Creates a PortPeerChanged message (and increments ack-counter on a + /// pre-created control entry) that is used as a preliminary step before + /// transferring a port over a channel. + pub(crate) fn create_port_transfer_message( + &mut self, associated_control_id: ControlId, + sender_comp_id: CompId, target_port_id: PortId + ) -> Message { + let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap(); + self.entries[entry_index].ack_countdown += 1; + + return Message::Control(ControlMessage{ + id: associated_control_id, + sender_comp_id, + target_port_id: Some(target_port_id), + content: ControlMessageContent::PortPeerChangedBlock + }) + } + // ------------------------------------------------------------------------- // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index e64f6bd1b2674b57f3bb68a2a470c5f51c2f3ebc..7dc79f4287745a0f7fcb5bb7a2942228b1b33370 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -8,7 +8,7 @@ mod component_internet; pub(crate) use component::{Component, CompScheduling}; pub(crate) use component_pdl::{CompPDL}; -pub(crate) use component_context::{CompCtx, PortInstruction}; +pub(crate) use component_context::{CompCtx, PortInstruction, PortKind, PortState, PortStateFlag}; pub(crate) use control_layer::{ControlId}; use super::scheduler::*;