diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index d5712961e4599ebea7ed0b73bbdf3aa03a2d2c81..f5e74559dca3ec8e1c6e26f10e21079df43d5b7c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -234,13 +234,14 @@ pub(crate) fn create_component( arguments: ValueGroup, num_ports: usize ) -> Box { let definition = &protocol.heap[definition_id]; - debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite); + debug_assert_eq!(definition.kind, ProcedureKind::Component); if definition.source.is_builtin() { // Builtin component let component: Box = match definition.source { ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), + ProcedureSource::CompTcpListener => Box::new(ComponentTcpListener::new(arguments)), _ => unreachable!(), }; @@ -472,7 +473,7 @@ pub(crate) fn default_handle_received_data_message( let new_port = comp_ctx.get_port(new_port_handle); // Add the port tho the consensus - consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx); + consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx); // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { @@ -735,7 +736,6 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - println!("DEBUG: Considering port:\n{:?}", port); if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end { // Already closed, or in the process of being closed continue; @@ -826,7 +826,113 @@ pub(crate) fn default_handle_sync_decision( } } +/// Special component creation function. This function assumes that the +/// transferred ports are NOT blocked, and that the channels to whom the ports +/// belong are fully owned by the creating component. This will be checked in +/// debug mode. +pub(crate) fn special_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + component_instance: Box, component_ports: Vec, +) { + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + let reservation = sched_ctx.runtime.start_create_component(); + let mut created_ctx = CompCtx::new(&reservation); + let mut port_pairs = Vec::new(); + + // Retrieve ports + for instantiator_port_id in component_ports.iter() { + let instantiator_port_id = *instantiator_port_id; + let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); + let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); + + // Check if conditions for calling this function are valid + debug_assert!(!instantiator_port.state.is_blocked_due_to_port_change()); + debug_assert_eq!(instantiator_port.peer_comp_id, instantiator_ctx.id); + + // Create port at new component + let created_port_handle = created_ctx.add_port( + instantiator_port.peer_comp_id, instantiator_port.peer_port_id, + instantiator_port.kind, instantiator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); + let created_port_id = created_port.self_id; + + // Store in port pairs + let is_open = instantiator_port.state.is_open(); + port_pairs.push(PortPair{ + instantiator_id: instantiator_port_id, + instantiator_handle: instantiator_port_handle, + created_id: created_port_id, + created_handle: created_port_handle, + is_open, + }); + } + + // Set peer of the port for the new component + for pair in port_pairs.iter() { + let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); + + // Note: we checked above (in debug mode) that the peer of the port is + // owned by the creator as well, now check if the peer is transferred + // as well. + let created_port_peer_index = port_pairs.iter() + .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); + + match created_port_peer_index { + Some(created_port_peer_index) => { + // Both ends of the channel are moving to the new component + let peer_pair = &port_pairs[created_port_peer_index]; + created_port_info.peer_port_id = peer_pair.created_id; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + created_port_info.peer_comp_id = instantiator_ctx.id; + if pair.is_open { + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); + } + } + } + } + + // Store component in runtime storage and retrieve component fields in their + // name memory location + let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( + reservation, component_instance, created_ctx, false + ); + + let created_ctx = &mut created_runtime_component.ctx; + let created_component = &mut created_runtime_component.component; + created_component.on_creation(created_key.downgrade(), sched_ctx); + + // Transfer messages and link instantiator to created component + for pair in port_pairs.iter() { + instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut()); + instantiator_ctx.remove_port(pair.instantiator_handle); + + let created_port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { + // Set up channel between instantiator component port, and its peer, + // which is owned by the new component + let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); + let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); + instantiator_port_info.peer_port_id = created_port_info.self_id; + instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); + } + } + // By definition we did not have any remote peers for the transferred ports, + // so we can schedule the new component immediately + sched_ctx.runtime.enqueue_work(created_key); +} + +/// Puts the component in an execution state where the specified component will +/// end up being created. The component goes through state changes (driven by +/// incoming control messages) to make sure that all of the ports that are going +/// to be transferred are not in a blocked state. Once finished the component +/// returns to the `NonSync` mode. pub(crate) fn default_start_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, @@ -851,15 +957,6 @@ pub(crate) fn perform_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { - // Small internal utilities - struct PortPair { - instantiator_id: PortId, - instantiator_handle: LocalPortHandle, - created_id: PortId, - created_handle: LocalPortHandle, - is_open: bool, - } - // Retrieve ports from the arguments debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); @@ -978,30 +1075,10 @@ pub(crate) fn perform_create_component( for pair in port_pairs.iter() { // Transferring the messages and removing the port from the // instantiator component - let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut()); instantiator_ctx.remove_port(pair.instantiator_handle); - if let Some(mut message) = inbox_main[instantiator_port_index].take() { - message.data_header.target_port = pair.created_id; - created_component.adopt_message(created_ctx, message); - } - - let mut message_index = 0; - while message_index < inbox_backup.len() { - let message = &inbox_backup[message_index]; - if message.data_header.target_port == pair.instantiator_id { - // Transfer the message - let mut message = inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - created_component.adopt_message(created_ctx, message); - } else { - // Message does not belong to the port pair that we're - // transferring to the new component. - message_index += 1; - } - } - // Here we take care of the case where the instantiator previously owned // both ends of the channel, but has transferred one port to the new // component (hence creating a channel between the instantiator @@ -1047,6 +1124,24 @@ pub(crate) fn perform_create_component( exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); } +#[inline] +pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { + debug_assert_eq!(_exec_state.mode, CompMode::Exit); + return CompScheduling::Exit; +} + +// ----------------------------------------------------------------------------- +// Internal messaging/state utilities +// ----------------------------------------------------------------------------- + +struct PortPair { + instantiator_id: PortId, + instantiator_handle: LocalPortHandle, + created_id: PortId, + created_handle: LocalPortHandle, + is_open: bool, +} + pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { for (_port_locations, port_id) in ports { let port_handle = comp_ctx.get_port_handle(*port_id); @@ -1060,6 +1155,32 @@ pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> return true; } +fn transfer_messages( + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, port_pair: &PortPair, + instantiator_ctx: &mut CompCtx, created_ctx: &mut CompCtx, created_component: &mut dyn Component +) { + let instantiator_port_index = instantiator_ctx.get_port_index(port_pair.instantiator_handle); + if let Some(mut message) = inbox_main.remove(instantiator_port_index) { + message.data_header.target_port = port_pair.created_id; + created_component.adopt_message(created_ctx, message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == port_pair.instantiator_id { + // Transfer the message + let mut message = inbox_backup.remove(message_index); + message.data_header.target_port = port_pair.created_id; + created_component.adopt_message(created_ctx, message); + } else { + // Message does not belong to the port pair that we're + // transferring to the new component. + message_index += 1; + } + } +} + /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more @@ -1080,16 +1201,6 @@ pub(crate) fn default_handle_error_for_builtin( exec_state.set_as_start_exit(exit_reason); } -#[inline] -pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { - debug_assert_eq!(_exec_state.mode, CompMode::Exit); - return CompScheduling::Exit; -} - -// ----------------------------------------------------------------------------- -// Internal messaging/state utilities -// ----------------------------------------------------------------------------- - /// Sends a message without any transmitted ports. Does not check if sending /// is actually valid. fn send_message_without_ports( @@ -1180,7 +1291,6 @@ fn perform_send_message_with_ports_notify_peers( // Block the peer of the port let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); - println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message); let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); @@ -1294,7 +1404,6 @@ fn default_handle_ack( AckAction::UnblockPutWithPorts => { // Send the message (containing ports) stored in the component // execution state to the recipient - println!("DEBUG: Unblocking put with ports"); debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks); exec_state.mode = CompMode::PutPortsBlockedSendingPort; let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);