Changeset - 2c25a85d3cc2
[Not reviewed]
0 3 0
mh - 3 years ago 2022-01-25 17:01:19
contact@maxhenger.nl
WIP: Fix scheduling and message transfer bug
3 files changed with 40 insertions and 8 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -277,49 +277,49 @@ impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:?}", message));
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(!_should_remove);
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 
@@ -537,49 +537,54 @@ impl CompPDL {
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: None,
 
                content: ControlMessageContent::Ack,
 
            }), true);
 
        }
 

	
 
        // Handle the content of the control message, and optionally Ack it
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
                loop {
 
                    let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                            // FIX @NoDirectHandle
 
                            let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.send_message(sched_ctx, Message::Control(message), true);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
                            // FIX @NoDirectHandle
 
                            let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 
                            wake_up_if_sleeping(sched_ctx, to_schedule, &handle);
 

	
 
                            // Note that the component is intentionally not
 
                            // sleeping, so we just wake it up
 
                            debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                            let key = unsafe{ to_schedule.upgrade() };
 
                            sched_ctx.runtime.enqueue_work(key);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            break;
 
                        },
 
                        AckAction::None => {
 
                            break;
 
                        }
 
                    }
 
                }
 
            },
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
@@ -759,60 +764,82 @@ impl CompPDL {
 
                }
 
            } else {
 
                // Peer is a different component
 
                let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id);
 
                created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let prompt = Prompt::new(
 
            &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
            definition_id, monomorph_index, arguments,
 
        );
 
        let component = CompPDL::new(prompt, port_id_pairs.len());
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        let created_ctx = &component.ctx;
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component
 
        // potentially remove the peer component. Here is also where we will
 
        // transfer messages in the main inbox.
 
        for pair in port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap();
 
            let creator_port_info = creator_ctx.ports.remove(creator_port_index);
 
            if creator_port_info.peer_comp_id != creator_ctx.id {
 
                creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id);
 
            }
 

	
 
            let created_port_info = created_ctx.get_port(pair.created);
 
            // Transfer any messages
 
            let created_port_index = created_ctx.get_port_index(pair.created).unwrap();
 
            let created_port_info = &created_ctx.ports[created_port_index];
 
            debug_assert!(component.code.inbox_main[created_port_index].is_none());
 
            if let Some(mut message) = self.inbox_main.remove(creator_port_index) {
 
                message.data_header.target_port = pair.created;
 
                component.code.inbox_main[created_port_index] = Some(message);
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.creator {
 
                    // transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created;
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            // Handle potential channel between creator and created component
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // This is the cause where the creator obtains a reference
 
                // to the created component
 
                let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                creator_ctx.add_peer(sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
        // By now all ports have been transferred. We'll now do any of the setup
 
        // for rerouting/messaging
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                        pair.creator, pair.created, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_info = created_ctx.get_peer(port_info.peer_comp_id);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -645,50 +645,55 @@ impl Consensus {
 
            SyncRoundDecision::None => unreachable!(),
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        for (peer, _) in self.solution.solution.submissions_by.iter().copied() {
 
            if peer == comp_ctx.id {
 
                // Do not send to ourselves
 
                continue;
 
            }
 

	
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(!_should_remove);
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if should_remove {
 
            let key = unsafe{ self.highest_id.upgrade() };
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // -------------------------------------------------------------------------
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.id == port_info.self_id {
 
                port_index = index;
 
            }
 
            expected_mapping.push((port.id, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
 
        self.ports[port_index].mapping = Some(new_mapping);
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
            new_mapping,
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_id,
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -18,39 +18,39 @@ fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o) {
 
        print(\"sender\");
 
        sync put(o, 1);
 
    }
 
    primitive receiver(in<u32> i) {
 
        print(\"receiver\");
 
        sync get(i);
 
        sync auto a = get(i);
 
    }
 
    composite constructor() {
 
        channel o -> i;
 
        print(\"creating sender\");
 
        new sender(o);
 
        print(\"creating receiver\");
 
        new receiver(i);
 
        print(\"done\");
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)