Changeset - 968e958c3286
[Not reviewed]
0 6 1
mh - 3 years ago 2022-01-13 20:40:25
contact@maxhenger.nl
WIP: Basic control/data message flow
7 files changed with 715 insertions and 67 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortId(pub u32);
 

	
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
@@ -16,11 +20,13 @@ pub struct Peer {
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    Blocked,
 
@@ -41,22 +47,36 @@ pub struct Channel {
 
}
 

	
 
pub struct DataMessage {
 
    pub source_port_id: PortId,
 
    pub target_port_id: PortId,
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
}
 

	
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, u32)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
pub struct ControlMessage {
 
    pub id: u32,
 
    pub id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub content: ControlContent,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
 
}
 

	
 
pub enum ControlContent {
 
#[derive(Copy, Clone)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    Ping,
 
    PortPeerChangedBlock,
 
    PortPeerChangedUnblock,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
 

	
 
pub enum Message {
 
@@ -64,4 +84,15 @@ pub enum Message {
 
    Control(ControlMessage),
 
}
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
 
            Message::Data(v) =>
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -10,6 +10,9 @@ use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
@@ -51,14 +54,6 @@ impl CompCtx {
 
        return Some(message);
 
    }
 

	
 
    fn find_peer(&self, port_id: PortId) -> (&Port, &Peer) {
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        let port_info = &self.ports[port_index];
 
        let peer_index = self.get_peer_index(port_info.peer_comp_id).unwrap();
 
        let peer_info = &self.peers[peer_index];
 
        return (port_info, peer_info);
 
    }
 

	
 
    fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
@@ -80,7 +75,17 @@ impl CompCtx {
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    fn get_port_index(&self, port_id: PortId) -> Option<usize> {
 
    fn get_port(&self, port_id: PortId) -> &Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_index(&self, port_id: PortId) -> Option<usize> {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == port_id {
 
                return Some(index);
 
@@ -90,7 +95,17 @@ impl CompCtx {
 
        return None;
 
    }
 

	
 
    fn get_peer_index(&self, peer_id: CompId) -> Option<usize> {
 
    fn get_peer(&self, peer_id: CompId) -> &Peer {
 
        let index = self.get_peer_index(peer_id).unwrap();
 
        return &self.peers[index];
 
    }
 

	
 
    fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer {
 
        let index = self.get_peer_index(peer_id).unwrap();
 
        return &mut self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == peer_id {
 
                return Some(index);
 
@@ -180,7 +195,16 @@ pub(crate) struct CompPDL {
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: Vec<Option<DataMessage>>,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl CompPDL {
 
@@ -190,13 +214,40 @@ impl CompPDL {
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: u32,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            }
 
            },
 
            inbox_main: Vec::new(),
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.inbox.resize(comp_ctx.ports.len(), None);
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
            let target = sched_ctx.runtime.get_component_public(new_target);
 
            target.inbox.push(message);
 

	
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_Ctx, message);
 
            },
 
        }
 
    }
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 
@@ -208,6 +259,7 @@ impl CompPDL {
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
@@ -227,8 +279,15 @@ impl CompPDL {
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = transform_port_id(port_id);
 
                Self::send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_info = comp_ctx.get_port(port_id);
 
                if port_info.state == PortState::Blocked {
 

	
 
                } else {
 

	
 
                }
 
                self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
@@ -238,9 +297,20 @@ impl CompPDL {
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
                let mut ports = Vec::new(); // TODO: Optimize
 
                let protocol = &sched_ctx.runtime.protocol;
 
                find_ports_in_value_group(&arguments, &mut ports);
 
                let prompt = Prompt::new(
 
                    &protocol.types, &protocol.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
@@ -253,8 +323,6 @@ impl CompPDL {
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 

	
 
        return Ok(CompScheduling::Sleep);
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
@@ -270,22 +338,24 @@ impl CompPDL {
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 

	
 
        self.consensus.notify_sync_start(comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 

	
 
        self.consensus.notify_sync_end();
 
        debug_assert_eq!(self.mode, Mode::Sync);
 
        self.mode = Mode::NonSync;
 
    }
 

	
 
    fn send_message_and_wake_up(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, port_id: PortId, value: ValueGroup) {
 
    fn send_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let (port_info, peer_info) = comp_ctx.find_peer(port_id);
 
        peer_info.handle.inbox.push(Message::Data(DataMessage{
 
            source_port_id: port_id,
 
            target_port_id: port_info.peer_id,
 
            content: value,
 
        }));
 
        let port_info = comp_ctx.get_port(source_port_id);
 
        let peer_info = comp_ctx.get_peer(port_info.peer_comp_id);
 
        let annotated_message = self.consensus.annotate_message_data(port_info, value);
 
        peer_info.handle.inbox.push(Message::Data(annotated_message));
 

	
 
        let should_wake_up = peer_info.handle.sleeping.compare_exchange(
 
            true, false, Ordering::AcqRel, Ordering::Relaxed
 
@@ -297,18 +367,165 @@ impl CompPDL {
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_index = comp_ctx.get_port_index(target_port_id).unwrap();
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = &mut comp_ctx.ports[port_index];
 
        debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 

	
 
        if port_info.state == PortState::Open {
 
            let (target_comp_id, block_message) =
 
                self.control.mark_port_blocked(target_port_id, comp_ctx);
 
            debug_assert_eq!(port_info.peer_comp_id, target_comp_id);
 

	
 
            let peer = comp_ctx.get_peer(target_comp_id);
 
            peer.handle.inbox.push(Message::Control(block_message));
 
            wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
                loop {
 
                    let action = self.control.handle_ack(to_ack, comp_ctx);
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                            // FIX @NoDirectHandle
 
                            let handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.inbox.push(Message::Control(message));
 
                            wake_up_if_sleeping(sched_ctx, target_comp, &handle);
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
                            // FIX @NoDirectHandle
 
                            let handle = sched_ctx.runtime.get_component_public(to_schedule);
 
                            wake_up_if_sleeping(sched_ctx, to_schedule, &handle);
 
                            break;
 
                        },
 
                        AckAction::None => {
 
                            break;
 
                        }
 
                    }
 
                }
 
            },
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_info = comp_ctx.get_port(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed);
 
                if port_info.state == PortState::Blocked {
 
                    self.unblock_port(sched_ctx, comp_ctx, port_id);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
                // The peer of our port has just changed. So we are asked to
 
                // temporarily block the port (while our original recipient is
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
                if port_info.state == PortState::Open {
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                port_info.peer_comp_id = new_comp_id;
 
                self.unblock_port(sched_ctx, comp_ctx, port_id);
 
            }
 
        }
 
    }
 

	
 
    fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
        if self.mode == Mode::BlockedPut && port_id == self.mode_port {
 
            // We were blocked on the port that just became unblocked, so
 
            // send the message.
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
        let component = CompPDL::new(prompt);
 
        let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true);
 
        let created_ctx = &mut component.ctx;
 

	
 
        let mut has_reroute_entry = false;
 
        let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 

	
 
        for port_id in ports.iter().copied() {
 
            // Transfer port
 
            let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            // Create temporary reroute entry if the peer is another component
 
            let port_info = creator_ctx.get_port(port_id);
 
            debug_assert_ne!(port_info.state, PortState::Blocked);
 
            if port_info.peer_comp_id == creator_ctx.id {
 
                // We own the peer port. So retrieve it and modify the peer directly
 
                let port_info = creator_ctx.get_port_mut(port_info.peer_id);
 
                port_info.peer_comp_id = created_ctx.id;
 
            } else {
 
                // We don't own the port, so send the appropriate messages and
 
                // notify the control layer
 
                has_reroute_entry = true;
 
                let message = self.control.add_reroute_entry(
 
                    creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                    port_info.self_id, created_ctx.id, schedule_entry_id
 
                );
 
                let peer_info = creator_ctx.get_peer(port_info.peer_comp_id);
 
                peer_info.handle.inbox.push(message);
 
            }
 

	
 
            // Transfer port and create temporary reroute entry
 
            let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            if port_info.state == PortState::Blocked {
 
                todo!("Think about this when you're not tired!");
 
            }
 
            Self::add_port_to_component(sched_ctx, created_ctx, port_info);
 

	
 
            // Maybe remove peer from the creator
 
            if let Some(peer_info) = peer_info {
 
            if let Some(mut peer_info) = peer_info {
 
                let remove_from_runtime = peer_info.handle.decrement_users();
 
                if remove_from_runtime {
 
                    let removed_comp_key = unsafe{ peer_info.id.upgrade() };
 
@@ -317,8 +534,11 @@ impl CompPDL {
 
            }
 
        }
 

	
 
        // Start scheduling
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
        if !has_reroute_entry {
 
            // We can schedule the component immediately
 
            self.control.remove_schedule_entry(schedule_entry_id);
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        } // else: wait for the `Ack`s, they will trigger the scheduling of the component
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
@@ -365,7 +585,6 @@ impl CompPDL {
 
            },
 
            None => {
 
                let handle = sched_ctx.runtime.get_component_public(peer_comp_id);
 
                handle.increment_users();
 
                comp_ctx.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
@@ -374,6 +593,25 @@ impl CompPDL {
 
            }
 
        }
 
    }
 

	
 
    fn change_port_peer_component(
 
        &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
        port_id: PortId, new_peer_comp_id: CompId
 
    ) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let cur_peer_comp_id = port_info.peer_comp_id;
 
        let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id);
 
        cur_peer_info.num_associated_ports -= 1;
 

	
 
        if cur_peer_info.num_associated_ports == 0 {
 
            let should_remove = cur_peer_info.handle.decrement_users();
 
            if should_remove {
 
                let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() };
 
                sched_ctx.runtime.destroy_component(cur_peer_comp_key);
 

	
 
            }
 
        }
 
    }
 
}
 

	
 
#[inline]
 
@@ -427,4 +665,20 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/consensus.rs
Show inline comments
 
new file 100644
 
use crate::protocol::eval::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_pdl::*;
 

	
 
pub struct PortAnnotation {
 
    id: PortId,
 
    mapping: Option<u32>,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(id: PortId) -> Self {
 
        return Self{ id, mapping: None }
 
    }
 
}
 

	
 
/// Tracking consensus state
 
pub struct Consensus {
 
    round: u32,
 
    mapping_counter: u32,
 
    ports: Vec<PortAnnotation>,
 
}
 

	
 
impl Consensus {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            round: 0,
 
            mapping_counter: 0,
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
 
        // Make sure we locally still have all of the same ports
 
        self.transfer_ports(comp_ctx);
 
        self.mapping_counter = 0;
 
    }
 

	
 
    pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id));
 
        let data_header = self.create_data_header(port_info);
 
        let sync_header = self.create_sync_header();
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
    }
 

	
 
    pub(crate) fn notify_sync_end(&mut self) {
 
        self.round = self.round.wrapping_add(1);
 
        todo!("implement sync end")
 
    }
 

	
 
    pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.ports.len() != self.ports.len() {
 
            ports_same = true;
 
        } else {
 
            for idx in 0..comp_ctx.ports.len() {
 
                let comp_port_id = comp_ctx.ports[idx].self_id;
 
                let cons_port_id = self.ports[idx].id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        if needs_setting_ports {
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.ports.len());
 
            for port in &comp_ctx.ports {
 
                self.ports.push(PortAnnotation::new(port.self_id))
 
            }
 
        }
 
    }
 

	
 
    fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                expected_mapping.push((port.id, mapping));
 
            }
 
        }
 

	
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
            new_mapping: self.take_mapping(),
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
        };
 
    }
 

	
 
    fn create_sync_header(&self) -> MessageSyncHeader {
 
        return MessageSyncHeader{
 
            sync_round: self.round,
 
        };
 
    }
 

	
 
    fn take_mapping(&mut self) -> u32 {
 
        let mapping = self.mapping_counter;
 
        self.mapping_counter += 1;
 
        return mapping;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -2,51 +2,270 @@ use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: u32,
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
    ack_action: ControlAction,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ControlPeerChange),
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(ContentScheduleComponent),
 
    BlockedPort(ContentBlockedPort),
 
}
 

	
 
struct ControlPeerChange {
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    target_port: PortId, // if sent to this port
 
    new_target_comp: CompId, // redirect to this component
 
    source_comp: CompId,
 
    target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ContentScheduleComponent {
 
    to_schedule: CompId,
 
}
 

	
 
/// Action to be taken when the `Ack`s for a control entry has come in.
 
enum ControlAction {
 
    Nothing,
 
    AckOwnEntry(u32), // ack an entry we own ourselves
 
    ScheduleComponent(CompId), // schedule a particular component for execution
 
struct ContentBlockedPort {
 
    blocked_port: PortId,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessageAndAck(CompId, ControlMessage, ControlId),
 
    ScheduleComponent(CompId),
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: u32,
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    fn handle_created_component(&mut self, creator_ctx: &CompCtx, created_ctx: &CompCtx) {
 
        for peer in &created_ctx.peers {
 
            // TODO: Optimize when we ourselves are the peer.
 
    pub(crate) fn should_reroute(&self, message: &Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.target_port == target_port {
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, comp_ctx: &CompCtx) -> AckAction {
 
        let entry_index = self.get_entry_index(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return AckAction::None;
 
        }
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match &entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.source_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
            // Create entry that will unblock the peer if it confirms that all
 
            // of its ports have been blocked
 
                self.entries.remove(entry_index);
 
                self.handle_ack(to_ack, comp_ctx);
 

	
 
            peer.handle.inbox.push(Message::)
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
            },
 
            ControlContent::ScheduleComponent(content) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return AckAction::ScheduleComponent(content.to_schedule);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
        }
 
    }
 

	
 
    fn take_id(&mut self) -> u32 {
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(ContentScheduleComponent{
 
                to_schedule: to_schedule_id
 
            }),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                target_port: target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        for entry in &mut self.entries {
 
            if entry.id == schedule_entry_id {
 
                entry.ack_countdown += 1;
 
                break;
 
            }
 
        }
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking and unblocking ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // TODO: Feels like this shouldn't be an entry. Hence this class should
 
        //  be renamed. Lets see where the code ends up being
 
        let entry_id = self.take_id();
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues
 
        port_info.state = PortState::Blocked;
 

	
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0,
 
            content: ControlContent::BlockedPort(ContentBlockedPort{
 
                blocked_port: port_id,
 
            }),
 
        });
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::BlockPort(port_info.peer_id),
 
            }
 
        );
 
    }
 

	
 
    pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // Find the entry that contains the blocking entry for the port
 
        let mut entry_index = usize::MAX;
 
        let mut entry_id = ControlId::MAX;
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if let ControlContent::BlockedPort(block_entry) = &entry.content {
 
                if block_entry.blocked_port == port_id {
 
                    entry_index = index;
 
                    entry_id = entry.id;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_id),
 
            }
 
        )
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter += 1;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
 
    }
 

	
 
    fn get_entry_index(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Default for ControlLayer {
 
    fn default() -> Self {
 
        return ControlLayer{
 
            id_counter: ControlId(0),
 
            entries: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/mod.rs
Show inline comments
 
mod component_pdl;
 
mod control_layer;
 
mod consensus;
 

	
 
pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling};
 
\ No newline at end of file
 
pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling};
 
pub(crate) use control_layer::{ControlId};
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
@@ -59,24 +59,48 @@ pub(crate) struct CompPublic {
 

	
 
/// Handle to public part of a component. Would be nice if we could
 
/// automagically manage the `num_handles` counter. But when it reaches zero we
 
/// need to manually remove the handle from the runtime. So be careful.
 
/// need to manually remove the handle from the runtime. So we just have debug
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
impl CompHandle {
 
    pub(crate) fn increment_users(&self) {
 
    fn new(public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
            target: public,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns true if the component should be destroyed
 
    pub(crate) fn decrement_users(&self) -> bool {
 
    pub(crate) fn decrement_users(&mut self) -> bool {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        dbg_code!(self.decremented = true);
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        return old_count == 1;
 
    }
 
}
 

	
 
impl Clone for CompHandle {
 
    fn clone(&self) -> Self {
 
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
 
        self.increment_users();
 
        return CompHandle{
 
            target: self.target,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
@@ -85,6 +109,12 @@ impl std::ops::Deref for CompHandle {
 
    }
 
}
 

	
 
impl Drop for CompHandle {
 
    fn drop(&mut self) {
 
        debug_assert!(self.decremented, "need call to 'decrement_users' before dropping");
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 
@@ -165,7 +195,7 @@ impl Runtime {
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return CompHandle{ target: &component.public };
 
        return CompHandle::new(&component.public);
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
src/runtime2/scheduler.rs
Show inline comments
 
@@ -14,6 +14,14 @@ pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a Runtime,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a Runtime) -> Self {
 
        return Self {
 
            runtime,
 
        }
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
@@ -22,7 +30,7 @@ impl Scheduler {
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
@@ -39,7 +47,7 @@ impl Scheduler {
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                new_scheduling = component.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
0 comments (0 inline, 0 general)