Changeset - 0ec053ef96b5
[Not reviewed]
0 5 0
mh - 3 years ago 2022-02-02 17:25:18
contact@maxhenger.nl
WIP: Fix transfered port ID bug
5 files changed with 9 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -116,49 +116,48 @@ impl CompCtx {
 
                };
 
                self.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer associated with a port.
 
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_id);
 
        if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) {
 
            return;
 
        }
 

	
 
        debug_assert!(port.associated_with_peer);
 
        dbg_code!(port.associated_with_peer = false);
 
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
 
        let peer = &mut self.peers[peer_index];
 
        peer.num_associated_ports -= 1;
 
        println!(" ****** DEBUG: Removed peer {:?} from {:?}, now at {}", peer.id, self_id, peer.num_associated_ports);
 
        if peer.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            if let Some(key) = peer.handle.decrement_users() {
 
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
 
        let port_info = self.get_port_mut(port_handle);
 
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
 
        port_info.state = new_state;
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
        return LocalPortHandle(port_id);
 
    }
 

	
 
    // should perhaps be revised, used in main inbox
 
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
 
        return self.must_get_port_index(port_handle);
 
    }
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -658,49 +658,50 @@ impl CompPDL {
 
            let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                let created_peer_port_index = port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well. So
 
                        // adjust IDs appropriately.
 
                        let peer_pair = &port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether")
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        println!("DEBUG: Setting peer for port {:?} of component {:?} to {:?}", created_port_info.self_id, reservation.id(), creator_ctx.id);
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None);
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component
 
                // Peer is a different component. We'll deal with sending the
 
                // appropriate messages later
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let prompt = Prompt::new(
 
            &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
            definition_id, monomorph_index, arguments,
 
        );
 
        let component = CompPDL::new(prompt, port_id_pairs.len());
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        let created_ctx = &component.ctx;
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component. Here is also where we will
 
        // transfer messages in the main inbox.
 
@@ -718,48 +719,49 @@ impl CompPDL {
 
            debug_assert!(component.code.inbox_main[created_port_index].is_none());
 
            if let Some(mut message) = self.inbox_main.remove(creator_port_index) {
 
                message.data_header.target_port = pair.created_id;
 
                component.code.inbox_main[created_port_index] = Some(message);
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.creator_id {
 
                    // transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created_id;
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            // Handle potential channel between creator and created component
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                peer_port_info.peer_port_id = created_port_info.self_id;
 
                creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
        // By now all ports have been transferred. We'll now do any of the setup
 
        // for rerouting/messaging
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created_handle);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                        pair.creator_id, pair.created_id, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
src/runtime2/runtime.rs
Show inline comments
 
@@ -92,49 +92,48 @@ impl CompHandle {
 
        };
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message));
 
        self.inbox.push(message);
 
        if try_wake_up {
 
            wake_up_if_sleeping(sched_ctx, self.id, self);
 
        }
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns the `CompKey` to the component if it should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        dbg_code!(self.decremented = true);
 
        println!(" ****** DEBUG [handle]: Decremented count to {} for {:?}", new_count, self.id);
 
        if new_count == 0 {
 
            return Some(unsafe{ self.id.upgrade() });
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Clone for CompHandle {
 
    fn clone(&self) -> Self {
 
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
 
        self.increment_users();
 
        return CompHandle{
 
            target: self.target,
 
            id: self.id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
    fn deref(&self) -> &Self::Target {
src/runtime2/scheduler.rs
Show inline comments
 
@@ -7,48 +7,53 @@ use super::runtime::*;
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a RuntimeInner,
 
    pub id: u32,
 
    pub comp: u32,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self {
 
        return Self {
 
            runtime,
 
            id,
 
            comp: 0,
 
        }
 
    }
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
    }
 

	
 
    // TODO: Obviously remove, but useful for testing
 
    pub(crate) fn log_special(&self, text: &str) {
 
        println!("[s:{:02}, c:{:03}] *** *** {}", self.id, self.comp, text);
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Scheduler{ runtime, scheduler_id }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let component = self.runtime.get_component(comp_key);
 
            scheduler_ctx.comp = comp_key.0;
 

	
 
            // Run the component until it no longer indicates that it needs to
 
@@ -84,35 +89,34 @@ impl Scheduler {
 
        component.public.sleeping.store(true, Ordering::Release);
 
        if component.inbox.can_pop() {
 
            let should_reschedule = component.public.sleeping
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
 

	
 
            if should_reschedule {
 
                self.runtime.enqueue_work(key);
 
            }
 
        }
 
    }
 

	
 
    /// Marks the component as exiting by removing the reference it holds to
 
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
 
    /// has not yet been destroyed)
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        // If we didn't yet decrement our reference count, do so now
 
        let comp_key = unsafe{ component.ctx.id.upgrade() };
 

	
 
        if !component.exiting {
 
            component.exiting = true;
 

	
 
            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
            let new_count = old_count - 1;
 
            println!(" ****** DEBUG [ sched]: Decremented count to {} for {:?}", new_count, component.ctx.id);
 
            if new_count == 0 {
 
                sched_ctx.runtime.destroy_component(comp_key);
 
                return;
 
            }
 
        }
 

	
 
        // Enter "regular" sleeping mode
 
        self.mark_component_as_sleeping(comp_key, component);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -10,49 +10,49 @@ fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: V
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication_b() {
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
0 comments (0 inline, 0 general)