diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 0937682ed8884505429fe1e2ac2ef4671afe234f..f1d24bd7abf6821c74823e93f5f4e59401463f83 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -2,6 +2,7 @@ use crate::protocol::eval::*; use crate::protocol::*; use crate::runtime2::*; use super::{CompCtx, CompPDL}; +use super::component_ip::*; pub enum CompScheduling { Immediate, @@ -39,7 +40,12 @@ pub(crate) fn create_component( if definition.source.is_builtin() { // Builtin component - todo!("implement") + let component = match definition.source { + ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), + _ => unreachable!(), + }; + + return component; } else { // User-defined component let prompt = Prompt::new( @@ -49,4 +55,15 @@ pub(crate) fn create_component( let component = CompPDL::new(prompt, num_ports); return Box::new(component); } -} \ No newline at end of file +} + + +#[inline] +pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { + return PortId(port_id.id); +} + +#[inline] +pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { + return EvalPortId{ id: port_id.0 }; +} diff --git a/src/runtime2/component/component_ip.rs b/src/runtime2/component/component_ip.rs index 9f63e10b2f4b5b173c8b5b0b1da36f46e4acf710..c4f438ed4c93990fd82b1e16345ed6f1c78d5a73 100644 --- a/src/runtime2/component/component_ip.rs +++ b/src/runtime2/component/component_ip.rs @@ -1,13 +1,17 @@ use crate::protocol::eval::*; use crate::runtime2::*; use super::*; +use super::component::*; /// TODO: Temporary component to figure out what to do with custom components. /// This component sends random numbers between two u32 limits -struct ComponentRandSend { +pub struct ComponentRandomU32 { + output_port_id: PortId, + random_minimum: u32, + random_maximum: u32, } -impl Component for ComponentRandSend { +impl Component for ComponentRandomU32 { fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { unreachable!("should not adopt messages"); } @@ -19,4 +23,20 @@ impl Component for ComponentRandSend { fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { todo!() } +} + +impl ComponentRandomU32 { + pub(crate) fn new(arguments: ValueGroup) -> Self { + debug_assert_eq!(arguments.values.len(), 3); + debug_assert!(arguments.regions.is_empty()); + let port_id = port_id_from_eval(arguments.values[0].as_port_id()); + let minimum = arguments.values[1].as_uint32(); + let maximum = arguments.values[2].as_uint32(); + + return ComponentRandomU32{ + output_port_id: port_id, + random_minimum: minimum, + random_maximum: maximum, + } + } } \ No newline at end of file diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 3f3450e88a7a301c828c75b40e73fa7568a59583..4e735be3142b0246610035fb13e7d4a7befa2b01 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -799,11 +799,22 @@ impl CompPDL { created_handle: LocalPortHandle, created_id: PortId, } - let mut port_id_pairs = Vec::new(); + let mut opened_port_id_pairs = Vec::new(); + let mut closed_port_id_pairs = Vec::new(); + + // TODO: @Nocommit + let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); + println!( + "DEBUG: Comp '{}' is creating comp '{}' at ID {:?}", + self_proc.identifier.value.as_str(), other_proc.identifier.value.as_str(), + reservation.id() + ); + // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are // associated with the component we're about to create @@ -820,12 +831,18 @@ impl CompPDL { let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; - port_id_pairs.push(PortPair{ + let port_id_pair = PortPair { creator_handle: creator_port_handle, creator_id: creator_port_id, created_handle: created_port_handle, created_id: created_port_id, - }); + }; + + if creator_port.state == PortState::Closed { + closed_port_id_pairs.push(port_id_pair) + } else { + opened_port_id_pairs.push(port_id_pair); + } // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) let arg_value = if let Some(heap_pos) = port_reference.heap_pos { @@ -845,20 +862,20 @@ impl CompPDL { // the new component. let mut created_component_has_remote_peers = false; - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let creator_port_info = creator_ctx.get_port(pair.creator_handle); let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well - let created_peer_port_index = port_id_pairs + let created_peer_port_index = opened_port_id_pairs .iter() .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { // Peer port moved to the new component as well. So // adjust IDs appropriately. - let peer_pair = &port_id_pairs[created_peer_port_index]; + let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); todo!("either add 'self peer', or remove that idea from Ctx altogether") @@ -883,18 +900,15 @@ impl CompPDL { // actual component. Note that we initialize it as "not sleeping" as // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. - let prompt = Prompt::new( - &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, - definition_id, type_id, arguments, - ); - let component = CompPDL::new(prompt, port_id_pairs.len()); + let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); + let component = create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); // Now modify the creator's ports: remove every transferred port and // potentially remove the peer component. - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); @@ -933,13 +947,25 @@ impl CompPDL { } } + // Do the same for the closed ports + for pair in closed_port_id_pairs.iter() { + let port_index = creator_ctx.get_port_index(pair.creator_handle); + creator_ctx.remove_port(pair.creator_handle); + let _removed_message = self.inbox_main.remove(port_index); + + // In debug mode: since we've closed the port we shouldn't have any + // messages for that port. + debug_assert!(_removed_message.is_none()); + debug_assert!(!self.inbox_backup.iter().any(|v| v.data_header.target_port == pair.creator_id)); + } + // By now all ports and messages have been transferred. If there are any // peers that need to be notified about this new component, then we // initiate the protocol that will notify everyone here. if created_component_has_remote_peers { let created_ctx = &component.ctx; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created_handle); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( @@ -959,16 +985,6 @@ impl CompPDL { } } -#[inline] -fn port_id_from_eval(port_id: EvalPortId) -> PortId { - return PortId(port_id.id); -} - -#[inline] -fn port_id_to_eval(port_id: PortId) -> EvalPortId { - return EvalPortId{ id: port_id.0 }; -} - /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 4f2a18d823c102b3582bfd433ff9c2ae37619d4c..d3e432ed679759a8d87bbe6cbcff6433e4a8660c 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -194,7 +194,8 @@ impl Runtime { )?; let reserved = self.inner.start_create_pdl_component(); let ctx = CompCtx::new(&reserved); - let (key, _) = self.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false); + let component = Box::new(CompPDL::new(prompt, 0)); + let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false); self.inner.enqueue_work(key); return Ok(()) @@ -247,9 +248,9 @@ impl RuntimeInner { return CompReserved{ reservation }; } - pub(crate) fn finish_create_pdl_component( + pub(crate) fn finish_create_pdl_component( &self, reserved: CompReserved, - component: C, mut context: CompCtx, initially_sleeping: bool, + component: Box, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); @@ -262,7 +263,7 @@ impl RuntimeInner { num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, - component: Box::new(component), + component, ctx: context, inbox: inbox_queue, exiting: false, diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 7900d9cfb8b1c8b8b355b75b695ce51d89678b8d..30568689f005d5c7ab31d56b024adb674253a55c 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -9,7 +9,8 @@ fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: V ).expect("create prompt"); let reserved = rt.inner.start_create_pdl_component(); let ctx = CompCtx::new(&reserved); - let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false); + let component = Box::new(CompPDL::new(prompt, 0)); + let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false); rt.inner.enqueue_work(key); } @@ -236,4 +237,6 @@ fn test_random_u32_temporary_thingo() { new random_taker(rx); } ").expect("compilation"); + let rt = Runtime::new(1, true, pd); + create_component(&rt, "", "constructor", no_args()); } \ No newline at end of file