diff --git a/src/protocol/eval/value.rs b/src/protocol/eval/value.rs index c4c5060485d669df5943ff6af48090d89b0f604e..97a9f0e8b47f17bdf8d9a75297b8256b2515bea7 100644 --- a/src/protocol/eval/value.rs +++ b/src/protocol/eval/value.rs @@ -261,6 +261,14 @@ impl ValueGroup { } } + /// Retrieves a mutable reference to the value given its ValueId. + pub(crate) fn get_value_mut(&mut self, id: ValueId) -> &mut Value { + match id { + ValueId::Stack(pos) => return &mut self.values[pos as usize], + ValueId::Heap(heap_pos, pos) => return &mut self.regions[heap_pos as usize][pos as usize], + } + } + fn provide_value(&self, value: &Value, to_store: &mut Store) -> Value { if let Some(from_heap_pos) = value.get_heap_pos() { let from_heap_pos = from_heap_pos as usize; diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 5e7bc0486149e21eb5da1081357caaf12687971d..9bbaebbf75274df3ddf976599aee226d2be1d5cc 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,3 +1,8 @@ +/* + * Default toolkit for creating components. Contains handlers for initiating and + * responding to various events. + */ + use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId}; @@ -80,6 +85,7 @@ pub(crate) enum CompMode { BlockedSelect, // waiting on message to complete the select statement BlockedPutPortsAwaitingAcks,// blocked because we're waiting to send a data message containing ports, but first need to receive Acks for the PortPeerChanged messages BlockedPutPortsReady, // blocked because we're waitingto send a data message containing ports + NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 @@ -92,7 +98,7 @@ impl CompMode { match self { Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => true, - NonSync | StartExit | BusyExit | Exit => false, + NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false, } } @@ -101,7 +107,8 @@ impl CompMode { match self { NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | - BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => false, + BlockedPutPortsAwaitingAcks | BlockedPutPortsReady | + NewComponentBlocked => false, StartExit | BusyExit => true, Exit => false, } @@ -142,6 +149,7 @@ pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put + pub mode_component: (ProcedureDefinitionId, TypeId), pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } @@ -151,6 +159,7 @@ impl CompExecState { mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), exit_reason: ExitReason::Termination, } } @@ -166,6 +175,15 @@ impl CompExecState { debug_assert!(self.mode_value.values.is_empty()); } + pub(crate) fn set_as_create_component_blocked( + &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId, + arguments: ValueGroup + ) { + self.mode = CompMode::NewComponentBlocked; + self.mode_value = arguments; + self.mode_component = (proc_id, type_id); + } + pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedGet && @@ -195,6 +213,10 @@ impl CompExecState { self.mode == CompMode::BlockedPutPortsReady && self.mode_port == port; } + + pub(crate) fn is_blocked_on_create_component(&self) -> bool { + return self.mode == CompMode::NewComponentBlocked; + } } // TODO: Replace when implementing port sending. Should probably be incorporated @@ -451,10 +473,7 @@ pub(crate) fn default_handle_received_data_message( // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { - let value = match message_location { - ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize], - ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize], - }; + let value = message.content.get_value_mut(message_location); match value { Value::Input(_) => { @@ -604,8 +623,8 @@ pub(crate) fn default_handle_control_message( port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); default_handle_recently_unblocked_port( - exec_state, consensus, port_handle, sched_ctx, comp_ctx, - inbox_main, inbox_backup + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup ); }, ControlMessageContent::PortPeerChangedBlock => { @@ -635,8 +654,8 @@ pub(crate) fn default_handle_control_message( port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); default_handle_recently_unblocked_port( - exec_state, consensus, port_handle, sched_ctx, comp_ctx, - inbox_main, inbox_backup + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup ); } } @@ -804,6 +823,240 @@ pub(crate) fn default_handle_sync_decision( } } + +pub(crate) fn default_start_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup +) { + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + + let mut transferred_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut transferred_ports); + + // Set execution state as waiting until we can create the component. If we + // can do so right away, then we will. + exec_state.set_as_create_component_blocked(definition_id, type_id, arguments); + if ports_not_blocked(comp_ctx, &transferred_ports) { + perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup); + } +} + +/// Actually creates a component (and assumes that the caller made sure that +/// none of the ports are involved in a blocking operation). +pub(crate) fn perform_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) { + // Small internal utilities + struct PortPair { + instantiator_id: PortId, + instantiator_handle: LocalPortHandle, + created_id: PortId, + created_handle: LocalPortHandle, + is_open: bool, + } + + // Retrieve ports from the arguments + debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); + + let (procedure_id, procedure_type_id) = exec_state.mode_component; + let mut arguments = exec_state.mode_value.take(); + let mut ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut ports); + debug_assert!(ports_not_blocked(instantiator_ctx, &ports)); + + // Reserve a location for the new component + let reservation = sched_ctx.runtime.start_create_component(); + let mut created_ctx = CompCtx::new(&reservation); + + let mut port_pairs = Vec::with_capacity(ports.len()); + + // Go over all the ports that will be transferred. Since the ports will get + // a new ID in the new component, we will take care of that here. + for (port_location, instantiator_port_id) in &ports { + // Retrieve port information from instantiator + let instantiator_port_id = *instantiator_port_id; + let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); + let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); + + // Create port at created component + let created_port_handle = created_ctx.add_port( + instantiator_port.peer_comp_id, instantiator_port.peer_port_id, + instantiator_port.kind, instantiator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); + let created_port_id = created_port.self_id; + + // Modify port ID in the arguments to the new component and store them + // for later access + let is_open = instantiator_port.state.is_open(); + port_pairs.push(PortPair{ + instantiator_id: instantiator_port_id, + instantiator_handle: instantiator_port_handle, + created_id: created_port_id, + created_handle: created_port_handle, + is_open, + }); + + for location in port_location.iter().copied() { + let value = arguments.get_value_mut(location); + match value { + Value::Input(id) => *id = port_id_to_eval(created_port_id), + Value::Output(id) => *id = port_id_to_eval(created_port_id), + _ => unreachable!(), + } + } + } + + // For each of the ports in the newly created component we set the peer to + // the correct value. We will not yet change the peer on the instantiator's + // ports (as we haven't yet stored the new component in the runtime's + // component storage) + let mut created_component_has_remote_peers = false; + for pair in port_pairs.iter() { + let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); + + if created_port_info.peer_comp_id == instantiator_ctx.id { + // The peer of the created component's port seems to be the + // instantiator. + let created_port_peer_index = port_pairs.iter() + .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); + + match created_port_peer_index { + Some(created_port_peer_index) => { + // However, the peer port is also moved to the new + // component, so the complete channel is owned by the new + // component. + let peer_pair = &port_pairs[created_port_peer_index]; + created_port_info.peer_port_id = peer_pair.created_id; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + // Peer port remains with instantiator. However, we cannot + // set the peer on the instantiator yet, because the new + // component has not yet been stored in the runtime's + // component storage. So we do this later + created_port_info.peer_comp_id = instantiator_ctx.id; + if pair.is_open { + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); + } + } + } + } else { + // Peer is a different component + if pair.is_open { + // And the port is still open, so we need to notify the peer + let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id); + let peer_info = instantiator_ctx.get_peer(peer_handle); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); + created_component_has_remote_peers = true; + } + } + } + + // Now we store the new component into the runtime's component storage using + // the reservation. + let component = create_component( + &sched_ctx.runtime.protocol, procedure_id, procedure_type_id, + arguments, port_pairs.len() + ); + let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( + reservation, component, created_ctx, false + ); + let created_ctx = &mut created_runtime_component.ctx; + let created_component = &mut created_runtime_component.component; + created_component.on_creation(created_key.downgrade(), sched_ctx); + + // We now pass along the messages that the instantiator component still has + // that belong to the new component. At the same time we'll take care of + // setting the correct peer of the instantiator component + for pair in port_pairs.iter() { + // Transferring the messages and removing the port from the + // instantiator component + let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); + instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + instantiator_ctx.remove_port(pair.instantiator_handle); + + if let Some(mut message) = inbox_main[instantiator_port_index].take() { + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == pair.instantiator_id { + // Transfer the message + let mut message = inbox_backup.remove(message_index); + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } else { + // Message does not belong to the port pair that we're + // transferring to the new component. + message_index += 1; + } + } + + // Here we take care of the case where the instantiator previously owned + // both ends of the channel, but has transferred one port to the new + // component (hence creating a channel between the instantiator + // component and the new component). + let created_port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { + // Note: the port we're receiving here belongs to the instantiator + // and is NOT in the "port_pairs" array. + let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); + let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); + instantiator_port_info.peer_port_id = created_port_info.self_id; + instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); + } + } + + // Finally: if we did move ports around whose peers are different + // components, then we'll initiate the appropriate protocol to notify them. + if created_component_has_remote_peers { + let schedule_entry_id = control.add_schedule_entry(created_ctx.id); + for pair in &port_pairs { + let port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id { + // Peer component is not the instantiator, and it is not the + // new component itself + let message = control.add_reroute_entry( + instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, + pair.instantiator_id, pair.created_id, created_ctx.id, + schedule_entry_id + ); + let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = created_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + } + } else { + // We can schedule the component immediately, we do not have to wait + // for any peers: there are none. + sched_ctx.runtime.enqueue_work(created_key); + } + + exec_state.mode = CompMode::NonSync; + exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); +} + +pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { + for (_port_locations, port_id) in ports { + let port_handle = comp_ctx.get_port_handle(*port_id); + let port_info = comp_ctx.get_port(port_handle); + + if port_info.state.is_blocked_due_to_port_change() { + return false; + } + } + + return true; +} + /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more @@ -1000,8 +1253,8 @@ fn default_handle_ack( let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); default_handle_recently_unblocked_port( - exec_state, consensus, port_handle, sched_ctx, comp_ctx, - inbox_main, inbox_backup + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup ); }, AckAction::None => {} @@ -1035,7 +1288,7 @@ fn default_send_ack( /// (stored in the execution state). In this latter case we might still have /// a blocked port. fn default_handle_recently_unblocked_port( - exec_state: &mut CompExecState, consensus: &mut Consensus, + exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { @@ -1076,6 +1329,14 @@ fn default_handle_recently_unblocked_port( exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); debug_assert!(exec_state.mode_value.values.is_empty()); + } else if exec_state.is_blocked_on_create_component() { + let mut ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut ports); + if ports_not_blocked(comp_ctx, &ports) { + perform_create_component( + exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup + ); + } } } diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 4ee51581b3f46d0b8c43f278b58a8b3fc7e391cb..1d00c2c83fa9b0ae269fe690f659ef94cf4c9e1d 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -79,6 +79,11 @@ impl PortState { self.is_set(PortStateFlag::BlockedDueToFullBuffers); } + #[inline] + pub fn is_blocked_due_to_port_change(&self) -> bool { + return self.is_set(PortStateFlag::BlockedDueToPeerChange); + } + // lower-level utils #[inline] pub fn set(&mut self, flag: PortStateFlag) { diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index cf10c736583f7d97e9f3040e6c5256eae01d2a11..b99866bd3bdf0ced33a409c11e79177ef3e32e37 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -127,7 +127,8 @@ impl Component for ComponentTcpClient { match self.exec_state.mode { CompMode::BlockedSelect | CompMode::BlockedPutPortsAwaitingAcks | - CompMode::BlockedPutPortsReady => { + CompMode::BlockedPutPortsReady | + CompMode::NewComponentBlocked => { // Not possible: we never enter this state unreachable!(); }, diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 0f2fd6c887a4062a0dff4fccee2df85abde65ce4..bb07b2eebc6ae0795010dcbafac7bff60d5c7f92 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -286,7 +286,8 @@ impl Component for CompPDL { }, CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect | - CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady => { + CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady | + CompMode::NewComponentBlocked => { return CompScheduling::Sleep; } CompMode::StartExit => return component::default_handle_start_exit( @@ -424,8 +425,9 @@ impl Component for CompPDL { }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.create_component_and_transfer_ports( - sched_ctx, comp_ctx, + component::default_start_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, definition_id, type_id, arguments ); return CompScheduling::Requeue; @@ -555,302 +557,4 @@ impl CompPDL { self.exec_state.set_as_start_exit(exit_reason); } - - // ------------------------------------------------------------------------- - // Handling ports - // ------------------------------------------------------------------------- - - /// Creates a new component and transfers ports. Because of the stepwise - /// process in which memory is allocated, ports are transferred, messages - /// are exchanged, component lifecycle methods are called, etc. This - /// function facilitates a lot of implicit assumptions (e.g. when the - /// `Component::on_creation` method is called, the component is already - /// registered at the runtime). - fn create_component_and_transfer_ports( - &mut self, - sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, - definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup - ) { - struct PortPair{ - creator_handle: LocalPortHandle, - creator_id: PortId, - created_handle: LocalPortHandle, - created_id: PortId, - } - let mut opened_port_id_pairs = Vec::new(); - let mut closed_port_id_pairs = Vec::new(); - - let reservation = sched_ctx.runtime.start_create_component(); - let mut created_ctx = CompCtx::new(&reservation); - - let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - dbg_code!({ - sched_ctx.info(&format!( - "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", - self_proc.identifier.value.as_str(), creator_ctx.id, - other_proc.identifier.value.as_str(), reservation.id() - )); - }); - - // Take all the ports ID that are in the `args` (and currently belong to - // the creator component) and translate them into new IDs that are - // associated with the component we're about to create - let mut arg_iter = ValueGroupPortIter::new(&mut arguments); - while let Some(port_reference) = arg_iter.next() { - // Create port entry for new component - let creator_port_id = port_reference.id; - let creator_port_handle = creator_ctx.get_port_handle(creator_port_id); - let creator_port = creator_ctx.get_port(creator_port_handle); - let created_port_handle = created_ctx.add_port( - creator_port.peer_comp_id, creator_port.peer_port_id, - creator_port.kind, creator_port.state - ); - let created_port = created_ctx.get_port(created_port_handle); - let created_port_id = created_port.self_id; - - let port_id_pair = PortPair { - creator_handle: creator_port_handle, - creator_id: creator_port_id, - created_handle: created_port_handle, - created_id: created_port_id, - }; - - if creator_port.state.is_closed() { - closed_port_id_pairs.push(port_id_pair) - } else { - opened_port_id_pairs.push(port_id_pair); - } - - // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) - let arg_value = if let Some(heap_pos) = port_reference.heap_pos { - &mut arg_iter.group.regions[heap_pos][port_reference.index] - } else { - &mut arg_iter.group.values[port_reference.index] - }; - match arg_value { - Value::Input(id) => *id = port_id_to_eval(created_port_id), - Value::Output(id) => *id = port_id_to_eval(created_port_id), - _ => unreachable!(), - } - } - - // For each transferred port pair set their peer components to the - // correct values. This will only change the values for the ports of - // the new component. - let mut created_component_has_remote_peers = false; - - for pair in opened_port_id_pairs.iter() { - let creator_port_info = creator_ctx.get_port(pair.creator_handle); - let created_port_info = created_ctx.get_port_mut(pair.created_handle); - - if created_port_info.peer_comp_id == creator_ctx.id { - // Peer of the transferred port is the component that is - // creating the new component. - let created_peer_port_index = opened_port_id_pairs - .iter() - .position(|v| v.creator_id == creator_port_info.peer_port_id); - match created_peer_port_index { - Some(created_peer_port_index) => { - // Addendum to the above comment: but that port is also - // moving to the new component - let peer_pair = &opened_port_id_pairs[created_peer_port_index]; - created_port_info.peer_port_id = peer_pair.created_id; - created_port_info.peer_comp_id = reservation.id(); - todo!("either add 'self peer', or remove that idea from Ctx altogether"); - }, - None => { - // Peer port remains with creator component. - created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id)); - } - } - } else { - // Peer is a different component. We'll deal with sending the - // appropriate messages later - let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); - let peer_info = creator_ctx.get_peer(peer_handle); - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); - created_component_has_remote_peers = true; - } - } - - // We'll now actually turn our reservation for a new component into an - // actual component. Note that we initialize it as "not sleeping" as - // its initial scheduling might be performed based on `Ack`s in response - // to message exchanges between remote peers. - let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); - let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); - let (created_key, component) = sched_ctx.runtime.finish_create_component( - reservation, component, created_ctx, false, - ); - component.component.on_creation(created_key.downgrade(), sched_ctx); - - // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. - for pair in opened_port_id_pairs.iter() { - // Remove peer if appropriate - let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None); - creator_ctx.remove_port(pair.creator_handle); - - // Transfer any messages - if let Some(mut message) = self.inbox_main.remove(creator_port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message) - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.creator_id { - // transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - - let created_port_info = component.ctx.get_port(pair.created_handle); - if created_port_info.peer_comp_id == creator_ctx.id { - // This handles the creation of a channel between the creator - // component and the newly created component. So if the creator - // had a `a -> b` channel, and `b` is moved to the new - // component, then `a` needs to set its peer component. - let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); - let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = component.ctx.id; - peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id)); - } - } - - // Do the same for the closed ports. Note that we might still have to - // transfer messages that cause the new owner of the port to fail. - for pair in closed_port_id_pairs.iter() { - let port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.remove_port(pair.creator_handle); - if let Some(mut message) = self.inbox_main.remove(port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.created_id { - // Transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - } - - // By now all ports and messages have been transferred. If there are any - // peers that need to be notified about this new component, then we - // initiate the protocol that will notify everyone here. - if created_component_has_remote_peers { - let created_ctx = &component.ctx; - let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in opened_port_id_pairs.iter() { - let port_info = created_ctx.get_port(pair.created_handle); - if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { - let message = self.control.add_reroute_entry( - creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, - pair.creator_id, pair.created_id, created_ctx.id, - schedule_entry_id - ); - let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message_logged(sched_ctx, message, true); - } - } - } else { - // Peer can be scheduled immediately - sched_ctx.runtime.enqueue_work(created_key); - } - } -} - -struct ValueGroupPortIter<'a> { - group: &'a mut ValueGroup, - heap_stack: Vec<(usize, usize)>, - index: usize, -} - -impl<'a> ValueGroupPortIter<'a> { - fn new(group: &'a mut ValueGroup) -> Self { - return Self{ group, heap_stack: Vec::new(), index: 0 } - } -} - -struct ValueGroupPortRef { - id: PortId, - heap_pos: Option, // otherwise: on stack - index: usize, -} - -impl<'a> Iterator for ValueGroupPortIter<'a> { - type Item = ValueGroupPortRef; - - fn next(&mut self) -> Option { - // Enter loop that keeps iterating until a port is found - loop { - if let Some(pos) = self.heap_stack.last() { - let (heap_pos, region_index) = *pos; - if region_index >= self.group.regions[heap_pos].len() { - self.heap_stack.pop(); - continue; - } - - let value = &self.group.regions[heap_pos][region_index]; - self.heap_stack.last_mut().unwrap().1 += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: Some(heap_pos), - index: region_index, - }); - }, - _ => {}, - } - - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } - } else { - if self.index >= self.group.values.len() { - return None; - } - - let value = &mut self.group.values[self.index]; - self.index += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: None, - index: self.index - 1 - }); - }, - _ => {}, - } - - // Not a port, check if we need to enter a heap region - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } // else: just consider the next value - } - } - } } \ No newline at end of file diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index 7c6063bc4d436adb21fdfb9c83290a3957d2e9ed..538448c89e06a2e7e4d17383a2fed66c921d9d02 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -63,7 +63,8 @@ impl Component for ComponentRandomU32 { match self.exec_state.mode { CompMode::BlockedGet | CompMode::BlockedSelect | - CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady => { + CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady | + CompMode::NewComponentBlocked => { // impossible for this component, no input ports and no select // blocks unreachable!();