diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 5e7bc0486149e21eb5da1081357caaf12687971d..9bbaebbf75274df3ddf976599aee226d2be1d5cc 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,3 +1,8 @@ +/* + * Default toolkit for creating components. Contains handlers for initiating and + * responding to various events. + */ + use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId}; @@ -80,6 +85,7 @@ pub(crate) enum CompMode { BlockedSelect, // waiting on message to complete the select statement BlockedPutPortsAwaitingAcks,// blocked because we're waiting to send a data message containing ports, but first need to receive Acks for the PortPeerChanged messages BlockedPutPortsReady, // blocked because we're waitingto send a data message containing ports + NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 @@ -92,7 +98,7 @@ impl CompMode { match self { Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => true, - NonSync | StartExit | BusyExit | Exit => false, + NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false, } } @@ -101,7 +107,8 @@ impl CompMode { match self { NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | - BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => false, + BlockedPutPortsAwaitingAcks | BlockedPutPortsReady | + NewComponentBlocked => false, StartExit | BusyExit => true, Exit => false, } @@ -142,6 +149,7 @@ pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put + pub mode_component: (ProcedureDefinitionId, TypeId), pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } @@ -151,6 +159,7 @@ impl CompExecState { mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), exit_reason: ExitReason::Termination, } } @@ -166,6 +175,15 @@ impl CompExecState { debug_assert!(self.mode_value.values.is_empty()); } + pub(crate) fn set_as_create_component_blocked( + &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId, + arguments: ValueGroup + ) { + self.mode = CompMode::NewComponentBlocked; + self.mode_value = arguments; + self.mode_component = (proc_id, type_id); + } + pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedGet && @@ -195,6 +213,10 @@ impl CompExecState { self.mode == CompMode::BlockedPutPortsReady && self.mode_port == port; } + + pub(crate) fn is_blocked_on_create_component(&self) -> bool { + return self.mode == CompMode::NewComponentBlocked; + } } // TODO: Replace when implementing port sending. Should probably be incorporated @@ -451,10 +473,7 @@ pub(crate) fn default_handle_received_data_message( // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { - let value = match message_location { - ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize], - ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize], - }; + let value = message.content.get_value_mut(message_location); match value { Value::Input(_) => { @@ -604,8 +623,8 @@ pub(crate) fn default_handle_control_message( port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); default_handle_recently_unblocked_port( - exec_state, consensus, port_handle, sched_ctx, comp_ctx, - inbox_main, inbox_backup + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup ); }, ControlMessageContent::PortPeerChangedBlock => { @@ -635,8 +654,8 @@ pub(crate) fn default_handle_control_message( port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); default_handle_recently_unblocked_port( - exec_state, consensus, port_handle, sched_ctx, comp_ctx, - inbox_main, inbox_backup + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup ); } } @@ -804,6 +823,240 @@ pub(crate) fn default_handle_sync_decision( } } + +pub(crate) fn default_start_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup +) { + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + + let mut transferred_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut transferred_ports); + + // Set execution state as waiting until we can create the component. If we + // can do so right away, then we will. + exec_state.set_as_create_component_blocked(definition_id, type_id, arguments); + if ports_not_blocked(comp_ctx, &transferred_ports) { + perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup); + } +} + +/// Actually creates a component (and assumes that the caller made sure that +/// none of the ports are involved in a blocking operation). +pub(crate) fn perform_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) { + // Small internal utilities + struct PortPair { + instantiator_id: PortId, + instantiator_handle: LocalPortHandle, + created_id: PortId, + created_handle: LocalPortHandle, + is_open: bool, + } + + // Retrieve ports from the arguments + debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); + + let (procedure_id, procedure_type_id) = exec_state.mode_component; + let mut arguments = exec_state.mode_value.take(); + let mut ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut ports); + debug_assert!(ports_not_blocked(instantiator_ctx, &ports)); + + // Reserve a location for the new component + let reservation = sched_ctx.runtime.start_create_component(); + let mut created_ctx = CompCtx::new(&reservation); + + let mut port_pairs = Vec::with_capacity(ports.len()); + + // Go over all the ports that will be transferred. Since the ports will get + // a new ID in the new component, we will take care of that here. + for (port_location, instantiator_port_id) in &ports { + // Retrieve port information from instantiator + let instantiator_port_id = *instantiator_port_id; + let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); + let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); + + // Create port at created component + let created_port_handle = created_ctx.add_port( + instantiator_port.peer_comp_id, instantiator_port.peer_port_id, + instantiator_port.kind, instantiator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); + let created_port_id = created_port.self_id; + + // Modify port ID in the arguments to the new component and store them + // for later access + let is_open = instantiator_port.state.is_open(); + port_pairs.push(PortPair{ + instantiator_id: instantiator_port_id, + instantiator_handle: instantiator_port_handle, + created_id: created_port_id, + created_handle: created_port_handle, + is_open, + }); + + for location in port_location.iter().copied() { + let value = arguments.get_value_mut(location); + match value { + Value::Input(id) => *id = port_id_to_eval(created_port_id), + Value::Output(id) => *id = port_id_to_eval(created_port_id), + _ => unreachable!(), + } + } + } + + // For each of the ports in the newly created component we set the peer to + // the correct value. We will not yet change the peer on the instantiator's + // ports (as we haven't yet stored the new component in the runtime's + // component storage) + let mut created_component_has_remote_peers = false; + for pair in port_pairs.iter() { + let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); + + if created_port_info.peer_comp_id == instantiator_ctx.id { + // The peer of the created component's port seems to be the + // instantiator. + let created_port_peer_index = port_pairs.iter() + .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); + + match created_port_peer_index { + Some(created_port_peer_index) => { + // However, the peer port is also moved to the new + // component, so the complete channel is owned by the new + // component. + let peer_pair = &port_pairs[created_port_peer_index]; + created_port_info.peer_port_id = peer_pair.created_id; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + // Peer port remains with instantiator. However, we cannot + // set the peer on the instantiator yet, because the new + // component has not yet been stored in the runtime's + // component storage. So we do this later + created_port_info.peer_comp_id = instantiator_ctx.id; + if pair.is_open { + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); + } + } + } + } else { + // Peer is a different component + if pair.is_open { + // And the port is still open, so we need to notify the peer + let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id); + let peer_info = instantiator_ctx.get_peer(peer_handle); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); + created_component_has_remote_peers = true; + } + } + } + + // Now we store the new component into the runtime's component storage using + // the reservation. + let component = create_component( + &sched_ctx.runtime.protocol, procedure_id, procedure_type_id, + arguments, port_pairs.len() + ); + let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( + reservation, component, created_ctx, false + ); + let created_ctx = &mut created_runtime_component.ctx; + let created_component = &mut created_runtime_component.component; + created_component.on_creation(created_key.downgrade(), sched_ctx); + + // We now pass along the messages that the instantiator component still has + // that belong to the new component. At the same time we'll take care of + // setting the correct peer of the instantiator component + for pair in port_pairs.iter() { + // Transferring the messages and removing the port from the + // instantiator component + let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); + instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + instantiator_ctx.remove_port(pair.instantiator_handle); + + if let Some(mut message) = inbox_main[instantiator_port_index].take() { + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == pair.instantiator_id { + // Transfer the message + let mut message = inbox_backup.remove(message_index); + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } else { + // Message does not belong to the port pair that we're + // transferring to the new component. + message_index += 1; + } + } + + // Here we take care of the case where the instantiator previously owned + // both ends of the channel, but has transferred one port to the new + // component (hence creating a channel between the instantiator + // component and the new component). + let created_port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { + // Note: the port we're receiving here belongs to the instantiator + // and is NOT in the "port_pairs" array. + let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); + let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); + instantiator_port_info.peer_port_id = created_port_info.self_id; + instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); + } + } + + // Finally: if we did move ports around whose peers are different + // components, then we'll initiate the appropriate protocol to notify them. + if created_component_has_remote_peers { + let schedule_entry_id = control.add_schedule_entry(created_ctx.id); + for pair in &port_pairs { + let port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id { + // Peer component is not the instantiator, and it is not the + // new component itself + let message = control.add_reroute_entry( + instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, + pair.instantiator_id, pair.created_id, created_ctx.id, + schedule_entry_id + ); + let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = created_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + } + } else { + // We can schedule the component immediately, we do not have to wait + // for any peers: there are none. + sched_ctx.runtime.enqueue_work(created_key); + } + + exec_state.mode = CompMode::NonSync; + exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); +} + +pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { + for (_port_locations, port_id) in ports { + let port_handle = comp_ctx.get_port_handle(*port_id); + let port_info = comp_ctx.get_port(port_handle); + + if port_info.state.is_blocked_due_to_port_change() { + return false; + } + } + + return true; +} + /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more @@ -1000,8 +1253,8 @@ fn default_handle_ack( let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); default_handle_recently_unblocked_port( - exec_state, consensus, port_handle, sched_ctx, comp_ctx, - inbox_main, inbox_backup + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup ); }, AckAction::None => {} @@ -1035,7 +1288,7 @@ fn default_send_ack( /// (stored in the execution state). In this latter case we might still have /// a blocked port. fn default_handle_recently_unblocked_port( - exec_state: &mut CompExecState, consensus: &mut Consensus, + exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { @@ -1076,6 +1329,14 @@ fn default_handle_recently_unblocked_port( exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); debug_assert!(exec_state.mode_value.values.is_empty()); + } else if exec_state.is_blocked_on_create_component() { + let mut ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut ports); + if ports_not_blocked(comp_ctx, &ports) { + perform_create_component( + exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup + ); + } } }