Changeset - f924af193e2a
[Not reviewed]
0 5 0
MH - 3 years ago 2022-05-09 17:10:30
contact@maxhenger.nl
Initial port transmission protocol
5 files changed with 205 insertions and 36 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -65,10 +65,12 @@ pub struct MessageDataHeader {
 

	
 
#[derive(Debug)]
 
pub struct TransmittedPort {
 
    location: ValueId, // within `content`
 
    messages: Vec<DataMessage>, // owned by previous component
 
    peer_comp: CompId,
 
    peer_port: PortId,
 
    pub locations: Vec<ValueId>, // within `content`
 
    pub messages: Vec<DataMessage>, // owned by previous component
 
    pub peer_comp: CompId,
 
    pub peer_port: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
}
 

	
 
// -----------------------------------------------------------------------------
src/runtime2/component/component.rs
Show inline comments
 
@@ -363,7 +363,7 @@ pub(crate) enum GetResult {
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
@@ -382,13 +382,14 @@ pub(crate) fn default_attempt_get(
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let message = inbox_main[port_index].take().unwrap();
 
            let mut message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction, inbox_main, inbox_backup,
 
                target_port, target_port_instruction,
 
                &mut message, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control,
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
@@ -415,8 +416,8 @@ pub(crate) fn default_attempt_get(
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup,
 
    targeted_port: PortId, port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
@@ -424,6 +425,59 @@ pub(crate) fn default_handle_received_data_message(
 
    let slot = &mut inbox_main[port_index];
 
    debug_assert!(slot.is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
        }
 
        inbox_backup.extend(received_port.messages.drain(..));
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        let new_port = comp_ctx.get_port(new_port_handle);
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(new_port.peer_comp_id));
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations {
 
            let value = match message_location {
 
                ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize],
 
                ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize],
 
            };
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
                    *value = Value::Output(port_id_to_eval(new_port.self_id));
 
                },
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
@@ -463,11 +517,12 @@ pub(crate) fn default_handle_received_data_message(
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, inbox_main, inbox_backup);
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
@@ -500,7 +555,7 @@ pub(crate) fn default_handle_control_message(
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, inbox_main, inbox_backup);
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
@@ -777,14 +832,17 @@ fn send_message_without_ports(
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
fn send_message_with_ports(
 
    sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction,
 
    value: ValueGroup,
 
/// Prepares sending a message that contains ports. Only once a particular
 
/// protocol has completed (where we notify all the peers that the ports will
 
/// be transferred) will we actually send the message to the recipient.
 
fn prepare_send_message_with_ports(
 
    sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer, consensus: &mut Consensus
 
    control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    let sending_port_handle = comp_ctx.get_port_handle(sending_port_id);
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
@@ -797,37 +855,98 @@ fn send_message_with_ports(
 
    for (_, port_id) in &transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(*port_id);
 
        let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle);
 
        let peer_comp_id = transmit_port_info.peer_comp_id;
 
        let peer_port_id = transmit_port_info.peer_port_id;
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as it is used in this sync round")
 
                String::from("Cannot transmit one of the ports in this message, as it is used in this sync round")
 
            ));
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as that port is already transmitted")
 
                String::from("Cannot transmit one of the ports in this message, as that port is already transmitted")
 
            ));
 
        }
 

	
 
        // Set the flag for transmission
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
    }
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, message, true);
 
    }
 

	
 
    // We've set up the protocol, once all the PPC's are blocked we are supposed
 
    // to transfer the message to the recipient. So store it temporarily
 
    exec_state.mode = CompMode::BlockedPutPorts;
 
    exec_state.mode_port = sending_port_id;
 
    exec_state.mode_value = value;
 

	
 
    return Ok(());
 
}
 

	
 
/// Performs the transmission of a data message that contains ports. These were
 
/// all stored in the component's execution state by the
 
/// `prepare_send_message_with_ports` function.
 
fn perform_send_message_with_ports(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPorts);
 

	
 
    // Find all ports again
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports);
 

	
 
    let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
    let port_info = comp_ctx.get_port(port_handle);
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
    // Annotate the data message
 
    let message_value = exec_state.mode_value.take();
 
    let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value);
 

	
 
    // And further enhance the message by adding data about the ports that are
 
    // being transferred
 
    for (port_locations, port_id) in transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(port_id);
 
        let transmit_port_info = comp_ctx.get_port(transmit_port_handle);
 

	
 
        let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup);
 

	
 
        let mut transmit_port_state = transmit_port_info.state;
 
        debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted));
 
        transmit_port_state.clear(PortStateFlag::Transmitted);
 

	
 
        annotated_message.ports.push(TransmittedPort{
 
            locations: port_locations,
 
            messages: transmit_messages,
 
            peer_comp: transmit_port_info.peer_comp_id,
 
            peer_port: transmit_port_info.peer_port_id,
 
            kind: transmit_port_info.kind,
 
            state: transmit_port_state
 
        })
 
    }
 

	
 
    // And finally, send the message to the peer
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain,
 
    inbox_backup: &mut InboxBackup
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 
@@ -853,6 +972,12 @@ fn default_handle_ack(
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::UnblockPutWithPorts => {
 
                // Send the message (containing ports) to the recipient
 
                perform_send_message_with_ports(exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
                exec_state.mode = CompMode::Sync;
 
                exec_state.mode_port = PortId::new_invalid();
 
            },
 
            AckAction::None => {}
 
        }
 

	
 
@@ -963,3 +1088,31 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut En
 
        find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports);
 
    }
 
}
 

	
 
/// Goes through the inbox of a component and takes out all the messages that
 
/// are targeted at a specific port
 
pub(crate) fn take_port_messages(
 
    comp_ctx: &CompCtx, port_id: PortId,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Vec<DataMessage> {
 
    let mut messages = Vec::new();
 
    let port_handle = comp_ctx.get_port_handle(port_id);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    if let Some(message) = inbox_main[port_index].take() {
 
        messages.push(message);
 
    }
 

	
 
    let mut message_index = 0;
 
    while message_index < inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == port_id {
 
            let message = inbox_backup.remove(message_index);
 
            messages.push(message);
 
        } else {
 
            message_index += 1;
 
        }
 
    }
 

	
 
    return messages;
 
}
 
\ No newline at end of file
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -39,6 +39,7 @@ pub enum PortStateFlag {
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
 
    Received = 0x10, // Received, so cannot be used yet, only after the sync round
 
}
 

	
 
#[derive(Copy, Clone)]
 
@@ -62,7 +63,8 @@ impl PortState {
 
    pub fn can_send(&self) -> bool {
 
        return
 
            !self.is_set(PortStateFlag::Closed) &&
 
            !self.is_set(PortStateFlag::Transmitted);
 
            !self.is_set(PortStateFlag::Transmitted) &&
 
            !self.is_set(PortStateFlag::Received);
 
    }
 

	
 
    #[inline]
 
@@ -102,7 +104,8 @@ impl Debug for PortState {
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers)
 
            ("blocked_full_buffers", BlockedDueToFullBuffers),
 
            ("transmitted", Transmitted),
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -10,7 +10,7 @@ pub(crate) struct ControlId(u32);
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
    pub(crate) fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 
@@ -142,7 +142,7 @@ impl ControlLayer {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // Port transfer
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
@@ -169,15 +169,8 @@ impl ControlLayer {
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    /// Adds a rerouting entry (used to ensure all messages will end up at a
 
    /// newly created component). Used when creating a new component.
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
@@ -210,6 +203,24 @@ impl ControlLayer {
 
        })
 
    }
 

	
 
    /// Creates a PortPeerChanged message (and increments ack-counter on a
 
    /// pre-created control entry) that is used as a preliminary step before
 
    /// transferring a port over a channel.
 
    pub(crate) fn create_port_transfer_message(
 
        &mut self, associated_control_id: ControlId,
 
        sender_comp_id: CompId, target_port_id: PortId
 
    ) -> Message {
 
        let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: associated_control_id,
 
            sender_comp_id,
 
            target_port_id: Some(target_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
src/runtime2/component/mod.rs
Show inline comments
 
@@ -8,7 +8,7 @@ mod component_internet;
 

	
 
pub(crate) use component::{Component, CompScheduling};
 
pub(crate) use component_pdl::{CompPDL};
 
pub(crate) use component_context::{CompCtx, PortInstruction};
 
pub(crate) use component_context::{CompCtx, PortInstruction, PortKind, PortState, PortStateFlag};
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
0 comments (0 inline, 0 general)