Changeset - 2a1875efc62c
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-07-03 13:46:29
christopher.esterhuyse@gmail.com
more refactoring. more rigor in distinction between recoverable / unrecoverable errors
3 files changed with 57 insertions and 39 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -130,11 +130,18 @@ impl Connector {
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased, phased } = self;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.round_result = Self::connected_sync(unphased, comm, timeout);
 
                match &comm.round_result {
 
                    Err(SyncError::Unrecoverable(e)) => {
 
                        log!(cu.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        return Err(SyncError::Unrecoverable(e.clone()));
 
                    }
 
                    _ => {}
 
                }
 
                comm.round_result = Self::connected_sync(cu, comm, timeout);
 
                comm.round_index += 1;
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
@@ -151,8 +158,9 @@ impl Connector {
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        // let deadline = timeout.map(|to| Instant::now() + to);
 
        //////////////////////////////////
 
        log!(
 
            cu.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
@@ -166,6 +174,7 @@ impl Connector {
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
@@ -209,6 +218,7 @@ impl Connector {
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
@@ -217,7 +227,8 @@ impl Connector {
 
                    .keys()
 
                    .map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
                SolutionStorage::new(n.chain(c).chain(e))
 
                let route_iter = n.chain(c).chain(e);
 
                SolutionStorage::new(route_iter)
 
            },
 
            spec_var_stream: cu.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
@@ -225,7 +236,7 @@ impl Connector {
 
        };
 
        log!(cu.logger, "Round context structure initialized");
 

	
 
        // 2. kick off the native
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.logger,
 
            "Translating {} native batches into branches...",
 
@@ -239,8 +250,7 @@ impl Connector {
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate =
 
                    Predicate::default().inserted(native_branch_spec_var, branch_spec_val);
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
@@ -248,7 +258,7 @@ impl Connector {
 
                    let var = cu.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for silent ports
 
                // assign falses for all silent (not firing) ports
 
                for &port in cu.native_ports.difference(&firing_ports) {
 
                    let var = cu.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
@@ -256,17 +266,18 @@ impl Connector {
 
                        continue 'native_branches;
 
                    }
 
                }
 
                predicate
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_branch_spec_var, branch_spec_val)
 
            };
 
            log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate);
 

	
 
            // put all messages
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg)?;
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            if to_get.is_empty() {
 
                // this branch is immediately ready to be part of a solution
 
                log!(
 
                    cu.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
@@ -285,8 +296,9 @@ impl Connector {
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
@@ -344,7 +356,7 @@ impl Connector {
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, SyncError> {
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.logger, "Native starts with no branches! Failure!");
 
@@ -628,7 +640,7 @@ impl Connector {
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        parent: usize,
 
    ) -> Result<(), SyncError> {
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
        let suggestion = Decision::Failure;
 
        let msg = Msg::CommMsg(CommMsg {
 
@@ -747,10 +759,6 @@ impl BranchingNative {
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
// |putter, m| {
 
//     let getter = *cu.port_info.peers.get(&putter).unwrap();
 
//     payloads_to_get.push((getter, m));
 
// },
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
@@ -758,7 +766,7 @@ impl BranchingProtoComponent {
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), SyncError> {
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                untaken_choice: &mut branch.untaken_choice,
 
@@ -833,7 +841,7 @@ impl BranchingProtoComponent {
 
                        // keep in "unblocked"
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg)?;
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
@@ -848,7 +856,7 @@ impl BranchingProtoComponent {
 
        proto_component_id: ProtoComponentId,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
@@ -1045,17 +1053,12 @@ impl GetterBuffer {
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getters_and_sends.push((getter, msg));
 
    }
 
    fn putter_add(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
    fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(&getter) = cu.port_info.peers.get(&putter) {
 
            self.getter_add(getter, msg);
 
            Ok(())
 
        } else {
 
            Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter }))
 
            log!(cu.logger, "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
src/runtime/endpoints.rs
Show inline comments
 
@@ -75,9 +75,14 @@ impl EndpointManager {
 
    pub(super) fn num_endpoints(&self) -> usize {
 
        self.endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(&mut self, index: usize, msg: &Msg) -> Result<(), SyncError> {
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
        index: usize,
 
        msg: &Msg,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        use UnrecoverableSyncError as Use;
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|_| SyncError::BrokenEndpoint(index))
 
        endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index))
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
@@ -89,13 +94,13 @@ impl EndpointManager {
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<Option<(usize, Msg)>, SyncError> {
 
        use {SyncError as Se, TryRecyAnyError as Trae};
 
    ) -> Result<Option<(usize, Msg)>, UnrecoverableSyncError> {
 
        use {TryRecyAnyError as Trae, UnrecoverableSyncError as Use};
 
        match self.try_recv_any(logger, deadline) {
 
            Ok(tup) => Ok(Some(tup)),
 
            Err(Trae::Timeout) => Ok(None),
 
            Err(Trae::PollFailed) => Err(Se::PollFailed),
 
            Err(Trae::EndpointError { error: _, index }) => Err(Se::BrokenEndpoint(index)),
 
            Err(Trae::PollFailed) => Err(Use::PollFailed),
 
            Err(Trae::EndpointError { error: _, index }) => Err(Use::BrokenEndpoint(index)),
 
        }
 
    }
 
    pub(super) fn try_recv_any_setup(
src/runtime/error.rs
Show inline comments
 
@@ -24,13 +24,17 @@ pub enum AddComponentError {
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum UnrecoverableSyncError {
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    RoundFailure,
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
    MalformedStateError(MalformedStateError),
 
    Unrecoverable(UnrecoverableSyncError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
@@ -65,3 +69,9 @@ pub enum NextBatchError {
 
pub enum NewNetPortError {
 
    AlreadyConnected,
 
}
 
/////////////////////
 
impl From<UnrecoverableSyncError> for SyncError {
 
    fn from(e: UnrecoverableSyncError) -> Self {
 
        Self::Unrecoverable(e)
 
    }
 
}
0 comments (0 inline, 0 general)