Changeset - 80ce091d67cf
[Not reviewed]
0 2 0
mh - 3 years ago 2022-01-24 13:37:10
contact@maxhenger.nl
WIP: Integrating select, fixing bugs on tests
2 files changed with 25 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -241,7 +241,7 @@ impl CompPDL {
 
        sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, true);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(!_should_remove);
 
            return;
 
@@ -474,6 +474,18 @@ impl CompPDL {
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        // Little local utility to send an Ack
 
        fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_port_id: PortId, peer_comp_id: CompId) {
 
            let peer_info = comp_ctx.get_peer(peer_comp_id);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
                id: causer_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: None,
 
                content: ControlMessageContent::Ack,
 
            }), true);
 
        }
 

	
 
        // Handle the content of the control message, and optionally Ack it
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
@@ -482,14 +494,18 @@ impl CompPDL {
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                            // FIX @NoDirectHandle
 
                            let handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.send_message(sched_ctx, Message::Control(message), true);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
                            // FIX @NoDirectHandle
 
                            let handle = sched_ctx.runtime.get_component_public(to_schedule);
 
                            let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 
                            wake_up_if_sleeping(sched_ctx, to_schedule, &handle);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            break;
 
                        },
 
                        AckAction::None => {
 
@@ -513,6 +529,7 @@ impl CompPDL {
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_info = &mut comp_ctx.ports[port_index];
 
                let peer_port_id = port_info.peer_id;
 
                let peer_comp_id = port_info.peer_comp_id;
 
                port_info.state = PortState::Closed;
 

	
 
@@ -529,6 +546,8 @@ impl CompPDL {
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 

	
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id);
 
            }
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
@@ -551,11 +570,9 @@ impl CompPDL {
 
                    port_info.state = PortState::Blocked;
 
                }
 

	
 
                let peer_info = comp_ctx.get_peer(port_info.peer_comp_id);
 
                // TODO: Continue here. Send ack, but think about whether we
 
                //  always have the peer in our list of peers? Quickly thinking
 
                //  about it, I think so, but we may have a series of port
 
                //  transfers. Does that change things?
 
                let peer_port_id = port_info.peer_id;
 
                let peer_comp_id = port_info.peer_comp_id;
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id);
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -96,9 +96,7 @@ impl ControlLayer {
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                self.entries.remove(entry_index);
 
                self.handle_ack(to_ack, sched_ctx, comp_ctx);
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
            },
0 comments (0 inline, 0 general)