From c1fd3f18416671696a1f79cf2a67fa232959e3d4 2021-11-19 18:42:29 From: MH Date: 2021-11-19 18:42:29 Subject: [PATCH] Fix introduced bug --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index f4b4e6bce2b76eda2f3da9ed9c8c1b3ef09b2666..0407db8aa68a774673bd42935987886773a78196 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -193,7 +193,9 @@ impl ConnectorPDL { match message { Message::Data(message) => self.handle_new_data_message(message, ctx), Message::SyncComp(message) => { - return self.handle_new_sync_comp_message(message, ctx) + if let Some(result) = self.handle_new_sync_comp_message(message, ctx) { + return Some(result); + } }, Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx), Message::Control(_) => unreachable!("control message in component"), @@ -221,7 +223,6 @@ impl ConnectorPDL { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); - println!("DEBUG: ### Branching due to new data message"); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; @@ -327,7 +328,6 @@ impl ConnectorPDL { branch.awaiting_port = PortIdLocal::new_invalid(); branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); - println!("DEBUG: ### Branching due to BlockGet with existing message"); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -418,7 +418,6 @@ impl ConnectorPDL { match run_result { EvalContinuation::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; - println!("DEBUG: ************ DOING THEM EXITS"); return ConnectorScheduling::Exit; }, EvalContinuation::SyncBlockStart => { diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index e0ae0d1004a4669e57bab2fcaff4615debac4a99..828dbe3f6c374a16fc5bbb171365482950c1b05b 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -147,7 +147,6 @@ impl Consensus { pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { // If called correctly. Then each time we are notified the new branch's // index is the length in `branch_annotations`. - println!("DEBUG: Branch {} became forked into {}", parent_branch_id.index, new_branch_id.index); debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; let new_marker = BranchMarker::new(self.branch_markers.len() as u32); diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 911674012bf95991b546a585dc61ef9570afbc16..9cb007655638c88dc662e2d132a2a396909659d7 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -71,7 +71,7 @@ impl Scheduler { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); - self.debug_conn(connector_id, "Finished running"); + self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule)); // Handle all of the output from the current run: messages to // send and connectors to instantiate. @@ -263,7 +263,11 @@ impl Scheduler { let message = &scheduled.ctx.inbox_messages[message_idx]; if Self::get_message_target_port(message) == Some(port_id) { // Need to transfer this message + // TODO: Revise messages, this is becoming messy and error-prone let message = scheduled.ctx.inbox_messages.remove(message_idx); + if message_idx < scheduled.ctx.inbox_len_read { + scheduled.ctx.inbox_len_read -= 1; + } new_connector.ctx.inbox_messages.push(message); } else { message_idx += 1; @@ -365,11 +369,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 46cf663a10725b3578b54782bfb256bb79b8cc93..d25324b1f3299f70dc7c80998af5ae4aff815450 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -10,13 +10,13 @@ use crate::protocol::eval::*; use crate::runtime2::native::{ApplicationSyncAction}; // Generic testing constants, use when appropriate to simplify stress-testing -// pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime -// pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed -// pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) +pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime +pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed +pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) -pub(crate) const NUM_THREADS: u32 = 1; -pub(crate) const NUM_INSTANCES: u32 = 1; -pub(crate) const NUM_LOOPS: u32 = 1; +// pub(crate) const NUM_THREADS: u32 = 1; +// pub(crate) const NUM_INSTANCES: u32 = 1; +// pub(crate) const NUM_LOOPS: u32 = 1; fn create_runtime(pdl: &str) -> Runtime {