Changeset - 3f6d45f84e76
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-06-23 09:30:20
christopher.esterhuyse@gmail.com
trivial sync reintegrated. debug improvements. more logging
6 files changed with 269 insertions and 53 deletions:
0 comments (0 inline, 0 general)
src/lib.rs
Show inline comments
 
@@ -10,7 +10,7 @@ mod runtime;
 

	
 
pub use common::Polarity;
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{Connector, EndpointSetup, StringLogger};
 
pub use runtime::{error, Connector, EndpointSetup, StringLogger};
 

	
 
// #[cfg(feature = "ffi")]
 
// pub use runtime::ffi;
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 
use core::marker::PhantomData;
 

	
 
////////////////
 
struct BranchingNative {
 
@@ -11,6 +10,7 @@ struct NativeBranch {
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
@@ -85,6 +85,17 @@ impl SyncProtoContext<'_> {
 
}
 

	
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Communication { round_result, .. } => match round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some((_index, gotten))) => gotten.get(&port).ok_or(PortDidntGet),
 
            },
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        if !self.native_ports.contains(&port) {
 
@@ -105,20 +116,42 @@ impl Connector {
 
            }
 
        }
 
    }
 
    pub fn sync(&mut self) -> Result<usize, SyncError> {
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, endpoint_manager, .. } => {
 
            ConnectorPhased::Communication {
 
                round_index,
 
                neighborhood,
 
                native_batches,
 
                endpoint_manager,
 
                round_result,
 
                ..
 
            } => {
 
                let _deadline = Instant::now() + timeout;
 
                let logger: &mut dyn Logger = &mut *self.logger;
 
                // 1. run all proto components to Nonsync blockers
 
                log!(
 
                    logger,
 
                    "~~~ SYNC called with timeout {:?}; starting round {}",
 
                    &timeout,
 
                    round_index
 
                );
 
                let mut branching_proto_components =
 
                    HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
                let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
                    self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
                log!(logger, "Nonsync running {} proto components...", unrun_components.len());
 
                while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
                    // TODO coalesce fields
 
                    log!(
 
                        logger,
 
                        "Nonsync running proto component with ID {:?}. {} to go after this",
 
                        proto_component_id,
 
                        unrun_components.len()
 
                    );
 
                    let mut ctx = NonsyncProtoContext {
 
                        logger: &mut *self.logger,
 
                        logger: &mut *logger,
 
                        port_info: &mut self.port_info,
 
                        id_manager: &mut self.id_manager,
 
                        proto_component_id,
 
@@ -129,8 +162,15 @@ impl Connector {
 
                            .unwrap()
 
                            .ports,
 
                    };
 
                    let blocker = component.state.nonsync_run(&mut ctx, &self.proto_description);
 
                    log!(
 
                        logger,
 
                        "proto component {:?} ran to nonsync blocker {:?}",
 
                        proto_component_id,
 
                        &blocker
 
                    );
 
                    use NonsyncBlocker as B;
 
                    match component.state.nonsync_run(&mut ctx, &self.proto_description) {
 
                    match blocker {
 
                        B::ComponentExit => drop(component),
 
                        B::Inconsistent => {
 
                            return Err(InconsistentProtoComponent(proto_component_id))
 
@@ -143,51 +183,87 @@ impl Connector {
 
                        }
 
                    }
 
                }
 
                log!(
 
                    logger,
 
                    "All {} proto components are now done with Nonsync phase",
 
                    branching_proto_components.len(),
 
                );
 

	
 
                // (Putter, )
 
                // NOTE: all msgs in outbox are of form (Putter, Payload)
 
                let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = (0..endpoint_manager.endpoint_exts.len())
 
                        .map(|index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 
                log!(logger, "Solution storage initialized");
 

	
 
                // 2. kick off the native
 
                log!(
 
                    logger,
 
                    "Translating {} native batches into branches...",
 
                    native_batches.len()
 
                );
 
                let mut branching_native = BranchingNative { branches: Default::default() };
 
                for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate()
 
                {
 
                    let mut predicate = Predicate::default();
 
                    // assign trues
 
                    for &port in to_get.iter().chain(to_put.keys()) {
 
                        predicate.assigned.insert(port, true);
 
                    }
 
                    // assign falses
 
                    for &port in self.native_ports.iter() {
 
                        predicate.assigned.entry(port).or_insert(false);
 
                    }
 
                    let predicate = {
 
                        let mut predicate = Predicate::default();
 
                        // assign trues
 
                        for &port in to_get.iter().chain(to_put.keys()) {
 
                            predicate.assigned.insert(port, true);
 
                        }
 
                        // assign falses
 
                        for &port in self.native_ports.iter() {
 
                            predicate.assigned.entry(port).or_insert(false);
 
                        }
 
                        predicate
 
                    };
 
                    log!(logger, "Native branch {} has pred {:?}", index, &predicate);
 

	
 
                    // put all messages
 
                    for (port, payload) in to_put {
 
                        payload_outbox.push((
 
                            port,
 
                            SendPayloadMsg { payload_predicate: predicate.clone(), payload },
 
                        ));
 
                        let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload };
 
                        log!(logger, "Native branch {} sending msg {:?}", index, &msg);
 
                        payload_outbox.push((port, msg));
 
                    }
 
                    if to_get.is_empty() {
 
                        log!(logger, "Native submitting trivial solution for index {}", index);
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            Route::LocalComponent(LocalComponentId::Native),
 
                            Predicate::default(),
 
                        );
 
                    }
 
                    let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
                    if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                        // TODO
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
                    }
 
                }
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = (0..endpoint_manager.endpoint_exts.len())
 
                        .map(|index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 
                log!(logger, "Done translating native batches into branches");
 
                native_batches.push(Default::default());
 

	
 
                // run all proto components to their sync blocker
 
                log!(
 
                    logger,
 
                    "Running all {} proto components to their sync blocker...",
 
                    branching_proto_components.len()
 
                );
 
                for (proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
                    // run this component to sync blocker in-place
 
                    log!(
 
                        logger,
 
                        "Running proto component with id {:?} to blocker...",
 
                        proto_component_id
 
                    );
 
                    let blocked = &mut proto_component.branches;
 
                    let [unblocked_from, unblocked_to] = [
 
                        &mut HashMap::<Predicate, ProtoComponentBranch>::default(),
 
@@ -198,15 +274,22 @@ impl Connector {
 
                    while !unblocked_from.is_empty() {
 
                        for (mut predicate, mut branch) in unblocked_from.drain() {
 
                            let mut ctx = SyncProtoContext {
 
                                logger: &mut *self.logger,
 
                                logger,
 
                                predicate: &predicate,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
                            let blocker = branch.state.sync_run(&mut ctx, &self.proto_description);
 
                            log!(
 
                                logger,
 
                                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                                proto_component_id,
 
                                &predicate,
 
                                &blocker,
 
                            );
 
                            use SyncBlocker as B;
 
                            match branch.state.sync_run(&mut ctx, &self.proto_description) {
 
                            match blocker {
 
                                B::Inconsistent => {
 
                                    log!(self.logger, "Proto component {:?} branch with pred {:?} became inconsistent", proto_component_id, &predicate);
 
                                    // branch is inconsistent. throw it away
 
                                    drop((predicate, branch));
 
                                }
 
@@ -216,9 +299,8 @@ impl Connector {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                    }
 
                                    // submit solution for this component
 
                                    log!(self.logger, "Proto component {:?} branch with pred {:?} reached SyncBlockEnd", proto_component_id, &predicate);
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *self.logger,
 
                                        logger,
 
                                        Route::LocalComponent(LocalComponentId::Proto(
 
                                            *proto_component_id,
 
                                        )),
 
@@ -249,13 +331,15 @@ impl Connector {
 
                                    assert_eq!(Some(&Putter), self.port_info.polarities.get(&port));
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(port);
 

	
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(self.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var);
 
                                        payload_outbox.push((
 
                                            port,
 
                                            SendPayloadMsg {
 
@@ -271,24 +355,98 @@ impl Connector {
 
                        std::mem::swap(unblocked_from, unblocked_to);
 
                    }
 
                }
 
                // now all components are blocked!
 
                //
 
                log!(logger, "All proto components are blocked");
 

	
 
                log!(logger, "Entering decision loop...");
 
                let decision = 'undecided: loop {
 
                    // check if we already have a solution
 
                    for _solution in solution_storage.iter_new_local_make_old() {
 
                        // todo check if parent, inform children etc. etc.
 
                        break 'undecided Ok(0);
 
                    log!(logger, "Check if we have any local decisions...");
 
                    for solution in solution_storage.iter_new_local_make_old() {
 
                        log!(logger, "New local decision with solution {:?}...", &solution);
 
                        match neighborhood.parent {
 
                            Some(parent) => {
 
                                log!(logger, "Forwarding to my parent {:?}", parent);
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::Elaborate {
 
                                        partial_oracle: solution,
 
                                    },
 
                                });
 
                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                            }
 
                            None => {
 
                                log!(logger, "No parent. Deciding on solution {:?}", &solution);
 
                                break 'undecided Decision::Success(solution);
 
                            }
 
                        }
 
                    }
 

	
 
                    // send / recv messages
 
                    // TODO send / recv messages
 
                };
 
                decision
 
                log!(logger, "Committing to decision {:?}!", &decision);
 

	
 
                // propagate the decision to children
 
                let msg = Msg::CommMsg(CommMsg {
 
                    round_index: *round_index,
 
                    contents: CommMsgContents::Announce { decision: decision.clone() },
 
                });
 
                log!(
 
                    logger,
 
                    "Announcing decision {:?} through child endpoints {:?}",
 
                    &msg,
 
                    &neighborhood.children
 
                );
 
                for &child in neighborhood.children.iter() {
 
                    endpoint_manager.send_to(child, &msg).unwrap();
 
                }
 

	
 
                *round_result = match decision {
 
                    Decision::Failure => Err(DistributedTimeout),
 
                    Decision::Success(predicate) => {
 
                        // commit changes to component states
 
                        self.proto_components.clear();
 
                        self.proto_components.extend(
 
                            branching_proto_components
 
                                .into_iter()
 
                                .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                        );
 
                        Ok(Some(branching_native.collapse_with(&predicate)))
 
                    }
 
                };
 
                log!(logger, "Updated round_result to {:?}", round_result);
 

	
 
                let returning = round_result
 
                    .as_ref()
 
                    .map(|option| option.as_ref().unwrap().0)
 
                    .map_err(|sync_error| sync_error.clone());
 
                log!(logger, "Returning {:?}", &returning);
 
                returning
 
            }
 
        }
 
    }
 
}
 
impl BranchingNative {
 
    fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap<PortId, Payload>) {
 
        for (branch_predicate, branch) in self.branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                return (index, gotten);
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
@@ -393,7 +551,7 @@ impl SolutionStorage {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
                log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
src/runtime/error.rs
Show inline comments
 
@@ -12,12 +12,13 @@ pub enum TryRecyAnyError {
 
    EndpointError { error: EndpointError, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    Timeout,
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
    DistributedTimeout,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
@@ -26,3 +27,9 @@ pub enum PortOpError {
 
    MultipleOpsOnPort,
 
    PortUnavailable,
 
}
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum GottenError {
 
    NoPreviousRound,
 
    PortDidntGet,
 
    PreviousSyncFailed,
 
}
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
mod error;
 
pub mod error;
 
mod setup2;
 

	
 
#[cfg(test)]
 
@@ -144,12 +144,12 @@ pub enum ConnectorPhased {
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
        native_batches: Vec<NativeBatch>,
 
        round_result: Result<Option<usize>, SyncError>,
 
        round_result: Result<Option<(usize, HashMap<PortId, Payload>)>, SyncError>,
 
    },
 
}
 
#[derive(Debug)]
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Default, Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
}
 
@@ -545,3 +545,24 @@ impl Predicate {
 
        self.assigned.get(&x).copied()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct MySet<'a>(&'a Predicate, bool);
 
        impl Debug for MySet<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                let iter = self.0.assigned.iter().filter_map(|(port, &firing)| {
 
                    if firing == self.1 {
 
                        Some(port)
 
                    } else {
 
                        None
 
                    }
 
                });
 
                f.debug_set().entries(iter).finish()
 
            }
 
        }
 
        f.debug_struct("Predicate")
 
            .field("Trues", &MySet(self, true))
 
            .field("Falses", &MySet(self, false))
 
            .finish()
 
    }
 
}
src/runtime/my_tests.rs
Show inline comments
 
@@ -113,3 +113,35 @@ fn dup_put_bad() {
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    c.print_state();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_no_round() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_ungotten() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
src/runtime/setup2.rs
Show inline comments
 
@@ -96,7 +96,7 @@ impl Connector {
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
 
                log!(self.logger, "~~~ CONNECT called with timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
@@ -394,9 +394,7 @@ fn init_neighborhood(
 
        ee.endpoint.send(msg)?;
 
    }
 
    let mut children = Vec::default();
 
    log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages);
 
    em.undelay_all();
 
    log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages);
 
    while !awaiting.is_empty() {
 
        log!(logger, "awaiting {:?}", &awaiting);
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
0 comments (0 inline, 0 general)