diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 3b3a89c572da00dc21fe1a87acea0c4e911b30f3..962841d303fbecfe98385fc1f3479cf172042d11 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -472,7 +472,7 @@ pub(crate) fn default_handle_received_data_message( let new_port = comp_ctx.get_port(new_port_handle); // Add the port tho the consensus - consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx); + consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx); // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { @@ -825,7 +825,113 @@ pub(crate) fn default_handle_sync_decision( } } +/// Special component creation function. This function assumes that the +/// transferred ports are NOT blocked, and that the channels to whom the ports +/// belong are fully owned by the creating component. This will be checked in +/// debug mode. +pub(crate) fn special_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + component_instance: Box, component_ports: Vec, +) { + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + let reservation = sched_ctx.runtime.start_create_component(); + let mut created_ctx = CompCtx::new(&reservation); + let mut port_pairs = Vec::new(); + + // Retrieve ports + for instantiator_port_id in component_ports.iter() { + let instantiator_port_id = *instantiator_port_id; + let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); + let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); + + // Check if conditions for calling this function are valid + debug_assert!(!instantiator_port.state.is_blocked_due_to_port_change()); + debug_assert_eq!(instantiator_port.peer_comp_id, instantiator_ctx.id); + + // Create port at new component + let created_port_handle = created_ctx.add_port( + instantiator_port.peer_comp_id, instantiator_port.peer_port_id, + instantiator_port.kind, instantiator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); + let created_port_id = created_port.self_id; + + // Store in port pairs + let is_open = instantiator_port.state.is_open(); + port_pairs.push(PortPair{ + instantiator_id: instantiator_port_id, + instantiator_handle: instantiator_port_handle, + created_id: created_port_id, + created_handle: created_port_handle, + is_open, + }); + } + + // Set peer of the port for the new component + for pair in port_pairs.iter() { + let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); + + // Note: we checked above (in debug mode) that the peer of the port is + // owned by the creator as well, now check if the peer is transferred + // as well. + let created_port_peer_index = port_pairs.iter() + .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); + + match created_port_peer_index { + Some(created_port_peer_index) => { + // Both ends of the channel are moving to the new component + let peer_pair = &port_pairs[created_port_peer_index]; + created_port_info.peer_port_id = peer_pair.created_id; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + created_port_info.peer_comp_id = instantiator_ctx.id; + if pair.is_open { + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); + } + } + } + } + + // Store component in runtime storage and retrieve component fields in their + // name memory location + let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( + reservation, component_instance, created_ctx, false + ); + + let created_ctx = &mut created_runtime_component.ctx; + let created_component = &mut created_runtime_component.component; + created_component.on_creation(created_key.downgrade(), sched_ctx); + + // Transfer messages and link instantiator to created component + for pair in port_pairs.iter() { + instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + instantiator_ctx.remove_port(pair.instantiator_handle); + transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut()); + + let created_port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { + // Set up channel between instantiator component port, and its peer, + // which is owned by the new component + let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); + let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); + instantiator_port_info.peer_port_id = created_port_info.self_id; + instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); + } + } + // By definition we did not have any remote peers for the transferred ports, + // so we can schedule the new component immediately + sched_ctx.runtime.enqueue_work(created_key); +} + +/// Puts the component in an execution state where the specified component will +/// end up being created. The component goes through state changes (driven by +/// incoming control messages) to make sure that all of the ports that are going +/// to be transferred are not in a blocked state. Once finished the component +/// returns to the `NonSync` mode. pub(crate) fn default_start_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, @@ -850,15 +956,6 @@ pub(crate) fn perform_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { - // Small internal utilities - struct PortPair { - instantiator_id: PortId, - instantiator_handle: LocalPortHandle, - created_id: PortId, - created_handle: LocalPortHandle, - is_open: bool, - } - // Retrieve ports from the arguments debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); @@ -977,29 +1074,9 @@ pub(crate) fn perform_create_component( for pair in port_pairs.iter() { // Transferring the messages and removing the port from the // instantiator component - let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); instantiator_ctx.remove_port(pair.instantiator_handle); - - if let Some(mut message) = inbox_main.remove(instantiator_port_index) { - message.data_header.target_port = pair.created_id; - created_component.adopt_message(created_ctx, message); - } - - let mut message_index = 0; - while message_index < inbox_backup.len() { - let message = &inbox_backup[message_index]; - if message.data_header.target_port == pair.instantiator_id { - // Transfer the message - let mut message = inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - created_component.adopt_message(created_ctx, message); - } else { - // Message does not belong to the port pair that we're - // transferring to the new component. - message_index += 1; - } - } + transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut()); // Here we take care of the case where the instantiator previously owned // both ends of the channel, but has transferred one port to the new @@ -1046,6 +1123,24 @@ pub(crate) fn perform_create_component( exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); } +#[inline] +pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { + debug_assert_eq!(_exec_state.mode, CompMode::Exit); + return CompScheduling::Exit; +} + +// ----------------------------------------------------------------------------- +// Internal messaging/state utilities +// ----------------------------------------------------------------------------- + +struct PortPair { + instantiator_id: PortId, + instantiator_handle: LocalPortHandle, + created_id: PortId, + created_handle: LocalPortHandle, + is_open: bool, +} + pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { for (_port_locations, port_id) in ports { let port_handle = comp_ctx.get_port_handle(*port_id); @@ -1059,6 +1154,32 @@ pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> return true; } +fn transfer_messages( + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, port_pair: &PortPair, + instantiator_ctx: &mut CompCtx, created_ctx: &mut CompCtx, created_component: &mut dyn Component +) { + let instantiator_port_index = instantiator_ctx.get_port_index(port_pair.instantiator_handle); + if let Some(mut message) = inbox_main.remove(instantiator_port_index) { + message.data_header.target_port = port_pair.created_id; + created_component.adopt_message(created_ctx, message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == port_pair.instantiator_id { + // Transfer the message + let mut message = inbox_backup.remove(message_index); + message.data_header.target_port = port_pair.created_id; + created_component.adopt_message(created_ctx, message); + } else { + // Message does not belong to the port pair that we're + // transferring to the new component. + message_index += 1; + } + } +} + /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more @@ -1079,16 +1200,6 @@ pub(crate) fn default_handle_error_for_builtin( exec_state.set_as_start_exit(exit_reason); } -#[inline] -pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { - debug_assert_eq!(_exec_state.mode, CompMode::Exit); - return CompScheduling::Exit; -} - -// ----------------------------------------------------------------------------- -// Internal messaging/state utilities -// ----------------------------------------------------------------------------- - /// Sends a message without any transmitted ports. Does not check if sending /// is actually valid. fn send_message_without_ports( diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 9fec80daf7169dba1681825cdfe547e42714ec11..5656f68fddc3c037ed64027f389f582fb509a08d 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -330,7 +330,7 @@ impl ComponentTcpClient { pub(crate) fn new_with_existing_connection(socket: SocketTcpClient, input_port: PortId, output_port: PortId) -> Self { return Self{ - socket_state: ClientSocketState::Connected(socket);, + socket_state: ClientSocketState::Connected(socket), sync_state: ClientSyncState::AwaitingCmd, poll_ticket: None, inbox_main: vec![None, None], @@ -413,13 +413,33 @@ impl ListenerSocketState { } } -#[derive(PartialEq, Debug)] enum ListenerSyncState { AwaitingCmd, - Accept, + AcceptCommandReceived, + AcceptChannelGenerated{ client: SocketTcpClient, cmd_rx: PortId, data_tx: PortId }, FinishSyncThenQuit, } +impl ListenerSyncState { + // Bit of a hacky solution: keeps the listener sync state intact, except + // if it is `AcceptChannelGenerated`, that will be replaced with + // `AwaitingCmd`. The reason is to move the `client` out. + fn take(&mut self) -> ListenerSyncState { + use ListenerSyncState::*; + + match self { + AwaitingCmd => return AwaitingCmd, + AcceptCommandReceived => return AcceptCommandReceived, + FinishSyncThenQuit => return FinishSyncThenQuit, + AcceptChannelGenerated{ .. } => { + let mut swapped = ListenerSyncState::AwaitingCmd; + std::mem::swap(self, &mut swapped); + return swapped; + } + } + } +} + pub struct ComponentTcpListener { // Properties for the tcp socket socket_state: ListenerSocketState, @@ -516,14 +536,29 @@ impl Component for ComponentTcpListener { CompMode::NonSync => { match &self.socket_state { ListenerSocketState::Connected(_socket) => { - if self.sync_state == ListenerSyncState::FinishSyncThenQuit { - self.exec_state.set_as_start_exit(ExitReason::Termination); - } else { - self.sync_state = ListenerSyncState::AwaitingCmd; - component::default_handle_sync_start( - &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus - ); + match self.sync_state.take() { + ListenerSyncState::AwaitingCmd => { + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); + }, + ListenerSyncState::AcceptCommandReceived => unreachable!(), + ListenerSyncState::AcceptChannelGenerated{ client, cmd_rx, data_tx } => { + // Now that we're outside the sync round, create the tcp client + // component + let socket_component: Box = Box::new(ComponentTcpClient::new_with_existing_connection(client, cmd_rx, data_tx)); + component::special_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, socket_component, + vec![cmd_rx, data_tx] + ); + self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take() + }, + ListenerSyncState::FinishSyncThenQuit => { + self.exec_state.set_as_start_exit(ExitReason::Termination); + }, } + return CompScheduling::Immediate; }, ListenerSocketState::Error => { @@ -543,7 +578,7 @@ impl Component for ComponentTcpListener { GetResult::Received(message) => { let (tag_value, _) = message.content.values[0].as_union(); if tag_value == self.input_union_accept_tag { - self.sync_state = ListenerSyncState::Accept; + self.sync_state = ListenerSyncState::AcceptCommandReceived; } else if tag_value == self.input_union_shutdown_tag { self.sync_state = ListenerSyncState::FinishSyncThenQuit; } else { @@ -561,33 +596,74 @@ impl Component for ComponentTcpListener { } } }, - ListenerSyncState::Accept => { + ListenerSyncState::AcceptCommandReceived => { let socket = self.socket_state.get_socket(); match socket.accept() { Ok(client) => { - // TODO: Continue here, somehow precreate component, but do the transfer correctly + // Create the channels (and the inbox entries, to stay consistent + // with the expectations from the `component` module's functions) let client = client.unwrap(); - component::default_start_create_component() - todo!("actually create the component") + let cmd_channel = comp_ctx.create_channel(); + let data_channel = comp_ctx.create_channel(); + + let port_ids = [ + cmd_channel.putter_id, cmd_channel.getter_id, + data_channel.putter_id, data_channel.getter_id, + ]; + for port_id in port_ids { + let expected_port_index = self.inbox_main.len(); + let port_handle = comp_ctx.get_port_handle(port_id); + self.inbox_main.push(None); + self.consensus.notify_of_new_port(expected_port_index, port_handle, comp_ctx); + } + + // Construct the message containing the appropriate ports that will + // be sent to the component commanding this listener. + let mut values = ValueGroup::new_stack(vec![ + Value::Unassigned, Value::Unassigned + ]); + values.values[self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); + values.values[self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); + if let Err(location_and_message) = component::default_send_data_message( + &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values, + sched_ctx, &mut self.consensus, &mut self.control, comp_ctx + ) { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, location_and_message + ); + } + + // And finish the consensus round + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + + // Enter a state such that when we leave the consensus round and + // go back to the nonsync state, that we will actually create the + // tcp client component. + self.sync_state = ListenerSyncState::AcceptChannelGenerated { + client, + cmd_rx: cmd_channel.getter_id, + data_tx: data_channel.putter_id + }; + + return CompScheduling::Requeue; }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; } else { - todo!("handle listener.accept error {:?}", err); + todo!("handle listener.accept error {:?}", err) } } } }, + ListenerSyncState::AcceptChannelGenerated{ .. } => unreachable!(), ListenerSyncState::FinishSyncThenQuit => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; } } - return CompScheduling::Sleep; }, CompMode::BlockedGet => { - debug_assert_eq!(self.sync_state, ListenerSyncState::AwaitingCmd); return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index dc11dd2c27b065ab112be629ac1ce3d31144a575..05856ffc3820837f251057c98e963c98eb51ddf4 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -355,7 +355,7 @@ impl Consensus { self.solution.clear(); } - pub(crate) fn notify_received_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) { + pub(crate) fn notify_of_new_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) { debug_assert_eq!(_expected_index, self.ports.len()); let port_info = comp_ctx.get_port(port_handle); self.ports.push(PortAnnotation{ diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index 88605e5d9808734a936ecfeb38b77f2e79f46ffa..f1e52354ea015e4d79ed598efb5178670b2a088d 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -154,6 +154,12 @@ impl Drop for SocketTcpListener { } } +impl AsFileDescriptor for SocketTcpListener { + fn as_file_descriptor(&self) -> FileDescriptor { + return self.socket_handle; + } +} + /// Raw socket receiver. Essentially a listener that accepts a single connection struct SocketRawRx { listen_handle: c_int,