Changeset - 9972a4c3928b
[Not reviewed]
MH - 3 years ago 2022-05-13 19:26:19
contact@maxhenger.nl
Add extra transfer test, fix bug related to delayed update of consensus manager
3 files changed with 85 insertions and 12 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -367,154 +367,158 @@ pub(crate) fn default_handle_incoming_data_message(
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
    }
 
}
 

	
 
pub(crate) enum GetResult {
 
    Received(DataMessage),
 
    NoMessage,
 
    Error((PortInstruction, String)),
 
}
 

	
 
/// Default attempt at trying to receive from a port (i.e. through a `get`, or
 
/// the equivalent operation for a builtin component). `target_port` is the port
 
/// we're trying to receive from, and the `target_port_instruction` is the
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = target_port_instruction;
 
    if port_info.state.is_closed() {
 
        let peer_id = port_info.peer_comp_id;
 
        return GetResult::Error((
 
            target_port_instruction,
 
            format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0)
 
        ));
 
    }
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let mut message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction,
 
                &mut message, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control,
 
                comp_ctx, sched_ctx, control, consensus
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer,
 
    consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = message.content.get_value_mut(message_location);
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
                    *value = Value::Output(port_id_to_eval(new_port.self_id));
 
                },
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            inbox_main[port_index] = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -310,96 +310,110 @@ impl Consensus {
 
            // Make sure that we consider all peers as undiscovered again
 
            for annotation in self.ports.iter_mut() {
 
                annotation.peer_discovered = false;
 
            }
 
        }
 
    }
 

	
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, false);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, false);
 
        return decision;
 
    }
 

	
 
    /// Notifies the consensus management that the component has encountered a
 
    /// critical error during the synchronous round. Hence we should report that
 
    /// we've failed and wait until all the participants have been notified of
 
    /// the error.
 
    pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, true);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, true);
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
    /// to the consensus layer.
 
    pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) {
 
        // Reset everything for the next round
 
        debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
        self.mode = Mode::NonSync;
 
        self.round_index = self.round_index.wrapping_add(1);
 

	
 
        for port in self.ports.iter_mut() {
 
            port.mapping = None;
 
        }
 

	
 
        self.solution.clear();
 
    }
 

	
 
    pub(crate) fn notify_received_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(_expected_index, self.ports.len());
 
        let port_info = comp_ctx.get_port(port_handle);
 
        self.ports.push(PortAnnotation{
 
            self_comp_id: comp_ctx.id,
 
            self_port_id: port_info.self_id,
 
            peer_comp_id: port_info.peer_comp_id,
 
            peer_port_id: port_info.peer_port_id,
 
            peer_discovered: false,
 
            mapping: None,
 
            kind: port_info.kind,
 
        });
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a set of values to be sent of a channel.
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
        return DataMessage{
 
            data_header, sync_header, content,
 
            ports: Vec::new()
 
        };
 
    }
 

	
 
    /// Handles the arrival of a new data message (needs to be called for every
 
    /// new data message, even though it might not end up being received). This
 
    /// is used to determine peers of `get`ter ports.
 
    // TODO: The use of this function is rather ugly. Find a more robust
 
    //  scheme about owners of `get`ter ports not knowing about their peers.
 
    pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) {
 
        let target_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let target_index = comp_ctx.get_port_index(target_handle);
 
        let annotation = &mut self.ports[target_index];
 
        debug_assert!(
 
            !annotation.peer_discovered || (
 
                annotation.peer_comp_id == message.sync_header.sending_id &&
 
                annotation.peer_port_id == message.data_header.source_port
 
            )
 
        );
 
        annotation.peer_comp_id = message.sync_header.sending_id;
 
        annotation.peer_port_id = message.data_header.source_port;
 
        annotation.peer_discovered = true;
 
    }
 

	
 
    /// Checks if the data message can be received (due to port annotations), if
 
    /// it can then `true` is returned and the caller is responsible for handing
 
    /// the message of to the PDL code. Otherwise the message cannot be
 
    /// received.
 
    pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (peer_port_kind, expected_annotation) in &message.data_header.expected_mapping {
 
            // Determine our annotation, in order to do so we need to find the
src/runtime2/tests/transfer_ports.rs
Show inline comments
 
@@ -17,110 +17,165 @@ fn test_transfer_precreated_port_with_owned_peer() {
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx, in<u32> to_send) {
 
        sync put(tx, to_send);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel tx -> rx;
 
        channel forgotten -> to_send;
 
        new port_sender(tx, to_send);
 
        new port_receiver(rx);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_synccreated_port() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        sync {
 
            channel a -> b;
 
            put(tx, b);
 
        }
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_back_and_forth() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_back_and_forth() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_and_communication() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        channel a -> b;
 
        sync put(tx, b);
 
        sync put(a, 1337);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        channel a -> b; // this is stupid, but we need to have a variable to use
 
        sync b = get(rx);
 
        u32 value = 0;
 
        sync value = get(b);
 
        while (value != 1337) {}
 
    }
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_and_communication() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx, in<u32> to_send) {
 
        sync put(tx, to_send);
 
    }
 

	
 
    primitive message_transmitter(out<u32> tx) {
 
        sync put(tx, 1337);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        channel unused -> b;
 
        sync b = get(rx);
 
        u32 value = 0;
 
        sync value = get(b);
 
        while (value != 1337) {}
 
    }
 

	
 
    composite constructor() {
 
        channel port_tx -> port_rx;
 
        channel value_tx -> value_rx;
 
        new port_sender(port_tx, value_rx);
 
        new port_receiver(port_rx);
 
        new message_transmitter(value_tx);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_back_and_forth() {
 
    compile_and_create_component("
 
    primitive port_send_and_receive(out<in<u32>> tx, in<in<u32>> rx) {
 
        channel a -> b;
 
        sync {
 
            put(tx, b);
 
            b = get(rx);
 
        }
 
    }
 

	
 
    primitive port_receive_and_send(in<in<u32>> rx, out<in<u32>> tx) {
 
        channel unused -> transferred; // same problem as in different tests
 
        sync {
 
            transferred = get(rx);
 
            put(tx, transferred);
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel port_tx_forward -> port_rx_forward;
 
        channel port_tx_backward -> port_rx_backward;
 

	
 
        new port_send_and_receive(port_tx_forward, port_rx_backward);
 
        new port_receive_and_send(port_rx_forward, port_tx_backward);
 
    }", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_back_and_forth_and_communication() {
 
    compile_and_create_component("
 
    primitive port_send_and_receive(out<in<u32>> tx, in<in<u32>> rx, in<u32> to_transfer) {
 
        sync {
 
            put(tx, to_transfer);
 
            to_transfer = get(rx);
 
        }
 
        sync {
 
            auto value = get(to_transfer);
 
            while (value != 1337) {}
 
        }
 
    }
 

	
 
    primitive port_receive_and_send(in<in<u32>> rx, out<in<u32>> tx) {
 
        channel unused -> transferred;
 
        sync {
 
            transferred = get(rx);
 
            put(tx, transferred);
 
        }
 
    }
 

	
 
    primitive value_sender(out<u32> tx) {
 
        sync put(tx, 1337);
 
    }
 

	
 
    composite constructor() {
 
        channel port_tx_forward -> port_rx_forward;
 
        channel port_tx_backward -> port_rx_backward;
 
        channel message_tx -> message_rx;
 
        new port_send_and_receive(port_tx_forward, port_rx_backward, message_rx);
 
        new port_receive_and_send(port_rx_forward, port_tx_backward);
 
        new value_sender(message_tx);
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)