diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index ed94ded96a5f9d0277ae3ffe542378e49cf3b5f3..36f5c267d82c7242ee2da12950b07ab58a05f5b1 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -218,7 +218,7 @@ pub(crate) struct CompPDL { // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. pub inbox_main: InboxMain, - pub inbox_backup: Vec, + pub inbox_backup: InboxBackup, } impl Component for CompPDL { @@ -241,15 +241,17 @@ impl Component for CompPDL { } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { - // sched_ctx.log(&format!("handling message: {:?}", message)); + sched_ctx.debug(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle + sched_ctx.debug(&format!("rerouting to: {:?}", new_target)); target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; } + sched_ctx.debug("handling message myself"); match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); @@ -257,7 +259,7 @@ impl Component for CompPDL { Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, - message, sched_ctx, comp_ctx + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup ) { self.handle_generic_component_error(sched_ctx, location_and_message); } @@ -282,7 +284,10 @@ impl Component for CompPDL { CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { + CompMode::SyncEnd | CompMode::BlockedGet | + CompMode::BlockedPut | CompMode::BlockedSelect | CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | CompMode::PutPortsBlockedSendingPort | + CompMode::NewComponentBlocked => { return CompScheduling::Sleep; } CompMode::StartExit => return component::default_handle_start_exit( @@ -342,7 +347,7 @@ impl Component for CompPDL { let send_result = component::default_send_data_message( &mut self.exec_state, target_port_id, PortInstruction::SourceLocation(expr_id), value, - sched_ctx, &mut self.consensus, comp_ctx + sched_ctx, &mut self.consensus, &mut self.control, comp_ctx ); if let Err(location_and_message) = send_result { self.handle_generic_component_error(sched_ctx, location_and_message); @@ -420,14 +425,14 @@ impl Component for CompPDL { }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.create_component_and_transfer_ports( - sched_ctx, comp_ctx, + component::default_start_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, definition_id, type_id, arguments ); return CompScheduling::Requeue; }, EC::NewChannel => { - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( @@ -552,343 +557,4 @@ impl CompPDL { self.exec_state.set_as_start_exit(exit_reason); } - - // ------------------------------------------------------------------------- - // Handling ports - // ------------------------------------------------------------------------- - - /// Creates a new component and transfers ports. Because of the stepwise - /// process in which memory is allocated, ports are transferred, messages - /// are exchanged, component lifecycle methods are called, etc. This - /// function facilitates a lot of implicit assumptions (e.g. when the - /// `Component::on_creation` method is called, the component is already - /// registered at the runtime). - fn create_component_and_transfer_ports( - &mut self, - sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, - definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup - ) { - struct PortPair{ - creator_handle: LocalPortHandle, - creator_id: PortId, - created_handle: LocalPortHandle, - created_id: PortId, - } - let mut opened_port_id_pairs = Vec::new(); - let mut closed_port_id_pairs = Vec::new(); - - let reservation = sched_ctx.runtime.start_create_pdl_component(); - let mut created_ctx = CompCtx::new(&reservation); - - // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - // dbg_code!({ - // sched_ctx.log(&format!( - // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", - // self_proc.identifier.value.as_str(), creator_ctx.id, - // other_proc.identifier.value.as_str(), reservation.id() - // )); - // }); - - // Take all the ports ID that are in the `args` (and currently belong to - // the creator component) and translate them into new IDs that are - // associated with the component we're about to create - let mut arg_iter = ValueGroupPortIter::new(&mut arguments); - while let Some(port_reference) = arg_iter.next() { - // Create port entry for new component - let creator_port_id = port_reference.id; - let creator_port_handle = creator_ctx.get_port_handle(creator_port_id); - let creator_port = creator_ctx.get_port(creator_port_handle); - let created_port_handle = created_ctx.add_port( - creator_port.peer_comp_id, creator_port.peer_port_id, - creator_port.kind, creator_port.state - ); - let created_port = created_ctx.get_port(created_port_handle); - let created_port_id = created_port.self_id; - - let port_id_pair = PortPair { - creator_handle: creator_port_handle, - creator_id: creator_port_id, - created_handle: created_port_handle, - created_id: created_port_id, - }; - - if creator_port.state.is_closed() { - closed_port_id_pairs.push(port_id_pair) - } else { - opened_port_id_pairs.push(port_id_pair); - } - - // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) - let arg_value = if let Some(heap_pos) = port_reference.heap_pos { - &mut arg_iter.group.regions[heap_pos][port_reference.index] - } else { - &mut arg_iter.group.values[port_reference.index] - }; - match arg_value { - Value::Input(id) => *id = port_id_to_eval(created_port_id), - Value::Output(id) => *id = port_id_to_eval(created_port_id), - _ => unreachable!(), - } - } - - // For each transferred port pair set their peer components to the - // correct values. This will only change the values for the ports of - // the new component. - let mut created_component_has_remote_peers = false; - - for pair in opened_port_id_pairs.iter() { - let creator_port_info = creator_ctx.get_port(pair.creator_handle); - let created_port_info = created_ctx.get_port_mut(pair.created_handle); - - if created_port_info.peer_comp_id == creator_ctx.id { - // Peer of the transferred port is the component that is - // creating the new component. - let created_peer_port_index = opened_port_id_pairs - .iter() - .position(|v| v.creator_id == creator_port_info.peer_port_id); - match created_peer_port_index { - Some(created_peer_port_index) => { - // Addendum to the above comment: but that port is also - // moving to the new component - let peer_pair = &opened_port_id_pairs[created_peer_port_index]; - created_port_info.peer_port_id = peer_pair.created_id; - created_port_info.peer_comp_id = reservation.id(); - todo!("either add 'self peer', or remove that idea from Ctx altogether");` - }, - None => { - // Peer port remains with creator component. - created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id)); - } - } - } else { - // Peer is a different component. We'll deal with sending the - // appropriate messages later - let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); - let peer_info = creator_ctx.get_peer(peer_handle); - created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); - created_component_has_remote_peers = true; - } - } - - // We'll now actually turn our reservation for a new component into an - // actual component. Note that we initialize it as "not sleeping" as - // its initial scheduling might be performed based on `Ack`s in response - // to message exchanges between remote peers. - let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); - let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); - let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( - reservation, component, created_ctx, false, - ); - component.component.on_creation(created_key.downgrade(), sched_ctx); - - // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. - for pair in opened_port_id_pairs.iter() { - // Remove peer if appropriate - let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None); - creator_ctx.remove_port(pair.creator_handle); - - // Transfer any messages - if let Some(mut message) = self.inbox_main.remove(creator_port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message) - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.creator_id { - // transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - - let created_port_info = component.ctx.get_port(pair.created_handle); - if created_port_info.peer_comp_id == creator_ctx.id { - // This handles the creation of a channel between the creator - // component and the newly created component. So if the creator - // had a `a -> b` channel, and `b` is moved to the new - // component, then `a` needs to set its peer component. - let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); - let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = component.ctx.id; - peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id)); - } - } - - // Do the same for the closed ports. Note that we might still have to - // transfer messages that cause the new owner of the port to fail. - for pair in closed_port_id_pairs.iter() { - let port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.remove_port(pair.creator_handle); - if let Some(mut message) = self.inbox_main.remove(port_index) { - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } - - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == pair.created_id { - // Transfer message - let mut message = self.inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - component.component.adopt_message(&mut component.ctx, message); - } else { - message_index += 1; - } - } - } - - // By now all ports and messages have been transferred. If there are any - // peers that need to be notified about this new component, then we - // initiate the protocol that will notify everyone here. - if created_component_has_remote_peers { - let created_ctx = &component.ctx; - let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in opened_port_id_pairs.iter() { - let port_info = created_ctx.get_port(pair.created_handle); - if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { - let message = self.control.add_reroute_entry( - creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, - pair.creator_id, pair.created_id, created_ctx.id, - schedule_entry_id - ); - let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message_logged(sched_ctx, message, true); - } - } - } else { - // Peer can be scheduled immediately - sched_ctx.runtime.enqueue_work(created_key); - } - } -} - -/// Recursively goes through the value group, attempting to find ports. -/// Duplicates will only be added once. -pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { - // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { - match value { - Value::Input(port_id) | Value::Output(port_id) => { - // This is an actual port - let cur_port = PortId(port_id.id); - for prev_port in ports.iter() { - if *prev_port == cur_port { - // Already added - return; - } - } - - ports.push(cur_port); - }, - Value::Array(heap_pos) | - Value::Message(heap_pos) | - Value::String(heap_pos) | - Value::Struct(heap_pos) | - Value::Union(_, heap_pos) => { - // Reference to some dynamic thing which might contain ports, - // so recurse - let heap_region = &group.regions[*heap_pos as usize]; - for embedded_value in heap_region { - find_port_in_value(group, embedded_value, ports); - } - }, - _ => {}, // values we don't care about - } - } - - // Clear the ports, then scan all the available values - ports.clear(); - for value in &value_group.values { - find_port_in_value(value_group, value, ports); - } -} - -struct ValueGroupPortIter<'a> { - group: &'a mut ValueGroup, - heap_stack: Vec<(usize, usize)>, - index: usize, -} - -impl<'a> ValueGroupPortIter<'a> { - fn new(group: &'a mut ValueGroup) -> Self { - return Self{ group, heap_stack: Vec::new(), index: 0 } - } -} - -struct ValueGroupPortRef { - id: PortId, - heap_pos: Option, // otherwise: on stack - index: usize, -} - -impl<'a> Iterator for ValueGroupPortIter<'a> { - type Item = ValueGroupPortRef; - - fn next(&mut self) -> Option { - // Enter loop that keeps iterating until a port is found - loop { - if let Some(pos) = self.heap_stack.last() { - let (heap_pos, region_index) = *pos; - if region_index >= self.group.regions[heap_pos].len() { - self.heap_stack.pop(); - continue; - } - - let value = &self.group.regions[heap_pos][region_index]; - self.heap_stack.last_mut().unwrap().1 += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: Some(heap_pos), - index: region_index, - }); - }, - _ => {}, - } - - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } - } else { - if self.index >= self.group.values.len() { - return None; - } - - let value = &mut self.group.values[self.index]; - self.index += 1; - - match value { - Value::Input(id) | Value::Output(id) => { - let id = PortId(id.id); - return Some(ValueGroupPortRef{ - id, - heap_pos: None, - index: self.index - 1 - }); - }, - _ => {}, - } - - // Not a port, check if we need to enter a heap region - if let Some(heap_pos) = value.get_heap_pos() { - self.heap_stack.push((heap_pos as usize, 0)); - } // else: just consider the next value - } - } - } } \ No newline at end of file