Changeset - 8ab15200d9a4
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-06-24 18:52:35
christopher.esterhuyse@gmail.com
misc refactoring: (1) more thorough error handling, (2) more modular functions
4 files changed with 103 insertions and 59 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -35,13 +35,18 @@ struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait PayloadMsgSender {
 
    fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg);
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError>;
 
}
 

	
 
////////////////
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
@@ -54,64 +59,65 @@ impl Connector {
 
                    Ok(None) => Err(NoPreviousRound),
 
                    Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
                }
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Putter != *unphased.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if batch.to_put.contains_key(&port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                batch.to_put.insert(port, payload);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                native_batches.push(Default::default());
 
                Ok(native_batches.len() - 1)
 
            }
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError::*;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Getter != *unphased.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(WrongPolarity),
 
            None => return Err(UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if !batch.to_get.insert(port) {
 
                    return Err(MultipleOpsOnPort);
 
                let batch = native_batches.last_mut().unwrap(); // length >= invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
@@ -161,13 +167,13 @@ impl Connector {
 
                id_manager: &mut cu.id_manager,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap()
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
@@ -228,15 +234,13 @@ impl Connector {
 
            log!(cu.logger, "Native branch {} has pred {:?}", index, &predicate);
 

	
 
            // put all messages
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                // rely on invariant: sync batches respect port polarity
 
                let getter = *cu.port_info.peers.get(&putter).unwrap();
 
                payloads_to_get.push((getter, msg));
 
                payloads_to_get.putter_send(cu, putter, msg)?;
 
            }
 
            if to_get.is_empty() {
 
                log!(
 
                    cu.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
@@ -273,53 +277,69 @@ impl Connector {
 
                cd,
 
                cu,
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            );
 
            )?;
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        let decision = 'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                match cu.port_info.routes.get(&getter).unwrap() {
 
                    Route::Endpoint { index } => {
 
                match cu.port_info.routes.get(&getter) {
 
                    None => {
 
                        log!(
 
                            cu.logger,
 
                            "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!",
 
                            getter,
 
                            &send_payload_msg
 
                        );
 
                    }
 
                    Some(Route::Endpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to(*index, &msg).unwrap();
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Route::LocalComponent(ComponentId::Native) => branching_native.feed_msg(
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        send_payload_msg,
 
                        &send_payload_msg,
 
                    ),
 
                    Route::LocalComponent(ComponentId::Proto(proto_component_id)) => {
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                send_payload_msg,
 
                            )
 
                                &send_payload_msg,
 
                            )?;
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
 
                                &send_payload_msg,
 
                                proto_component_id
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // check if we have a solution yet
 
@@ -331,13 +351,13 @@ impl Connector {
 
                        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::Suggest { suggestion },
 
                        });
 
                        comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
                    None => {
 
                        log!(cu.logger, "No parent. Deciding on solution {:?}", &solution);
 
                        break 'undecided Decision::Success(solution);
 
                    }
 
                }
 
@@ -360,13 +380,13 @@ impl Connector {
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: comm.round_index,
 
                                    contents: CommMsgContents::Suggest {
 
                                        suggestion: Decision::Failure,
 
                                    },
 
                                });
 
                                comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                                comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                            } else {
 
                                log!(cu.logger, "As the leader, deciding on timeout");
 
                                break 'undecided Decision::Failure;
 
                            }
 
                            deadline = None;
 
                        }
 
@@ -445,13 +465,13 @@ impl Connector {
 
                                            log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                            // I've got a parent. Forward the failure suggestion.
 
                                            let msg = Msg::CommMsg(CommMsg {
 
                                                round_index: comm.round_index,
 
                                                contents: CommMsgContents::Suggest { suggestion },
 
                                            });
 
                                            comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                                            comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                                        }
 
                                    }
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
@@ -490,15 +510,14 @@ impl Connector {
 
            cu.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to(child, &msg).unwrap();
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 

	
 
        match decision {
 
            Decision::Failure => Err(Se::RoundFailure),
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
@@ -514,13 +533,13 @@ impl Connector {
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
@@ -617,13 +636,13 @@ impl BranchingProtoComponent {
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) {
 
    ) -> Result<(), SyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
                inbox: &branch.inbox,
 
@@ -681,28 +700,29 @@ impl BranchingProtoComponent {
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                            payload_msg_sender.send(&cu.port_info, &putter, msg);
 
                        payload_msg_sender.putter_send(cu, putter, msg)?;
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
            }
 
        });
 
            Ok(())
 
        })
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        proto_component_id: ProtoComponentId,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        send_payload_msg: &SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
@@ -755,16 +775,17 @@ impl BranchingProtoComponent {
 
            cd,
 
            cu,
 
            solution_storage,
 
            payload_msg_sender,
 
            proto_component_id,
 
            ports,
 
        );
 
        )?;
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
@@ -877,15 +898,24 @@ impl SolutionStorage {
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> {
 
    fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg) {
 
        let getter = *port_info.peers.get(putter).unwrap();
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
        if let Some(&getter) = cu.port_info.peers.get(&putter) {
 
            self.push((getter, msg));
 
            Ok(())
 
        } else {
 
            Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter }))
 
        }
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.port_info.firing_var_for(port);
 
        self.predicate.query(var)
 
@@ -957,17 +987,21 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainInner { swap, output } }
 
    }
 
    fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) {
 
    fn cylic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainInner { swap, output } } = self;
 
        // assert!(swap.is_empty());
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                func(k, v, CyclicDrainInner { swap, output })
 
                func(k, v, CyclicDrainInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
 
        }
 
        Ok(())
 
    }
 
}
src/runtime/endpoints.rs
Show inline comments
 
@@ -55,12 +55,16 @@ impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_endpoints()
 
    }
 
    pub(super) fn num_endpoints(&self) -> usize {
 
        self.endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(&mut self, index: usize, msg: &Msg) -> Result<(), SyncError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|_| SyncError::BrokenEndpoint(index))
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|err| {
 
            ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
src/runtime/error.rs
Show inline comments
 
@@ -18,21 +18,28 @@ pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
    RoundFailure,
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
    PortCannotPut(PortId),
 
    GetterUnknownFor { putter: PortId },
 
}
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
    UnknownPolarity,
 
    NotConnected,
 
    MultipleOpsOnPort,
 
    PortUnavailable,
 
}
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum GottenError {
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 
use std::io::ErrorKind::WouldBlock;
 

	
 
impl Connector {
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
@@ -244,13 +243,13 @@ fn new_endpoint_manager(
 
                                Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
 
                            let wcs = Arc::new(AtomicBool::from(true));
 
                            let wcs2 = wcs.clone();
 
                            std::thread::spawn(move || {
 
                                while wcs2.load(std::sync::atomic::Ordering::SeqCst) {
 
                                    std::thread::sleep(WAKER_PERIOD);
 
                                    waker.wake().expect("unable to wake");
 
                                    let _ = waker.wake();
 
                                }
 
                            });
 
                            waker_continue_signal = Some(wcs);
 
                        }
 
                        continue;
 
                    }
0 comments (0 inline, 0 general)