diff --git a/src/collections/raw_vec.rs b/src/collections/raw_vec.rs index 0abf43a06ac1d39f500751c698e40150ded015b4..088430ecf66725df09cb21e7c4edf7b675f80a37 100644 --- a/src/collections/raw_vec.rs +++ b/src/collections/raw_vec.rs @@ -70,7 +70,7 @@ impl RawVec { debug_assert!(additional > 0); let new_cap = self.len.checked_add(additional).unwrap(); let new_cap = cmp::max(new_cap, self.cap * Self::GROWTH_RATE); - + let layout = Layout::array::(new_cap) .map_err(|_| AllocError::CapacityOverflow)?; debug_assert_eq!(new_cap * Self::T_SIZE, layout.size()); @@ -83,7 +83,7 @@ impl RawVec { let old_base = self.base as *mut u8; let (old_size, old_layout) = self.current_layout(); - ptr::copy_nonoverlapping(new_base, old_base, old_size); + ptr::copy_nonoverlapping(old_base, new_base, old_size); dealloc(old_base, old_layout); } diff --git a/src/collections/sets.rs b/src/collections/sets.rs index d2ce409d7f0291e643c46b987804d5a8eb57679d..ee00c2983f8283ab04a7a4d5b75f277c238c7f87 100644 --- a/src/collections/sets.rs +++ b/src/collections/sets.rs @@ -93,6 +93,11 @@ impl VecSet { return self.inner.iter(); } + #[inline] + pub fn contains(&self, item: &T) -> bool { + return self.inner.contains(item); + } + #[inline] pub fn is_empty(&self) -> bool { self.inner.is_empty() diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 297585f7909df4eb0857c86295c3e2782ff1e78c..80a2a191b1a8f0a8eaed8f68b46fd08309d1bd2d 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -273,8 +273,9 @@ impl ExecTree { pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { debug_assert!(self.is_in_sync()); let parent_branch = &self[parent_branch_id]; - let new_branch = Branch::new_sync(1, parent_branch); + let new_branch = Branch::new_sync(self.branches.len() as u32, parent_branch); let new_branch_id = new_branch.id; + self.branches.push(new_branch); return new_branch_id; } diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index b716d3fa20090f606e03cd887197bcb116e49753..f33e6c5c2bb5bdbcfa4949ebb9fe7ee0c7de128c 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -34,6 +34,7 @@ use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::consensus::find_ports_in_value_group; +use crate::runtime2::inbox2::DataContent; use crate::runtime2::port::PortKind; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; @@ -110,10 +111,11 @@ impl Connector for ConnectorPDL { if self.tree.is_in_sync() { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - todo!("call handler"); + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } else { + return scheduling } - - return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -154,8 +156,8 @@ impl ConnectorPDL { self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; - receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); - self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content); + receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -164,7 +166,7 @@ impl ConnectorPDL { pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - + self.collapse_sync_to_solution_branch(solution_branch_id, ctx); } } @@ -239,16 +241,17 @@ impl ConnectorPDL { // a message that targets this branch, so check now. let mut any_branch_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.data_header) { + if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { // This branch can receive the message, so we do the // fork-and-receive dance - let recv_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[recv_branch_id]; - branch.insert_message(port_id, message.content.clone()); + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_new_branch(branch_id, recv_branch_id); - self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content); - self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_branch_received = true; } @@ -267,7 +270,7 @@ impl ConnectorPDL { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else if consistency == Consistency::Inconsistent { - branch.sync_state == SpeculativeState::Inconsistent; + branch.sync_state = SpeculativeState::Inconsistent; } }, RunResult::BranchPut(port_id, content) => { @@ -278,7 +281,8 @@ impl ConnectorPDL { // `put()` is valid. let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{ - sync_header, data_header, content + sync_header, data_header, + content: DataContent::Message(content), })); self.tree.push_into_queue(QueueKind::Runnable, branch_id); @@ -324,6 +328,7 @@ impl ConnectorPDL { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate; diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 1affea56d580a3dcf628f13a2f71142a82e4525c..d0c833ceb22ef431c45ac02b134bac28b70bd53a 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,5 +1,6 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; +use crate::runtime2::inbox2::DataContent; use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; @@ -42,15 +43,18 @@ pub(crate) struct GlobalSolution { // TODO: A lot of stuff should be batched. Like checking all the sync headers // and sending "I have a higher ID" messages. pub(crate) struct Consensus { + // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, last_finished_handled: Option, - // Gathered state (in case we are currently the leader of the distributed - // consensus protocol) - encountered_peers: VecSet, + // Gathered state from communication + encountered_peers: VecSet, // to determine when we should send "found a higher ID" messages. + encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, - // Workspaces + // --- Persistent state + // TODO: Tracking sync round numbers + // --- Workspaces workspace_ports: Vec, } @@ -67,6 +71,7 @@ impl Consensus { branch_annotations: Vec::new(), last_finished_handled: None, encountered_peers: VecSet::new(), + encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), workspace_ports: Vec::new(), } @@ -188,9 +193,32 @@ impl Consensus { let mut target_mapping = Vec::with_capacity(source_mapping.len()); for port in source_mapping { + // Note: if the port is silent, and we've never communicated + // over the port, then we need to do so now, to let the peer + // component know about our sync leader state. let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); + let peer_port_id = port_desc.peer_id; + let channel_id = port_desc.channel_id; + + if !self.encountered_ports.contains(&port.port_id) { + ctx.submit_message(MessageFancy::Data(DataMessageFancy{ + sync_header: SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + }, + data_header: DataHeader{ + expected_mapping: source_mapping.clone(), + sending_port: port.port_id, + target_port: peer_port_id, + new_mapping: BranchId::new_invalid(), + }, + content: DataContent::SilentPortNotification, + })); + self.encountered_ports.push(port.port_id); + } + target_mapping.push(( - port_desc.channel_id, + channel_id, port.registered_id.unwrap_or(BranchId::new_invalid()) )); } @@ -229,6 +257,7 @@ impl Consensus { self.branch_annotations.clear(); self.last_finished_handled = None; self.encountered_peers.clear(); + self.encountered_ports.clear(); self.solution_combiner.clear(); } @@ -238,11 +267,10 @@ impl Consensus { /// sending the message is consistent with the speculative state. pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); - let sync_header = self.create_sync_header(ctx); - let branch = &mut self.branch_annotations[branch_id.index as usize]; if cfg!(debug_assertions) { + // Check for consistent mapping let port = branch.port_mapping.iter() .find(|v| v.port_id == source_port_id) .unwrap(); @@ -257,17 +285,19 @@ impl Consensus { self.workspace_ports.clear(); } + // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), - sending_port: port_info.peer_id, + sending_port: port_info.self_id, target_port: port_info.peer_id, new_mapping: branch_id }; + // Update port mapping for mapping in &mut branch.port_mapping { if mapping.port_id == source_port_id { mapping.expected_firing = Some(true); @@ -275,7 +305,9 @@ impl Consensus { } } - return (sync_header, data_header); + self.encountered_ports.push(source_port_id); + + return (self.create_sync_header(ctx), data_header); } /// Handles a new data message by handling the data and sync header, and @@ -286,7 +318,7 @@ impl Consensus { /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec) { - self.handle_received_data_header(exec_tree, &message.data_header, target_ids); + self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids); self.handle_received_sync_header(&message.sync_header, ctx); } @@ -318,8 +350,9 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) { - debug_assert!(self.branch_can_receive(branch_id, data_header)); + pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) { + debug_assert!(self.branch_can_receive(branch_id, data_header, content)); + let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == data_header.target_port { @@ -328,7 +361,7 @@ impl Consensus { // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(content, &mut self.workspace_ports); + find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); @@ -345,7 +378,12 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { + pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool { + if let DataContent::SilentPortNotification = content { + // No port can receive a "silent" notification. + return false; + } + let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &data_header.expected_mapping { // If we own the port, then we have an entry in the @@ -369,12 +407,12 @@ impl Consensus { /// Checks data header and consults the stored port mapping and the /// execution tree to see which branches may receive the data message's /// contents. - fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { + fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { // Found a branch awaiting the message, but we need to make sure // the mapping is correct - if self.branch_can_receive(branch.id, data_header) { + if self.branch_can_receive(branch.id, data_header, content) { target_ids.push(branch.id); } } @@ -419,6 +457,8 @@ impl Consensus { } fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option { + println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index); + if self.highest_connector_id == ctx.id { // We are the leader if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs index a38bdd08b1ace47a29d4c6fcd2cddfa67261444c..238188d73d34d89f3c378e7262557fdaf37dfab5 100644 --- a/src/runtime2/inbox2.rs +++ b/src/runtime2/inbox2.rs @@ -32,13 +32,34 @@ pub(crate) struct DataHeader { pub new_mapping: BranchId, } +// TODO: Very much on the fence about this. On one hand I thought making it a +// data message was neat because "silent port notification" should be rerouted +// like any other data message to determine the component ID of the receiver +// and to make it part of the leader election algorithm for the sync leader. +// However: it complicates logic quite a bit. Really it might be easier to +// create `Message::SyncAtComponent` and `Message::SyncAtPort` messages... +#[derive(Debug, Clone)] +pub(crate) enum DataContent { + SilentPortNotification, + Message(ValueGroup), +} + +impl DataContent { + pub(crate) fn as_message(&self) -> Option<&ValueGroup> { + match self { + DataContent::SilentPortNotification => None, + DataContent::Message(message) => Some(message), + } + } +} + /// A data message is a message that is intended for the receiver's PDL code, /// but will also be handled by the consensus algorithm #[derive(Debug, Clone)] pub(crate) struct DataMessageFancy { pub sync_header: SyncHeader, pub data_header: DataHeader, - pub content: ValueGroup, + pub content: DataContent, } #[derive(Debug)] diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 800fdf37cab02cfb191f2ec5a12d1422e237f9ed..af4373aca8c049f02a7d7bb0d07ba639819c9714 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -389,6 +389,7 @@ impl ConnectorStore { /// Retrieves public part of connector - accessible by many threads at once. fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic { unsafe { + debug_assert!(!self.free.contains(&(id.0 as usize))); let connector = self.connectors.get(id.0 as usize); debug_assert!(!connector.is_null()); return &(**connector).public; @@ -399,6 +400,7 @@ impl ConnectorStore { /// time. fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { unsafe { + debug_assert!(!self.free.contains(&(key.index as usize))); let connector = self.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); return &mut (**connector); diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index de2a0fd4a5cfd1528bb6210ade13015c83478af6..0b6b5fdaf98b09d9f591a2b2b7cbe80e438e1b25 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -188,7 +188,7 @@ impl ApplicationInterface { connector.inbox.insert_message(MessageFancy::Control(ControlMessageFancy{ id: 0, sending_component_id: self.connector_id, - content: ControlContent::Ack + content: ControlContent::Ping, })); let should_wake_up = connector.sleeping diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 9ed1ce2cf657e3b23eb5de8932b9253a740d1f03..adf1a5be8bd2bca197139e5483975ede611e42e7 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -130,7 +130,7 @@ impl Scheduler { // Check if the message has to be rerouted because we have moved the // target port to another component. self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message)); - if let Some(target_port) = Self::get_data_message_target_port(&message) { + if let Some(target_port) = Self::get_message_target_port(&message) { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); self.runtime.send_message(other_component_id, message); @@ -221,7 +221,7 @@ impl Scheduler { // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). - content.sync_header.highest_component_id + content.target_component_id }, MessageFancy::Control(_) => { unreachable!("component sending control messages directly"); @@ -246,7 +246,7 @@ impl Scheduler { let mut message_idx = 0; while message_idx < scheduled.ctx_fancy.inbox_messages.len() { let message = &scheduled.ctx_fancy.inbox_messages[message_idx]; - if Self::get_data_message_target_port(message) == Some(port_id) { + if Self::get_message_target_port(message) == Some(port_id) { // Need to transfer this message let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); new_connector.ctx_fancy.inbox_messages.push(message); @@ -330,9 +330,17 @@ impl Scheduler { } #[inline] - fn get_data_message_target_port(message: &MessageFancy) -> Option { - if let MessageFancy::Data(message) = message { - return Some(message.data_header.target_port) + fn get_message_target_port(message: &MessageFancy) -> Option { + match message { + MessageFancy::Data(data) => return Some(data.data_header.target_port), + MessageFancy::Sync(_) => {}, + MessageFancy::Control(control) => { + match &control.content { + ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), + ControlContent::CloseChannel(port_id) => return Some(*port_id), + ControlContent::Ping | ControlContent::Ack => {}, + } + }, } return None diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 7b2900640ff08e539807ed562ad6de738598aa01..79417ff2af586b2530f86f6610348643eea104e5 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -5,7 +5,7 @@ use crate::{PortId, ProtocolDescription}; use crate::common::Id; use crate::protocol::eval::*; -const NUM_THREADS: u32 = 1; // number of threads in runtime +const NUM_THREADS: u32 = 10; // number of threads in runtime const NUM_INSTANCES: u32 = 10; // number of test instances constructed const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests) @@ -50,7 +50,6 @@ fn test_put_and_get() { print(\"getting!\"); auto result = get(receiver); assert(result); - } index += 1; }