diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 48f002ef7e5f0619d249bdcee564d62977161b9f..2ced679fcbfca7f6f706bb51df86ac43baf9e775 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,6 +10,9 @@ use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; +use super::control_layer::*; +use super::consensus::Consensus; + pub enum CompScheduling { Immediate, Requeue, @@ -51,14 +54,6 @@ impl CompCtx { return Some(message); } - fn find_peer(&self, port_id: PortId) -> (&Port, &Peer) { - let port_index = self.get_port_index(port_id).unwrap(); - let port_info = &self.ports[port_index]; - let peer_index = self.get_peer_index(port_info.peer_comp_id).unwrap(); - let peer_info = &self.peers[peer_index]; - return (port_info, peer_info); - } - fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); @@ -80,7 +75,17 @@ impl CompCtx { return Channel{ putter_id, getter_id }; } - fn get_port_index(&self, port_id: PortId) -> Option { + fn get_port(&self, port_id: PortId) -> &Port { + let index = self.get_port_index(port_id).unwrap(); + return &self.ports[index]; + } + + pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port { + let index = self.get_port_index(port_id).unwrap(); + return &mut self.ports[index]; + } + + pub(crate) fn get_port_index(&self, port_id: PortId) -> Option { for (index, port) in self.ports.iter().enumerate() { if port.self_id == port_id { return Some(index); @@ -90,7 +95,17 @@ impl CompCtx { return None; } - fn get_peer_index(&self, peer_id: CompId) -> Option { + fn get_peer(&self, peer_id: CompId) -> &Peer { + let index = self.get_peer_index(peer_id).unwrap(); + return &self.peers[index]; + } + + fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer { + let index = self.get_peer_index(peer_id).unwrap(); + return &mut self.peers[index]; + } + + pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option { for (index, peer) in self.peers.iter().enumerate() { if peer.id == peer_id { return Some(index); @@ -180,7 +195,16 @@ pub(crate) struct CompPDL { pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put pub prompt: Prompt, + pub control: ControlLayer, + pub consensus: Consensus, + pub sync_counter: u32, pub exec_ctx: ExecCtx, + // TODO: Temporary field, simulates future plans of having one storage place + // reserved per port. + // Should be same length as the number of ports. Corresponding indices imply + // message is intended for that port. + pub inbox_main: Vec>, + pub inbox_backup: Vec, } impl CompPDL { @@ -190,13 +214,40 @@ impl CompPDL { mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), prompt: initial_state, + control: ControlLayer::default(), + consensus: Consensus::new(), + sync_counter: u32, exec_ctx: ExecCtx{ stmt: ExecStmt::None, - } + }, + inbox_main: Vec::new(), + inbox_backup: Vec::new(), + } + } + + pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + self.inbox.resize(comp_ctx.ports.len(), None); + } + + pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + if let Some(new_target) = self.control.should_reroute(&message) { + let target = sched_ctx.runtime.get_component_public(new_target); + target.inbox.push(message); + + return; + } + + match message { + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, + Message::Control(message) => { + self.handle_incoming_control_message(sched_ctx, comp_Ctx, message); + }, } } - pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; let run_result = self.execute_prompt(&sched_ctx)?; @@ -208,6 +259,7 @@ impl CompPDL { EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); @@ -227,8 +279,15 @@ impl CompPDL { }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); - let port_id = transform_port_id(port_id); - Self::send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + let port_id = port_id_from_eval(port_id); + let port_info = comp_ctx.get_port(port_id); + if port_info.state == PortState::Blocked { + + } else { + + } + self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + return Ok(CompScheduling::Immediate); }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { @@ -238,9 +297,20 @@ impl CompPDL { EC::SyncBlockStart => { debug_assert_eq!(self.mode, Mode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); + + let mut ports = Vec::new(); // TODO: Optimize + let protocol = &sched_ctx.runtime.protocol; + find_ports_in_value_group(&arguments, &mut ports); + let prompt = Prompt::new( + &protocol.types, &protocol.heap, + definition_id, monomorph_idx, arguments + ); + self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports); + return Ok(CompScheduling::Requeue); }, EC::NewChannel => { debug_assert_eq!(self.mode, Mode::NonSync); @@ -253,8 +323,6 @@ impl CompPDL { return Ok(CompScheduling::Immediate); } } - - return Ok(CompScheduling::Sleep); } fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { @@ -270,22 +338,24 @@ impl CompPDL { } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - + self.consensus.notify_sync_start(comp_ctx); + debug_assert_eq!(self.mode, Mode::NonSync); + self.mode = Mode::Sync; } fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - + self.consensus.notify_sync_end(); + debug_assert_eq!(self.mode, Mode::Sync); + self.mode = Mode::NonSync; } - fn send_message_and_wake_up(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, port_id: PortId, value: ValueGroup) { + fn send_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { use std::sync::atomic::Ordering; - let (port_info, peer_info) = comp_ctx.find_peer(port_id); - peer_info.handle.inbox.push(Message::Data(DataMessage{ - source_port_id: port_id, - target_port_id: port_info.peer_id, - content: value, - })); + let port_info = comp_ctx.get_port(source_port_id); + let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); + let annotated_message = self.consensus.annotate_message_data(port_info, value); + peer_info.handle.inbox.push(Message::Data(annotated_message)); let should_wake_up = peer_info.handle.sleeping.compare_exchange( true, false, Ordering::AcqRel, Ordering::Relaxed @@ -297,18 +367,165 @@ impl CompPDL { } } - fn create_component_and_transfer_ports(sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + // Check if we can insert it directly into the storage associated with + // the port + let target_port_id = message.data_header.target_port; + let port_index = comp_ctx.get_port_index(target_port_id).unwrap(); + if self.inbox_main[port_index].is_none() { + self.inbox_main[port_index] = Some(message); + + // After direct insertion, check if this component's execution is + // blocked on receiving a message on that port + debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly + if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port { + // We were indeed blocked + self.mode = Mode::Sync; + self.mode_port = PortId::new_invalid(); + } + + return; + } + + // The direct inbox is full, so the port will become (or was already) blocked + let port_info = &mut comp_ctx.ports[port_index]; + debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + + if port_info.state == PortState::Open { + let (target_comp_id, block_message) = + self.control.mark_port_blocked(target_port_id, comp_ctx); + debug_assert_eq!(port_info.peer_comp_id, target_comp_id); + + let peer = comp_ctx.get_peer(target_comp_id); + peer.handle.inbox.push(Message::Control(block_message)); + wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle); + } + + // But we still need to remember the message, so: + self.inbox_backup.push(message); + } + + fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { + match message.content { + ControlMessageContent::Ack => { + let mut to_ack = message.id; + loop { + let action = self.control.handle_ack(to_ack, comp_ctx); + match action { + AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { + // FIX @NoDirectHandle + let handle = sched_ctx.runtime.get_component_public(target_comp); + handle.inbox.push(Message::Control(message)); + wake_up_if_sleeping(sched_ctx, target_comp, &handle); + to_ack = new_to_ack; + }, + AckAction::ScheduleComponent(to_schedule) => { + // FIX @NoDirectHandle + let handle = sched_ctx.runtime.get_component_public(to_schedule); + wake_up_if_sleeping(sched_ctx, to_schedule, &handle); + break; + }, + AckAction::None => { + break; + } + } + } + }, + ControlMessageContent::BlockPort(port_id) => { + // On of our messages was accepted, but the port should be + // blocked. + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state != PortState::Closed { + debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages + port_info.state = PortState::Blocked; + } + }, + ControlMessageContent::UnblockPort(port_id) => { + // We were previously blocked (or already closed) + let port_info = comp_ctx.get_port(port_id); + debug_assert_eq!(port_info.kind, PortKind::Putter); + debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); + if port_info.state == PortState::Blocked { + self.unblock_port(sched_ctx, comp_ctx, port_id); + } + }, + ControlMessageContent::PortPeerChangedBlock(port_id) => { + // The peer of our port has just changed. So we are asked to + // temporarily block the port (while our original recipient is + // potentially rerouting some of the in-flight messages) and + // Ack. Then we wait for the `unblock` call. + debug_assert_eq!(message.target_port_id, port_id); + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + if port_info.state == PortState::Open { + port_info.state = PortState::Blocked; + } + }, + ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { + debug_assert_eq!(message.target_port_id, port_id); + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert!(port_info.state == PortState::Blocked); + port_info.peer_comp_id = new_comp_id; + self.unblock_port(sched_ctx, comp_ctx, port_id); + } + } + } + + fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.state, PortState::Blocked); + port_info.state = PortState::Open; + + if self.mode == Mode::BlockedPut && port_id == self.mode_port { + // We were blocked on the port that just became unblocked, so + // send the message. + let mut replacement = ValueGroup::default(); + std::mem::swap(&mut replacement, &mut self.mode_value); + self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); + + self.mode = Mode::Sync; + self.mode_port = PortId::new_invalid(); + } + } + + fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { let component = CompPDL::new(prompt); let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true); let created_ctx = &mut component.ctx; + let mut has_reroute_entry = false; + let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); + for port_id in ports.iter().copied() { - // Transfer port - let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); + // Create temporary reroute entry if the peer is another component + let port_info = creator_ctx.get_port(port_id); + debug_assert_ne!(port_info.state, PortState::Blocked); + if port_info.peer_comp_id == creator_ctx.id { + // We own the peer port. So retrieve it and modify the peer directly + let port_info = creator_ctx.get_port_mut(port_info.peer_id); + port_info.peer_comp_id = created_ctx.id; + } else { + // We don't own the port, so send the appropriate messages and + // notify the control layer + has_reroute_entry = true; + let message = self.control.add_reroute_entry( + creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, + port_info.self_id, created_ctx.id, schedule_entry_id + ); + let peer_info = creator_ctx.get_peer(port_info.peer_comp_id); + peer_info.handle.inbox.push(message); + } + + // Transfer port and create temporary reroute entry + let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); + if port_info.state == PortState::Blocked { + todo!("Think about this when you're not tired!"); + } Self::add_port_to_component(sched_ctx, created_ctx, port_info); // Maybe remove peer from the creator - if let Some(peer_info) = peer_info { + if let Some(mut peer_info) = peer_info { let remove_from_runtime = peer_info.handle.decrement_users(); if remove_from_runtime { let removed_comp_key = unsafe{ peer_info.id.upgrade() }; @@ -317,8 +534,11 @@ impl CompPDL { } } - // Start scheduling - sched_ctx.runtime.enqueue_work(comp_key); + if !has_reroute_entry { + // We can schedule the component immediately + self.control.remove_schedule_entry(schedule_entry_id); + sched_ctx.runtime.enqueue_work(comp_key); + } // else: wait for the `Ack`s, they will trigger the scheduling of the component } /// Removes a port from a component. Also decrements the port counter in @@ -365,7 +585,6 @@ impl CompPDL { }, None => { let handle = sched_ctx.runtime.get_component_public(peer_comp_id); - handle.increment_users(); comp_ctx.peers.push(Peer{ id: peer_comp_id, num_associated_ports: 1, @@ -374,6 +593,25 @@ impl CompPDL { } } } + + fn change_port_peer_component( + &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + port_id: PortId, new_peer_comp_id: CompId + ) { + let port_info = comp_ctx.get_port_mut(port_id); + let cur_peer_comp_id = port_info.peer_comp_id; + let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id); + cur_peer_info.num_associated_ports -= 1; + + if cur_peer_info.num_associated_ports == 0 { + let should_remove = cur_peer_info.handle.decrement_users(); + if should_remove { + let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() }; + sched_ctx.runtime.destroy_component(cur_peer_comp_key); + + } + } + } } #[inline] @@ -427,4 +665,20 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve for value in &value_group.values { find_port_in_value(value_group, value, ports); } +} + +/// If the component is sleeping, then that flag will be atomically set to +/// false. If we're the ones that made that happen then we add it to the work +/// queue. +fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { + use std::sync::atomic::Ordering; + + let should_wake_up = handle.sleeping + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let comp_key = unsafe{ comp_id.upgrade() }; + sched_ctx.runtime.enqueue_work(comp_key); + } } \ No newline at end of file