Changeset - e62f669c1e03
[Not reviewed]
0 3 0
mh - 3 years ago 2022-05-09 13:37:54
contact@maxhenger.nl
WIP on refactoring port transmission code
3 files changed with 105 insertions and 6 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -753,24 +753,86 @@ pub(crate) fn default_handle_error_for_builtin(
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Sends a message without any transmitted ports. Does not check if sending
 
/// is actually valid.
 
fn send_message_without_ports(
 
    sending_port_handle: LocalPortHandle, value: ValueGroup,
 
    comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
) {
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 
    debug_assert!(port_info.state.can_send());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
    let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
fn send_message_with_ports(
 
    sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction,
 
    value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer, consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut transmit_ports);
 
    debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function
 

	
 
    // Set up the final Ack that triggers us to send our final message
 
    let unblock_put_control_id = control.add_unblock_put_with_ports_entry();
 
    for (_, port_id) in &transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(*port_id);
 
        let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle);
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as it is used in this sync round")
 
            ));
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as that port is already transmitted")
 
            ));
 
        }
 

	
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
    }
 

	
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 

	
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
@@ -847,41 +909,45 @@ fn default_handle_recently_unblocked_port(
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
// TODO: Optimize double vec
 
type EncounteredPorts = Vec<(Vec<ValueId>, PortId)>;
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<(ValueId, PortId)>) {
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut Vec<(ValueId, PortId)>) {
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port.1 {
 
                for prev_port in ports.iter_mut() {
 
                    if prev_port.1 == cur_port {
 
                        // Already added
 
                        prev_port.0.push(value_location);
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push((value_location, cur_port));
 
                ports.push((vec![value_location], cur_port));
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for (value_index, embedded_value) in heap_region.iter().enumerate() {
 
                    let value_location = ValueId::Heap(*heap_pos, value_index as u32);
 
                    find_port_in_value(group, embedded_value, value_location, ports);
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -5,59 +5,75 @@ use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place.
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
impl PortInstruction {
 
    pub fn is_none(&self) -> bool {
 
        match self {
 
            PortInstruction::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
/// Directionality of a port
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Bitflags for port
 
// TODO: Incorporate remaining flags from `Port` struct
 
#[repr(u32)]
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortStateFlag {
 
    Closed = 0x01, // If not closed, then the port is open
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
    Transmitted, // Transmitted, so cannot be used anymore
 
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortState {
 
    flags: u32
 
}
 

	
 
impl PortState {
 
    pub(crate) fn new() -> PortState {
 
        return PortState{ flags: 0 }
 
    }
 

	
 
    // high-level
 

	
 
    #[inline]
 
    pub fn is_open(&self) -> bool {
 
        return !self.is_closed();
 
    }
 

	
 
    #[inline]
 
    pub fn can_send(&self) -> bool {
 
        return
 
            !self.is_set(PortStateFlag::Closed) &&
 
            !self.is_set(PortStateFlag::Transmitted);
 
    }
 

	
 
    #[inline]
 
    pub fn is_closed(&self) -> bool {
 
        return self.is_set(PortStateFlag::Closed);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked(&self) -> bool {
 
        return
 
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
 
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
 
    }
 

	
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -16,44 +16,46 @@ impl ControlId {
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    ClosedPort(PortId),
 
    UnblockPutWithPorts
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
    UnblockPutWithPorts,
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
@@ -119,48 +121,63 @@ impl ControlLayer {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_closed());
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                return (AckAction::None, None);
 
            },
 
            ControlContent::UnblockPutWithPorts => {
 
                return (AckAction::UnblockPutWithPorts, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds an entry that returns the similarly named Ack action
 
    pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::UnblockPutWithPorts,
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
0 comments (0 inline, 0 general)