Changeset - 0ec053ef96b5
[Not reviewed]
0 5 0
mh - 3 years ago 2022-02-02 17:25:18
contact@maxhenger.nl
WIP: Fix transfered port ID bug
5 files changed with 9 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -128,25 +128,24 @@ impl CompCtx {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_id);
 
        if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) {
 
            return;
 
        }
 

	
 
        debug_assert!(port.associated_with_peer);
 
        dbg_code!(port.associated_with_peer = false);
 
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
 
        let peer = &mut self.peers[peer_index];
 
        peer.num_associated_ports -= 1;
 
        println!(" ****** DEBUG: Removed peer {:?} from {:?}, now at {}", peer.id, self_id, peer.num_associated_ports);
 
        if peer.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            if let Some(key) = peer.handle.decrement_users() {
 
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
 
        let port_info = self.get_port_mut(port_handle);
 
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -670,25 +670,26 @@ impl CompPDL {
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether")
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        println!("DEBUG: Setting peer for port {:?} of component {:?} to {:?}", created_port_info.self_id, reservation.id(), creator_ctx.id);
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None);
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component
 
                // Peer is a different component. We'll deal with sending the
 
                // appropriate messages later
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let prompt = Prompt::new(
 
@@ -730,24 +731,25 @@ impl CompPDL {
 
                    message.data_header.target_port = pair.created_id;
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            // Handle potential channel between creator and created component
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                peer_port_info.peer_port_id = created_port_info.self_id;
 
                creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
        // By now all ports have been transferred. We'll now do any of the setup
 
        // for rerouting/messaging
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created_handle);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
src/runtime2/runtime.rs
Show inline comments
 
@@ -104,25 +104,24 @@ impl CompHandle {
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns the `CompKey` to the component if it should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        dbg_code!(self.decremented = true);
 
        println!(" ****** DEBUG [handle]: Decremented count to {} for {:?}", new_count, self.id);
 
        if new_count == 0 {
 
            return Some(unsafe{ self.id.upgrade() });
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Clone for CompHandle {
 
    fn clone(&self) -> Self {
 
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
 
        self.increment_users();
src/runtime2/scheduler.rs
Show inline comments
 
@@ -19,24 +19,29 @@ pub(crate) struct SchedulerCtx<'a> {
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self {
 
        return Self {
 
            runtime,
 
            id,
 
            comp: 0,
 
        }
 
    }
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
    }
 

	
 
    // TODO: Obviously remove, but useful for testing
 
    pub(crate) fn log_special(&self, text: &str) {
 
        println!("[s:{:02}, c:{:03}] *** *** {}", self.id, self.comp, text);
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Scheduler{ runtime, scheduler_id }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id);
 

	
 
@@ -96,23 +101,22 @@ impl Scheduler {
 
    /// Marks the component as exiting by removing the reference it holds to
 
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
 
    /// has not yet been destroyed)
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        // If we didn't yet decrement our reference count, do so now
 
        let comp_key = unsafe{ component.ctx.id.upgrade() };
 

	
 
        if !component.exiting {
 
            component.exiting = true;
 

	
 
            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
            let new_count = old_count - 1;
 
            println!(" ****** DEBUG [ sched]: Decremented count to {} for {:?}", new_count, component.ctx.id);
 
            if new_count == 0 {
 
                sched_ctx.runtime.destroy_component(comp_key);
 
                return;
 
            }
 
        }
 

	
 
        // Enter "regular" sleeping mode
 
        self.mark_component_as_sleeping(comp_key, component);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -22,25 +22,25 @@ fn test_component_creation() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication_b() {
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
0 comments (0 inline, 0 general)