diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 20955eba657a3074c44b00e754cca9207bac0063..c8691e1acf8b09a2c770d95d69fb4dfdcaf2d9d3 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -24,7 +24,7 @@ pub struct CompCtx { pub id: CompId, pub ports: Vec, pub peers: Vec, - pub messages: Vec, // same size as "ports" + pub messages: Vec>, // same size as "ports" pub port_id_counter: u32, } @@ -40,20 +40,12 @@ impl Default for CompCtx { } } -impl CompCtx { - fn take_message(&mut self, port_id: PortId) -> Option { - let port_index = self.get_port_index(port_id).unwrap(); - let old_value = &mut self.messages[port_index]; - if old_value.values.is_empty() { - return None; - } - - // Replace value in array with an empty one - let mut message = ValueGroup::new_stack(Vec::new()); - std::mem::swap(old_value, &mut message); - return Some(message); - } +struct MessageView<'a> { + index: usize, + pub message: &'a DataMessage, +} +impl CompCtx { fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); @@ -75,7 +67,7 @@ impl CompCtx { return Channel{ putter_id, getter_id }; } - fn get_port(&self, port_id: PortId) -> &Port { + pub(crate) fn get_port(&self, port_id: PortId) -> &Port { let index = self.get_port_index(port_id).unwrap(); return &self.ports[index]; } @@ -184,12 +176,25 @@ impl RunContext for ExecCtx { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum Mode { - NonSync, - Sync, + NonSync, // not in sync mode + Sync, // in sync mode, can interact with other components + SyncFail, // something went wrong during sync mode (deadlocked, error, whatever) + SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, } +impl Mode { + fn can_run(&self) -> bool { + match self { + Mode::NonSync | Mode::Sync => + return true, + Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => + return false, + } + } +} + pub(crate) struct CompPDL { pub mode: Mode, pub mode_port: PortId, // when blocked on a port @@ -247,13 +252,21 @@ impl CompPDL { Message::Control(message) => { self.handle_incoming_control_message(sched_ctx, comp_ctx, message); }, + Message::Sync(message) => { + self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); + } } } pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; - sched_ctx.log("Running component"); + let can_run = self.mode.can_run(); + sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run)); + if !can_run { + return Ok(CompScheduling::Sleep); + } + let run_result = self.execute_prompt(&sched_ctx)?; match run_result { @@ -267,13 +280,23 @@ impl CompPDL { }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); + debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); - if let Some(message) = comp_ctx.take_message(port_id) { - // We can immediately receive and continue - debug_assert!(self.exec_ctx.stmt.is_none()); - self.exec_ctx.stmt = ExecStmt::PerformedGet(message); - return Ok(CompScheduling::Immediate); + let port_index = comp_ctx.get_port_index(port_id).unwrap(); + if let Some(message) = &self.inbox_main[port_index] { + // Check if we can actually receive the message + if self.consensus.try_receive_data_message(message) { + // Message was received. Make sure any blocked peers and + // pending messages are handled. + let message = self.inbox_main[port_index].take().unwrap(); + + self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); + return Ok(CompScheduling::Immediate); + } else { + self.mode = Mode::SyncFail; + return Ok(CompScheduling::Sleep); + } } else { // We need to wait self.mode = Mode::BlockedGet; @@ -288,7 +311,7 @@ impl CompPDL { if port_info.state == PortState::Blocked { todo!("handle blocked port"); } - self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); self.exec_ctx.stmt = ExecStmt::PerformedPut; return Ok(CompScheduling::Immediate); }, @@ -349,27 +372,23 @@ impl CompPDL { fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { self.consensus.notify_sync_end(); debug_assert_eq!(self.mode, Mode::Sync); - self.mode = Mode::NonSync; + self.mode = Mode::SyncEnd; } - fn send_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { + fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { use std::sync::atomic::Ordering; let port_info = comp_ctx.get_port(source_port_id); let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); - let annotated_message = self.consensus.annotate_message_data(port_info, value); + let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.inbox.push(Message::Data(annotated_message)); - let should_wake_up = peer_info.handle.sleeping.compare_exchange( - true, false, Ordering::AcqRel, Ordering::Relaxed - ).is_ok(); - - if should_wake_up { - let comp_key = unsafe{ peer_info.id.upgrade() }; - sched_ctx.runtime.enqueue_work(comp_key); - } + wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle); } + /// Handles a message that came in through the public inbox. This function + /// will handle putting it in the correct place, and potentially blocking + /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { // Check if we can insert it directly into the storage associated with // the port @@ -397,7 +416,7 @@ impl CompPDL { if port_info.state == PortState::Open { let (target_comp_id, block_message) = - self.control.mark_port_blocked(target_port_id, comp_ctx); + self.control.set_port_and_peer_blocked(target_port_id, comp_ctx); debug_assert_eq!(_peer_comp_id, target_comp_id); let peer = comp_ctx.get_peer(target_comp_id); @@ -409,6 +428,37 @@ impl CompPDL { self.inbox_backup.push(message); } + /// Handles when a message has been handed off from the inbox to the PDL + /// code. We check to see if there are more messages waiting and, if not, + /// then we handle the case where the port might have been blocked + /// previously. + fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { + let port_index = comp_ctx.get_port_index(port_id).unwrap(); + debug_assert!(self.inbox_main[port_index].is_none()); // because we just received it + + // Check for any more messages + for message_index in 0..self.inbox_backup.len() { + let message = &self.inbox_backup[message_index]; + if message.data_header.target_port == port_id { + // One more message for this port + let message = self.inbox_backup.remove(message_index); + debug_assert_eq!(comp_ctx.get_port(port_id).state, PortState::Blocked); // since we had >1 message on the port + self.inbox_main[port_index] = Some(message); + return; + } + } + + // Did not have any more messages. So if we were blocked, then we need + // to send the "unblock" message. + let port_info = &comp_ctx.ports[port_index]; + if port_info.state == PortState::Blocked { + let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx); + let peer_info = comp_ctx.get_peer(peer_comp_id); + peer_info.handle.inbox.push(Message::Control(message)); + wake_up_if_sleeping(sched_ctx, peer_comp_id, &peer_info.handle); + } + } + fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { match message.content { ControlMessageContent::Ack => { @@ -473,7 +523,7 @@ impl CompPDL { debug_assert_eq!(port_info.kind, PortKind::Putter); debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); if port_info.state == PortState::Blocked { - self.unblock_port(sched_ctx, comp_ctx, port_id); + self.unblock_local_port(sched_ctx, comp_ctx, port_id); } }, ControlMessageContent::PortPeerChangedBlock(port_id) => { @@ -493,12 +543,19 @@ impl CompPDL { let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Blocked); port_info.peer_comp_id = new_comp_id; - self.unblock_port(sched_ctx, comp_ctx, port_id); + self.unblock_local_port(sched_ctx, comp_ctx, port_id); } } } - fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { + fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { + + } + + /// Marks the local port as being unblocked. If the execution was blocked on + /// sending a message over this port, then execution will continue and the + /// message will be sent. + fn unblock_local_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) { let port_info = comp_ctx.get_port_mut(port_id); debug_assert_eq!(port_info.state, PortState::Blocked); port_info.state = PortState::Open; @@ -506,9 +563,10 @@ impl CompPDL { if self.mode == Mode::BlockedPut && port_id == self.mode_port { // We were blocked on the port that just became unblocked, so // send the message. + debug_assert_eq!(port_info.kind, PortKind::Putter); let mut replacement = ValueGroup::default(); std::mem::swap(&mut replacement, &mut self.mode_value); - self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement); self.mode = Mode::Sync; self.mode_port = PortId::new_invalid();