From 03f278e76a41c0f952030de58d86b5ba5514accc 2021-11-22 13:26:14 From: MH Date: 2021-11-22 13:26:14 Subject: [PATCH] WIP on fixing consensus bug, finishing up error handling --- diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 64db29608cbe90b1fc3e2601f345642dbea7b1be..bdf9a9cb2b715ca4b6dc13ab5267ff845deac23b 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -272,8 +272,19 @@ impl ProtocolDescription { }, CTP::Input => if let Value::Input(_) = argument { true } else { false }, CTP::Output => if let Value::Output(_) = argument { true } else { false }, - CTP::Instance(_definition_id, _num_embedded) => { - todo!("implement full type checking on user-supplied arguments"); + CTP::Instance(definition_id, _num_embedded) => { + let definition = self.types.get_base_definition(definition_id).unwrap(); + match &definition.definition { + DefinedTypeVariant::Enum(definition) => { + if let Value::Enum(variant_value) = argument { + let is_valid = definition.variants.iter() + .any(|v| v.value == *variant_value); + return is_valid; + } + }, + _ => todo!("implement full type checking on user-supplied arguments"), + } + return false; }, } diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 53ed270193deb762acee1587609efe37beb04349..3899d7d4c6bd87e943fad19518069369204f4a99 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -36,7 +36,7 @@ use crate::protocol::{RunContext, RunResult}; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement}; use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group}; -use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, PublicInbox}; +use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx}; @@ -159,8 +159,8 @@ impl Connector for ConnectorPDL { return scheduling; }, Mode::SyncError => { - todo!("write"); - return ConnectorScheduling::Exit; + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + return scheduling; }, Mode::Error => { // This shouldn't really be called. Because when we reach exit @@ -194,6 +194,11 @@ impl ConnectorPDL { } }, Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx), + Message::SyncControl(message) => { + if let Some(result) = self.handle_new_sync_control_message(message, ctx) { + return Some(result); + } + }, Message::Control(_) => unreachable!("control message in component"), } } @@ -244,6 +249,14 @@ impl ConnectorPDL { self.consensus.handle_new_sync_port_message(message, ctx); } + pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { + if let Some(round_conclusion) = self.consensus.handle_new_sync_control_message(message, ctx) { + return Some(self.enter_non_sync_mode(round_conclusion, ctx)); + } + + return None; + } + // --- Running code pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index bd8510e2c1bd789afe73cebd37e385d9acf2b32a..8e5e985f91578aa95625780c1bd0b1d30e83042a 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -6,8 +6,11 @@ use super::ConnectorId; use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ - Message, ChannelAnnotation, BranchMarker, DataMessage, DataHeader, - SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader, + Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker, + DataMessage, + SyncCompMessage, SyncCompContent, + SyncPortMessage, SyncPortContent, + SyncControlMessage, SyncControlContent }; use super::scheduler::{ComponentCtx, ComponentPortChange}; @@ -469,6 +472,19 @@ impl Consensus { } } + pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { + if message.in_response_to_sync_round < self.sync_round { + // Old message + return None + } + + match message.content { + SyncControlContent::ChannelIsClosed(_) => { + return Some(RoundConclusion::Failure); + } + } + } + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) { debug_assert!(self.branch_can_receive(branch_id, message)); diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index f5bc869b78ce3cc081b673b240a2d3fb2847d24f..6e5854c85dd7b27d7967b48bc2baa1f472bc0aeb 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -94,6 +94,10 @@ pub(crate) enum SyncPortContent { NotificationWave, } +/// A sync message intended for the consensus algorithm. This message does not +/// go to a component, but through a channel (and results in potential +/// rerouting) because we're not sure about the ID of the component that holds +/// the other end of the channel. #[derive(Debug)] pub(crate) struct SyncPortMessage { pub sync_header: SyncHeader, @@ -102,6 +106,24 @@ pub(crate) struct SyncPortMessage { pub content: SyncPortContent, } +#[derive(Debug)] +pub(crate) enum SyncControlContent { + ChannelIsClosed(PortIdLocal), // contains port that is owned by the recipient of the message +} + +/// A sync control message: originating from the scheduler, but intended for the +/// current sync round of the recipient. Every kind of consensus algorithm must +/// be able to handle such a message. +#[derive(Debug)] +pub(crate) struct SyncControlMessage { + // For now these control messages are only aimed at components. Might change + // in the future. But for now we respond to messages from components that + // have, because of that message, published their ID. + pub in_response_to_sync_round: u32, + pub target_component_id: ConnectorId, + pub content: SyncControlContent, +} + /// A control message is a message intended for the scheduler that is executing /// a component. #[derive(Debug)] @@ -125,6 +147,7 @@ pub(crate) enum Message { Data(DataMessage), SyncComp(SyncCompMessage), SyncPort(SyncPortMessage), + SyncControl(SyncControlMessage), Control(ControlMessage), } @@ -137,6 +160,7 @@ impl Message { Message::Data(message) => return Some(message.data_header.sending_port), Message::SyncPort(message) => return Some(message.source_port), Message::SyncComp(_) => return None, + Message::SyncControl(_) => return None, Message::Control(_) => return None, } } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index c32927832211a8e771d9de395585e7bdb9589d34..bb039dbccc97e7707b8108ff31d0b73c5527b785 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -431,6 +431,7 @@ impl ConnectorStore { } } + println!("DEBUG [ global store ] Created component at {}", key.index); return key; } @@ -443,6 +444,7 @@ impl ConnectorStore { // Note: but not deallocating! } + println!("DEBUG [ global store ] Destroyed component at {}", key.index); self.free.push(key.index as usize); } } diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 0b8891c0696a36688f310327125984d11753f596..7fa49811e7ed9d11bc76bfe75864b66da837015e 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -116,6 +116,7 @@ impl ConnectorApplication { Message::Data(message) => self.handle_new_data_message(message, comp_ctx), Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), + Message::SyncControl(message) => todo!("implement"), Message::Control(_) => unreachable!("control message in native API component"), } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index bafdf0dc3198bdff60ba18befa21bf461c0d7edc..3bc9c94477458743706ed7f8c32e622a41ef3bd6 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -9,7 +9,11 @@ use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; use super::connector::{ConnectorPDL, ConnectorScheduling}; -use super::inbox::{Message, DataMessage, ControlMessage, ControlContent}; +use super::inbox::{ + Message, DataMessage, SyncHeader, + ControlMessage, ControlContent, + SyncControlMessage, SyncControlContent, +}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -203,9 +207,29 @@ impl Scheduler { fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) { // Note: we're not handling the public inbox, we're dealing with the // private one! + debug_assert!(scheduled.shutting_down); while let Some(message) = scheduled.ctx.read_next_message() { - if let Some(target_port) = Self::get_message_target_port(&message) { - todo!("handle this, send back 'my thing is closed yo'") + let target_port_and_round_number = match &message { + Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), + Message::SyncComp(_) => None, + Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), + Message::SyncControl(_) => None, + Message::Control(_) => None, + }; + + if let Some((target_port, sync_round)) = target_port_and_round_number { + // This message is aimed at a port, but we're shutting down, so + // notify the peer that its was not received properly. + // (also: since we're shutting down, we're not in sync mode and + // the context contains the definitive set of owned ports) + let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); + let message = SyncControlMessage{ + in_response_to_sync_round: sync_round, + target_component_id: port.peer_connector, + content: SyncControlContent::ChannelIsClosed(port.peer_id), + }; + self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message)); + self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); } } } @@ -249,10 +273,9 @@ impl Scheduler { } port_desc.peer_connector - } - Message::Control(_) => { - unreachable!("component sending control messages directly"); - } + }, + Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"), + Message::Control(_) => unreachable!("component sending 'Control' messages directly"), }; self.runtime.send_message(target_component_id, message); @@ -367,6 +390,7 @@ impl Scheduler { Message::Data(data) => return Some(data.data_header.target_port), Message::SyncComp(_) => {}, Message::SyncPort(content) => return Some(content.target_port), + Message::SyncControl(_) => return None, Message::Control(control) => { match &control.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), @@ -381,11 +405,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } @@ -506,8 +530,16 @@ impl ComponentCtx { pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { debug_assert!(self.is_in_sync); if let Some(port_id) = contents.source_port() { - if self.get_port_by_id(port_id).is_none() { + let port_info = self.get_port_by_id(port_id); + let is_valid = match port_info { + Some(port_info) => { + port_info.state == PortState::Open + }, + None => false, + }; + if !is_valid { // We don't own the port + println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index); return Err(()); } } @@ -554,19 +586,17 @@ impl ComponentCtx { // should only handle them once. Control messages should never be in // here. let message = &self.inbox_messages[self.inbox_len_read]; - match message { + match &message { Message::Data(content) => { + // Keep message in inbox for later reading self.inbox_len_read += 1; return Some(Message::Data(content.clone())); }, - Message::SyncComp(_) => { + Message::SyncComp(_) | Message::SyncPort(_) | Message::SyncControl(_) => { + // Remove message from inbox let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); }, - Message::SyncPort(_) => { - let message = self.inbox_messages.remove(self.inbox_len_read); - return Some(message); - } Message::Control(_) => unreachable!("control message ended up in component inbox"), } } diff --git a/src/runtime2/tests/basics.rs b/src/runtime2/tests/data_transmission.rs similarity index 80% rename from src/runtime2/tests/basics.rs rename to src/runtime2/tests/data_transmission.rs index 0a3a06db525162b88b488d57b0e6ce94bd204458..1320e4f6b2f6721a39ce094a7c0e5c6390ed49f7 100644 --- a/src/runtime2/tests/basics.rs +++ b/src/runtime2/tests/data_transmission.rs @@ -1,3 +1,6 @@ +// basics.rs +// +// The most basic of testing: sending a message, receiving a message, etc. use super::*; @@ -22,28 +25,6 @@ fn test_doing_nothing() { }); } -#[test] -fn test_local_sync_failure() { - // If the component exits cleanly, then the runtime exits cleanly, and the - // test will finish - const CODE: &'static str = " - primitive immediate_failure() { - u32[] only_allows_index_0 = { 1 }; - while (true) sync { // note the infinite loop - auto value = only_allows_index_0[1]; - } - } - "; - - let thing = TestTimer::new("immediate_local_failure"); - run_test_in_runtime(CODE, |api| { - api.create_connector("", "immediate_failure", ValueGroup::new_stack(Vec::new())) - .expect("create component"); - }) -} - - - #[test] fn test_single_put_and_get() { const CODE: &'static str = " @@ -85,6 +66,36 @@ fn test_single_put_and_get() { }); } +#[test] +fn test_combined_put_and_get() { + const CODE: &'static str = " + primitive put_then_get(out output, in input, u32 num_loops) { + u32 index = 0; + while (index < num_loops) { + sync { + put(output, true); + auto value = get(input); + assert(value); + index += 1; + } + } + } + + composite constructor(u32 num_loops) { + channel output_a -> input_a; + channel output_b -> input_b; + new put_then_get(output_a, input_b, num_loops); + new put_then_get(output_b, input_a, num_loops); + } + "; + + run_test_in_runtime(CODE, |api| { + api.create_connector("", "constructor", ValueGroup::new_stack(vec![ + Value::UInt32(NUM_LOOPS), + ])).expect("create connector"); + }) +} + #[test] fn test_multi_put_and_get() { const CODE: &'static str = " diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 46cf663a10725b3578b54782bfb256bb79b8cc93..29c6c9621e87b27bf8a198a446a90a16aa477171 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -1,7 +1,8 @@ mod network_shapes; mod api_component; -mod speculation_basic; -mod basics; +mod speculation; +mod data_transmission; +mod sync_failure; use super::*; use crate::{PortId, ProtocolDescription}; diff --git a/src/runtime2/tests/speculation_basic.rs b/src/runtime2/tests/speculation.rs similarity index 100% rename from src/runtime2/tests/speculation_basic.rs rename to src/runtime2/tests/speculation.rs diff --git a/src/runtime2/tests/sync_failure.rs b/src/runtime2/tests/sync_failure.rs new file mode 100644 index 0000000000000000000000000000000000000000..392b8a22fa18fbf5c6fe934059189f7bfd84bea9 --- /dev/null +++ b/src/runtime2/tests/sync_failure.rs @@ -0,0 +1,73 @@ +// sync_failure.rs +// +// Various tests to ensure that failing components fail in a consistent way. + +use super::*; + +#[test] +fn test_local_sync_failure() { + // If the component exits cleanly, then the runtime exits cleanly, and the + // test will finish + const CODE: &'static str = " + primitive immediate_failure_inside_sync() { + u32[] only_allows_index_0 = { 1 }; + while (true) sync { // note the infinite loop + auto value = only_allows_index_0[1]; + } + } + + primitive immediate_failure_outside_sync() { + u32[] only_allows_index_0 = { 1 }; + auto never_gonna_get = only_allows_index_0[1]; + while (true) sync {} + } + "; + + // let thing = TestTimer::new("local_sync_failure"); + run_test_in_runtime(CODE, |api| { + api.create_connector("", "immediate_failure_outside_sync", ValueGroup::new_stack(Vec::new())) + .expect("create component"); + + api.create_connector("", "immediate_failure_inside_sync", ValueGroup::new_stack(Vec::new())) + .expect("create component"); + }) +} + +#[test] +fn test_shared_sync_failure() { + // Same as above. One of the components should fail, the other should follow + // suit because it cannot complete a sync round. + const CODE: &'static str = " + enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never } + primitive failing_at_location(in input, out output, Location loc) { + u32[] failure_array = {}; + while (true) { + if (loc == Location::BeforeSync) failure_array[0]; + sync { + put(output, true); + if (loc == Location::AfterPut) failure_array[0]; + auto received = get(input); + assert(received); + if (loc == Location::AfterGet) failure_array[0]; + } + if (loc == Location::AfterSync) failure_array[0]; + } + } + + composite constructor(Location loc) { + channel output_a -> input_a; + channel output_b -> input_b; + new failing_at_location(input_a, output_b, Location::Never); + new failing_at_location(input_b, output_a, loc); + } + "; + + run_test_in_runtime(CODE, |api| { + for variant in 0..1 { + // Create the channels + api.create_connector("", "constructor", ValueGroup::new_stack(vec![ + Value::Enum(variant) + ])).expect("create connector"); + } + }) +} \ No newline at end of file