Changeset - 8ab15200d9a4
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-06-24 18:52:35
christopher.esterhuyse@gmail.com
misc refactoring: (1) more thorough error handling, (2) more modular functions
4 files changed with 167 insertions and 123 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -38,7 +38,12 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait PayloadMsgSender {
 
    fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg);
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError>;
 
}
 

	
 
////////////////
 
@@ -57,27 +62,6 @@ impl Connector {
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Putter != *unphased.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if batch.to_put.contains_key(&port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                batch.to_put.insert(port, payload);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
@@ -90,26 +74,48 @@ impl Connector {
 
            }
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError::*;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Getter != *unphased.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(WrongPolarity),
 
            None => return Err(UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if !batch.to_get.insert(port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                Ok(())
 
                let batch = native_batches.last_mut().unwrap(); // length >= invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased, phased } = self;
 
@@ -164,7 +170,7 @@ impl Connector {
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap()
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
@@ -231,9 +237,7 @@ impl Connector {
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                // rely on invariant: sync batches respect port polarity
 
                let getter = *cu.port_info.peers.get(&putter).unwrap();
 
                payloads_to_get.push((getter, msg));
 
                payloads_to_get.putter_send(cu, putter, msg)?;
 
            }
 
            if to_get.is_empty() {
 
                log!(
 
@@ -276,7 +280,7 @@ impl Connector {
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            );
 
            )?;
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
        }
 
@@ -288,22 +292,30 @@ impl Connector {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                match cu.port_info.routes.get(&getter).unwrap() {
 
                    Route::Endpoint { index } => {
 
                match cu.port_info.routes.get(&getter) {
 
                    None => {
 
                        log!(
 
                            cu.logger,
 
                            "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!",
 
                            getter,
 
                            &send_payload_msg
 
                        );
 
                    }
 
                    Some(Route::Endpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to(*index, &msg).unwrap();
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Route::LocalComponent(ComponentId::Native) => branching_native.feed_msg(
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        send_payload_msg,
 
                        &send_payload_msg,
 
                    ),
 
                    Route::LocalComponent(ComponentId::Proto(proto_component_id)) => {
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
@@ -315,8 +327,16 @@ impl Connector {
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                send_payload_msg,
 
                            )
 
                                &send_payload_msg,
 
                            )?;
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
 
                                &send_payload_msg,
 
                                proto_component_id
 
                            );
 
                        }
 
                    }
 
                }
 
@@ -334,7 +354,7 @@ impl Connector {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::Suggest { suggestion },
 
                        });
 
                        comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
                    None => {
 
                        log!(cu.logger, "No parent. Deciding on solution {:?}", &solution);
 
@@ -363,7 +383,7 @@ impl Connector {
 
                                        suggestion: Decision::Failure,
 
                                    },
 
                                });
 
                                comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                                comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                            } else {
 
                                log!(cu.logger, "As the leader, deciding on timeout");
 
                                break 'undecided Decision::Failure;
 
@@ -448,7 +468,7 @@ impl Connector {
 
                                                round_index: comm.round_index,
 
                                                contents: CommMsgContents::Suggest { suggestion },
 
                                            });
 
                                            comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                                            comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                                        }
 
                                    }
 
                                }
 
@@ -493,9 +513,8 @@ impl Connector {
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to(child, &msg).unwrap();
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 

	
 
        match decision {
 
            Decision::Failure => Err(Se::RoundFailure),
 
            Decision::Success(predicate) => {
 
@@ -517,7 +536,7 @@ impl BranchingNative {
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
@@ -620,76 +639,77 @@ impl BranchingProtoComponent {
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) {
 
    ) -> Result<(), SyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                    logger: &mut *cu.logger,
 
                    predicate: &predicate,
 
                    port_info: &cu.port_info,
 
                    inbox: &branch.inbox,
 
                };
 
                let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
                log!(
 
                    cu.logger,
 
                    "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                    proto_component_id,
 
                    &predicate,
 
                    &blocker,
 
                );
 
                use SyncBlocker as B;
 
                match blocker {
 
                    B::Inconsistent => {
 
                        // branch is inconsistent. throw it away
 
                        drop((predicate, branch));
 
                    }
 
                    B::SyncBlockEnd => {
 
                        // make concrete all variables
 
                        for &port in ports.iter() {
 
                            let var = cu.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        // submit solution for this component
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            &mut *cu.logger,
 
                            Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                            predicate.clone(),
 
                        );
 
                        // move to "blocked"
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntReadMsg(port) => {
 
                        // move to "blocked"
 
                        assert!(!branch.inbox.contains_key(&port));
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntCheckFiring(port) => {
 
                        // sanity check
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
                inbox: &branch.inbox,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => {
 
                    // branch is inconsistent. throw it away
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.firing_var_for(port);
 
                        assert!(predicate.query(var).is_none());
 
                        // keep forks in "unblocked"
 
                        drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                        drainer.add_input(predicate.inserted(var, true), branch);
 
                        predicate.assigned.entry(var).or_insert(false);
 
                    }
 
                    B::PutMsg(putter, payload) => {
 
                        // sanity check
 
                        assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                        // overwrite assignment
 
                        let var = cu.port_info.firing_var_for(putter);
 
                        let was = predicate.assigned.insert(var, true);
 
                        if was == Some(false) {
 
                            log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                            // discard forever
 
                            drop((predicate, branch));
 
                        } else {
 
                            // keep in "unblocked"
 
                            log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                            let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                            payload_msg_sender.send(&cu.port_info, &putter, msg);
 
                            drainer.add_input(predicate, branch);
 
                        }
 
                    // submit solution for this component
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        predicate.clone(),
 
                    );
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.firing_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, true), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.firing_var_for(putter);
 
                    let was = predicate.assigned.insert(var, true);
 
                    if was == Some(false) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        payload_msg_sender.putter_send(cu, putter, msg)?;
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
        });
 
            }
 
            Ok(())
 
        })
 
    }
 
    fn feed_msg(
 
        &mut self,
 
@@ -698,8 +718,8 @@ impl BranchingProtoComponent {
 
        proto_component_id: ProtoComponentId,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        send_payload_msg: &SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
@@ -758,10 +778,11 @@ impl BranchingProtoComponent {
 
            payload_msg_sender,
 
            proto_component_id,
 
            ports,
 
        );
 
        )?;
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
@@ -880,9 +901,18 @@ impl SolutionStorage {
 
    }
 
}
 
impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> {
 
    fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg) {
 
        let getter = *port_info.peers.get(putter).unwrap();
 
        self.push((getter, msg));
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
        if let Some(&getter) = cu.port_info.peers.get(&putter) {
 
            self.push((getter, msg));
 
            Ok(())
 
        } else {
 
            Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter }))
 
        }
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
@@ -960,14 +990,18 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainInner { swap, output } }
 
    }
 
    fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) {
 
    fn cylic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainInner { swap, output } } = self;
 
        // assert!(swap.is_empty());
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                func(k, v, CyclicDrainInner { swap, output })
 
                func(k, v, CyclicDrainInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
 
        }
 
        Ok(())
 
    }
 
}
src/runtime/endpoints.rs
Show inline comments
 
@@ -58,6 +58,10 @@ impl EndpointManager {
 
    pub(super) fn num_endpoints(&self) -> usize {
 
        self.endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(&mut self, index: usize, msg: &Msg) -> Result<(), SyncError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|_| SyncError::BrokenEndpoint(index))
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|err| {
src/runtime/error.rs
Show inline comments
 
@@ -21,6 +21,12 @@ pub enum SyncError {
 
    RoundFailure,
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
    PortCannotPut(PortId),
 
    GetterUnknownFor { putter: PortId },
 
}
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
@@ -30,6 +36,7 @@ pub enum EndpointError {
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
    UnknownPolarity,
 
    NotConnected,
 
    MultipleOpsOnPort,
 
    PortUnavailable,
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 
use std::io::ErrorKind::WouldBlock;
 

	
 
impl Connector {
 
    pub fn new(
 
@@ -247,7 +246,7 @@ fn new_endpoint_manager(
 
                            std::thread::spawn(move || {
 
                                while wcs2.load(std::sync::atomic::Ordering::SeqCst) {
 
                                    std::thread::sleep(WAKER_PERIOD);
 
                                    waker.wake().expect("unable to wake");
 
                                    let _ = waker.wake();
 
                                }
 
                            });
 
                            waker_continue_signal = Some(wcs);
0 comments (0 inline, 0 general)