Changeset - d7f29db22526
[Not reviewed]
0 4 0
MH - 3 years ago 2022-04-22 12:36:23
contact@maxhenger.nl
Default handling of attempting a 'get' on a port
4 files changed with 182 insertions and 152 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -172,31 +172,29 @@ impl CompExecState {
 
        self.mode = CompMode::BlockedPut;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 
}
 

	
 
/// Generic representation of a component's inbox.
 
// NOTE: It would be nice to remove the `backup` at some point and make the
 
// blocking occur the moment a peer tries to send a message.
 
pub(crate) struct CompInbox {
 
    main: Vec<Option<DataMessage>>,
 
    backup: Vec<DataMessage>,
 
}
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
//  into CompCtx (and rename CompCtx into CompComms)
 
pub(crate) type InboxMain = Vec<Option<DataMessage>>;
 
pub(crate) type InboxMainRef = [Option<DataMessage>];
 
pub(crate) type InboxBackup = Vec<DataMessage>;
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
/// management.
 
pub(crate) fn create_component(
 
    protocol: &ProtocolDescription,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId,
 
    arguments: ValueGroup, num_ports: usize
 
) -> Box<dyn Component> {
 
    let definition = &protocol.heap[definition_id];
 
    debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite);
 
@@ -272,82 +270,144 @@ pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, port_value_slot: &mut Option<DataMessage>,
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
        // We can put the value in the slot
 
        *port_value_slot = Some(incoming_message);
 

	
 
        // Check if we're blocked on receiving this message.
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            let port_handle = comp_ctx.get_port_handle(target_port_id);
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
        });
 

	
 
        if exec_state.is_blocked_on_get(target_port_id) {
 
            // Return to normal operation
 
            exec_state.mode = CompMode::Sync;
 
            exec_state.mode_port = PortId::new_invalid();
 
            debug_assert!(exec_state.mode_value.values.is_empty());
 
        }
 

	
 
        return IncomingData::PlacedInSlot
 
    } else {
 
        // Slot is already full, so if the port was previously opened, it will
 
        // now become closed
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future
 

	
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            let (peer_handle, message) =
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
    }
 
}
 

	
 
pub(crate) enum GetResult {
 
    Received(DataMessage),
 
    NoMessage,
 
    Error((PortInstruction, String)),
 
}
 

	
 
/// Default attempt at trying to receive from a port (i.e. through a `get`, or
 
/// the equivalent operation for a builtin component). `target_port` is the port
 
/// we're trying to receive from, and the `target_port_instruction` is the
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control,
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us.
 
        let port_info = comp_ctx.get_port(port_handle);
 
        let port_is_closed = port_info.state == PortState::Closed;
 
        if port_is_closed {
 
            let peer_id = port_info.peer_comp_id;
 
            return GetResult::Error((
 
                target_port_instruction,
 
                format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0)
 
            ));
 
        }
 

	
 
        // No error ocurred, so enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, port_instruction: PortInstruction,
 
    slot: &mut Option<DataMessage>, inbox_backup: &mut Vec<DataMessage>,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    let slot = &mut inbox_main[port_index];
 
    debug_assert!(slot.is_none()); // because we've just received from it
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    if port_info.state == PortState::Closed {
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot 'get' because the channel is closed"))
 
        );
 
    }
 

	
 
    // Check if there are any more messages in the backup buffer
 
    let port_info = comp_ctx.get_port(port_handle);
 
@@ -489,27 +549,60 @@ pub(crate) fn default_handle_control_message(
 
            port_info.peer_comp_id = new_comp_id;
 
            port_info.peer_port_id = new_port_id;
 
            comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_start_sync(
 
    exec_state: &mut CompExecState,
 
) {}
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.log("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
    consensus.notify_sync_start(comp_ctx);
 
    for (port_index, message) in inbox_main.iter().enumerate() {
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
 
    // Modify execution state
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    exec_state.mode = CompMode::Sync;
 
}
 

	
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.log("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log("Component starting exit");
 
    exec_state.mode = CompMode::BusyExit;
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -32,26 +32,26 @@ enum SyncState {
 
    AwaitingCmd,
 
    Getting,
 
    Putting,
 
    FinishSync,
 
    FinishSyncThenQuit,
 
}
 

	
 
pub struct ComponentTcpClient {
 
    // Properties for the tcp socket
 
    socket_state: SocketState,
 
    sync_state: SyncState,
 
    poll_ticket: Option<PollTicket>,
 
    inbox_main: Option<DataMessage>,
 
    inbox_backup: Vec<DataMessage>,
 
    inbox_main: InboxMain,
 
    inbox_backup: InboxBackup,
 
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
 
    pdl_output_port_id: PortId, // output towards PDL, so received over socket
 
    input_union_send_tag_value: i64,
 
    input_union_receive_tag_value: i64,
 
    input_union_finish_tag_value: i64,
 
    input_union_shutdown_tag_value: i64,
 
    // Generic component state
 
    exec_state: CompExecState,
 
    control: ControlLayer,
 
    consensus: Consensus,
 
    // Temporary variables
 
    byte_buffer: Vec<u8>,
 
@@ -81,26 +81,27 @@ impl Component for ComponentTcpClient {
 
            self.poll_ticket = Some(poll_ticket);
 
        }
 
    }
 

	
 
    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {
 
        if let Some(poll_ticket) = self.poll_ticket.take() {
 
            sched_ctx.polling.unregister(poll_ticket)
 
                .expect("unregistering tcp component");
 
        }
 
    }
 

	
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
 
        if self.inbox_main.is_none() {
 
            self.inbox_main = Some(message);
 
        let slot = &mut self.inbox_main[0];
 
        if slot.is_none() {
 
            *slot = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
@@ -129,97 +130,84 @@ impl Component for ComponentTcpClient {
 
                unreachable!();
 
            },
 
            CompMode::NonSync => {
 
                // When in non-sync mode
 
                match &mut self.socket_state {
 
                    SocketState::Connected(_socket) => {
 
                        if self.sync_state == SyncState::FinishSyncThenQuit {
 
                            // Previous request was to let the component shut down
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            // Reset for a new request
 
                            self.sync_state = SyncState::AwaitingCmd;
 
                            self.consensus.notify_sync_start(comp_ctx);
 
                            self.exec_state.mode = CompMode::Sync;
 
                            component::default_handle_sync_start(
 
                                &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                            );
 
                        }
 
                        return CompScheduling::Immediate;
 
                    },
 
                    SocketState::Error => {
 
                        // Could potentially send an error message to the
 
                        // connected component.
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                        return CompScheduling::Immediate;
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 
                // When in sync mode: wait for a command to come in
 
                match self.sync_state {
 
                    SyncState::AwaitingCmd => {
 
                        if let Some(message) = &self.inbox_main {
 
                            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
                            if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) {
 
                                // Check which command we're supposed to execute.
 
                                let message = self.inbox_main.take().unwrap();
 
                                let target_port_id = message.data_header.target_port;
 
                                let receive_result = component::default_handle_received_data_message(
 
                                    target_port_id, PortInstruction::NoSource,
 
                                    &mut self.inbox_main, &mut self.inbox_backup,
 
                                    comp_ctx, sched_ctx, &mut self.control
 
                                );
 

	
 
                                if let Err(location_and_message) = receive_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let (tag_value, embedded_heap_pos) = message.content.values[0].as_union();
 
                                    if tag_value == self.input_union_send_tag_value {
 
                                        // Retrieve bytes from the message
 
                                        self.byte_buffer.clear();
 
                                        let union_content = &message.content.regions[embedded_heap_pos as usize];
 
                                        debug_assert_eq!(union_content.len(), 1);
 
                                        let array_heap_pos = union_content[0].as_array();
 
                                        let array_values = &message.content.regions[array_heap_pos as usize];
 
                                        self.byte_buffer.reserve(array_values.len());
 
                                        for value in array_values {
 
                                            self.byte_buffer.push(value.as_uint8());
 
                                        }
 

	
 
                                        self.sync_state = SyncState::Putting;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_receive_tag_value {
 
                                        // Component requires a `recv`
 
                                        self.sync_state = SyncState::Getting;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_finish_tag_value {
 
                                        // Component requires us to end the sync round
 
                                        self.sync_state = SyncState::FinishSync;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_shutdown_tag_value {
 
                                        // Component wants to close the connection
 
                                        self.sync_state = SyncState::FinishSyncThenQuit;
 
                                        return CompScheduling::Immediate;
 
                                    } else {
 
                                        unreachable!("got tag_value {}", tag_value)
 
                        match component::default_attempt_get(
 
                            &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource,
 
                            &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx,
 
                            &mut self.control, &mut self.consensus
 
                        ) {
 
                            GetResult::Received(message) => {
 
                                let (tag_value, embedded_heap_pos) = message.content.values[0].as_union();
 
                                if tag_value == self.input_union_send_tag_value {
 
                                    // Retrieve bytes from the message
 
                                    self.byte_buffer.clear();
 
                                    let union_content = &message.content.regions[embedded_heap_pos as usize];
 
                                    debug_assert_eq!(union_content.len(), 1);
 
                                    let array_heap_pos = union_content[0].as_array();
 
                                    let array_values = &message.content.regions[array_heap_pos as usize];
 
                                    self.byte_buffer.reserve(array_values.len());
 
                                    for value in array_values {
 
                                        self.byte_buffer.push(value.as_uint8());
 
                                    }
 

	
 
                                    self.sync_state = SyncState::Putting;
 
                                } else if tag_value == self.input_union_receive_tag_value {
 
                                    // Component requires a `recv`
 
                                    self.sync_state = SyncState::Getting;
 
                                } else if tag_value == self.input_union_finish_tag_value {
 
                                    // Component requires us to end the sync round
 
                                    self.sync_state = SyncState::FinishSync;
 
                                } else if tag_value == self.input_union_shutdown_tag_value {
 
                                    // Component wants to close the connection
 
                                    self.sync_state = SyncState::FinishSyncThenQuit;
 
                                } else {
 
                                    unreachable!("got tag_value {}", tag_value)
 
                                }
 
                            } else {
 
                                todo!("handle sync failure due to message deadlock");
 

	
 
                                return CompScheduling::Immediate;
 
                            },
 
                            GetResult::NoMessage => {
 
                                return CompScheduling::Sleep;
 
                            },
 
                            GetResult::Error(location_and_message) => {
 
                                component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                return CompScheduling::Immediate;
 
                            }
 
                        } else {
 
                            let port_handle = comp_ctx.get_port_handle(self.pdl_input_port_id);
 
                            comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::NoSource;
 
                            self.exec_state.set_as_blocked_get(self.pdl_input_port_id);
 
                            return CompScheduling::Sleep;
 
                        }
 
                    },
 
                    SyncState::Putting => {
 
                        // We're supposed to send a user-supplied message fully
 
                        // over the socket. But we might end up blocking. In
 
                        // that case the component goes to sleep until it is
 
                        // polled.
 
                        let socket = self.socket_state.get_socket();
 
                        while !self.byte_buffer.is_empty() {
 
                            match socket.send(&self.byte_buffer) {
 
                                Ok(bytes_sent) => {
 
                                    self.byte_buffer.drain(..bytes_sent);
 
@@ -227,28 +215,26 @@ impl Component for ComponentTcpClient {
 
                                Err(err) => {
 
                                    if err.kind() == IoErrorKind::WouldBlock {
 
                                        return CompScheduling::Sleep; // wait until notified
 
                                    } else {
 
                                        todo!("handle socket.send error {:?}", err)
 
                                    }
 
                                }
 
                            }
 
                        }
 

	
 
                        // If here then we're done putting the data, we can
 
                        // finish the sync round
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
                        return CompScheduling::Immediate;
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                    SyncState::Getting => {
 
                        // We're going to try and receive a single message. If
 
                        // this causes us to end up blocking the component
 
                        // goes to sleep until it is polled.
 
                        const BUFFER_SIZE: usize = 1024; // TODO: Move to config
 

	
 
                        let socket = self.socket_state.get_socket();
 
                        self.byte_buffer.resize(BUFFER_SIZE, 0);
 
                        match socket.receive(&mut self.byte_buffer) {
 
                            Ok(num_received) => {
 
                                self.byte_buffer.resize(num_received, 0);
 
@@ -264,27 +250,25 @@ impl Component for ComponentTcpClient {
 
                                }
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep; // wait until polled
 
                                } else {
 
                                    todo!("handle socket.receive error {:?}", err)
 
                                }
 
                            }
 
                        }
 
                    },
 
                    SyncState::FinishSync | SyncState::FinishSyncThenQuit => {
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                // Entered when awaiting a new command
 
                debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd);
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut =>
 
                return CompScheduling::Sleep,
 
            CompMode::StartExit =>
 
@@ -318,25 +302,25 @@ impl ComponentTcpClient {
 
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
        let socket = SocketTcpClient::new(ip_address, port);
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 

	
 
        return Self{
 
            socket_state: SocketState::Connected(socket.unwrap()),
 
            sync_state: SyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: None,
 
            inbox_main: vec![None],
 
            inbox_backup: Vec::new(),
 
            input_union_send_tag_value: -1,
 
            input_union_receive_tag_value: -1,
 
            input_union_finish_tag_value: -1,
 
            input_union_shutdown_tag_value: -1,
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            byte_buffer: Vec::new(),
 
        }
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -4,24 +4,25 @@ use crate::protocol::ast::ProcedureDefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::runtime::CompId;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::component::{
 
    self,
 
    InboxMain, InboxBackup, GetResult,
 
    CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason,
 
    port_id_from_eval, port_id_to_eval
 
};
 
use super::component_context::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    PerformedSelectWait(u32),
 
@@ -98,26 +99,24 @@ struct SelectState {
 
    cases: Vec<SelectCase>,
 
    next_case: u32,
 
    num_cases: u32,
 
    random: Random,
 
    candidates_workspace: Vec<usize>,
 
}
 

	
 
enum SelectDecision {
 
    None,
 
    Case(u32), // contains case index, should be passed along to PDL code
 
}
 

	
 
type InboxMain = Vec<Option<DataMessage>>;
 

	
 
impl SelectState {
 
    fn new() -> Self {
 
        return Self{
 
            cases: Vec::new(),
 
            next_case: 0,
 
            num_cases: 0,
 
            random: Random::new(),
 
            candidates_workspace: Vec::new(),
 
        }
 
    }
 

	
 
    fn handle_select_start(&mut self, num_cases: u32) {
 
@@ -299,80 +298,48 @@ impl Component for CompPDL {
 
        if let Err(error) = run_result {
 
            self.handle_component_error(sched_ctx, CompError::Executor(error));
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::BlockGet(expr_id, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 

	
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.last_instruction = PortInstruction::SourceLocation(expr_id);
 
                let port_is_closed = port_info.state == PortState::Closed;
 

	
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        let receive_result = component::default_handle_received_data_message(
 
                            port_id, PortInstruction::SourceLocation(expr_id),
 
                            &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                            comp_ctx, sched_ctx, &mut self.control
 
                        );
 
                        if let Err(location_and_message) = receive_result {
 
                            self.handle_generic_component_error(sched_ctx, location_and_message);
 
                            return CompScheduling::Immediate
 
                        } else {
 
                            self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                            return CompScheduling::Immediate;
 
                        }
 
                    } else {
 
                        let protocol = &sched_ctx.runtime.protocol;
 
                        self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                            &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                            String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s")
 
                        )));
 
                match component::default_attempt_get(
 
                    &mut self.exec_state, port_id, PortInstruction::SourceLocation(expr_id),
 
                    &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx,
 
                    &mut self.control, &mut self.consensus
 
                ) {
 
                    GetResult::Received(message) => {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    GetResult::NoMessage => {
 
                        return CompScheduling::Sleep;
 
                    },
 
                    GetResult::Error(location_and_message) => {
 
                        self.handle_generic_component_error(sched_ctx, location_and_message);
 
                        return CompScheduling::Immediate;
 
                    }
 
                } else if port_is_closed {
 
                    // No messages, but getting makes no sense as the port is
 
                    // closed.
 
                    let peer_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0)
 
                    )));
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
@@ -437,26 +404,27 @@ impl Component for CompPDL {
 
                } else {
 
                    // No decision yet
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                component::default_handle_sync_start(
 
                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                );
 
                return CompScheduling::Immediate;
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return CompScheduling::Requeue;
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
@@ -516,35 +484,24 @@ impl CompPDL {
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        for message in self.inbox_main.iter() {
 
            if let Some(message) = message {
 
                self.consensus.handle_incoming_data_message(comp_ctx, message);
 
            }
 
        }
 
        debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
        self.exec_state.mode = CompMode::Sync;
 
    }
 

	
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
        self.exec_state.mode = CompMode::SyncEnd;
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason));
 
        debug_assert_eq!(self.exec_state.mode, CompMode::StartExit);
 
        self.exec_state.mode = CompMode::BusyExit;
 
        let exit_inside_sync = self.exec_state.exit_reason.is_in_sync();
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
@@ -569,28 +526,26 @@ impl CompPDL {
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        use component::IncomingData;
 

	
 
        // Whatever we do, glean information from headers in message
 
        if self.exec_state.mode.is_in_sync_block() {
 
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
        }
 

	
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        match component::default_handle_incoming_data_message(
 
            &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message,
 
            &mut self.exec_state, &mut self.inbox_main, comp_ctx, message,
 
            sched_ctx, &mut self.control
 
        ) {
 
            IncomingData::PlacedInSlot => {
 
                if self.exec_state.mode == CompMode::BlockedSelect {
 
                    let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx);
 
                    if let SelectDecision::Case(case_index) = select_decision {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                        self.exec_state.mode = CompMode::Sync;
 
                    }
 
                }
 
            },
 
            IncomingData::SlotFull(message) => {
src/runtime2/component/component_random.rs
Show inline comments
 
@@ -74,26 +74,27 @@ impl Component for ComponentRandomU32 {
 
                // (at some point in the future, this is just a testing
 
                // component).
 
                if self.random_minimum >= self.random_maximum {
 
                    // Could throw an evaluation error, but lets just panic
 
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
 
                }
 

	
 
                if self.num_sends >= self.max_num_sends {
 
                    self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                } else {
 
                    sched_ctx.log("Entering sync mode");
 
                    self.did_perform_send = false;
 
                    self.consensus.notify_sync_start(comp_ctx);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    component::default_handle_sync_start(
 
                        &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus
 
                    );
 
                }
 

	
 
                return CompScheduling::Immediate;
 
            },
 
            CompMode::Sync => {
 
                // This component just sends a single message, then waits until
 
                // consensus has been reached
 
                if !self.did_perform_send {
 
                    sched_ctx.log("Sending random message");
 
                    let mut random = self.generator.next_u32() - self.random_minimum;
 
                    let random_delta = self.random_maximum - self.random_minimum;
 
                    random %= random_delta;
 
@@ -111,28 +112,25 @@ impl Component for ComponentRandomU32 {
 
                        return CompScheduling::Immediate
 
                    } else {
 
                        // Blocked or not, we set `did_perform_send` to true. If
 
                        // blocked then the moment we become unblocked (and are back
 
                        // at the `Sync` mode) we have sent the message.
 
                        let scheduling = send_result.unwrap();
 
                        self.did_perform_send = true;
 
                        self.num_sends += 1;
 
                        return scheduling
 
                    }
 
                } else {
 
                    // Message was sent, finish this sync round
 
                    sched_ctx.log("Waiting for consensus");
 
                    self.exec_state.mode = CompMode::SyncEnd;
 
                    let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                    component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
                    component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                    return CompScheduling::Requeue;
 
                }
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep,
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
0 comments (0 inline, 0 general)