Changeset - 6555f56a22a9
[Not reviewed]
0 7 0
mh - 3 years ago 2022-01-21 17:20:57
contact@maxhenger.nl
WIP: First sync test, partially correct
7 files changed with 288 insertions and 138 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -78,6 +78,7 @@ pub struct SyncMessage {
 
    pub content: SyncMessageContent,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncLocalSolutionEntry {
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
@@ -88,6 +89,7 @@ pub struct SyncLocalSolutionEntry {
 

	
 
pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;
 

	
 
#[derive(Debug)]
 
pub struct SyncSolutionPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
@@ -97,29 +99,37 @@ pub struct SyncSolutionPort {
 
    pub port_kind: PortKind,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncSolutionChannel {
 
    pub putter: Option<SyncSolutionPort>,
 
    pub getter: Option<SyncSolutionPort>,
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub enum RoundDecision {
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum SyncRoundDecision {
 
    None,
 
    Solution,
 
    Failure,
 
}
 

	
 
impl RoundDecision {
 
    fn is_some(&self)
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncPartialSolution {
 
    pub submissions_by: Vec<(CompId, bool)>,
 
    pub channel_mapping: Vec<SyncSolutionChannel>,
 
    pub decision: SyncRoundDecision,
 
}
 

	
 
#[derive(Debug, Clone)]
 
impl Default for SyncPartialSolution {
 
    fn default() -> Self {
 
        return Self{
 
            submissions_by: Vec::new(),
 
            channel_mapping: Vec::new(),
 
            decision: SyncRoundDecision::None,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub enum SyncMessageContent {
 
    NotificationOfLeader,
 
    LocalSolution(CompId, SyncLocalSolution), // local solution of the specified component
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -182,6 +182,7 @@ pub(crate) enum Mode {
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet,
 
    BlockedPut,
 
    Exit,
 
}
 

	
 
impl Mode {
 
@@ -189,7 +190,7 @@ impl Mode {
 
        match self {
 
            Mode::NonSync | Mode::Sync =>
 
                return true,
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut =>
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit =>
 
                return false,
 
        }
 
    }
 
@@ -239,9 +240,10 @@ impl CompPDL {
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
            let target = sched_ctx.runtime.get_component_public(new_target);
 
            target.inbox.push(message);
 

	
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, true);
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(!_should_remove);
 
            return;
 
        }
 

	
 
@@ -258,6 +260,10 @@ impl CompPDL {
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
@@ -286,7 +292,7 @@ impl CompPDL {
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(message) {
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
@@ -317,7 +323,7 @@ impl CompPDL {
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Exit);
 
            },
 
            EC::SyncBlockStart => {
 
@@ -346,6 +352,8 @@ impl CompPDL {
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 
@@ -370,20 +378,28 @@ impl CompPDL {
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.consensus.notify_sync_end();
 
        self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::Sync);
 
        self.mode = Mode::SyncEnd;
 
    }
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) {
 
        use std::sync::atomic::Ordering;
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync
 

	
 
        // Note: for now we have that the scheduler handles exiting. I don't
 
        // know if that is a good idea, we'll see
 
        self.mode = Mode::Exit;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_id);
 
        let peer_info = comp_ctx.get_peer(port_info.peer_comp_id);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.inbox.push(Message::Data(annotated_message));
 

	
 
        wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
@@ -420,8 +436,7 @@ impl CompPDL {
 
            debug_assert_eq!(_peer_comp_id, target_comp_id);
 

	
 
            let peer = comp_ctx.get_peer(target_comp_id);
 
            peer.handle.inbox.push(Message::Control(block_message));
 
            wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(block_message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
@@ -454,8 +469,7 @@ impl CompPDL {
 
        if port_info.state == PortState::Blocked {
 
            let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer_comp_id);
 
            peer_info.handle.inbox.push(Message::Control(message));
 
            wake_up_if_sleeping(sched_ctx, peer_comp_id, &peer_info.handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
@@ -469,8 +483,7 @@ impl CompPDL {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                            // FIX @NoDirectHandle
 
                            let handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.inbox.push(Message::Control(message));
 
                            wake_up_if_sleeping(sched_ctx, target_comp, &handle);
 
                            handle.send_message(sched_ctx, Message::Control(message), true);
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
@@ -537,6 +550,12 @@ impl CompPDL {
 
                if port_info.state == PortState::Open {
 
                    port_info.state = PortState::Blocked;
 
                }
 

	
 
                let peer_info = comp_ctx.get_peer(port_info.peer_comp_id);
 
                // TODO: Continue here. Send ack, but think about whether we
 
                //  always have the peer in our list of peers? Quickly thinking
 
                //  about it, I think so, but we may have a series of port
 
                //  transfers. Does that change things?
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
@@ -548,10 +567,36 @@ impl CompPDL {
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option<CompScheduling> {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return None;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a conclusion
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        self.mode = Mode::NonSync;
 

	
 
        if is_success {
 
            // We can simply continue executing. So we do nothing extra!
 
        } else {
 
            todo!("handle this better, show some kind of error");
 
            self.handle_component_exit(sched_ctx, comp_ctx);
 
            return Some(CompScheduling::Exit);
 
        }
 

	
 
        return None;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Marks the local port as being unblocked. If the execution was blocked on
 
    /// sending a message over this port, then execution will continue and the
 
    /// message will be sent.
 
@@ -590,18 +635,23 @@ impl CompPDL {
 
                let peer_port_id = port_info.peer_id;
 
                let port_info = creator_ctx.get_port_mut(peer_port_id);
 
                port_info.peer_comp_id = created_ctx.id;
 
                Self::add_peer_associated_port_to_component(sched_ctx, creator_ctx, created_ctx.id);
 
            } else {
 
                // We don't own the port, so send the appropriate messages and
 
                // notify the control layer
 
                // We don't own the peer port, so send the appropriate messages
 
                // to the peer component and notify the control layer
 
                has_reroute_entry = true;
 
                let message = self.control.add_reroute_entry(
 
                    creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                    port_info.self_id, created_ctx.id, schedule_entry_id
 
                );
 
                let peer_info = creator_ctx.get_peer(port_info.peer_comp_id);
 
                peer_info.handle.inbox.push(message);
 
                peer_info.handle.send_message(sched_ctx, message, true);
 
            }
 

	
 
            // Take out any potential messages for the peer
 
            let creator_port_index = creator_ctx.get_port_index(port_id).unwrap();
 
            let port_main_message = self.inbox_main[creator_port_index].take();
 

	
 
            // Transfer port and create temporary reroute entry
 
            let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            if port_info.state == PortState::Blocked {
 
@@ -609,6 +659,20 @@ impl CompPDL {
 
            }
 
            Self::add_port_to_component(sched_ctx, created_ctx, port_info);
 

	
 
            // Transfer the taken messages
 
            let created_port_index = created_ctx.get_port_index(port_id).unwrap();
 
            component.code.inbox_main[created_port_index] = port_main_message;
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                if self.inbox_backup[message_index].data_header.target_port == port_id {
 
                    // Move this message
 
                    let message = self.inbox_backup.remove(message_index);
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            // Maybe remove peer from the creator
 
            if let Some(mut peer_info) = peer_info {
 
                let remove_from_runtime = peer_info.handle.decrement_users();
 
@@ -622,6 +686,7 @@ impl CompPDL {
 
        if !has_reroute_entry {
 
            // We can schedule the component immediately
 
            self.control.remove_schedule_entry(schedule_entry_id);
 
            component.public.sleeping.store(false, std::sync::atomic::Ordering::Release);
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        } // else: wait for the `Ack`s, they will trigger the scheduling of the component
 
    }
 
@@ -653,23 +718,29 @@ impl CompPDL {
 
        return (port_info, Some(peer_info));
 
    }
 

	
 
    /// Adds a port to the component context. The peer (or its counter) will be
 
    /// updated accordingly.
 
    fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) {
 
        // Add the port info
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id));
 
        comp_ctx.ports.push(port_info);
 
        Self::add_peer_associated_port_to_component(sched_ctx, comp_ctx, peer_comp_id);
 
    }
 

	
 
        // Increment counters on peer, or create entry for peer if it doesn't
 
        // exist yet.
 
        match comp_ctx.peers.iter().position(|v| v.id == peer_comp_id) {
 
    /// Only adds/updates a peer for a given port. This function assumes (but
 
    /// does not check!) that the port was not considered to belong to that peer
 
    /// before calling this function.
 
    fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) {
 
        match comp_ctx.get_peer_index(peer_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = sched_ctx.runtime.get_component_public(peer_comp_id);
 
                let handle = sched_ctx.runtime.get_component_public(peer_id);
 
                comp_ctx.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    id: peer_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -2,7 +2,6 @@ use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::wake_up_if_sleeping;
 

	
 
use super::component_pdl::*;
 

	
 
@@ -17,7 +16,7 @@ impl PortAnnotation {
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
#[derive(Debug, Eq, PartialEq)]
 
enum Mode {
 
    NonSync,
 
    SyncBusy,
 
@@ -32,24 +31,25 @@ struct SolutionCombiner {
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self {
 
            solution: SyncPartialSolution{
 
                submissions_by: Vec::new(),
 
                channel_mapping: Vec::new(),
 
                decision: RoundDecision::None,
 
            },
 
            solution: SyncPartialSolution::default(),
 
            all_present: false,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn has_contributions(&self) -> bool {
 
        return !self.solution.submissions_by.is_empty();
 
    }
 

	
 
    /// Returns a decision for the current round. If there is no decision (yet)
 
    /// then `RoundDecision::None` is returned.
 
    fn get_decision(&self) -> RoundDecision {
 
    fn get_decision(&self) -> SyncRoundDecision {
 
        if self.all_present {
 
            debug_assert_ne!(self.solution.decision, RoundDecision::None);
 
            debug_assert_ne!(self.solution.decision, SyncRoundDecision::None);
 
            return self.solution.decision;
 
        }
 

	
 
        return RoundDecision::None; // even if failure: wait for everyone.
 
        return SyncRoundDecision::None; // even in case of failure: wait for everyone.
 
    }
 

	
 
    fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) {
 
@@ -58,8 +58,8 @@ impl SolutionCombiner {
 
            self.mark_single_component_submission(comp_id, present);
 
        }
 

	
 
        debug_assert_ne!(self.solution.decision, RoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, RoundDecision::Solution);
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, SyncRoundDecision::Solution);
 

	
 
        // Combine our partial solution with the provided partial solution.
 
        // This algorithm *could* allow overlap in the partial solutions, but
 
@@ -125,18 +125,18 @@ impl SolutionCombiner {
 
            // Make sure the new entry is consistent
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if !Self::channel_is_consistent(channel) {
 
                self.solution.decision = RoundDecision::Failure;
 
                self.solution.decision = SyncRoundDecision::Failure;
 
            }
 
        }
 

	
 
        // Check to see if we have a global solution already
 
        self.update_all_present();
 
        if self.all_present && self.solution.decision != RoundDecision::Failure {
 
            debug_assert_eq!(self.solution.decision, RoundDecision::None);
 
        if self.all_present && self.solution.decision != SyncRoundDecision::Failure {
 
            debug_assert_eq!(self.solution.decision, SyncRoundDecision::None);
 
            dbg_code!(for entry in &self.solution.channel_mapping {
 
                    debug_assert!(entry.putter.is_some() && entry.getter.is_some());
 
                });
 
            self.solution.decision = RoundDecision::Solution;
 
            self.solution.decision = SyncRoundDecision::Solution;
 
        }
 
    }
 

	
 
@@ -151,19 +151,20 @@ impl SolutionCombiner {
 
            self.mark_single_component_submission(entry.peer_comp_id, false);
 
        }
 

	
 
        debug_assert_ne!(self.solution.decision, RoundDecision::Solution);
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 

	
 
        // Go through all entries and check if the submitted local solution is
 
        // consistent with our partial solution
 
        let mut had_new_entry = false;
 
        for entry in solution.iter() {
 
            let preexisting_index = self.find_channel_index_for_local_entry(comp_id, entry);
 
            let new_port = SolutionPort{
 
            let new_port = SyncSolutionPort{
 
                self_comp_id: comp_id,
 
                self_port_id: entry.self_port_id,
 
                peer_comp_id: entry.peer_comp_id,
 
                peer_port_id: entry.peer_port_id,
 
                mapping: entry.mapping,
 
                port_kind: entry.port_kind,
 
            };
 

	
 
            match preexisting_index {
 
@@ -172,26 +173,28 @@ impl SolutionCombiner {
 
                    // the global solution. We'll handle any mismatches along
 
                    // the way.
 
                    let channel = &mut self.solution.channel_mapping[entry_index];
 
                    if entry.is_putter {
 
                    match entry.port_kind {
 
                        PortKind::Putter => {
 
                            // Getter should be present in existing entry
 
                            debug_assert!(channel.getter.is_some() && channel.putter.is_none());
 
                            channel.putter = Some(new_port);
 
                    } else {
 
                        },
 
                        PortKind::Getter => {
 
                            // Putter should be present in existing entry
 
                            debug_assert!(channel.putter.is_some() && channel.getter.is_none());
 
                            channel.getter = Some(new_port);
 
                    };
 
                        }
 
                    }
 

	
 
                    if !Self::channel_is_consistent(channel) {
 
                        self.solution.decision = RoundDecision::Failure;
 
                        self.solution.decision = SyncRoundDecision::Failure;
 
                    }
 
                },
 
                None => {
 
                    // No entry yet. So add it
 
                    let new_solution = if entry.is_putter {
 
                        SolutionChannel{ putter: Some(new_port), getter: None }
 
                    } else {
 
                        SolutionChannel{ putter: None, getter: Some(new_port) }
 
                    let new_solution = match entry.port_kind {
 
                        PortKind::Putter => SyncSolutionChannel{ putter: Some(new_port), getter: None },
 
                        PortKind::Getter => SyncSolutionChannel{ putter: None, getter: Some(new_port) },
 
                    };
 
                    self.solution.channel_mapping.push(new_solution);
 
                    had_new_entry = true;
 
@@ -201,19 +204,39 @@ impl SolutionCombiner {
 

	
 
        if !had_new_entry {
 
            self.update_all_present();
 
            if self.all_present && self.solution.decision != RoundDecision::Failure {
 
            if self.all_present && self.solution.decision != SyncRoundDecision::Failure {
 
                // No new entries and every component is present. This implies that
 
                // every component successfully added their local solutions to the
 
                // global solution. Hence: we have a global solution
 
                debug_assert_eq!(self.solution.decision, RoundDecision::None);
 
                debug_assert_eq!(self.solution.decision, SyncRoundDecision::None);
 
                dbg_code!(for entry in &self.solution.channel_mapping {
 
                    debug_assert!(entry.putter.is_some() && entry.getter.is_some());
 
                });
 
                self.solution.decision = RoundDecision::Solution;
 
                self.solution.decision = SyncRoundDecision::Solution;
 
            }
 
        }
 
    }
 

	
 
    /// Takes whatever partial solution is present in the solution combiner and
 
    /// returns it. The solution combiner's solution will end up being empty.
 
    /// This is used when a new leader is found and we need to pass along our
 
    /// partial results.
 
    fn take_partial_solution(&mut self) -> SyncPartialSolution {
 
        let mut partial_solution = SyncPartialSolution::default();
 
        std::mem::swap(&mut partial_solution, &mut self.solution);
 
        self.all_present = false;
 

	
 
        return partial_solution;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.solution.submissions_by.clear();
 
        self.solution.channel_mapping.clear();
 
        self.solution.decision = SyncRoundDecision::None;
 
    }
 

	
 
    // --- Small utilities for combining solutions
 

	
 
    fn mark_single_component_submission(&mut self, comp_id: CompId, will_contribute: bool) {
 
        debug_assert!(!will_contribute || !self.solution.submissions_by.iter().any(|(id, val)| *id == comp_id && *val)); // if submitting a solution, then we do not expect an existing entry
 
        for (entry, has_contributed) in self.solution.submissions_by.iter_mut() {
 
@@ -290,14 +313,16 @@ impl SolutionCombiner {
 
            // port B is part of its channel, but port B may consider port A not
 
            // to be part of its channel. Before merging the entries (outside of
 
            // this function) we'll make sure this is not the case.
 
            if new_entry.is_putter {
 
            match new_entry.port_kind {
 
                PortKind::Putter => {
 
                    // Expect getter to be present
 
                    if let Some(cur_entry) = &cur_entry.getter {
 
                        if might_belong_to_same_channel(cur_entry, comp_id, new_entry) {
 
                            return Some(entry_index);
 
                        }
 
                    }
 
            } else {
 
                },
 
                PortKind::Getter => {
 
                    if let Some(cur_entry) = &cur_entry.putter {
 
                        if might_belong_to_same_channel(cur_entry, comp_id, new_entry) {
 
                            return Some(entry_index);
 
@@ -305,6 +330,7 @@ impl SolutionCombiner {
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 
@@ -362,6 +388,8 @@ impl Consensus {
 
    // Managing sync state
 
    // -------------------------------------------------------------------------
 

	
 
    /// Notifies the consensus management that the PDL code has reached the
 
    /// start of a sync block.
 
    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.highest_id = comp_ctx.id;
 
@@ -370,7 +398,10 @@ impl Consensus {
 
        self.make_ports_consistent_with_ctx(comp_ctx);
 
    }
 

	
 
    pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> RoundDecision {
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
@@ -393,6 +424,22 @@ impl Consensus {
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
    /// to the consensus layer.
 
    pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) {
 
        // Reset everything for the next round
 
        debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
        self.mode = Mode::NonSync;
 
        self.round_index = self.round_index.wrapping_add(1);
 

	
 
        for port in self.ports.iter_mut() {
 
            port.mapping = None;
 
        }
 

	
 
        self.solution.clear();
 
    }
 

	
 
    fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.ports.len() != self.ports.len() {
 
@@ -423,7 +470,7 @@ impl Consensus {
 

	
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.round.ports.iter().any(|v| v.id == port_info.self_id));
 
        debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
@@ -436,12 +483,12 @@ impl Consensus {
 
    /// received.
 
    pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        debug_assert!(self.round.ports.iter().any(|v| v.id == message.data_header.target_port));
 
        debug_assert!(self.ports.iter().any(|v| v.id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (expected_id, expected_annotation) in &message.data_header.expected_mapping {
 
            let got_annotation = self.get_annotation(*expected_id);
 
            if got_annotation != expected_annotation {
 
            if got_annotation != *expected_annotation {
 
                return false;
 
            }
 
        }
 
@@ -456,35 +503,38 @@ impl Consensus {
 
    }
 

	
 
    /// Receives the sync message and updates the consensus state appropriately.
 
    pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> RoundDecision {
 
    pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision {
 
        // Whatever happens: handle the sync header (possibly changing the
 
        // currently registered leader)
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        match message.content {
 
            SyncMessageContent::NotificationOfLeader => {
 
                return RoundDecision::None;
 
                return SyncRoundDecision::None;
 
            },
 
            SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => {
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution);
 
            },
 
            SyncMessageContent::PartialSolution(partial_solution) => {
 
                return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution);
 
            }
 
            },
 
            SyncMessageContent::GlobalSolution => {
 
                // Global solution has been found
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution
 
                todo!("clear port mapping or something");
 
                return RoundDecision::Solution;
 
                return SyncRoundDecision::Solution;
 
            },
 
            SyncMessageContent::GlobalFailure => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
                return SyncRoundDecision::Failure;
 
            }
 
        }
 
    }
 

	
 
    fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) {
 
        if header.highest_id > self.round.highest_id {
 
        if header.highest_id.0 > self.highest_id.0 {
 
            // Sender knows of someone with a higher ID. So store highest ID,
 
            // notify all peers, and forward local solutions
 
            self.round.highest_id = header.highest_id;
 
            self.highest_id = header.highest_id;
 
            for peer in &comp_ctx.peers {
 
                if peer.id == header.sending_id {
 
                    continue;
 
@@ -494,20 +544,18 @@ impl Consensus {
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.inbox.push(Message::Sync(message));
 
                wake_up_if_sleeping(sched_ctx, peer.id, &peer.handle);
 
                peer.handle.send_message(sched_ctx, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_local_solutions(sched_ctx, comp_ctx);
 
        } else if header.highest_id < self.round.highest_id {
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
        } else if header.highest_id.0 < self.highest_id.0 {
 
            // Sender has a lower ID, so notify it of our higher one
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::NotificationOfLeader,
 
            };
 
            let peer_info = comp_ctx.get_peer(header.sending_id);
 
            peer_info.handle.inbox.push(Message::Sync(message));
 
            wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
@@ -534,47 +582,30 @@ impl Consensus {
 
    // Leader-related methods
 
    // -------------------------------------------------------------------------
 

	
 
    fn forward_local_solutions(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        todo!("implement")
 
    }
 
    fn forward_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // not leader
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> RoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let round_decision = self.solution.get_decision();
 
            let decision_is_solution = match round_decision {
 
                RoundDecision::None => {
 
                    // No solution yet
 
                    return RoundDecision::None;
 
                },
 
                RoundDecision::Solution => true,
 
                RoundDecision::Failure => false,
 
            };
 

	
 
            // If here then we've reached a decision, broadcast it
 
            for (peer_id, _is_present) in self.solution.solution.submissions_by.iter().copied() {
 
                debug_assert!(_is_present);
 
                if peer_id == comp_ctx.id {
 
                    // Do not send the result to ourselves
 
                    continue;
 
        // Make sure that we have something to send
 
        if !self.solution.has_contributions() {
 
            return;
 
        }
 

	
 
                let mut handle = sched_ctx.runtime.get_component_public(peer_id);
 
                handle.inbox.push(Message::Sync(SyncMessage{
 
        // Swap the container with the partial solution and then send it along
 
        let partial_solution = self.solution.take_partial_solution();
 
        self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{
 
            sync_header: self.create_sync_header(comp_ctx),
 
                    content: if decision_is_solution {
 
                        SyncMessageContent::GlobalSolution
 
                    } else {
 
                        SyncMessageContent::GlobalFailure
 
                    },
 
            content: SyncMessageContent::PartialSolution(partial_solution),
 
        }));
 
                wake_up_if_sleeping(sched_ctx, peer_id, &handle);
 

	
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(!_should_remove);
 
    }
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the solution
 
@@ -583,18 +614,19 @@ impl Consensus {
 
                content: SyncMessageContent::LocalSolution(solution_sender_id, solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return RoundDecision::None;
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> RoundDecision {
 
    fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader, combine existing and new solution
 
            self.solution.combine_with_partial_solution(solution);
 
            let round_decision = self.solution.get_decision();
 

	
 

	
 
            return RoundDecision::None;
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the partial solution
 
            let message = SyncMessage{
 
@@ -602,15 +634,40 @@ impl Consensus {
 
                content: SyncMessageContent::PartialSolution(solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return RoundDecision::None;
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.highest_id, comp_ctx.id);
 

	
 
        let is_success = match decision {
 
            SyncRoundDecision::None => unreachable!(),
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        for (peer, _) in self.solution.solution.submissions_by.iter().copied() {
 
            if peer == comp_ctx.id {
 
                // Do not send to ourselves
 
                continue;
 
            }
 

	
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(!_should_remove);
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.inbox.push(message);
 
        wake_up_if_sleeping(sched_ctx, self.highest_id, &leader_info);
 
        leader_info.send_message(sched_ctx, message, true);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
@@ -620,15 +677,15 @@ impl Consensus {
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.round.ports.iter().enumerate() {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.id == port_info.self_id {
 
                port_index = index;
 
            }
 
            expected_mapping.push((port.id, Some(mapping)));
 
            expected_mapping.push((port.id, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
 
        self.round.ports[port_index].mapping = Some(new_mapping);
 
        self.ports[port_index].mapping = Some(new_mapping);
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
@@ -638,14 +695,16 @@ impl Consensus {
 
        };
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader {
 
        return MessageSyncHeader{
 
            sync_round: self.round.index,
 
            sync_round: self.round_index,
 
            sending_id: comp_ctx.id,
 
            highest_id: self.highest_id,
 
        };
 
    }
 

	
 
    #[inline]
 
    fn take_mapping(&mut self) -> u32 {
 
        let mapping = self.mapping_counter;
 
        self.mapping_counter = self.mapping_counter.wrapping_add(1);
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -206,7 +206,7 @@ impl ControlLayer {
 
        port.state = PortState::Closed;
 

	
 
        if peer_comp_id == comp_ctx.id {
 
            // We own the other end of the channel as well
 
            // We own the other end of the channel as well.
 
            return None;
 
        }
 

	
src/runtime2/runtime.rs
Show inline comments
 
@@ -3,11 +3,12 @@ use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 
use crate::runtime2::component::wake_up_if_sleeping;
 

	
 
use super::communication::Message;
 
use super::component::{CompCtx, CompPDL};
 
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::Scheduler;
 
use super::scheduler::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
@@ -58,7 +59,7 @@ pub(crate) struct RuntimeComp {
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // manually modified (!)
 
    pub inbox: QueueDynProducer<Message>,
 
    inbox: QueueDynProducer<Message>,
 
}
 

	
 
/// Handle to public part of a component. Would be nice if we could
 
@@ -67,19 +68,29 @@ pub(crate) struct CompPublic {
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    id: CompId, // TODO: @Remove after debugging
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
impl CompHandle {
 
    fn new(public: &CompPublic) -> CompHandle {
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
            target: public,
 
            id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message));
 
        self.inbox.push(message);
 
        if try_wake_up {
 
            wake_up_if_sleeping(sched_ctx, self.id, self);
 
        }
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
@@ -100,6 +111,7 @@ impl Clone for CompHandle {
 
        self.increment_users();
 
        return CompHandle{
 
            target: self.target,
 
            id: self.id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 
@@ -230,7 +242,7 @@ impl RuntimeInner {
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return CompHandle::new(&component.public);
 
        return CompHandle::new(id, &component.public);
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
src/runtime2/scheduler.rs
Show inline comments
 
@@ -95,9 +95,7 @@ impl Scheduler {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.inbox.push(Message::Control(message));
 

	
 
                wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle);
 
                peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
            }
 
        }
 

	
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -17,7 +17,7 @@ fn test_component_creation() {
 
        let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new()))
 
            .expect("component creation");
 
        let comp = CompPDL::new(prompt, 0);
 
        let (key, _) = rt.inner.create_pdl_component(comp, true);
 
        let (key, _) = rt.inner.create_pdl_component(comp, false);
 
        rt.inner.enqueue_work(key);
 
    }
 
}
 
@@ -46,6 +46,6 @@ fn test_component_communication() {
 

	
 
    let prompt = rt.inner.protocol.new_component(b"", b"constructor", ValueGroup::new_stack(Vec::new()))
 
        .expect("creation");
 
    let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), true);
 
    let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), false);
 
    rt.inner.enqueue_work(key);
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)