diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 0f2fd6c887a4062a0dff4fccee2df85abde65ce4..bb07b2eebc6ae0795010dcbafac7bff60d5c7f92 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -286,7 +286,8 @@ impl Component for CompPDL { }, CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect | - CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady => { + CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady | + CompMode::NewComponentBlocked => { return CompScheduling::Sleep; } CompMode::StartExit => return component::default_handle_start_exit( @@ -424,8 +425,9 @@ impl Component for CompPDL { }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.create_component_and_transfer_ports( - sched_ctx, comp_ctx, + component::default_start_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, definition_id, type_id, arguments ); return CompScheduling::Requeue; @@ -555,302 +557,4 @@ impl CompPDL { self.exec_state.set_as_start_exit(exit_reason); } - - // ------------------------------------------------------------------------- - // Handling ports - // ------------------------------------------------------------------------- - - /// Creates a new component and transfers ports. Because of the stepwise - /// process in which memory is allocated, ports are transferred, messages - /// are exchanged, component lifecycle methods are called, etc. This - /// function facilitates a lot of implicit assumptions (e.g. when the - /// `Component::on_creation` method is called, the component is already - /// registered at the runtime). - fn create_component_and_transfer_ports( - &mut self, - sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, - definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup - ) { - struct PortPair{ - creator_handle: LocalPortHandle, - creator_id: PortId, - created_handle: LocalPortHandle, - created_id: PortId, - } - let mut opened_port_id_pairs = Vec::new(); - let mut closed_port_id_pairs = Vec::new(); - - let reservation = sched_ctx.runtime.start_create_component(); - let mut created_ctx = CompCtx::new(&reservation); - - let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - dbg_code!({ - sched_ctx.info(&format!( - "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", - self_proc.identifier.value.as_str(), creator_ctx.id, - other_proc.identifier.value.as_str(), reservation.id() - )); - }); - - // Take all the ports ID that are in the `args` (and currently belong to - // the creator component) and translate them into new IDs that are - // associated with the component we're about to create - let mut arg_iter = ValueGroupPortIter::new(&mut arguments); - while let Some(port_reference) = arg_iter.next() { - // Create port entry for new component - let creator_port_id = port_reference.id; - let creator_port_handle = creator_ctx.get_port_handle(creator_port_id); - let creator_port = creator_ctx.get_port(creator_port_handle); - let created_port_handle = created_ctx.add_port( - creator_port.peer_comp_id, creator_port.peer_port_id, - creator_port.kind, creator_port.state - ); - let created_port = created_ctx.get_port(created_port_handle); - let created_port_id = created_port.self_id; - - let port_id_pair = PortPair { - creator_handle: creator_port_handle, - creator_id: creator_port_id, - created_handle: created_port_handle, - created_id: created_port_id, - }; - - if creator_port.state.is_closed() { - closed_port_id_pairs.push(port_id_pair) - } else { - opened_port_id_pairs.push(port_id_pair); - } - - // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) - let arg_value = if let Some(heap_pos) = port_reference.heap_pos { - &mut arg_iter.group.regions[heap_pos][port_reference.index] - } else { - &mut arg_iter.group.values[port_reference.index] - }; - match arg_value { - Value::Input(id) => *id = port_id_to_eval(created_port_id), - Value::Output(id) => *id = port_id_to_eval(created_port_id), - _ => unreachable!(), - } - } - - // For each transferred port pair set their peer components to the - // correct values. This will only change the values for the ports of - // the new component. - let mut created_component_has_remote_peers = false; - - for pair in opened_port_id_pairs.iter() { - let creator_port_info = creator_ctx.get_port(pair.creator_handle); - let created_port_info = created_ctx.get_port_mut(pair.created_handle); - - if created_port_info.peer_comp_id == creator_ctx.id { - // Peer of the transferred port is the component that is - // creating the new component. - let created_peer_port_index = opened_port_id_pairs - .iter() - .position(|v| v.creator_id == creator_port_info.peer_port_id); - match created_peer_port_index { - Some(created_peer_port_index) => { - // Addendum to the above comment: but that port is also - // moving to the new component - let peer_pair = &opened_port_id_pairs[created_peer_port_index]; - created_port_info.peer_port_id = peer_pair.created_id; - created_port_info.peer_comp_id = reservation.id(); - todo!("either add 'self peer', or remove that idea from Ctx altogether"); - }, - None => { - // Peer port remains with creator component. - created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id)); - } - } - } else { - // Peer is a different component. We'll deal with sending the - // appropriate messages later - let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); - let peer_info = creator_ctx.get_peer(peer_handle); - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); - created_component_has_remote_peers = true; - } - } - - // We'll now actually turn our reservation for a new component into an - // actual component. Note that we initialize it as "not sleeping" as - // its initial scheduling might be performed based on `Ack`s in response - // to message exchanges between remote peers. - let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); - let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); - let (created_key, component) = sched_ctx.runtime.finish_create_component( - reservation, component, created_ctx, false, - ); - component.component.on_creation(created_key.downgrade(), sched_ctx); - - // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. - for pair in opened_port_id_pairs.iter() { - // Remove peer if appropriate - let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None); - creator_ctx.remove_port(pair.creator_handle); - - // Transfer any messages - if let Some(mut message) = self.inbox_main.remove(creator_port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message) - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.creator_id { - // transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - - let created_port_info = component.ctx.get_port(pair.created_handle); - if created_port_info.peer_comp_id == creator_ctx.id { - // This handles the creation of a channel between the creator - // component and the newly created component. So if the creator - // had a `a -> b` channel, and `b` is moved to the new - // component, then `a` needs to set its peer component. - let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); - let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = component.ctx.id; - peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id)); - } - } - - // Do the same for the closed ports. Note that we might still have to - // transfer messages that cause the new owner of the port to fail. - for pair in closed_port_id_pairs.iter() { - let port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.remove_port(pair.creator_handle); - if let Some(mut message) = self.inbox_main.remove(port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.created_id { - // Transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - } - - // By now all ports and messages have been transferred. If there are any - // peers that need to be notified about this new component, then we - // initiate the protocol that will notify everyone here. - if created_component_has_remote_peers { - let created_ctx = &component.ctx; - let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in opened_port_id_pairs.iter() { - let port_info = created_ctx.get_port(pair.created_handle); - if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { - let message = self.control.add_reroute_entry( - creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, - pair.creator_id, pair.created_id, created_ctx.id, - schedule_entry_id - ); - let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message_logged(sched_ctx, message, true); - } - } - } else { - // Peer can be scheduled immediately - sched_ctx.runtime.enqueue_work(created_key); - } - } -} - -struct ValueGroupPortIter<'a> { - group: &'a mut ValueGroup, - heap_stack: Vec<(usize, usize)>, - index: usize, -} - -impl<'a> ValueGroupPortIter<'a> { - fn new(group: &'a mut ValueGroup) -> Self { - return Self{ group, heap_stack: Vec::new(), index: 0 } - } -} - -struct ValueGroupPortRef { - id: PortId, - heap_pos: Option, // otherwise: on stack - index: usize, -} - -impl<'a> Iterator for ValueGroupPortIter<'a> { - type Item = ValueGroupPortRef; - - fn next(&mut self) -> Option { - // Enter loop that keeps iterating until a port is found - loop { - if let Some(pos) = self.heap_stack.last() { - let (heap_pos, region_index) = *pos; - if region_index >= self.group.regions[heap_pos].len() { - self.heap_stack.pop(); - continue; - } - - let value = &self.group.regions[heap_pos][region_index]; - self.heap_stack.last_mut().unwrap().1 += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: Some(heap_pos), - index: region_index, - }); - }, - _ => {}, - } - - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } - } else { - if self.index >= self.group.values.len() { - return None; - } - - let value = &mut self.group.values[self.index]; - self.index += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: None, - index: self.index - 1 - }); - }, - _ => {}, - } - - // Not a port, check if we need to enter a heap region - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } // else: just consider the next value - } - } - } } \ No newline at end of file