Changeset - 2113bc7bdbc3
[Not reviewed]
0 4 0
MH - 3 years ago 2022-05-14 19:10:45
contact@maxhenger.nl
WIP on implementation of TCP listener component
4 files changed with 253 insertions and 60 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -472,7 +472,7 @@ pub(crate) fn default_handle_received_data_message(
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx);
 
        consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
@@ -825,7 +825,113 @@ pub(crate) fn default_handle_sync_decision(
 
    }
 
}
 

	
 
/// Special component creation function. This function assumes that the
 
/// transferred ports are NOT blocked, and that the channels to whom the ports
 
/// belong are fully owned by the creating component. This will be checked in
 
/// debug mode.
 
pub(crate) fn special_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    component_instance: Box<dyn Component>, component_ports: Vec<PortId>,
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    let reservation = sched_ctx.runtime.start_create_component();
 
    let mut created_ctx = CompCtx::new(&reservation);
 
    let mut port_pairs = Vec::new();
 

	
 
    // Retrieve ports
 
    for instantiator_port_id in component_ports.iter() {
 
        let instantiator_port_id = *instantiator_port_id;
 
        let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id);
 
        let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle);
 

	
 
        // Check if conditions for calling this function are valid
 
        debug_assert!(!instantiator_port.state.is_blocked_due_to_port_change());
 
        debug_assert_eq!(instantiator_port.peer_comp_id, instantiator_ctx.id);
 

	
 
        // Create port at new component
 
        let created_port_handle = created_ctx.add_port(
 
            instantiator_port.peer_comp_id, instantiator_port.peer_port_id,
 
            instantiator_port.kind, instantiator_port.state
 
        );
 
        let created_port = created_ctx.get_port(created_port_handle);
 
        let created_port_id = created_port.self_id;
 

	
 
        // Store in port pairs
 
        let is_open = instantiator_port.state.is_open();
 
        port_pairs.push(PortPair{
 
            instantiator_id: instantiator_port_id,
 
            instantiator_handle: instantiator_port_handle,
 
            created_id: created_port_id,
 
            created_handle: created_port_handle,
 
            is_open,
 
        });
 
    }
 

	
 
    // Set peer of the port for the new component
 
    for pair in port_pairs.iter() {
 
        let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle);
 
        let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
        // Note: we checked above (in debug mode) that the peer of the port is
 
        // owned by the creator as well, now check if the peer is transferred
 
        // as well.
 
        let created_port_peer_index = port_pairs.iter()
 
            .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id);
 

	
 
        match created_port_peer_index {
 
            Some(created_port_peer_index) => {
 
                // Both ends of the channel are moving to the new component
 
                let peer_pair = &port_pairs[created_port_peer_index];
 
                created_port_info.peer_port_id = peer_pair.created_id;
 
                created_port_info.peer_comp_id = reservation.id();
 
            },
 
            None => {
 
                created_port_info.peer_comp_id = instantiator_ctx.id;
 
                if pair.is_open {
 
                    created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id));
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Store component in runtime storage and retrieve component fields in their
 
    // name memory location
 
    let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component(
 
        reservation, component_instance, created_ctx, false
 
    );
 

	
 
    let created_ctx = &mut created_runtime_component.ctx;
 
    let created_component = &mut created_runtime_component.component;
 
    created_component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
    // Transfer messages and link instantiator to created component
 
    for pair in port_pairs.iter() {
 
        instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None);
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 
        transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut());
 

	
 
        let created_port_info = created_ctx.get_port(pair.created_handle);
 
        if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // Set up channel between instantiator component port, and its peer,
 
            // which is owned by the new component
 
            let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id);
 
            let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle);
 
            instantiator_port_info.peer_port_id = created_port_info.self_id;
 
            instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id));
 
        }
 
    }
 

	
 
    // By definition we did not have any remote peers for the transferred ports,
 
    // so we can schedule the new component immediately
 
    sched_ctx.runtime.enqueue_work(created_key);
 
}
 

	
 
/// Puts the component in an execution state where the specified component will
 
/// end up being created. The component goes through state changes (driven by
 
/// incoming control messages) to make sure that all of the ports that are going
 
/// to be transferred are not in a blocked state. Once finished the component
 
/// returns to the `NonSync` mode.
 
pub(crate) fn default_start_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
@@ -850,15 +956,6 @@ pub(crate) fn perform_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Small internal utilities
 
    struct PortPair {
 
        instantiator_id: PortId,
 
        instantiator_handle: LocalPortHandle,
 
        created_id: PortId,
 
        created_handle: LocalPortHandle,
 
        is_open: bool,
 
    }
 

	
 
    // Retrieve ports from the arguments
 
    debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked);
 

	
 
@@ -977,29 +1074,9 @@ pub(crate) fn perform_create_component(
 
    for pair in port_pairs.iter() {
 
        // Transferring the messages and removing the port from the
 
        // instantiator component
 
        let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle);
 
        instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None);
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 

	
 
        if let Some(mut message) = inbox_main.remove(instantiator_port_index) {
 
            message.data_header.target_port = pair.created_id;
 
            created_component.adopt_message(created_ctx, message);
 
        }
 

	
 
        let mut message_index = 0;
 
        while message_index < inbox_backup.len() {
 
            let message = &inbox_backup[message_index];
 
            if message.data_header.target_port == pair.instantiator_id {
 
                // Transfer the message
 
                let mut message = inbox_backup.remove(message_index);
 
                message.data_header.target_port = pair.created_id;
 
                created_component.adopt_message(created_ctx, message);
 
            } else {
 
                // Message does not belong to the port pair that we're
 
                // transferring to the new component.
 
                message_index += 1;
 
            }
 
        }
 
        transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut());
 

	
 
        // Here we take care of the case where the instantiator previously owned
 
        // both ends of the channel, but has transferred one port to the new
 
@@ -1046,6 +1123,24 @@ pub(crate) fn perform_create_component(
 
    exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid());
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
struct PortPair {
 
    instantiator_id: PortId,
 
    instantiator_handle: LocalPortHandle,
 
    created_id: PortId,
 
    created_handle: LocalPortHandle,
 
    is_open: bool,
 
}
 

	
 
pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool {
 
    for (_port_locations, port_id) in ports {
 
        let port_handle = comp_ctx.get_port_handle(*port_id);
 
@@ -1059,6 +1154,32 @@ pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) ->
 
    return true;
 
}
 

	
 
fn transfer_messages(
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, port_pair: &PortPair,
 
    instantiator_ctx: &mut CompCtx, created_ctx: &mut CompCtx, created_component: &mut dyn Component
 
) {
 
    let instantiator_port_index = instantiator_ctx.get_port_index(port_pair.instantiator_handle);
 
    if let Some(mut message) = inbox_main.remove(instantiator_port_index) {
 
        message.data_header.target_port = port_pair.created_id;
 
        created_component.adopt_message(created_ctx, message);
 
    }
 

	
 
    let mut message_index = 0;
 
    while message_index < inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == port_pair.instantiator_id {
 
            // Transfer the message
 
            let mut message = inbox_backup.remove(message_index);
 
            message.data_header.target_port = port_pair.created_id;
 
            created_component.adopt_message(created_ctx, message);
 
        } else {
 
            // Message does not belong to the port pair that we're
 
            // transferring to the new component.
 
            message_index += 1;
 
        }
 
    }
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
@@ -1079,16 +1200,6 @@ pub(crate) fn default_handle_error_for_builtin(
 
    exec_state.set_as_start_exit(exit_reason);
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Sends a message without any transmitted ports. Does not check if sending
 
/// is actually valid.
 
fn send_message_without_ports(
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -330,7 +330,7 @@ impl ComponentTcpClient {
 

	
 
    pub(crate) fn new_with_existing_connection(socket: SocketTcpClient, input_port: PortId, output_port: PortId) -> Self {
 
        return Self{
 
            socket_state: ClientSocketState::Connected(socket);,
 
            socket_state: ClientSocketState::Connected(socket),
 
            sync_state: ClientSyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
@@ -413,13 +413,33 @@ impl ListenerSocketState {
 
    }
 
}
 

	
 
#[derive(PartialEq, Debug)]
 
enum ListenerSyncState {
 
    AwaitingCmd,
 
    Accept,
 
    AcceptCommandReceived,
 
    AcceptChannelGenerated{ client: SocketTcpClient, cmd_rx: PortId, data_tx: PortId },
 
    FinishSyncThenQuit,
 
}
 

	
 
impl ListenerSyncState {
 
    // Bit of a hacky solution: keeps the listener sync state intact, except
 
    // if it is `AcceptChannelGenerated`, that will be replaced with
 
    // `AwaitingCmd`. The reason is to move the `client` out.
 
    fn take(&mut self) -> ListenerSyncState {
 
        use ListenerSyncState::*;
 

	
 
        match self {
 
            AwaitingCmd => return AwaitingCmd,
 
            AcceptCommandReceived => return AcceptCommandReceived,
 
            FinishSyncThenQuit => return FinishSyncThenQuit,
 
            AcceptChannelGenerated{ .. } => {
 
                let mut swapped = ListenerSyncState::AwaitingCmd;
 
                std::mem::swap(self, &mut swapped);
 
                return swapped;
 
            }
 
        }
 
    }
 
}
 

	
 
pub struct ComponentTcpListener {
 
    // Properties for the tcp socket
 
    socket_state: ListenerSocketState,
 
@@ -516,14 +536,29 @@ impl Component for ComponentTcpListener {
 
            CompMode::NonSync => {
 
                match &self.socket_state {
 
                    ListenerSocketState::Connected(_socket) => {
 
                        if self.sync_state == ListenerSyncState::FinishSyncThenQuit {
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            self.sync_state = ListenerSyncState::AwaitingCmd;
 
                            component::default_handle_sync_start(
 
                                &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                            );
 
                        match self.sync_state.take() {
 
                            ListenerSyncState::AwaitingCmd => {
 
                                component::default_handle_sync_start(
 
                                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                                );
 
                            },
 
                            ListenerSyncState::AcceptCommandReceived => unreachable!(),
 
                            ListenerSyncState::AcceptChannelGenerated{ client, cmd_rx, data_tx } => {
 
                                // Now that we're outside the sync round, create the tcp client
 
                                // component
 
                                let socket_component: Box<dyn Component> = Box::new(ComponentTcpClient::new_with_existing_connection(client, cmd_rx, data_tx));
 
                                component::special_create_component(
 
                                    &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control,
 
                                    &mut self.inbox_main, &mut self.inbox_backup, socket_component,
 
                                    vec![cmd_rx, data_tx]
 
                                );
 
                                self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take()
 
                            },
 
                            ListenerSyncState::FinishSyncThenQuit => {
 
                                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                            },
 
                        }
 

	
 
                        return CompScheduling::Immediate;
 
                    },
 
                    ListenerSocketState::Error => {
 
@@ -543,7 +578,7 @@ impl Component for ComponentTcpListener {
 
                            GetResult::Received(message) => {
 
                                let (tag_value, _) = message.content.values[0].as_union();
 
                                if tag_value == self.input_union_accept_tag {
 
                                    self.sync_state = ListenerSyncState::Accept;
 
                                    self.sync_state = ListenerSyncState::AcceptCommandReceived;
 
                                } else if tag_value == self.input_union_shutdown_tag {
 
                                    self.sync_state = ListenerSyncState::FinishSyncThenQuit;
 
                                } else {
 
@@ -561,33 +596,74 @@ impl Component for ComponentTcpListener {
 
                            }
 
                        }
 
                    },
 
                    ListenerSyncState::Accept => {
 
                    ListenerSyncState::AcceptCommandReceived => {
 
                        let socket = self.socket_state.get_socket();
 
                        match socket.accept() {
 
                            Ok(client) => {
 
                                // TODO: Continue here, somehow precreate component, but do the transfer correctly
 
                                // Create the channels (and the inbox entries, to stay consistent
 
                                // with the expectations from the `component` module's functions)
 
                                let client = client.unwrap();
 
                                component::default_start_create_component()
 
                                todo!("actually create the component")
 
                                let cmd_channel = comp_ctx.create_channel();
 
                                let data_channel = comp_ctx.create_channel();
 

	
 
                                let port_ids = [
 
                                    cmd_channel.putter_id, cmd_channel.getter_id,
 
                                    data_channel.putter_id, data_channel.getter_id,
 
                                ];
 
                                for port_id in port_ids {
 
                                    let expected_port_index = self.inbox_main.len();
 
                                    let port_handle = comp_ctx.get_port_handle(port_id);
 
                                    self.inbox_main.push(None);
 
                                    self.consensus.notify_of_new_port(expected_port_index, port_handle, comp_ctx);
 
                                }
 

	
 
                                // Construct the message containing the appropriate ports that will
 
                                // be sent to the component commanding this listener.
 
                                let mut values = ValueGroup::new_stack(vec![
 
                                    Value::Unassigned, Value::Unassigned
 
                                ]);
 
                                values.values[self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id));
 
                                values.values[self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id));
 
                                if let Err(location_and_message) = component::default_send_data_message(
 
                                    &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values,
 
                                    sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                                ) {
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx, location_and_message
 
                                    );
 
                                }
 

	
 
                                // And finish the consensus round
 
                                component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 

	
 
                                // Enter a state such that when we leave the consensus round and
 
                                // go back to the nonsync state, that we will actually create the
 
                                // tcp client component.
 
                                self.sync_state = ListenerSyncState::AcceptChannelGenerated {
 
                                    client,
 
                                    cmd_rx: cmd_channel.getter_id,
 
                                    data_tx: data_channel.putter_id
 
                                };
 

	
 
                                return CompScheduling::Requeue;
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep;
 
                                } else {
 
                                    todo!("handle listener.accept error {:?}", err);
 
                                    todo!("handle listener.accept error {:?}", err)
 
                                }
 
                            }
 
                        }
 
                    },
 
                    ListenerSyncState::AcceptChannelGenerated{ .. } => unreachable!(),
 
                    ListenerSyncState::FinishSyncThenQuit => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    }
 
                }
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::BlockedGet => {
 
                debug_assert_eq!(self.sync_state, ListenerSyncState::AwaitingCmd);
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -355,7 +355,7 @@ impl Consensus {
 
        self.solution.clear();
 
    }
 

	
 
    pub(crate) fn notify_received_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) {
 
    pub(crate) fn notify_of_new_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(_expected_index, self.ports.len());
 
        let port_info = comp_ctx.get_port(port_handle);
 
        self.ports.push(PortAnnotation{
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -154,6 +154,12 @@ impl Drop for SocketTcpListener {
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpListener {
 
    fn as_file_descriptor(&self) -> FileDescriptor {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// Raw socket receiver. Essentially a listener that accepts a single connection
 
struct SocketRawRx {
 
    listen_handle: c_int,
0 comments (0 inline, 0 general)