Changeset - e62f669c1e03
[Not reviewed]
0 3 0
mh - 3 years ago 2022-05-09 13:37:54
contact@maxhenger.nl
WIP on refactoring port transmission code
3 files changed with 105 insertions and 6 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -717,183 +717,249 @@ pub(crate) fn default_handle_sync_decision(
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
 
    } else {
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
/// common) as they don't have any source code.
 
pub(crate) fn default_handle_error_for_builtin(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx,
 
    location_and_message: (PortInstruction, String)
 
) {
 
    let (_location, message) = location_and_message;
 
    sched_ctx.error(&message);
 

	
 
    let exit_reason = if exec_state.mode.is_in_sync_block() {
 
        ExitReason::ErrorInSync
 
    } else {
 
        ExitReason::ErrorNonSync
 
    };
 

	
 
    exec_state.set_as_start_exit(exit_reason);
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Sends a message without any transmitted ports. Does not check if sending
 
/// is actually valid.
 
fn send_message_without_ports(
 
    sending_port_handle: LocalPortHandle, value: ValueGroup,
 
    comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
) {
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 
    debug_assert!(port_info.state.can_send());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
    let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
fn send_message_with_ports(
 
    sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction,
 
    value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer, consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut transmit_ports);
 
    debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function
 

	
 
    // Set up the final Ack that triggers us to send our final message
 
    let unblock_put_control_id = control.add_unblock_put_with_ports_entry();
 
    for (_, port_id) in &transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(*port_id);
 
        let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle);
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as it is used in this sync round")
 
            ));
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as that port is already transmitted")
 
            ));
 
        }
 

	
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
    }
 

	
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 

	
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::ScheduleComponent(to_schedule) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                // Note that the component is intentionally not
 
                // sleeping, so we just wake it up
 
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                let key = unsafe { to_schedule.upgrade() };
 
                sched_ctx.runtime.enqueue_work(key);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::None => {}
 
        }
 

	
 
        match new_to_ack {
 
            Some(new_to_ack) => to_ack = new_to_ack,
 
            None => break,
 
        }
 
    }
 
}
 

	
 
/// Little helper for sending the most common kind of `Ack`
 
fn default_send_ack(
 
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
        content: ControlMessageContent::Ack
 
    }), true);
 
}
 

	
 
/// Handles the unblocking of a putter port. In case there is a pending message
 
/// on that port then it will be sent.
 
fn default_handle_recently_unblocked_port(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
        // Return to the regular execution mode
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 

	
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    }
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
// TODO: Optimize double vec
 
type EncounteredPorts = Vec<(Vec<ValueId>, PortId)>;
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<(ValueId, PortId)>) {
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut Vec<(ValueId, PortId)>) {
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port.1 {
 
                for prev_port in ports.iter_mut() {
 
                    if prev_port.1 == cur_port {
 
                        // Already added
 
                        prev_port.0.push(value_location);
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push((value_location, cur_port));
 
                ports.push((vec![value_location], cur_port));
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for (value_index, embedded_value) in heap_region.iter().enumerate() {
 
                    let value_location = ValueId::Heap(*heap_pos, value_index as u32);
 
                    find_port_in_value(group, embedded_value, value_location, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for (value_index, value) in &value_group.values.iter().enumerate() {
 
        find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_context.rs
Show inline comments
 
use std::fmt::{Debug, Formatter, Result as FmtResult};
 

	
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place.
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
impl PortInstruction {
 
    pub fn is_none(&self) -> bool {
 
        match self {
 
            PortInstruction::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
/// Directionality of a port
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Bitflags for port
 
// TODO: Incorporate remaining flags from `Port` struct
 
#[repr(u32)]
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortStateFlag {
 
    Closed = 0x01, // If not closed, then the port is open
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
    Transmitted, // Transmitted, so cannot be used anymore
 
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortState {
 
    flags: u32
 
}
 

	
 
impl PortState {
 
    pub(crate) fn new() -> PortState {
 
        return PortState{ flags: 0 }
 
    }
 

	
 
    // high-level
 

	
 
    #[inline]
 
    pub fn is_open(&self) -> bool {
 
        return !self.is_closed();
 
    }
 

	
 
    #[inline]
 
    pub fn can_send(&self) -> bool {
 
        return
 
            !self.is_set(PortStateFlag::Closed) &&
 
            !self.is_set(PortStateFlag::Transmitted);
 
    }
 

	
 
    #[inline]
 
    pub fn is_closed(&self) -> bool {
 
        return self.is_set(PortStateFlag::Closed);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked(&self) -> bool {
 
        return
 
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
 
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
 
    }
 

	
 
    // lower-level utils
 
    #[inline]
 
    pub fn set(&mut self, flag: PortStateFlag) {
 
        self.flags |= flag as u32;
 
    }
 

	
 
    #[inline]
 
    pub fn clear(&mut self, flag: PortStateFlag) {
 
        self.flags &= !(flag as u32);
 
    }
 

	
 
    #[inline]
 
    pub const fn is_set(&self, flag: PortStateFlag) -> bool {
 
        return (self.flags & (flag as u32)) != 0;
 
    }
 
}
 

	
 
impl Debug for PortState {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        use PortStateFlag::*;
 

	
 
        let mut s = f.debug_struct("PortState");
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers)
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
 

	
 
        return s.finish();
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
src/runtime2/component/control_layer.rs
Show inline comments
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    ClosedPort(PortId),
 
    UnblockPutWithPorts
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
    UnblockPutWithPorts,
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an acknowledgement. The returned action must be performed by the
 
    /// caller. The optionally returned `ControlId` must be used and passed to
 
    /// `handle_ack` again.
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return (AckAction::None, None);
 
        }
 

	
 
        let entry = self.entries.remove(entry_index);
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                return (
 
                    AckAction::SendMessage(target_comp_id, message_to_send),
 
                    Some(to_ack)
 
                );
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_closed());
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                return (AckAction::None, None);
 
            },
 
            ControlContent::UnblockPutWithPorts => {
 
                return (AckAction::UnblockPutWithPorts, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds an entry that returns the similarly named Ack action
 
    pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::UnblockPutWithPorts,
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option<ControlId> {
 
        let port = comp_ctx.get_port(port_handle);
 
        let port_id = port.self_id;
0 comments (0 inline, 0 general)