Changeset - 2c25a85d3cc2
[Not reviewed]
0 3 0
mh - 3 years ago 2022-01-25 17:01:19
contact@maxhenger.nl
WIP: Fix scheduling and message transfer bug
3 files changed with 40 insertions and 8 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -253,97 +253,97 @@ impl Mode {
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit =>
 
                return false,
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: Vec<Option<DataMessage>>,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:?}", message));
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(!_should_remove);
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        let can_run = self.mode.can_run();
 
        sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run));
 
        if !can_run {
 
            return Ok(CompScheduling::Sleep);
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
@@ -513,97 +513,102 @@ impl CompPDL {
 
                // One more message for this port
 
                let message = self.inbox_backup.remove(message_index);
 
                debug_assert_eq!(comp_ctx.get_port(port_id).state, PortState::Blocked); // since we had >1 message on the port
 
                self.inbox_main[port_index] = Some(message);
 
                return;
 
            }
 
        }
 

	
 
        // Did not have any more messages. So if we were blocked, then we need
 
        // to send the "unblock" message.
 
        let port_info = &comp_ctx.ports[port_index];
 
        if port_info.state == PortState::Blocked {
 
            let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer_comp_id);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        // Little local utility to send an Ack
 
        fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_port_id: PortId, peer_comp_id: CompId) {
 
            let peer_info = comp_ctx.get_peer(peer_comp_id);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
                id: causer_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: None,
 
                content: ControlMessageContent::Ack,
 
            }), true);
 
        }
 

	
 
        // Handle the content of the control message, and optionally Ack it
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
                loop {
 
                    let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                            // FIX @NoDirectHandle
 
                            let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.send_message(sched_ctx, Message::Control(message), true);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
                            // FIX @NoDirectHandle
 
                            let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 
                            wake_up_if_sleeping(sched_ctx, to_schedule, &handle);
 

	
 
                            // Note that the component is intentionally not
 
                            // sleeping, so we just wake it up
 
                            debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                            let key = unsafe{ to_schedule.upgrade() };
 
                            sched_ctx.runtime.enqueue_work(key);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            break;
 
                        },
 
                        AckAction::None => {
 
                            break;
 
                        }
 
                    }
 
                }
 
            },
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_info = &mut comp_ctx.ports[port_index];
 
                let peer_port_id = port_info.peer_id;
 
                let peer_comp_id = port_info.peer_comp_id;
 
                port_info.state = PortState::Closed;
 

	
 
                let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 
                if peer_info.num_associated_ports == 0 {
 
                    // TODO: @Refactor clean up all these uses of "num_associated_ports"
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 

	
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id);
 
            }
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_info = comp_ctx.get_port(port_id);
 
@@ -735,108 +740,130 @@ impl CompPDL {
 
        // the new component.
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator);
 
            let created_port_info = created_ctx.get_port_mut(pair.created);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                let created_peer_port_index = port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator == creator_port_info.peer_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well
 
                        let peer_pair = &port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_id = peer_pair.created;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(sched_ctx, creator_ctx.id, None);
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component
 
                let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id);
 
                created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let prompt = Prompt::new(
 
            &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
            definition_id, monomorph_index, arguments,
 
        );
 
        let component = CompPDL::new(prompt, port_id_pairs.len());
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        let created_ctx = &component.ctx;
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component
 
        // potentially remove the peer component. Here is also where we will
 
        // transfer messages in the main inbox.
 
        for pair in port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap();
 
            let creator_port_info = creator_ctx.ports.remove(creator_port_index);
 
            if creator_port_info.peer_comp_id != creator_ctx.id {
 
                creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id);
 
            }
 

	
 
            let created_port_info = created_ctx.get_port(pair.created);
 
            // Transfer any messages
 
            let created_port_index = created_ctx.get_port_index(pair.created).unwrap();
 
            let created_port_info = &created_ctx.ports[created_port_index];
 
            debug_assert!(component.code.inbox_main[created_port_index].is_none());
 
            if let Some(mut message) = self.inbox_main.remove(creator_port_index) {
 
                message.data_header.target_port = pair.created;
 
                component.code.inbox_main[created_port_index] = Some(message);
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.creator {
 
                    // transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created;
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            // Handle potential channel between creator and created component
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // This is the cause where the creator obtains a reference
 
                // to the created component
 
                let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                creator_ctx.add_peer(sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
        // By now all ports have been transferred. We'll now do any of the setup
 
        // for rerouting/messaging
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                        pair.creator, pair.created, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_info = created_ctx.get_peer(port_info.peer_comp_id);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
    /// the peer component's entry. If that hits 0 then it will be removed and
 
    /// returned. If returned then the caller is responsible for decrementing
 
    /// the atomic counters of the peer component's handle.
 
    fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option<Peer>) {
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        let port_info = comp_ctx.ports.remove(port_index);
 

	
 
        // If the component owns the peer, then we don't have to decrement the
 
        // number of peers (because we don't have an entry for ourselves)
 
        if port_info.peer_comp_id == comp_ctx.id {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
        let peer_info = &mut comp_ctx.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        // Check if we still have other ports referencing this peer
 
        if peer_info.num_associated_ports != 0 {
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -621,93 +621,98 @@ impl Consensus {
 
    fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader, combine existing and new solution
 
            self.solution.combine_with_partial_solution(solution);
 
            let round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the partial solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::PartialSolution(solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.highest_id, comp_ctx.id);
 

	
 
        let is_success = match decision {
 
            SyncRoundDecision::None => unreachable!(),
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        for (peer, _) in self.solution.solution.submissions_by.iter().copied() {
 
            if peer == comp_ctx.id {
 
                // Do not send to ourselves
 
                continue;
 
            }
 

	
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(!_should_remove);
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if should_remove {
 
            let key = unsafe{ self.highest_id.upgrade() };
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // -------------------------------------------------------------------------
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.id == port_info.self_id {
 
                port_index = index;
 
            }
 
            expected_mapping.push((port.id, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
 
        self.ports[port_index].mapping = Some(new_mapping);
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
            new_mapping,
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
        };
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader {
 
        return MessageSyncHeader{
 
            sync_round: self.round_index,
 
            sending_id: comp_ctx.id,
 
            highest_id: self.highest_id,
 
        };
 
    }
 

	
 
    #[inline]
 
    fn take_mapping(&mut self) -> u32 {
 
        let mapping = self.mapping_counter;
 
        self.mapping_counter = self.mapping_counter.wrapping_add(1);
 
        return mapping;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o) {
 
        print(\"sender\");
 
        sync put(o, 1);
 
    }
 
    primitive receiver(in<u32> i) {
 
        print(\"receiver\");
 
        sync get(i);
 
        sync auto a = get(i);
 
    }
 
    composite constructor() {
 
        channel o -> i;
 
        print(\"creating sender\");
 
        new sender(o);
 
        print(\"creating receiver\");
 
        new receiver(i);
 
        print(\"done\");
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)