Changeset - a34c55297ac2
[Not reviewed]
1 3 1
Christopher Esterhuyse - 5 years ago 2020-06-23 12:24:08
christopher.esterhuyse@gmail.com
native -> native messaging working
4 files changed with 58 insertions and 20 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -228,14 +228,13 @@ impl Connector {
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = (0..endpoint_manager.endpoint_exts.len())
 
                        .map(|index| Route::Endpoint { index });
 
                    let e = neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 
                log!(logger, "Solution storage initialized");
 

	
 
                // 2. kick off the native
 
                log!(
 
@@ -268,17 +267,22 @@ impl Connector {
 
                        log!(logger, "Native branch {} sending msg {:?}", index, &msg);
 
                        // rely on invariant: sync batches respect port polarity
 
                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                        payloads_to_get.push((getter, msg));
 
                    }
 
                    if to_get.is_empty() {
 
                        log!(logger, "Native submitting trivial solution for index {}", index);
 
                        log!(
 
                            logger,
 
                            "Native submitting solution for batch {} with {:?}",
 
                            index,
 
                            &predicate
 
                        );
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            Route::LocalComponent(LocalComponentId::Native),
 
                            Predicate::default(),
 
                            predicate.clone(),
 
                        );
 
                    }
 
                    let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
                    if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                        // TODO
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
@@ -611,14 +615,14 @@ impl BranchingNative {
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        log!(logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(port_info.polarities.get(&getter).copied() == Some(Getter));
 
        println!("BEFORE {:#?}", &self.branches);
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            // check if this branch expects to receive it
 
            let var = port_info.firing_var_for(getter);
 
@@ -634,46 +638,65 @@ impl BranchingNative {
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "branch pred covers it! Accept the msg");
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    finished.insert(predicate, branch);
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        logger,
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(new_predicate) => {
 
                    // fork branch, give fork the message and the new predicate
 
                Csr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    finished.insert(predicate, branch);
 
                    feed_branch(&mut branch2, &new_predicate);
 
                    finished.insert(new_predicate, branch2);
 
                    finished.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
        println!("AFTER {:#?}", &self.branches);
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap<PortId, Payload>) {
 
        for (branch_predicate, branch) in self.branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                return (index, gotten);
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
pub mod error;
 
mod setup2;
 

	
 
#[cfg(test)]
 
mod my_tests;
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(
 
    Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
@@ -349,12 +349,20 @@ impl Into<Msg> for SetupMsg {
 
}
 
impl StringLogger {
 
    pub fn new(controller_id: ControllerId) -> Self {
 
        Self(controller_id, String::default())
 
    }
 
}
 
impl Drop for StringLogger {
 
    fn drop(&mut self) {
 
        let stdout = std::io::stdout();
 
        let mut lock = stdout.lock();
 
        writeln!(lock, "--- DROP LOG DUMP ---").unwrap();
 
        self.dump_log(&mut lock);
 
    }
 
}
 
impl Logger for StringLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write {
 
        use std::fmt::Write;
 
        let _ = write!(&mut self.1, "\nCID({}): ", self.0);
 
        self
 
    }
src/runtime/setup2.rs
Show inline comments
 
@@ -42,13 +42,19 @@ impl Connector {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let p = self.id_manager.new_port_id();
 
                self.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                self.port_info.polarities.insert(p, polarity);
 
                self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup);
 
                log!(
 
                    self.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
                    &endpoint_setup
 
                );
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
src/runtime/tests.rs
Show inline comments
 
file renamed from src/runtime/my_tests.rs to src/runtime/tests.rs
 
@@ -195,12 +195,13 @@ fn native_message_pass() {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
0 comments (0 inline, 0 general)