Changeset - d06da4e9296c
[Not reviewed]
0 4 0
mh - 3 years ago 2022-01-20 21:55:02
contact@maxhenger.nl
WIP: Reimplementing messaging and consensus
4 files changed with 762 insertions and 73 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -2,10 +2,13 @@ use crate::protocol::eval::*;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Generic types
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
@@ -20,7 +23,7 @@ pub struct Peer {
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
@@ -46,6 +49,10 @@ pub struct Channel {
 
    pub getter_id: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Data messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
@@ -53,19 +60,78 @@ pub struct DataMessage {
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, u32)>,
 
    pub expected_mapping: Vec<(PortId, Option<u32>)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Sync messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct SyncMessage {
 
    pub sync_header: MessageSyncHeader,
 
    pub content: SyncMessageContent,
 
}
 

	
 
pub struct SyncLocalSolutionEntry {
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
    pub port_kind: PortKind,
 
}
 

	
 
pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;
 

	
 
pub struct SyncSolutionPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
    pub port_kind: PortKind,
 
}
 

	
 
pub struct SyncSolutionChannel {
 
    pub putter: Option<SyncSolutionPort>,
 
    pub getter: Option<SyncSolutionPort>,
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub enum RoundDecision {
 
    None,
 
    Solution,
 
    Failure,
 
}
 

	
 
impl RoundDecision {
 
    fn is_some(&self)
 
}
 

	
 
pub struct SyncPartialSolution {
 
    pub submissions_by: Vec<(CompId, bool)>,
 
    pub channel_mapping: Vec<SyncSolutionChannel>,
 
    pub decision: SyncRoundDecision,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum SyncMessageContent {
 
    NotificationOfLeader,
 
    LocalSolution(CompId, SyncLocalSolution), // local solution of the specified component
 
    PartialSolution(SyncPartialSolution), // partial solution of multiple components
 
    GlobalSolution,
 
    GlobalFailure,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct ControlMessage {
 
    pub(crate) id: ControlId,
 
@@ -84,9 +150,21 @@ pub enum ControlMessageContent {
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Messages (generic)
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
    pub sending_id: CompId,
 
    pub highest_id: CompId,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Message {
 
    Data(DataMessage),
 
    Sync(SyncMessage),
 
    Control(ControlMessage),
 
}
 

	
 
@@ -97,6 +175,8 @@ impl Message {
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
            Message::Sync(_) =>
 
                return None,
 
        }
 
    }
 
}
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -24,7 +24,7 @@ pub struct CompCtx {
 
    pub id: CompId,
 
    pub ports: Vec<Port>,
 
    pub peers: Vec<Peer>,
 
    pub messages: Vec<ValueGroup>, // same size as "ports"
 
    pub messages: Vec<Option<DataMessage>>, // same size as "ports"
 
    pub port_id_counter: u32,
 
}
 

	
 
@@ -40,20 +40,12 @@ impl Default for CompCtx {
 
    }
 
}
 

	
 
impl CompCtx {
 
    fn take_message(&mut self, port_id: PortId) -> Option<ValueGroup> {
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        let old_value = &mut self.messages[port_index];
 
        if old_value.values.is_empty() {
 
            return None;
 
        }
 

	
 
        // Replace value in array with an empty one
 
        let mut message = ValueGroup::new_stack(Vec::new());
 
        std::mem::swap(old_value, &mut message);
 
        return Some(message);
 
    }
 
struct MessageView<'a> {
 
    index: usize,
 
    pub message: &'a DataMessage,
 
}
 

	
 
impl CompCtx {
 
    fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
@@ -75,7 +67,7 @@ impl CompCtx {
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    fn get_port(&self, port_id: PortId) -> &Port {
 
    pub(crate) fn get_port(&self, port_id: PortId) -> &Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &self.ports[index];
 
    }
 
@@ -184,12 +176,25 @@ impl RunContext for ExecCtx {
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum Mode {
 
    NonSync,
 
    Sync,
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncFail, // something went wrong during sync mode (deadlocked, error, whatever)
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet,
 
    BlockedPut,
 
}
 

	
 
impl Mode {
 
    fn can_run(&self) -> bool {
 
        match self {
 
            Mode::NonSync | Mode::Sync =>
 
                return true,
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut =>
 
                return false,
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
@@ -247,13 +252,21 @@ impl CompPDL {
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log("Running component");
 
        let can_run = self.mode.can_run();
 
        sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run));
 
        if !can_run {
 
            return Ok(CompScheduling::Sleep);
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
@@ -267,13 +280,23 @@ impl CompPDL {
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                if let Some(message) = comp_ctx.take_message(port_id) {
 
                    // We can immediately receive and continue
 
                    debug_assert!(self.exec_ctx.stmt.is_none());
 
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
 
                    return Ok(CompScheduling::Immediate);
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        self.mode = Mode::SyncFail;
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
@@ -288,7 +311,7 @@ impl CompPDL {
 
                if port_info.state == PortState::Blocked {
 
                    todo!("handle blocked port");
 
                }
 
                self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
                self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
                self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                return Ok(CompScheduling::Immediate);
 
            },
 
@@ -349,27 +372,23 @@ impl CompPDL {
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.consensus.notify_sync_end();
 
        debug_assert_eq!(self.mode, Mode::Sync);
 
        self.mode = Mode::NonSync;
 
        self.mode = Mode::SyncEnd;
 
    }
 

	
 
    fn send_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) {
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let port_info = comp_ctx.get_port(source_port_id);
 
        let peer_info = comp_ctx.get_peer(port_info.peer_comp_id);
 
        let annotated_message = self.consensus.annotate_message_data(port_info, value);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.inbox.push(Message::Data(annotated_message));
 

	
 
        let should_wake_up = peer_info.handle.sleeping.compare_exchange(
 
            true, false, Ordering::AcqRel, Ordering::Relaxed
 
        ).is_ok();
 

	
 
        if should_wake_up {
 
            let comp_key = unsafe{ peer_info.id.upgrade() };
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        }
 
        wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
@@ -397,7 +416,7 @@ impl CompPDL {
 

	
 
        if port_info.state == PortState::Open {
 
            let (target_comp_id, block_message) =
 
                self.control.mark_port_blocked(target_port_id, comp_ctx);
 
                self.control.set_port_and_peer_blocked(target_port_id, comp_ctx);
 
            debug_assert_eq!(_peer_comp_id, target_comp_id);
 

	
 
            let peer = comp_ctx.get_peer(target_comp_id);
 
@@ -409,6 +428,37 @@ impl CompPDL {
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    /// Handles when a message has been handed off from the inbox to the PDL
 
    /// code. We check to see if there are more messages waiting and, if not,
 
    /// then we handle the case where the port might have been blocked
 
    /// previously.
 
    fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        debug_assert!(self.inbox_main[port_index].is_none()); // because we just received it
 

	
 
        // Check for any more messages
 
        for message_index in 0..self.inbox_backup.len() {
 
            let message = &self.inbox_backup[message_index];
 
            if message.data_header.target_port == port_id {
 
                // One more message for this port
 
                let message = self.inbox_backup.remove(message_index);
 
                debug_assert_eq!(comp_ctx.get_port(port_id).state, PortState::Blocked); // since we had >1 message on the port
 
                self.inbox_main[port_index] = Some(message);
 
                return;
 
            }
 
        }
 

	
 
        // Did not have any more messages. So if we were blocked, then we need
 
        // to send the "unblock" message.
 
        let port_info = &comp_ctx.ports[port_index];
 
        if port_info.state == PortState::Blocked {
 
            let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer_comp_id);
 
            peer_info.handle.inbox.push(Message::Control(message));
 
            wake_up_if_sleeping(sched_ctx, peer_comp_id, &peer_info.handle);
 
        }
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
@@ -473,7 +523,7 @@ impl CompPDL {
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed);
 
                if port_info.state == PortState::Blocked {
 
                    self.unblock_port(sched_ctx, comp_ctx, port_id);
 
                    self.unblock_local_port(sched_ctx, comp_ctx, port_id);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
@@ -493,12 +543,19 @@ impl CompPDL {
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                port_info.peer_comp_id = new_comp_id;
 
                self.unblock_port(sched_ctx, comp_ctx, port_id);
 
                self.unblock_local_port(sched_ctx, comp_ctx, port_id);
 
            }
 
        }
 
    }
 

	
 
    fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 

	
 
    }
 

	
 
    /// Marks the local port as being unblocked. If the execution was blocked on
 
    /// sending a message over this port, then execution will continue and the
 
    /// message will be sent.
 
    fn unblock_local_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 
@@ -506,9 +563,10 @@ impl CompPDL {
 
        if self.mode == Mode::BlockedPut && port_id == self.mode_port {
 
            // We were blocked on the port that just became unblocked, so
 
            // send the message.
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement);
 
            self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
src/runtime2/component/consensus.rs
Show inline comments
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::wake_up_if_sleeping;
 

	
 
use super::component_pdl::*;
 

	
 
@@ -14,42 +17,383 @@ impl PortAnnotation {
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
enum Mode {
 
    NonSync,
 
    SyncBusy,
 
    SyncAwaitingSolution,
 
}
 

	
 
struct SolutionCombiner {
 
    solution: SyncPartialSolution,
 
    all_present: bool, // set if the `submissions_by` only contains (_, true) entries.
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self {
 
            solution: SyncPartialSolution{
 
                submissions_by: Vec::new(),
 
                channel_mapping: Vec::new(),
 
                decision: RoundDecision::None,
 
            },
 
            all_present: false,
 
        }
 
    }
 

	
 
    /// Returns a decision for the current round. If there is no decision (yet)
 
    /// then `RoundDecision::None` is returned.
 
    fn get_decision(&self) -> RoundDecision {
 
        if self.all_present {
 
            debug_assert_ne!(self.solution.decision, RoundDecision::None);
 
            return self.solution.decision;
 
        }
 

	
 
        return RoundDecision::None; // even if failure: wait for everyone.
 
    }
 

	
 
    fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) {
 
        // Combine the submission tracking
 
        for (comp_id, present) in partial.submissions_by {
 
            self.mark_single_component_submission(comp_id, present);
 
        }
 

	
 
        debug_assert_ne!(self.solution.decision, RoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, RoundDecision::Solution);
 

	
 
        // Combine our partial solution with the provided partial solution.
 
        // This algorithm *could* allow overlap in the partial solutions, but
 
        // in practice this means something is going wrong (a component stored
 
        // a local solution *and* transmitted it to the leader, then later
 
        // submitted its partial solution), hence we will do some debug asserts
 
        // for now.
 
        for new_entry in partial.channel_mapping {
 
            let channel_index = if new_entry.putter.is_some() && new_entry.getter.is_some() {
 
                // Channel is completely specified
 
                debug_assert!(
 
                    self.find_channel_index_for_partial_entry(new_entry.putter.as_ref().unwrap()).is_none() &&
 
                    self.find_channel_index_for_partial_entry(new_entry.getter.as_ref().unwrap()).is_none()
 
                );
 
                let channel_index = self.solution.channel_mapping.len();
 
                self.solution.channel_mapping.push(new_entry);
 

	
 
                channel_index
 
            } else if let Some(new_port) = new_entry.putter {
 
                // Only putter is present in new entry
 
                match self.find_channel_index_for_partial_entry(&new_port) {
 
                    Some(channel_index) => {
 
                        let entry = &mut self.solution.channel_mapping[channel_index];
 
                        debug_assert!(entry.putter.is_none());
 
                        entry.putter = Some(new_port);
 

	
 
                        channel_index
 
                    },
 
                    None => {
 
                        let channel_index = self.solution.channel_mapping.len();
 
                        self.solution.channel_mapping.push(SyncSolutionChannel{
 
                            putter: Some(new_port),
 
                            getter: None,
 
                        });
 

	
 
                        channel_index
 
                    }
 
                }
 
            } else if let Some(new_port) = new_entry.getter {
 
                // Only getter is present in new entry
 
                match self.find_channel_index_for_partial_entry(&new_port) {
 
                    Some(channel_index) => {
 
                        let entry = &mut self.solution.channel_mapping[channel_index];
 
                        debug_assert!(entry.getter.is_none());
 
                        entry.getter = Some(new_port);
 

	
 
                        channel_index
 
                    },
 
                    None => {
 
                        let channel_index = self.solution.channel_mapping.len();
 
                        self.solution.channel_mapping.push(SyncSolutionChannel{
 
                            putter: None,
 
                            getter: Some(new_port)
 
                        });
 

	
 
                        channel_index
 
                    }
 
                }
 
            } else {
 
                unreachable!()
 
            };
 

	
 
            // Make sure the new entry is consistent
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if !Self::channel_is_consistent(channel) {
 
                self.solution.decision = RoundDecision::Failure;
 
            }
 
        }
 

	
 
        // Check to see if we have a global solution already
 
        self.update_all_present();
 
        if self.all_present && self.solution.decision != RoundDecision::Failure {
 
            debug_assert_eq!(self.solution.decision, RoundDecision::None);
 
            dbg_code!(for entry in &self.solution.channel_mapping {
 
                    debug_assert!(entry.putter.is_some() && entry.getter.is_some());
 
                });
 
            self.solution.decision = RoundDecision::Solution;
 
        }
 
    }
 

	
 
    /// Combines the currently stored global solution (if any) with the newly
 
    /// provided local solution. Make sure to check the `has_decision` return
 
    /// value afterwards.
 
    fn combine_with_local_solution(&mut self, comp_id: CompId, solution: SyncLocalSolution) {
 
        // Mark the contributions of the component and detect components whose
 
        // submissions we do not yet have
 
        self.mark_single_component_submission(comp_id, true);
 
        for entry in solution.iter() {
 
            self.mark_single_component_submission(entry.peer_comp_id, false);
 
        }
 

	
 
        debug_assert_ne!(self.solution.decision, RoundDecision::Solution);
 

	
 
        // Go through all entries and check if the submitted local solution is
 
        // consistent with our partial solution
 
        let mut had_new_entry = false;
 
        for entry in solution.iter() {
 
            let preexisting_index = self.find_channel_index_for_local_entry(comp_id, entry);
 
            let new_port = SolutionPort{
 
                self_comp_id: comp_id,
 
                self_port_id: entry.self_port_id,
 
                peer_comp_id: entry.peer_comp_id,
 
                peer_port_id: entry.peer_port_id,
 
                mapping: entry.mapping,
 
            };
 

	
 
            match preexisting_index {
 
                Some(entry_index) => {
 
                    // Add the local solution's entry to the existing entry in
 
                    // the global solution. We'll handle any mismatches along
 
                    // the way.
 
                    let channel = &mut self.solution.channel_mapping[entry_index];
 
                    if entry.is_putter {
 
                        // Getter should be present in existing entry
 
                        debug_assert!(channel.getter.is_some() && channel.putter.is_none());
 
                        channel.putter = Some(new_port);
 
                    } else {
 
                        // Putter should be present in existing entry
 
                        debug_assert!(channel.putter.is_some() && channel.getter.is_none());
 
                        channel.getter = Some(new_port);
 
                    };
 

	
 
                    if !Self::channel_is_consistent(channel) {
 
                        self.solution.decision = RoundDecision::Failure;
 
                    }
 
                },
 
                None => {
 
                    // No entry yet. So add it
 
                    let new_solution = if entry.is_putter {
 
                        SolutionChannel{ putter: Some(new_port), getter: None }
 
                    } else {
 
                        SolutionChannel{ putter: None, getter: Some(new_port) }
 
                    };
 
                    self.solution.channel_mapping.push(new_solution);
 
                    had_new_entry = true;
 
                }
 
            }
 
        }
 

	
 
        if !had_new_entry {
 
            self.update_all_present();
 
            if self.all_present && self.solution.decision != RoundDecision::Failure {
 
                // No new entries and every component is present. This implies that
 
                // every component successfully added their local solutions to the
 
                // global solution. Hence: we have a global solution
 
                debug_assert_eq!(self.solution.decision, RoundDecision::None);
 
                dbg_code!(for entry in &self.solution.channel_mapping {
 
                    debug_assert!(entry.putter.is_some() && entry.getter.is_some());
 
                });
 
                self.solution.decision = RoundDecision::Solution;
 
            }
 
        }
 
    }
 

	
 
    fn mark_single_component_submission(&mut self, comp_id: CompId, will_contribute: bool) {
 
        debug_assert!(!will_contribute || !self.solution.submissions_by.iter().any(|(id, val)| *id == comp_id && *val)); // if submitting a solution, then we do not expect an existing entry
 
        for (entry, has_contributed) in self.solution.submissions_by.iter_mut() {
 
            if *entry == comp_id {
 
                *has_contributed = *has_contributed || will_contribute;
 
                return;
 
            }
 
        }
 

	
 
        self.solution.submissions_by.push((comp_id, will_contribute));
 
    }
 

	
 
    fn update_all_present(&mut self) {
 
        debug_assert!(!self.all_present); // upheld by caller
 
        for (_, present) in self.solution.submissions_by.iter() {
 
            if !*present {
 
                return;
 
            }
 
        }
 

	
 
        self.all_present = true;
 
    }
 

	
 
    /// Given the partial solution entry of a channel's port, check if there is
 
    /// an entry for the other port. If there is we return its index, and we
 
    /// return `None` otherwise.
 
    fn find_channel_index_for_partial_entry(&self, new_entry: &SyncSolutionPort) -> Option<usize> {
 
        fn might_belong_to_same_channel(cur_entry: &SyncSolutionPort, new_entry: &SyncSolutionPort) -> bool {
 
            (
 
                cur_entry.peer_comp_id == new_entry.self_comp_id &&
 
                cur_entry.peer_port_id == new_entry.self_port_id
 
            ) || (
 
                cur_entry.self_comp_id == new_entry.peer_comp_id &&
 
                cur_entry.self_port_id == new_entry.peer_port_id
 
            )
 
        }
 

	
 
        for (entry_index, cur_entry) in self.solution.channel_mapping.iter().enumerate() {
 
            if new_entry.port_kind == PortKind::Putter {
 
                if let Some(cur_entry) = &cur_entry.getter {
 
                    if might_belong_to_same_channel(cur_entry, new_entry) {
 
                        return Some(entry_index);
 
                    }
 
                }
 
            } else {
 
                if let Some(cur_entry) = &cur_entry.putter {
 
                    if might_belong_to_same_channel(cur_entry, new_entry) {
 
                        return Some(entry_index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Given the local solution entry for one end of a channel, check if there
 
    /// is an entry for the other end of the channel such that they can be
 
    /// paired up.
 
    fn find_channel_index_for_local_entry(&self, comp_id: CompId, new_entry: &SyncLocalSolutionEntry) -> Option<usize> {
 
        fn might_belong_to_same_channel(cur_entry: &SyncSolutionPort, new_comp_id: CompId, new_entry: &SyncLocalSolutionEntry) -> bool {
 
            (
 
                new_entry.peer_comp_id == cur_entry.self_comp_id &&
 
                new_entry.peer_port_id == cur_entry.self_port_id
 
            ) || (
 
                new_comp_id == cur_entry.peer_comp_id &&
 
                new_entry.self_port_id == cur_entry.peer_port_id
 
            )
 
        }
 

	
 
        for (entry_index, cur_entry) in self.solution.channel_mapping.iter().enumerate() {
 
            // Note that the check that determines whether two ports belong to
 
            // the same channel is one-sided. That is: port A may decide that
 
            // port B is part of its channel, but port B may consider port A not
 
            // to be part of its channel. Before merging the entries (outside of
 
            // this function) we'll make sure this is not the case.
 
            if new_entry.is_putter {
 
                // Expect getter to be present
 
                if let Some(cur_entry) = &cur_entry.getter {
 
                    if might_belong_to_same_channel(cur_entry, comp_id, new_entry) {
 
                        return Some(entry_index);
 
                    }
 
                }
 
            } else {
 
                if let Some(cur_entry) = &cur_entry.putter {
 
                    if might_belong_to_same_channel(cur_entry, comp_id, new_entry) {
 
                        return Some(entry_index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    // Makes sure that two ports agree that they are each other's peers
 
    fn ports_belong_to_same_channel(a: &SyncSolutionPort, b: &SyncSolutionPort) -> bool {
 
        return
 
            a.self_comp_id == b.peer_comp_id && a.self_port_id == b.peer_port_id &&
 
            a.peer_comp_id == b.self_comp_id && a.peer_port_id == b.self_port_id
 
    }
 

	
 
    // Makes sure channel is consistently mapped (or not yet fully specified)
 
    fn channel_is_consistent(channel: &SyncSolutionChannel) -> bool {
 
        debug_assert!(channel.putter.is_some() || channel.getter.is_some());
 
        if channel.putter.is_none() || channel.getter.is_none() {
 
            // Not yet fully specified
 
            return false;
 
        }
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return
 
            Self::ports_belong_to_same_channel(putter, getter) &&
 
                putter.mapping == getter.mapping;
 
    }
 
}
 

	
 
/// Tracking consensus state
 
pub struct Consensus {
 
    round: u32,
 
    // General state of consensus manager
 
    mapping_counter: u32,
 
    mode: Mode,
 
    // State associated with sync round
 
    round_index: u32,
 
    highest_id: CompId,
 
    ports: Vec<PortAnnotation>,
 
    // State associated with arriving at a solution and being a (temporary)
 
    // leader in the consensus round
 
    solution: SolutionCombiner,
 
}
 

	
 
impl Consensus {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            round: 0,
 
            mapping_counter: 0,
 
            round_index: 0,
 
            highest_id: CompId::new_invalid(),
 
            ports: Vec::new(),
 
            mapping_counter: 0,
 
            mode: Mode::NonSync,
 
            solution: SolutionCombiner::new(),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Managing sync state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
 
        // Make sure we locally still have all of the same ports
 
        self.transfer_ports(comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.highest_id = comp_ctx.id;
 
        self.mapping_counter = 0;
 
        self.mode = Mode::SyncBusy;
 
        self.make_ports_consistent_with_ctx(comp_ctx);
 
    }
 

	
 
    pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id));
 
        let data_header = self.create_data_header(port_info);
 
        let sync_header = self.create_sync_header();
 
    pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> RoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
    }
 
        // Submit our port mapping as a solution
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_info = comp_ctx.get_port(port.id);
 
                local_solution.push(SyncLocalSolutionEntry {
 
                    self_port_id: port.id,
 
                    peer_comp_id: port_info.peer_comp_id,
 
                    peer_port_id: port_info.peer_id,
 
                    mapping,
 
                    port_kind: port_info.kind,
 
                });
 
            }
 
        }
 

	
 
    pub(crate) fn notify_sync_end(&mut self) {
 
        self.round = self.round.wrapping_add(1);
 
        todo!("implement sync end")
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
        return decision;
 
    }
 

	
 
    pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) {
 
    fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.ports.len() != self.ports.len() {
 
            needs_setting_ports = true;
 
@@ -73,32 +417,238 @@ impl Consensus {
 
        }
 
    }
 

	
 
    fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader {
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.round.ports.iter().any(|v| v.id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
    }
 

	
 
    /// Checks if the data message can be received (due to port annotations), if
 
    /// it can then `true` is returned and the caller is responsible for handing
 
    /// the message of to the PDL code. Otherwise the message cannot be
 
    /// received.
 
    pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        debug_assert!(self.round.ports.iter().any(|v| v.id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (expected_id, expected_annotation) in &message.data_header.expected_mapping {
 
            let got_annotation = self.get_annotation(*expected_id);
 
            if got_annotation != expected_annotation {
 
                return false;
 
            }
 
        }
 

	
 
        // Expected mapping matches current mapping, so we will receive the message
 
        self.set_annotation(message.data_header.target_port, message.data_header.new_mapping);
 

	
 
        // Handle the sync header embedded within the data message
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        return true;
 
    }
 

	
 
    /// Receives the sync message and updates the consensus state appropriately.
 
    pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> RoundDecision {
 
        // Whatever happens: handle the sync header (possibly changing the
 
        // currently registered leader)
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        match message.content {
 
            SyncMessageContent::NotificationOfLeader => {
 
                return RoundDecision::None;
 
            },
 
            SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => {
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution);
 
            },
 
            SyncMessageContent::PartialSolution(partial_solution) => {
 
                return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution);
 
            }
 
            SyncMessageContent::GlobalSolution => {
 
                // Global solution has been found
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution
 
                todo!("clear port mapping or something");
 
                return RoundDecision::Solution;
 
            },
 
        }
 
    }
 

	
 
    fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) {
 
        if header.highest_id > self.round.highest_id {
 
            // Sender knows of someone with a higher ID. So store highest ID,
 
            // notify all peers, and forward local solutions
 
            self.round.highest_id = header.highest_id;
 
            for peer in &comp_ctx.peers {
 
                if peer.id == header.sending_id {
 
                    continue;
 
                }
 

	
 
                let message = SyncMessage{
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.inbox.push(Message::Sync(message));
 
                wake_up_if_sleeping(sched_ctx, peer.id, &peer.handle);
 
            }
 

	
 
            self.forward_local_solutions(sched_ctx, comp_ctx);
 
        } else if header.highest_id < self.round.highest_id {
 
            // Sender has a lower ID, so notify it of our higher one
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::NotificationOfLeader,
 
            };
 
            let peer_info = comp_ctx.get_peer(header.sending_id);
 
            peer_info.handle.inbox.push(Message::Sync(message));
 
            wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle);
 
        } // else: exactly equal
 
    }
 

	
 
    fn get_annotation(&self, port_id: PortId) -> Option<u32> {
 
        for annotation in self.ports.iter() {
 
            if annotation.id == port_id {
 
                return annotation.mapping;
 
            }
 
        }
 

	
 
        debug_assert!(false);
 
        return None;
 
    }
 

	
 
    fn set_annotation(&mut self, port_id: PortId, mapping: u32) {
 
        for annotation in self.ports.iter_mut() {
 
            if annotation.id == port_id {
 
                annotation.mapping = Some(mapping);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Leader-related methods
 
    // -------------------------------------------------------------------------
 

	
 
    fn forward_local_solutions(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        todo!("implement")
 
    }
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> RoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let round_decision = self.solution.get_decision();
 
            let decision_is_solution = match round_decision {
 
                RoundDecision::None => {
 
                    // No solution yet
 
                    return RoundDecision::None;
 
                },
 
                RoundDecision::Solution => true,
 
                RoundDecision::Failure => false,
 
            };
 

	
 
            // If here then we've reached a decision, broadcast it
 
            for (peer_id, _is_present) in self.solution.solution.submissions_by.iter().copied() {
 
                debug_assert!(_is_present);
 
                if peer_id == comp_ctx.id {
 
                    // Do not send the result to ourselves
 
                    continue;
 
                }
 

	
 
                let mut handle = sched_ctx.runtime.get_component_public(peer_id);
 
                handle.inbox.push(Message::Sync(SyncMessage{
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: if decision_is_solution {
 
                        SyncMessageContent::GlobalSolution
 
                    } else {
 
                        SyncMessageContent::GlobalFailure
 
                    },
 
                }));
 
                wake_up_if_sleeping(sched_ctx, peer_id, &handle);
 

	
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(!_should_remove);
 
            }
 

	
 
            return round_decision;
 
        } else {
 
            // Forward the solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::LocalSolution(solution_sender_id, solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return RoundDecision::None;
 
        }
 
    }
 

	
 
    fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> RoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader, combine existing and new solution
 
            self.solution.combine_with_partial_solution(solution);
 
            let round_decision = self.solution.get_decision();
 

	
 

	
 
            return RoundDecision::None;
 
        } else {
 
            // Forward the partial solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::PartialSolution(solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return RoundDecision::None;
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.inbox.push(message);
 
        wake_up_if_sleeping(sched_ctx, self.highest_id, &leader_info);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // -------------------------------------------------------------------------
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                expected_mapping.push((port.id, mapping));
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.round.ports.iter().enumerate() {
 
            if port.id == port_info.self_id {
 
                port_index = index;
 
            }
 
            expected_mapping.push((port.id, Some(mapping)));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
 
        self.round.ports[port_index].mapping = Some(new_mapping);
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
            new_mapping: self.take_mapping(),
 
            new_mapping,
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
        };
 
    }
 

	
 
    fn create_sync_header(&self) -> MessageSyncHeader {
 
    fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader {
 
        return MessageSyncHeader{
 
            sync_round: self.round,
 
            sync_round: self.round.index,
 
            sending_id: comp_ctx.id,
 
            highest_id: self.highest_id,
 
        };
 
    }
 

	
 
    fn take_mapping(&mut self) -> u32 {
 
        let mapping = self.mapping_counter;
 
        self.mapping_counter += 1;
 
        self.mapping_counter = self.mapping_counter.wrapping_add(1);
 
        return mapping;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -228,7 +228,7 @@ impl ControlLayer {
 
        ));
 
    }
 

	
 
    pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
    pub(crate) fn set_port_and_peer_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // TODO: Feels like this shouldn't be an entry. Hence this class should
 
        //  be renamed. Lets see where the code ends up being
 
        let entry_id = self.take_id();
 
@@ -255,7 +255,7 @@ impl ControlLayer {
 
        );
 
    }
 

	
 
    pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
    pub(crate) fn set_port_and_peer_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // Find the entry that contains the blocking entry for the port
 
        let mut entry_index = usize::MAX;
 
        let mut entry_id = ControlId::new_invalid();
 
@@ -273,6 +273,7 @@ impl ControlLayer {
 
        let peer_port_id = port_info.peer_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we blocked it because of receiving too many messages
 
        port_info.state = PortState::Open;
 

	
 
        return (
0 comments (0 inline, 0 general)