Changeset - 7ce4d293022a
[Not reviewed]
0 6 0
MH - 4 years ago 2021-11-09 18:41:05
contact@maxhenger.nl
remove some debug logging, did basic perf tests
6 files changed with 27 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
@@ -455,14 +455,12 @@ impl Consensus {
 
            };
 
            ctx.submit_message(Message::Sync(message));
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index);
 

	
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 
                let mut my_final_branch_id = BranchId::new_invalid();
 
                for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
                    if connector_id == ctx.id {
src/runtime2/mod.rs
Show inline comments
 
@@ -303,37 +303,33 @@ impl RuntimeInner {
 

	
 
    // --- Managing exit condition
 

	
 
    #[inline]
 
    pub(crate) fn increment_active_interfaces(&self) {
 
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
 
        println!("DEBUG: Incremented active interfaces to {}", _old_num + 1);
 
        debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero
 
    }
 

	
 
    pub(crate) fn decrement_active_interfaces(&self) {
 
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
 
        println!("DEBUG: Decremented active interfaces to {}", old_num - 1);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 { // such that active interfaces is now 0
 
            let num_connectors = self.active_connectors.load(Ordering::Acquire);
 
            if num_connectors == 0 {
 
                self.signal_for_shutdown();
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn increment_active_components(&self) {
 
        let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst);
 
        println!("DEBUG: Incremented components to {}", _old_num + 1);
 
    }
 

	
 
    fn decrement_active_components(&self) {
 
        let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst);
 
        println!("DEBUG: Decremented components to {}", old_num - 1);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 { // such that we have no more active connectors (for now!)
 
            let num_interfaces = self.active_interfaces.load(Ordering::Acquire);
 
            if num_interfaces == 0 {
 
                self.signal_for_shutdown();
 
            }
 
@@ -342,20 +338,18 @@ impl RuntimeInner {
 

	
 
    #[inline]
 
    fn signal_for_shutdown(&self) {
 
        debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0);
 
        debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0);
 

	
 
        println!("DEBUG: Signaling for shutdown");
 
        let _lock = self.connector_queue.lock().unwrap();
 
        let should_signal = self.should_exit
 
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_signal {
 
            println!("DEBUG: Notifying all waiting schedulers");
 
            self.scheduler_notifier.notify_all();
 
        }
 
    }
 
}
 

	
 
// TODO: Come back to this at some point
src/runtime2/native.rs
Show inline comments
 
@@ -66,18 +66,16 @@ impl Connector for ConnectorApplication {
 
        // Handle requests coming from the API
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            while let Some(job) = queue.pop_front() {
 
                match job {
 
                    ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                        println!("DEBUG: API adopting ports");
 
                        comp_ctx.push_port(endpoint_a);
 
                        comp_ctx.push_port(endpoint_b);
 
                    }
 
                    ApplicationJob::NewConnector(connector, initial_ports) => {
 
                        println!("DEBUG: API creating connector");
 
                        comp_ctx.push_component(connector, initial_ports);
 
                    },
 
                    ApplicationJob::Shutdown => {
 
                        debug_assert!(queue.is_empty());
 
                        return ConnectorScheduling::Exit;
 
                    }
 
@@ -193,17 +191,14 @@ impl ApplicationInterface {
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            println!("DEBUG: Waking up connector");
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.push_work(key);
 
        } else {
 
            println!("DEBUG: NOT waking up connector");
 
        }
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
src/runtime2/port.rs
Show inline comments
 
@@ -55,13 +55,11 @@ pub struct Port {
 
    pub channel_id: ChannelId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -344,17 +344,17 @@ impl Scheduler {
 

	
 
        return None
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
src/runtime2/tests/mod.rs
Show inline comments
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
const NUM_THREADS: u32 = 10;  // number of threads in runtime
 
const NUM_INSTANCES: u32 = 10;  // number of test instances constructed
 
const NUM_LOOPS: u32 = 10;      // number of loops within a single test (not used by all tests)
 
const NUM_THREADS: u32 = 1;  // number of threads in runtime
 
const NUM_INSTANCES: u32 = 4;  // number of test instances constructed
 
const NUM_LOOPS: u32 = 4;      // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
@@ -24,39 +24,59 @@ fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor:
 
        constructor(&mut api);
 
    }
 

	
 
    // Wait until done :)
 
}
 

	
 
struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
        let delta = std::time::Instant::now() - self.started;
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                print(\"putting!\");
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                print(\"getting!\");
 
                auto result = get(receiver);
 
                assert(result);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("hello");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
0 comments (0 inline, 0 general)