Changeset - f11006c1092d
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-09-21 08:58:26
christopher.esterhuyse@gmail.com
sequencer 3 primitive and composite unit tests added
3 files changed with 67 insertions and 105 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -39,24 +39,27 @@ trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 
// CuUndecided provides a mostly immutable view into the ConnectorUnphased structure,
 
// making it harder to accidentally mutate its contents in a way that cannot be rolled back.
 
impl CuUndecided for ConnectorUnphased {
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription) {
 
        (&mut *self.inner.logger, &self.proto_description)
 
    }
 
    fn logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.inner.logger
 
    }
 
    fn proto_description(&self) -> &ProtocolDescription {
 
        &self.proto_description
 
    }
 
    fn native_component_id(&self) -> ComponentId {
 
        self.inner.native_component_id
 
    }
 
}
 

	
 
////////////////
 
@@ -208,31 +211,32 @@ impl Connector {
 
            .iter()
 
            .map(|(&proto_id, proto)| (proto_id, proto.clone()))
 
            .collect();
 
        log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            log!(
 
                cu.logger(),
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let (logger, proto_description) = cu.logger_and_protocol_description();
 
            let mut ctx = NonsyncProtoContext {
 
                current_state: &mut current_state,
 
                logger: &mut *cu.inner.logger,
 
                logger,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
            };
 
            let blocker = component.nonsync_run(&mut ctx, &cu.proto_description);
 
            let blocker = component.nonsync_run(&mut ctx, proto_description);
 
            log!(
 
                cu.logger(),
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
@@ -243,25 +247,26 @@ impl Connector {
 
        log!(
 
            cu.logger(),
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            current_state,
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id));
 
                let c = cu.proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid));
 
                let c =
 
                    branching_proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid));
 
                let e = comm
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
@@ -1161,25 +1166,24 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        println!("MOVED PORTS {:#?}", &moved_ports);
 
        // sanity check
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.current_state.port_info.get(port).unwrap().owner
 
            );
 
        }
 
        // 2. create new component
 
        let new_cid = self.current_state.id_manager.new_component_id();
 
        self.unrun_components.push((new_cid, state));
 
        // 3. update ownership of moved ports
 
        for port in moved_ports.iter() {
 
@@ -1239,55 +1243,12 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainInner { swap, output } } = self;
 
        // assert!(swap.is_empty());
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                func(k, v, CyclicDrainInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
 
        }
 
        Ok(())
 
    }
 
}
 

	
 
// struct ConnectorComm {
 
//     logger: Box<dyn Logger>,
 
//     pd: Arc<ProtocolDescription>,
 
//     current_state: CurrentState,
 
//     components: HashMap<ComponentId, ComponentState>,
 
//     ports: HashMap<PortId, PortInfo>,
 
//     endpoint_manager: EndpointManager,
 
//     neighborhood: Neighborhood,
 
//     native_batches: Vec<NativeBatch>,
 
//     round_result: Result<Option<RoundOk>, SyncError>,
 
// }
 

	
 
// struct RoundTemp<'a> {
 
//     deadline: Option<Instant>,
 
//     msg_buf: Vec<(PortId, SendPayloadMsg)>,
 
//     solution_storage: SolutionStorage,
 
//     override_ports: HashMap<PortId, PortInfo>,
 
//     spec_var_stream: SpecVarStream,
 
//     branching_proto: HashMap<ComponentId, BranchingProtoComponent>,
 
//     branching_native: BranchingNative,
 
//     comm: &'a mut ConnectorComm,
 
// }
 
// impl ConnectorComm {
 
//     fn sync(&mut self, deadline: Option<Duration>) -> Result<usize, ()> {
 
//         RoundTemp {
 
//             msg_buf: Default::default(),
 
//             deadline: todo!(),
 
//             solution_storage: todo!(),
 
//             override_ports: Default::default(),
 
//             spec_var_stream: todo!(),
 
//             branching_proto: Default::default(),
 
//             branching_native: todo!(),
 
//             comm: self,
 
//         }
 
//         .sync()
 
//     }
 
// }
 
// impl RoundTemp<'_> {
 
//     fn sync(&mut self) -> Result<usize, ()> {
 
//         todo!()
 
//     }
 
// }
src/runtime/mod.rs
Show inline comments
 
@@ -261,24 +261,25 @@ struct SolutionStorage {
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    current_state: CurrentState,
 
}
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription);
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
src/runtime/tests.rs
Show inline comments
 
@@ -1050,74 +1050,74 @@ fn sequencer3_prim() {
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(None).unwrap()
 
    };
 

	
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        assert_eq!(expected_batch_idx, which_of_three());
 
    }
 
}
 

	
 
// #[test]
 
// fn sequencer3_comp() {
 
//     let test_log_path = Path::new("./logs/sequencer3_comp");
 
//     let pdl = b"
 
//     primitive fifo1_init(msg m, in a, out b) {
 
//         while(true) synchronous {
 
//             if(m != null && fires(b)) {
 
//                 put(b, m);
 
//                 m = null;
 
//             } else if (m == null && fires(a)) {
 
//                 m = get(a);
 
//             }
 
//         }
 
//     }
 
//     composite fifo1_full(in a, out b) {
 
//         new fifo1_init(create(0), a, b);
 
//     }
 
//     composite fifo1(in a, out b) {
 
//         new fifo1_init(null, a, b);
 
//     }
 
//     composite seq3composite(out a, out b, out c) {
 
//         channel d -> e;
 
//         channel f -> g;
 
//         channel h -> i;
 
//         channel j -> k;
 
//         channel l -> m;
 
//         channel n -> o;
 
#[test]
 
fn sequencer3_comp() {
 
    let test_log_path = Path::new("./logs/sequencer3_comp");
 
    let pdl = b"
 
    primitive fifo1_init(msg m, in a, out b) {
 
        while(true) synchronous {
 
            if(m != null && fires(b)) {
 
                put(b, m);
 
                m = null;
 
            } else if (m == null && fires(a)) {
 
                m = get(a);
 
            }
 
        }
 
    }
 
    composite fifo1_full(in a, out b) {
 
        new fifo1_init(create(0), a, b);
 
    }
 
    composite fifo1(in a, out b) {
 
        new fifo1_init(null, a, b);
 
    }
 
    composite seq3composite(out a, out b, out c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 

	
 
//         new fifo1_full(o, d);
 
//         new replicator2(e, f, a);
 
//         new fifo1(g, h);
 
//         new replicator2(i, j, b);
 
//         new fifo1(k, l);
 
//         new replicator2(m, n, c);
 
//     }
 
//     ";
 
//     let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
//     let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
        new fifo1_full(o, d);
 
        new replicator2(e, f, a);
 
        new fifo1(g, h);
 
        new replicator2(i, j, b);
 
        new fifo1(k, l);
 
        new replicator2(m, n, c);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
//     // setup a session between (a) native, and (b) composite sequencer3, connected by 3 ports.
 
//     let [p0, g0] = c.new_port_pair();
 
//     let [p1, g1] = c.new_port_pair();
 
//     let [p2, g2] = c.new_port_pair();
 
//     c.add_component(b"seq3composite", &[p0, p1, p2]).unwrap();
 
//     c.connect(None).unwrap();
 
    // setup a session between (a) native, and (b) composite sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"seq3composite", &[p0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
//     let mut which_of_three = move || {
 
//         // setup three sync batches. sync. return which succeeded
 
//         c.get(g0).unwrap();
 
//         c.next_batch().unwrap();
 
//         c.get(g1).unwrap();
 
//         c.next_batch().unwrap();
 
//         c.get(g2).unwrap();
 
//         c.sync(None).unwrap()
 
//     };
 
    let mut which_of_three = move || {
 
        // setup three sync batches. sync. return which succeeded
 
        c.get(g0).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g1).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(None).unwrap()
 
    };
 

	
 
//     const TEST_ROUNDS: usize = 50;
 
//     // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
//     for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
//         assert_eq!(expected_batch_idx, which_of_three());
 
//     }
 
// }
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        assert_eq!(expected_batch_idx, which_of_three());
 
    }
 
}
0 comments (0 inline, 0 general)