use crate::protocol::*; use crate::protocol::eval::{ PortId as EvalPortId, Prompt, ValueGroup, Value, EvalContinuation, EvalResult, EvalError }; use crate::runtime2::store::QueueDynMpsc; use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; use super::control_layer::*; use super::consensus::Consensus; pub enum CompScheduling { Immediate, Requeue, Sleep, Exit, } pub struct CompCtx { pub id: CompId, pub ports: Vec, pub peers: Vec, pub messages: Vec, // same size as "ports" pub port_id_counter: u32, } impl Default for CompCtx { fn default() -> Self { return Self{ id: CompId(0), ports: Vec::new(), peers: Vec::new(), messages: Vec::new(), port_id_counter: 0, } } } impl CompCtx { fn take_message(&mut self, port_id: PortId) -> Option { let port_index = self.get_port_index(port_id).unwrap(); let old_value = &mut self.messages[port_index]; if old_value.values.is_empty() { return None; } // Replace value in array with an empty one let mut message = ValueGroup::new_stack(Vec::new()); std::mem::swap(old_value, &mut message); return Some(message); } fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, state: PortState::Open, peer_comp_id: self.id, }); self.ports.push(Port{ self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, state: PortState::Closed, peer_comp_id: self.id, }); return Channel{ putter_id, getter_id }; } fn get_port(&self, port_id: PortId) -> &Port { let index = self.get_port_index(port_id).unwrap(); return &self.ports[index]; } pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port { let index = self.get_port_index(port_id).unwrap(); return &mut self.ports[index]; } pub(crate) fn get_port_index(&self, port_id: PortId) -> Option { for (index, port) in self.ports.iter().enumerate() { if port.self_id == port_id { return Some(index); } } return None; } fn get_peer(&self, peer_id: CompId) -> &Peer { let index = self.get_peer_index(peer_id).unwrap(); return &self.peers[index]; } fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer { let index = self.get_peer_index(peer_id).unwrap(); return &mut self.peers[index]; } pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option { for (index, peer) in self.peers.iter().enumerate() { if peer.id == peer_id { return Some(index); } } return None; } fn take_port_id(&mut self) -> u32 { let port_id = self.port_id_counter; self.port_id_counter = self.port_id_counter.wrapping_add(1); return port_id; } } pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), None, } impl ExecStmt { fn take(&mut self) -> ExecStmt { let mut value = ExecStmt::None; std::mem::swap(self, &mut value); return value; } fn is_none(&self) -> bool { match self { ExecStmt::None => return true, _ => return false, } } } pub struct ExecCtx { stmt: ExecStmt, } impl RunContext for ExecCtx { fn performed_put(&mut self, _port: EvalPortId) -> bool { match self.stmt.take() { ExecStmt::None => return false, ExecStmt::PerformedPut => return true, _ => unreachable!(), } } fn performed_get(&mut self, _port: EvalPortId) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedGet(value) => return Some(value), _ => unreachable!(), } } fn fires(&mut self, _port: EvalPortId) -> Option { todo!("remove fires") } fn performed_fork(&mut self) -> Option { todo!("remove fork") } fn created_channel(&mut self) -> Option<(Value, Value)> { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::CreatedChannel(ports) => return Some(ports), _ => unreachable!(), } } } #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum Mode { NonSync, Sync, BlockedGet, BlockedPut, } pub(crate) struct CompPDL { pub mode: Mode, pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, pub sync_counter: u32, pub exec_ctx: ExecCtx, // TODO: Temporary field, simulates future plans of having one storage place // reserved per port. // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. pub inbox_main: Vec>, pub inbox_backup: Vec, } impl CompPDL { pub(crate) fn new(initial_state: Prompt) -> Self { return Self{ mode: Mode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), sync_counter: u32, exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, inbox_main: Vec::new(), inbox_backup: Vec::new(), } } pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { self.inbox.resize(comp_ctx.ports.len(), None); } pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { if let Some(new_target) = self.control.should_reroute(&message) { let target = sched_ctx.runtime.get_component_public(new_target); target.inbox.push(message); return; } match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { self.handle_incoming_control_message(sched_ctx, comp_Ctx, message); }, } } pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; let run_result = self.execute_prompt(&sched_ctx)?; match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); let port_id = transform_port_id(port_id); if let Some(message) = comp_ctx.take_message(port_id) { // We can immediately receive and continue debug_assert!(self.exec_ctx.stmt.is_none()); self.exec_ctx.stmt = ExecStmt::PerformedGet(message); return Ok(CompScheduling::Immediate); } else { // We need to wait self.mode = Mode::BlockedGet; self.mode_port = port_id; return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); let port_id = port_id_from_eval(port_id); let port_info = comp_ctx.get_port(port_id); if port_info.state == PortState::Blocked { } else { } self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); return Ok(CompScheduling::Immediate); }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { debug_assert_eq!(self.mode, Mode::NonSync); return Ok(CompScheduling::Exit); }, EC::SyncBlockStart => { debug_assert_eq!(self.mode, Mode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); let mut ports = Vec::new(); // TODO: Optimize let protocol = &sched_ctx.runtime.protocol; find_ports_in_value_group(&arguments, &mut ports); let prompt = Prompt::new( &protocol.types, &protocol.heap, definition_id, monomorph_idx, arguments ); self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports); return Ok(CompScheduling::Requeue); }, EC::NewChannel => { debug_assert_eq!(self.mode, Mode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( Value::Output(port_id_to_eval(channel.putter_id)), Value::Input(port_id_to_eval(channel.getter_id)) )); return Ok(CompScheduling::Immediate); } } } fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; while let EvalContinuation::Stepping = step_result { step_result = self.prompt.step( &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, )?; } return Ok(step_result) } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { self.consensus.notify_sync_start(comp_ctx); debug_assert_eq!(self.mode, Mode::NonSync); self.mode = Mode::Sync; } fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { self.consensus.notify_sync_end(); debug_assert_eq!(self.mode, Mode::Sync); self.mode = Mode::NonSync; } fn send_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { use std::sync::atomic::Ordering; let port_info = comp_ctx.get_port(source_port_id); let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); let annotated_message = self.consensus.annotate_message_data(port_info, value); peer_info.handle.inbox.push(Message::Data(annotated_message)); let should_wake_up = peer_info.handle.sleeping.compare_exchange( true, false, Ordering::AcqRel, Ordering::Relaxed ).is_ok(); if should_wake_up { let comp_key = unsafe{ peer_info.id.upgrade() }; sched_ctx.runtime.enqueue_work(comp_key); } } fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { // Check if we can insert it directly into the storage associated with // the port let target_port_id = message.data_header.target_port; let port_index = comp_ctx.get_port_index(target_port_id).unwrap(); if self.inbox_main[port_index].is_none() { self.inbox_main[port_index] = Some(message); // After direct insertion, check if this component's execution is // blocked on receiving a message on that port debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port { // We were indeed blocked self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); } return; } // The direct inbox is full, so the port will become (or was already) blocked let port_info = &mut comp_ctx.ports[port_index]; debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); if port_info.state == PortState::Open { let (target_comp_id, block_message) = self.control.mark_port_blocked(target_port_id, comp_ctx); debug_assert_eq!(port_info.peer_comp_id, target_comp_id); let peer = comp_ctx.get_peer(target_comp_id); peer.handle.inbox.push(Message::Control(block_message)); wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle); } // But we still need to remember the message, so: self.inbox_backup.push(message); } fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { match message.content { ControlMessageContent::Ack => { let mut to_ack = message.id; loop { let action = self.control.handle_ack(to_ack, comp_ctx); match action { AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { // FIX @NoDirectHandle let handle = sched_ctx.runtime.get_component_public(target_comp); handle.inbox.push(Message::Control(message)); wake_up_if_sleeping(sched_ctx, target_comp, &handle); to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle let handle = sched_ctx.runtime.get_component_public(to_schedule); wake_up_if_sleeping(sched_ctx, to_schedule, &handle); break; }, AckAction::None => { break; } } } }, ControlMessageContent::BlockPort(port_id) => { // On of our messages was accepted, but the port should be // blocked. let port_info = comp_ctx.get_port_mut(port_id); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state != PortState::Closed { debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages port_info.state = PortState::Blocked; } }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) let port_info = comp_ctx.get_port(port_id); debug_assert_eq!(port_info.kind, PortKind::Putter); debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); if port_info.state == PortState::Blocked { self.unblock_port(sched_ctx, comp_ctx, port_id); } }, ControlMessageContent::PortPeerChangedBlock(port_id) => { // The peer of our port has just changed. So we are asked to // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. debug_assert_eq!(message.target_port_id, port_id); let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); if port_info.state == PortState::Open { port_info.state = PortState::Blocked; } }, ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { debug_assert_eq!(message.target_port_id, port_id); let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Blocked); port_info.peer_comp_id = new_comp_id; self.unblock_port(sched_ctx, comp_ctx, port_id); } } } fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { let port_info = comp_ctx.get_port_mut(port_id); debug_assert_eq!(port_info.state, PortState::Blocked); port_info.state = PortState::Open; if self.mode == Mode::BlockedPut && port_id == self.mode_port { // We were blocked on the port that just became unblocked, so // send the message. let mut replacement = ValueGroup::default(); std::mem::swap(&mut replacement, &mut self.mode_value); self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); } } fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { let component = CompPDL::new(prompt); let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true); let created_ctx = &mut component.ctx; let mut has_reroute_entry = false; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); for port_id in ports.iter().copied() { // Create temporary reroute entry if the peer is another component let port_info = creator_ctx.get_port(port_id); debug_assert_ne!(port_info.state, PortState::Blocked); if port_info.peer_comp_id == creator_ctx.id { // We own the peer port. So retrieve it and modify the peer directly let port_info = creator_ctx.get_port_mut(port_info.peer_id); port_info.peer_comp_id = created_ctx.id; } else { // We don't own the port, so send the appropriate messages and // notify the control layer has_reroute_entry = true; let message = self.control.add_reroute_entry( creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, port_info.self_id, created_ctx.id, schedule_entry_id ); let peer_info = creator_ctx.get_peer(port_info.peer_comp_id); peer_info.handle.inbox.push(message); } // Transfer port and create temporary reroute entry let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); if port_info.state == PortState::Blocked { todo!("Think about this when you're not tired!"); } Self::add_port_to_component(sched_ctx, created_ctx, port_info); // Maybe remove peer from the creator if let Some(mut peer_info) = peer_info { let remove_from_runtime = peer_info.handle.decrement_users(); if remove_from_runtime { let removed_comp_key = unsafe{ peer_info.id.upgrade() }; sched_ctx.runtime.destroy_component(removed_comp_key); } } } if !has_reroute_entry { // We can schedule the component immediately self.control.remove_schedule_entry(schedule_entry_id); sched_ctx.runtime.enqueue_work(comp_key); } // else: wait for the `Ack`s, they will trigger the scheduling of the component } /// Removes a port from a component. Also decrements the port counter in /// the peer component's entry. If that hits 0 then it will be removed and /// returned. If returned then the caller is responsible for decrementing /// the atomic counters of the peer component's handle. fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option) { use std::sync::atomic::Ordering; let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = comp_ctx.ports.remove(port_index); // If the component owns the peer, then we don't have to decrement the // number of peers (because we don't have an entry for ourselves) if port_info.peer_comp_id == comp_ctx.id { return (port_info, None); } let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports -= 1; // Check if we still have other ports referencing this peer if peer_info.num_associated_ports != 0 { return (port_info, None); } let peer_info = comp_ctx.peers.remove(peer_index); return (port_info, Some(peer_info)); } fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) { // Add the port info let peer_comp_id = port_info.peer_comp_id; debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id)); comp_ctx.ports.push(port_info); // Increment counters on peer, or create entry for peer if it doesn't // exist yet. match comp_ctx.peers.iter().position(|v| v.id == peer_comp_id) { Some(peer_index) => { let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports += 1; }, None => { let handle = sched_ctx.runtime.get_component_public(peer_comp_id); comp_ctx.peers.push(Peer{ id: peer_comp_id, num_associated_ports: 1, handle, }); } } } fn change_port_peer_component( &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId, new_peer_comp_id: CompId ) { let port_info = comp_ctx.get_port_mut(port_id); let cur_peer_comp_id = port_info.peer_comp_id; let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id); cur_peer_info.num_associated_ports -= 1; if cur_peer_info.num_associated_ports == 0 { let should_remove = cur_peer_info.handle.decrement_users(); if should_remove { let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() }; sched_ctx.runtime.destroy_component(cur_peer_comp_key); } } } } #[inline] fn port_id_from_eval(port_id: EvalPortId) -> PortId { return PortId(port_id.id); } #[inline] fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. use crate::protocol::eval::Value; fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortId(port_id.id); for prev_port in ports.iter() { if *prev_port == cur_port { // Already added return; } } ports.push(cur_port); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } } /// If the component is sleeping, then that flag will be atomically set to /// false. If we're the ones that made that happen then we add it to the work /// queue. fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { use std::sync::atomic::Ordering; let should_wake_up = handle.sleeping .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) .is_ok(); if should_wake_up { let comp_key = unsafe{ comp_id.upgrade() }; sched_ctx.runtime.enqueue_work(comp_key); } }