diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 4fee962a9b41daa870d84b2b60e71585d5b630a3..c4bee587535d12a0410fcc24ff6545f853431c3c 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -1,10 +1,14 @@ use crate::protocol::eval::*; use super::runtime::*; +use super::component::*; #[derive(Copy, Clone)] pub struct PortId(pub u32); + impl PortId { + /// This value is not significant, it is chosen to make debugging easier: a + /// very large port number is more likely to shine a light on bugs. pub fn new_invalid() -> Self { return Self(u32::MAX); } @@ -16,11 +20,13 @@ pub struct Peer { pub(crate) handle: CompHandle, } +#[derive(Debug, PartialEq, Eq)] pub enum PortKind { Putter, Getter, } +#[derive(Debug, PartialEq, Eq)] pub enum PortState { Open, Blocked, @@ -41,22 +47,36 @@ pub struct Channel { } pub struct DataMessage { - pub source_port_id: PortId, - pub target_port_id: PortId, + pub data_header: MessageDataHeader, + pub sync_header: MessageSyncHeader, pub content: ValueGroup, } +pub struct MessageSyncHeader { + pub sync_round: u32, +} + +pub struct MessageDataHeader { + pub expected_mapping: Vec<(PortId, u32)>, + pub new_mapping: u32, + pub source_port: PortId, + pub target_port: PortId, +} + pub struct ControlMessage { - pub id: u32, + pub id: ControlId, pub sender_comp_id: CompId, - pub content: ControlContent, + pub target_port_id: Option, + pub content: ControlMessageContent, } -pub enum ControlContent { +#[derive(Copy, Clone)] +pub enum ControlMessageContent { Ack, - Ping, - PortPeerChangedBlock, - PortPeerChangedUnblock, + BlockPort(PortId), + UnblockPort(PortId), + PortPeerChangedBlock(PortId), + PortPeerChangedUnblock(PortId, CompId), } pub enum Message { @@ -64,4 +84,15 @@ pub enum Message { Control(ControlMessage), } +impl Message { + pub(crate) fn target_port(&self) -> Option { + match self { + Message::Data(v) => + return Some(v.data_header.target_port), + Message::Control(v) => + return v.target_port_id, + } + } +} + diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 48f002ef7e5f0619d249bdcee564d62977161b9f..2ced679fcbfca7f6f706bb51df86ac43baf9e775 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,6 +10,9 @@ use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; +use super::control_layer::*; +use super::consensus::Consensus; + pub enum CompScheduling { Immediate, Requeue, @@ -51,14 +54,6 @@ impl CompCtx { return Some(message); } - fn find_peer(&self, port_id: PortId) -> (&Port, &Peer) { - let port_index = self.get_port_index(port_id).unwrap(); - let port_info = &self.ports[port_index]; - let peer_index = self.get_peer_index(port_info.peer_comp_id).unwrap(); - let peer_info = &self.peers[peer_index]; - return (port_info, peer_info); - } - fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); @@ -80,7 +75,17 @@ impl CompCtx { return Channel{ putter_id, getter_id }; } - fn get_port_index(&self, port_id: PortId) -> Option { + fn get_port(&self, port_id: PortId) -> &Port { + let index = self.get_port_index(port_id).unwrap(); + return &self.ports[index]; + } + + pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port { + let index = self.get_port_index(port_id).unwrap(); + return &mut self.ports[index]; + } + + pub(crate) fn get_port_index(&self, port_id: PortId) -> Option { for (index, port) in self.ports.iter().enumerate() { if port.self_id == port_id { return Some(index); @@ -90,7 +95,17 @@ impl CompCtx { return None; } - fn get_peer_index(&self, peer_id: CompId) -> Option { + fn get_peer(&self, peer_id: CompId) -> &Peer { + let index = self.get_peer_index(peer_id).unwrap(); + return &self.peers[index]; + } + + fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer { + let index = self.get_peer_index(peer_id).unwrap(); + return &mut self.peers[index]; + } + + pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option { for (index, peer) in self.peers.iter().enumerate() { if peer.id == peer_id { return Some(index); @@ -180,7 +195,16 @@ pub(crate) struct CompPDL { pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put pub prompt: Prompt, + pub control: ControlLayer, + pub consensus: Consensus, + pub sync_counter: u32, pub exec_ctx: ExecCtx, + // TODO: Temporary field, simulates future plans of having one storage place + // reserved per port. + // Should be same length as the number of ports. Corresponding indices imply + // message is intended for that port. + pub inbox_main: Vec>, + pub inbox_backup: Vec, } impl CompPDL { @@ -190,13 +214,40 @@ impl CompPDL { mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), prompt: initial_state, + control: ControlLayer::default(), + consensus: Consensus::new(), + sync_counter: u32, exec_ctx: ExecCtx{ stmt: ExecStmt::None, - } + }, + inbox_main: Vec::new(), + inbox_backup: Vec::new(), + } + } + + pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + self.inbox.resize(comp_ctx.ports.len(), None); + } + + pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + if let Some(new_target) = self.control.should_reroute(&message) { + let target = sched_ctx.runtime.get_component_public(new_target); + target.inbox.push(message); + + return; + } + + match message { + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, + Message::Control(message) => { + self.handle_incoming_control_message(sched_ctx, comp_Ctx, message); + }, } } - pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; let run_result = self.execute_prompt(&sched_ctx)?; @@ -208,6 +259,7 @@ impl CompPDL { EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); @@ -227,8 +279,15 @@ impl CompPDL { }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); - let port_id = transform_port_id(port_id); - Self::send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + let port_id = port_id_from_eval(port_id); + let port_info = comp_ctx.get_port(port_id); + if port_info.state == PortState::Blocked { + + } else { + + } + self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + return Ok(CompScheduling::Immediate); }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { @@ -238,9 +297,20 @@ impl CompPDL { EC::SyncBlockStart => { debug_assert_eq!(self.mode, Mode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); + + let mut ports = Vec::new(); // TODO: Optimize + let protocol = &sched_ctx.runtime.protocol; + find_ports_in_value_group(&arguments, &mut ports); + let prompt = Prompt::new( + &protocol.types, &protocol.heap, + definition_id, monomorph_idx, arguments + ); + self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports); + return Ok(CompScheduling::Requeue); }, EC::NewChannel => { debug_assert_eq!(self.mode, Mode::NonSync); @@ -253,8 +323,6 @@ impl CompPDL { return Ok(CompScheduling::Immediate); } } - - return Ok(CompScheduling::Sleep); } fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { @@ -270,22 +338,24 @@ impl CompPDL { } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - + self.consensus.notify_sync_start(comp_ctx); + debug_assert_eq!(self.mode, Mode::NonSync); + self.mode = Mode::Sync; } fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - + self.consensus.notify_sync_end(); + debug_assert_eq!(self.mode, Mode::Sync); + self.mode = Mode::NonSync; } - fn send_message_and_wake_up(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, port_id: PortId, value: ValueGroup) { + fn send_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { use std::sync::atomic::Ordering; - let (port_info, peer_info) = comp_ctx.find_peer(port_id); - peer_info.handle.inbox.push(Message::Data(DataMessage{ - source_port_id: port_id, - target_port_id: port_info.peer_id, - content: value, - })); + let port_info = comp_ctx.get_port(source_port_id); + let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); + let annotated_message = self.consensus.annotate_message_data(port_info, value); + peer_info.handle.inbox.push(Message::Data(annotated_message)); let should_wake_up = peer_info.handle.sleeping.compare_exchange( true, false, Ordering::AcqRel, Ordering::Relaxed @@ -297,18 +367,165 @@ impl CompPDL { } } - fn create_component_and_transfer_ports(sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + // Check if we can insert it directly into the storage associated with + // the port + let target_port_id = message.data_header.target_port; + let port_index = comp_ctx.get_port_index(target_port_id).unwrap(); + if self.inbox_main[port_index].is_none() { + self.inbox_main[port_index] = Some(message); + + // After direct insertion, check if this component's execution is + // blocked on receiving a message on that port + debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly + if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port { + // We were indeed blocked + self.mode = Mode::Sync; + self.mode_port = PortId::new_invalid(); + } + + return; + } + + // The direct inbox is full, so the port will become (or was already) blocked + let port_info = &mut comp_ctx.ports[port_index]; + debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + + if port_info.state == PortState::Open { + let (target_comp_id, block_message) = + self.control.mark_port_blocked(target_port_id, comp_ctx); + debug_assert_eq!(port_info.peer_comp_id, target_comp_id); + + let peer = comp_ctx.get_peer(target_comp_id); + peer.handle.inbox.push(Message::Control(block_message)); + wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle); + } + + // But we still need to remember the message, so: + self.inbox_backup.push(message); + } + + fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { + match message.content { + ControlMessageContent::Ack => { + let mut to_ack = message.id; + loop { + let action = self.control.handle_ack(to_ack, comp_ctx); + match action { + AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { + // FIX @NoDirectHandle + let handle = sched_ctx.runtime.get_component_public(target_comp); + handle.inbox.push(Message::Control(message)); + wake_up_if_sleeping(sched_ctx, target_comp, &handle); + to_ack = new_to_ack; + }, + AckAction::ScheduleComponent(to_schedule) => { + // FIX @NoDirectHandle + let handle = sched_ctx.runtime.get_component_public(to_schedule); + wake_up_if_sleeping(sched_ctx, to_schedule, &handle); + break; + }, + AckAction::None => { + break; + } + } + } + }, + ControlMessageContent::BlockPort(port_id) => { + // On of our messages was accepted, but the port should be + // blocked. + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state != PortState::Closed { + debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages + port_info.state = PortState::Blocked; + } + }, + ControlMessageContent::UnblockPort(port_id) => { + // We were previously blocked (or already closed) + let port_info = comp_ctx.get_port(port_id); + debug_assert_eq!(port_info.kind, PortKind::Putter); + debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); + if port_info.state == PortState::Blocked { + self.unblock_port(sched_ctx, comp_ctx, port_id); + } + }, + ControlMessageContent::PortPeerChangedBlock(port_id) => { + // The peer of our port has just changed. So we are asked to + // temporarily block the port (while our original recipient is + // potentially rerouting some of the in-flight messages) and + // Ack. Then we wait for the `unblock` call. + debug_assert_eq!(message.target_port_id, port_id); + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + if port_info.state == PortState::Open { + port_info.state = PortState::Blocked; + } + }, + ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { + debug_assert_eq!(message.target_port_id, port_id); + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert!(port_info.state == PortState::Blocked); + port_info.peer_comp_id = new_comp_id; + self.unblock_port(sched_ctx, comp_ctx, port_id); + } + } + } + + fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.state, PortState::Blocked); + port_info.state = PortState::Open; + + if self.mode == Mode::BlockedPut && port_id == self.mode_port { + // We were blocked on the port that just became unblocked, so + // send the message. + let mut replacement = ValueGroup::default(); + std::mem::swap(&mut replacement, &mut self.mode_value); + self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); + + self.mode = Mode::Sync; + self.mode_port = PortId::new_invalid(); + } + } + + fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { let component = CompPDL::new(prompt); let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true); let created_ctx = &mut component.ctx; + let mut has_reroute_entry = false; + let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); + for port_id in ports.iter().copied() { - // Transfer port - let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); + // Create temporary reroute entry if the peer is another component + let port_info = creator_ctx.get_port(port_id); + debug_assert_ne!(port_info.state, PortState::Blocked); + if port_info.peer_comp_id == creator_ctx.id { + // We own the peer port. So retrieve it and modify the peer directly + let port_info = creator_ctx.get_port_mut(port_info.peer_id); + port_info.peer_comp_id = created_ctx.id; + } else { + // We don't own the port, so send the appropriate messages and + // notify the control layer + has_reroute_entry = true; + let message = self.control.add_reroute_entry( + creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, + port_info.self_id, created_ctx.id, schedule_entry_id + ); + let peer_info = creator_ctx.get_peer(port_info.peer_comp_id); + peer_info.handle.inbox.push(message); + } + + // Transfer port and create temporary reroute entry + let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); + if port_info.state == PortState::Blocked { + todo!("Think about this when you're not tired!"); + } Self::add_port_to_component(sched_ctx, created_ctx, port_info); // Maybe remove peer from the creator - if let Some(peer_info) = peer_info { + if let Some(mut peer_info) = peer_info { let remove_from_runtime = peer_info.handle.decrement_users(); if remove_from_runtime { let removed_comp_key = unsafe{ peer_info.id.upgrade() }; @@ -317,8 +534,11 @@ impl CompPDL { } } - // Start scheduling - sched_ctx.runtime.enqueue_work(comp_key); + if !has_reroute_entry { + // We can schedule the component immediately + self.control.remove_schedule_entry(schedule_entry_id); + sched_ctx.runtime.enqueue_work(comp_key); + } // else: wait for the `Ack`s, they will trigger the scheduling of the component } /// Removes a port from a component. Also decrements the port counter in @@ -365,7 +585,6 @@ impl CompPDL { }, None => { let handle = sched_ctx.runtime.get_component_public(peer_comp_id); - handle.increment_users(); comp_ctx.peers.push(Peer{ id: peer_comp_id, num_associated_ports: 1, @@ -374,6 +593,25 @@ impl CompPDL { } } } + + fn change_port_peer_component( + &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + port_id: PortId, new_peer_comp_id: CompId + ) { + let port_info = comp_ctx.get_port_mut(port_id); + let cur_peer_comp_id = port_info.peer_comp_id; + let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id); + cur_peer_info.num_associated_ports -= 1; + + if cur_peer_info.num_associated_ports == 0 { + let should_remove = cur_peer_info.handle.decrement_users(); + if should_remove { + let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() }; + sched_ctx.runtime.destroy_component(cur_peer_comp_key); + + } + } + } } #[inline] @@ -427,4 +665,20 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve for value in &value_group.values { find_port_in_value(value_group, value, ports); } +} + +/// If the component is sleeping, then that flag will be atomically set to +/// false. If we're the ones that made that happen then we add it to the work +/// queue. +fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { + use std::sync::atomic::Ordering; + + let should_wake_up = handle.sleeping + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let comp_key = unsafe{ comp_id.upgrade() }; + sched_ctx.runtime.enqueue_work(comp_key); + } } \ No newline at end of file diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs new file mode 100644 index 0000000000000000000000000000000000000000..68ea40f957186ec832cff45e89cb9bbd8a25be29 --- /dev/null +++ b/src/runtime2/component/consensus.rs @@ -0,0 +1,104 @@ +use crate::protocol::eval::*; +use crate::runtime2::communication::*; + +use super::component_pdl::*; + +pub struct PortAnnotation { + id: PortId, + mapping: Option, +} + +impl PortAnnotation { + fn new(id: PortId) -> Self { + return Self{ id, mapping: None } + } +} + +/// Tracking consensus state +pub struct Consensus { + round: u32, + mapping_counter: u32, + ports: Vec, +} + +impl Consensus { + pub(crate) fn new() -> Self { + return Self{ + round: 0, + mapping_counter: 0, + ports: Vec::new(), + } + } + + pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) { + // Make sure we locally still have all of the same ports + self.transfer_ports(comp_ctx); + self.mapping_counter = 0; + } + + pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage { + debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id)); + let data_header = self.create_data_header(port_info); + let sync_header = self.create_sync_header(); + + return DataMessage{ data_header, sync_header, content }; + } + + pub(crate) fn notify_sync_end(&mut self) { + self.round = self.round.wrapping_add(1); + todo!("implement sync end") + } + + pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) { + let mut needs_setting_ports = false; + if comp_ctx.ports.len() != self.ports.len() { + ports_same = true; + } else { + for idx in 0..comp_ctx.ports.len() { + let comp_port_id = comp_ctx.ports[idx].self_id; + let cons_port_id = self.ports[idx].id; + if comp_port_id != cons_port_id { + needs_setting_ports = true; + break; + } + } + } + + if needs_setting_ports { + self.ports.clear(); + self.ports.reserve(comp_ctx.ports.len()); + for port in &comp_ctx.ports { + self.ports.push(PortAnnotation::new(port.self_id)) + } + } + } + + fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader { + let mut expected_mapping = Vec::with_capacity(self.ports.len()); + for port in &self.ports { + if let Some(mapping) = port.mapping { + expected_mapping.push((port.id, mapping)); + } + } + + debug_assert_eq!(port_info.kind, PortKind::Putter); + return MessageDataHeader{ + expected_mapping, + new_mapping: self.take_mapping(), + source_port: port_info.self_id, + target_port: port_info.peer_id, + }; + } + + fn create_sync_header(&self) -> MessageSyncHeader { + return MessageSyncHeader{ + sync_round: self.round, + }; + } + + fn take_mapping(&mut self) -> u32 { + let mapping = self.mapping_counter; + self.mapping_counter += 1; + return mapping; + } +} \ No newline at end of file diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 75bd7c1f096fa10c0b355d0c4ea5577bab7eedcd..67b481c073e8c16a09533a7711d9864bab934326 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -2,51 +2,270 @@ use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use crate::runtime2::component::*; +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub(crate) struct ControlId(u32); + +impl ControlId { + /// Like other invalid IDs, this one doesn't care any significance, but is + /// just set at u32::MAX to hopefully bring out bugs sooner. + fn new_invalid() -> Self { + return ControlId(u32::MAX); + } +} + struct ControlEntry { - id: u32, + id: ControlId, ack_countdown: u32, content: ControlContent, - ack_action: ControlAction, } enum ControlContent { - PeerChange(ControlPeerChange), + PeerChange(ContentPeerChange), + ScheduleComponent(ContentScheduleComponent), + BlockedPort(ContentBlockedPort), } -struct ControlPeerChange { +struct ContentPeerChange { source_port: PortId, - target_port: PortId, // if sent to this port - new_target_comp: CompId, // redirect to this component + source_comp: CompId, + target_port: PortId, + new_target_comp: CompId, + schedule_entry_id: ControlId, +} + +struct ContentScheduleComponent { + to_schedule: CompId, } -/// Action to be taken when the `Ack`s for a control entry has come in. -enum ControlAction { - Nothing, - AckOwnEntry(u32), // ack an entry we own ourselves - ScheduleComponent(CompId), // schedule a particular component for execution +struct ContentBlockedPort { + blocked_port: PortId, +} + +pub(crate) enum AckAction { + None, + SendMessageAndAck(CompId, ControlMessage, ControlId), + ScheduleComponent(CompId), } /// Handling/sending control messages. pub(crate) struct ControlLayer { - id_counter: u32, + id_counter: ControlId, entries: Vec, } impl ControlLayer { - fn handle_created_component(&mut self, creator_ctx: &CompCtx, created_ctx: &CompCtx) { - for peer in &created_ctx.peers { - // TODO: Optimize when we ourselves are the peer. + pub(crate) fn should_reroute(&self, message: &Message) -> Option { + // Safety note: rerouting should occur during the time when we're + // notifying a peer of a new component. During this period that + // component hasn't been executed yet, so cannot have died yet. + // FIX @NoDirectHandle + let target_port = message.target_port(); + if target_port.is_none() { + return None; + } + + let target_port = target_port.unwrap(); + for entry in &self.entries { + if let ControlContent::PeerChange(entry) = &entry.content { + if entry.target_port == target_port { + return Some(entry.new_target_comp); + } + } + } + + return None; + } + + pub(crate) fn handle_ack(&mut self, entry_id: ControlId, comp_ctx: &CompCtx) -> AckAction { + let entry_index = self.get_entry_index(entry_id).unwrap(); + let entry = &mut self.entries[entry_index]; + debug_assert!(entry.ack_countdown > 0); + + entry.ack_countdown -= 1; + if entry.ack_countdown != 0 { + return AckAction::None; + } + + // All `Ack`s received, take action based on the kind of entry + match &entry.content { + ControlContent::PeerChange(content) => { + // If change of peer is ack'd. Then we are certain we have + // rerouted all of the messages, and the sender's port can now + // be unblocked again. + let target_comp_id = content.source_comp; + let message_to_send = ControlMessage{ + id: ControlId::new_invalid(), + sender_comp_id: comp_ctx.id, + target_port_id: Some(content.source_port), + content: ControlMessageContent::PortPeerChangedUnblock( + content.source_port, + content.new_target_comp + ) + }; + let to_ack = content.schedule_entry_id; - // Create entry that will unblock the peer if it confirms that all - // of its ports have been blocked + self.entries.remove(entry_index); + self.handle_ack(to_ack, comp_ctx); - peer.handle.inbox.push(Message::) + return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); + }, + ControlContent::ScheduleComponent(content) => { + // If all change-of-peers are `Ack`d, then we're ready to + // schedule the component! + return AckAction::ScheduleComponent(content.to_schedule); + }, + ControlContent::BlockedPort(_) => unreachable!(), } } - fn take_id(&mut self) -> u32 { + // ------------------------------------------------------------------------- + // Port transfer (due to component creation) + // ------------------------------------------------------------------------- + + /// Adds an entry that, when completely ack'd, will schedule a component. + pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId { + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 0, // incremented by calls to `add_reroute_entry` + content: ControlContent::ScheduleComponent(ContentScheduleComponent{ + to_schedule: to_schedule_id + }), + }); + + return entry_id; + } + + /// Removes a schedule entry. Only used if the caller preemptively called + /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, + /// hence the `ack_countdown` in the scheduling entry is at 0. + pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { + let index = self.get_entry_index(schedule_entry_id).unwrap(); + debug_assert_eq!(self.entries[index].ack_countdown, 0); + self.entries.remove(index); + } + + pub(crate) fn add_reroute_entry( + &mut self, creator_comp_id: CompId, + source_port_id: PortId, source_comp_id: CompId, + target_port_id: PortId, new_comp_id: CompId, + schedule_entry_id: ControlId, + ) -> Message { + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 1, + content: ControlContent::PeerChange(ContentPeerChange{ + source_port: source_port_id, + source_comp: source_comp_id, + target_port: target_port_id, + new_target_comp: new_comp_id, + schedule_entry_id, + }), + }); + + // increment counter on schedule entry + for entry in &mut self.entries { + if entry.id == schedule_entry_id { + entry.ack_countdown += 1; + break; + } + } + + return Message::Control(ControlMessage{ + id: entry_id, + sender_comp_id: creator_comp_id, + target_port_id: Some(source_port_id), + content: ControlMessageContent::PortPeerChangedBlock(source_port_id) + }) + } + + // ------------------------------------------------------------------------- + // Blocking and unblocking ports + // ------------------------------------------------------------------------- + + pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { + // TODO: Feels like this shouldn't be an entry. Hence this class should + // be renamed. Lets see where the code ends up being + let entry_id = self.take_id(); + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues + port_info.state = PortState::Blocked; + + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 0, + content: ControlContent::BlockedPort(ContentBlockedPort{ + blocked_port: port_id, + }), + }); + + return ( + port_info.peer_comp_id, + ControlMessage{ + id: entry_id, + sender_comp_id: comp_ctx.id, + target_port_id: Some(port_info.peer_id), + content: ControlMessageContent::BlockPort(port_info.peer_id), + } + ); + } + + pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { + // Find the entry that contains the blocking entry for the port + let mut entry_index = usize::MAX; + let mut entry_id = ControlId::MAX; + for (index, entry) in self.entries.iter().enumerate() { + if let ControlContent::BlockedPort(block_entry) = &entry.content { + if block_entry.blocked_port == port_id { + entry_index = index; + entry_id = entry.id; + break; + } + } + } + + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.state, PortState::Blocked); + port_info.state = PortState::Open; + + return ( + port_info.peer_comp_id, + ControlMessage{ + id: entry_id, + sender_comp_id: comp_ctx.id, + target_port_id: Some(port_info.peer_id), + content: ControlMessageContent::UnblockPort(port_info.peer_id), + } + ) + } + + // ------------------------------------------------------------------------- + // Internal utilities + // ------------------------------------------------------------------------- + + fn take_id(&mut self) -> ControlId { let id = self.id_counter; - self.id_counter += 1; + self.id_counter.0 = self.id_counter.0.wrapping_add(1); return id; } + + fn get_entry_index(&self, entry_id: ControlId) -> Option { + for (index, entry) in self.entries.iter().enumerate() { + if entry.id == entry_id { + return Some(index); + } + } + + return None; + } +} + +impl Default for ControlLayer { + fn default() -> Self { + return ControlLayer{ + id_counter: ControlId(0), + entries: Vec::new(), + } + } } \ No newline at end of file diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index cda01712f7e718a45a591d978e57d3b6f2c92d9c..5c92219776421421c209667fb96bfad54503828a 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -1,4 +1,6 @@ mod component_pdl; mod control_layer; +mod consensus; -pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling}; \ No newline at end of file +pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling}; +pub(crate) use control_layer::{ControlId}; \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 621b50e17daa9b070687640d9e6baab332fe03f5..4255058472a74d8a9f8ce05a1720bc0169d71cf7 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -59,24 +59,48 @@ pub(crate) struct CompPublic { /// Handle to public part of a component. Would be nice if we could /// automagically manage the `num_handles` counter. But when it reaches zero we -/// need to manually remove the handle from the runtime. So be careful. +/// need to manually remove the handle from the runtime. So we just have debug +/// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, + #[cfg(debug_assertions)] decremented: bool, } impl CompHandle { - pub(crate) fn increment_users(&self) { + fn new(public: &CompPublic) -> CompHandle { + let handle = CompHandle{ + target: public, + #[cfg(debug_assertions)] decremented: false, + }; + handle.increment_users(); + return handle; + } + + fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed } /// Returns true if the component should be destroyed - pub(crate) fn decrement_users(&self) -> bool { + pub(crate) fn decrement_users(&mut self) -> bool { + debug_assert!(!self.decremented, "illegal to 'decrement_users' twice"); + dbg_code!(self.decremented = true); let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); return old_count == 1; } } +impl Clone for CompHandle { + fn clone(&self) -> Self { + debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'"); + self.increment_users(); + return CompHandle{ + target: self.target, + #[cfg(debug_assertions)] decremented: false, + }; + } +} + impl std::ops::Deref for CompHandle { type Target = CompPublic; @@ -85,6 +109,12 @@ impl std::ops::Deref for CompHandle { } } +impl Drop for CompHandle { + fn drop(&mut self) { + debug_assert!(self.decremented, "need call to 'decrement_users' before dropping"); + } +} + // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- @@ -165,7 +195,7 @@ impl Runtime { pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle { let component = self.components.get(id.0); - return CompHandle{ target: &component.public }; + return CompHandle::new(&component.public); } pub(crate) fn destroy_component(&self, key: CompKey) { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index bf620950f3942fc085ebf3d4db60f089afd0f2ae..1bcdcbc7ab628629a8cb5a46bbc1a57d553e59f3 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -14,6 +14,14 @@ pub(crate) struct SchedulerCtx<'a> { pub runtime: &'a Runtime, } +impl<'a> SchedulerCtx<'a> { + pub fn new(runtime: &'a Runtime) -> Self { + return Self { + runtime, + } + } +} + impl Scheduler { // public interface to thread @@ -22,7 +30,7 @@ impl Scheduler { } pub fn run(&mut self) { - let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; + let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime); 'run_loop: loop { // Wait until we have something to do (or need to quit) @@ -39,7 +47,7 @@ impl Scheduler { // be re-executed immediately. let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { - new_scheduling = component.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error"); + new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error"); } // Handle the new scheduling