Changeset - d5080183f27a
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-06-22 18:20:04
christopher.esterhuyse@gmail.com
sync rewrite
2 files changed with 354 insertions and 232 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 
use core::marker::PhantomData;
 

	
 
////////////////
 
struct BranchingNative {
 
@@ -10,6 +11,22 @@ struct NativeBranch {
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as Route -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<Route, usize>,
 
}
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Clone)]
 
struct ProtoComponentBranch {
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 

	
 
////////////////
 
impl NonsyncProtoContext<'_> {
 
@@ -72,32 +89,43 @@ impl Connector {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
            ConnectorPhased::Communication { native_batches, endpoint_manager, .. } => {
 
                // 1. run all proto components to Nonsync blockers
 
                let mut branching_proto_components =
 
                    HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
                let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
                    self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
                while let Some((proto_component_id, mut proto_component)) = unrun_components.pop() {
 
                while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
                    // TODO coalesce fields
 
                    let mut ctx = NonsyncProtoContext {
 
                        logger: &mut *self.logger,
 
                        port_info: &mut self.port_info,
 
                        id_manager: &mut self.id_manager,
 
                        proto_component_id,
 
                        unrun_components: &mut unrun_components,
 
                        proto_component_ports: &mut proto_component.ports,
 
                        proto_component_ports: &mut self
 
                            .proto_components
 
                            .get_mut(&proto_component_id)
 
                            .unwrap()
 
                            .ports,
 
                    };
 
                    match proto_component.state.nonsync_run(&mut ctx, &self.proto_description) {
 
                        NonsyncBlocker::ComponentExit => drop(proto_component),
 
                        NonsyncBlocker::Inconsistent => {
 
                    use NonsyncBlocker as B;
 
                    match component.state.nonsync_run(&mut ctx, &self.proto_description) {
 
                        B::ComponentExit => drop(component),
 
                        B::Inconsistent => {
 
                            return Err(InconsistentProtoComponent(proto_component_id))
 
                        }
 
                        NonsyncBlocker::SyncBlockStart => {
 
                            self.proto_components.insert(proto_component_id, proto_component);
 
                        B::SyncBlockStart => {
 
                            branching_proto_components.insert(
 
                                proto_component_id,
 
                                BranchingProtoComponent::initial(component),
 
                            );
 
                        }
 
                    }
 
                }
 

	
 
                // all ports are GETTER
 
                let mut mem_inbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 
                // (Putter, )
 
                let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // 2. kick off the native
 
                let mut branching_native = BranchingNative { branches: Default::default() };
 
@@ -114,9 +142,8 @@ impl Connector {
 
                    }
 
                    // put all messages
 
                    for (port, payload) in to_put {
 
                        let getter = *self.port_info.peers.get(&port).unwrap();
 
                        mem_inbox.push((
 
                            getter,
 
                        payload_outbox.push((
 
                            port,
 
                            SendPayloadMsg { payload_predicate: predicate.clone(), payload },
 
                        ));
 
                    }
 
@@ -125,14 +152,299 @@ impl Connector {
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
                    }
 
                }
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = (0..endpoint_manager.endpoint_exts.len())
 
                        .map(|index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 

	
 
                // run all proto components to their sync blocker
 
                for (proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
                    let blocked = &mut proto_component.branches;
 
                    let [unblocked_from, unblocked_to] = [
 
                        &mut HashMap::<Predicate, ProtoComponentBranch>::default(),
 
                        &mut Default::default(),
 
                    ];
 
                    std::mem::swap(unblocked_from, blocked);
 
                    while !unblocked_from.is_empty() {
 
                        for (mut predicate, mut branch) in unblocked_from.drain() {
 
                            let mut ctx = SyncProtoContext {
 
                                logger: &mut *self.logger,
 
                                predicate: &predicate,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
                            use SyncBlocker as B;
 
                            match branch.state.sync_run(&mut ctx, &self.proto_description) {
 
                                B::Inconsistent => {
 
                                    log!(self.logger, "Proto component {:?} branch with pred {:?} became inconsistent", proto_component_id, &predicate);
 
                                    // discard forever
 
                                    drop((predicate, branch));
 
                                }
 
                                B::SyncBlockEnd => {
 
                                    // todo falsify
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *self.logger,
 
                                        Route::LocalComponent(LocalComponentId::Proto(
 
                                            *proto_component_id,
 
                                        )),
 
                                        predicate.clone(),
 
                                    );
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                    }
 
                                    // move to "blocked"
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntReadMsg(port) => {
 
                                    // move to "blocked"
 
                                    assert!(predicate.query(port).is_none());
 
                                    assert!(!branch.inbox.contains_key(&port));
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntCheckFiring(port) => {
 
                                    // sanity check
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    // keep forks in "unblocked"
 
                                    unblocked_to.insert(
 
                                        predicate.clone().inserted(var, false),
 
                                        branch.clone(),
 
                                    );
 
                                    unblocked_to.insert(predicate.inserted(var, true), branch);
 
                                }
 
                                B::PutMsg(port, payload) => {
 
                                    // sanity check
 
                                    assert_eq!(Some(&Putter), self.port_info.polarities.get(&port));
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(self.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        payload_outbox.push((
 
                                            port,
 
                                            SendPayloadMsg {
 
                                                payload_predicate: predicate.clone(),
 
                                                payload,
 
                                            },
 
                                        ));
 
                                        unblocked_to.insert(predicate, branch);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                        std::mem::swap(unblocked_from, unblocked_to);
 
                    }
 
                }
 
                todo!()
 
            }
 
        }
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = Route>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 

	
 
    pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
        self.new_local.iter()
 
    }
 

	
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        subtree_id: Route,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut dyn Logger,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 

	
 
// impl ControllerEphemeral {
 
//     fn is_clear(&self) -> bool {
 
//         self.solution_storage.is_clear()
 
//             && self.poly_n.is_none()
 
//             && self.poly_ps.is_empty()
 
//             && self.mono_ps.is_empty()
 
//             && self.port_to_holder.is_empty()
 
//     }
 
//     fn clear(&mut self) {
 
//         self.solution_storage.clear();
 
//         self.poly_n.take();
 
//         self.poly_ps.clear();
 
//         self.port_to_holder.clear();
 
//     }
 
// }
 
// impl Into<PolyP> for MonoP {
 
//     fn into(self) -> PolyP {
 
//         PolyP {
 
//             complete: Default::default(),
 
//             incomplete: hashmap! {
 
//                 Predicate::new_trivial() =>
 
//                 BranchP {
 
//                     state: self.state,
 
//                     inbox: Default::default(),
 
//                     outbox: Default::default(),
 
//                     blocking_on: None,
 
//                 }
 
//             },
 
//             ports: self.ports,
 
//         }
 
//     }
 
// }
 

	
 
// impl From<EndpointError> for SyncError {
 
//     fn from(e: EndpointError) -> SyncError {
 
//         SyncError::EndpointError(e)
 
//     }
 
// }
 

	
 
// impl ProtoSyncContext<'_> {
 
//     fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
//         todo!()
 
//     }
 
//     fn new_channel(&mut self) -> [PortId; 2] {
 
//         todo!()
 
//     }
 
// }
 

	
 
// impl PolyContext for BranchPContext<'_, '_> {
 
//     type D = ProtocolD;
 

	
 
//     fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
//         assert!(self.ports.contains(&port));
 
//         let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let val = self.predicate.query(channel_id);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
//     fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
//         assert!(self.ports.contains(&port));
 
//         let val = self.inbox.get(&port);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
// }
 

	
 
//////////////
 

	
 
// impl Connector {
 
//     fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> {
 
// fn end_round_with_decision(&mut self, decision: Decision) -> Result<usize, SyncError> {
 
//     log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
//     let ret = match &decision {
 
//         Decision::Success(predicate) => {
 
@@ -165,8 +477,7 @@ impl Connector {
 
//             &announcement,
 
//             child_port
 
//         );
 
//             self
 
//                 .endpoint_exts
 
//         self.endpoint_exts
 
//             .get_mut(child_port)
 
//             .expect("eefef")
 
//             .endpoint
 
@@ -184,8 +495,7 @@ impl Connector {
 
//         let parent_endpoint =
 
//             &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
//         for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
//                 let msg =
 
//                     CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index);
 
//             let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index);
 
//             log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
//             parent_endpoint.send(msg)?;
 
//         }
 
@@ -258,7 +568,7 @@ impl Connector {
 
//         if branch.to_get.is_empty() {
 
//             self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
//                 &mut self.logger,
 
//                     SubtreeId::PolyN,
 
//                 Route::PolyN,
 
//                 predicate.clone(),
 
//             );
 
//         }
 
@@ -293,11 +603,7 @@ impl Connector {
 
//     mut deadline: Option<Instant>,
 
//     sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
// ) -> Result<(), SyncError> {
 
//         log!(
 
//             &mut self.logger,
 
//             "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
//             self.round_index
 
//         );
 
//     log!(&mut self.logger, "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", self.round_index);
 
//     assert!(self.ephemeral.is_clear());
 
//     assert!(self.unrecoverable_error.is_none());
 

	
 
@@ -351,14 +657,9 @@ impl Connector {
 
//     // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
//     //    of the controller in the overlay tree.
 
//     self.ephemeral.solution_storage.reset({
 
//             let n = std::iter::once(SubtreeId::PolyN);
 
//             let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
//             let c = self
 
//
 
//                 .family
 
//                 .children_ports
 
//                 .iter()
 
//                 .map(|&port| SubtreeId::ChildController { port });
 
//         let n = std::iter::once(Route::PolyN);
 
//         let m = (0..self.ephemeral.poly_ps.len()).map(|index| Route::PolyP { index });
 
//         let c = self.family.children_ports.iter().map(|&port| Route::ChildController { port });
 
//         let subtree_id_iter = n.chain(m).chain(c);
 
//         log!(
 
//             &mut self.logger,
 
@@ -391,7 +692,7 @@ impl Connector {
 
//     //    TODO distinguish between completed and not completed poly_p's?
 
//     log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
//     for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
//             let my_subtree_id = SubtreeId::PolyP { index };
 
//         let my_subtree_id = Route::PolyP { index };
 
//         let m_ctx = PolyPContext {
 
//             my_subtree_id,
 
//             inner: &mut self,
 
@@ -456,8 +757,7 @@ impl Connector {
 
//                                 &announcement,
 
//                                 parent_port
 
//                             );
 
//                                 self
 
//                                     .endpoint_exts
 
//                             self.endpoint_exts
 
//                                 .get_mut(parent_port)
 
//                                 .expect("ss")
 
//                                 .endpoint
 
@@ -475,17 +775,13 @@ impl Connector {
 
//                 println!("WASNT EXPECTING {:?}", s);
 
//                 return Err(SyncError::UnexpectedSetupMsg);
 
//             }
 
//                 Msg::CommMsg(CommMsg { round_index, .. })
 
//                     if round_index < self.round_index =>
 
//                 {
 
//             Msg::CommMsg(CommMsg { round_index, .. }) if round_index < self.round_index => {
 
//                 // Old message! Can safely discard
 
//                 log!(&mut self.logger, "...and its OLD! :(");
 
//                 drop(received);
 
//                 continue 'recv_loop;
 
//             }
 
//                 Msg::CommMsg(CommMsg { round_index, .. })
 
//                     if round_index > self.round_index =>
 
//                 {
 
//             Msg::CommMsg(CommMsg { round_index, .. }) if round_index > self.round_index => {
 
//                 // Message from a next round. Keep for later!
 
//                 log!(&mut self.logger, "... DELAY! :(");
 
//                 self.delay(received);
 
@@ -514,8 +810,7 @@ impl Connector {
 
//                         &announcement,
 
//                         parent_port
 
//                     );
 
//                         self
 
//                             .endpoint_exts
 
//                     self.endpoint_exts
 
//                         .get_mut(parent_port)
 
//                         .expect("ss")
 
//                         .endpoint
 
@@ -528,7 +823,7 @@ impl Connector {
 
//                 if !self.family.children_ports.contains(&received.recipient) {
 
//                     return Err(SyncError::ElaborateFromNonChild);
 
//                 }
 
//                     let subtree_id = SubtreeId::ChildController { port: received.recipient };
 
//                 let subtree_id = Route::ChildController { port: received.recipient };
 
//                 log!(
 
//                     &mut self.logger,
 
//                     "Received elaboration from child for subtree {:?}: {:?}",
 
@@ -573,13 +868,8 @@ impl Connector {
 
//                     &payload_predicate,
 
//                     &payload
 
//                 );
 
//                     let channel_id = self
 
//
 
//                         .endpoint_exts
 
//                         .get(received.recipient)
 
//                         .expect("UEHFU")
 
//                         .info
 
//                         .channel_id;
 
//                 let channel_id =
 
//                     self.endpoint_exts.get(received.recipient).expect("UEHFU").info.channel_id;
 
//                 if payload_predicate.query(channel_id) != Some(true) {
 
//                     // sender didn't preserve the invariant
 
//                     return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id));
 
@@ -608,7 +898,7 @@ impl Connector {
 
//                         let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
//                         let m_ctx = PolyPContext {
 
//                                 my_subtree_id: SubtreeId::PolyP { index: *index },
 
//                             my_subtree_id: Route::PolyP { index: *index },
 
//                             inner: &mut self,
 
//                             solution_storage: &mut self.ephemeral.solution_storage,
 
//                         };
 
@@ -654,169 +944,3 @@ impl Connector {
 
//     }
 
// }
 
// }
 
// impl ControllerEphemeral {
 
//     fn is_clear(&self) -> bool {
 
//         self.solution_storage.is_clear()
 
//             && self.poly_n.is_none()
 
//             && self.poly_ps.is_empty()
 
//             && self.mono_ps.is_empty()
 
//             && self.port_to_holder.is_empty()
 
//     }
 
//     fn clear(&mut self) {
 
//         self.solution_storage.clear();
 
//         self.poly_n.take();
 
//         self.poly_ps.clear();
 
//         self.port_to_holder.clear();
 
//     }
 
// }
 
// impl Into<PolyP> for MonoP {
 
//     fn into(self) -> PolyP {
 
//         PolyP {
 
//             complete: Default::default(),
 
//             incomplete: hashmap! {
 
//                 Predicate::new_trivial() =>
 
//                 BranchP {
 
//                     state: self.state,
 
//                     inbox: Default::default(),
 
//                     outbox: Default::default(),
 
//                     blocking_on: None,
 
//                 }
 
//             },
 
//             ports: self.ports,
 
//         }
 
//     }
 
// }
 

	
 
// impl From<EndpointError> for SyncError {
 
//     fn from(e: EndpointError) -> SyncError {
 
//         SyncError::EndpointError(e)
 
//     }
 
// }
 

	
 
// impl ProtoSyncContext<'_> {
 
//     fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
//         todo!()
 
//     }
 
//     fn new_channel(&mut self) -> [PortId; 2] {
 
//         todo!()
 
//     }
 
// }
 

	
 
// impl SolutionStorage {
 
//     fn is_clear(&self) -> bool {
 
//         self.subtree_id_to_index.is_empty()
 
//             && self.subtree_solutions.is_empty()
 
//             && self.old_local.is_empty()
 
//             && self.new_local.is_empty()
 
//     }
 
//     fn clear(&mut self) {
 
//         self.subtree_id_to_index.clear();
 
//         self.subtree_solutions.clear();
 
//         self.old_local.clear();
 
//         self.new_local.clear();
 
//     }
 
//     pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
//         self.subtree_id_to_index.clear();
 
//         self.subtree_solutions.clear();
 
//         self.old_local.clear();
 
//         self.new_local.clear();
 
//         for key in subtree_ids {
 
//             self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
//             self.subtree_solutions.push(Default::default())
 
//         }
 
//     }
 

	
 
//     pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
//         self.new_local.iter()
 
//     }
 

	
 
//     pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
//         let Self { old_local, new_local, .. } = self;
 
//         new_local.drain().map(move |local| {
 
//             old_local.insert(local.clone());
 
//             local
 
//         })
 
//     }
 

	
 
//     pub(crate) fn submit_and_digest_subtree_solution(
 
//         &mut self,
 
//         logger: &mut String,
 
//         subtree_id: SubtreeId,
 
//         predicate: Predicate,
 
//     ) {
 
//         log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
//         let index = self.subtree_id_to_index[&subtree_id];
 
//         let left = 0..index;
 
//         let right = (index + 1)..self.subtree_solutions.len();
 

	
 
//         let Self { subtree_solutions, new_local, old_local, .. } = self;
 
//         let was_new = subtree_solutions[index].insert(predicate.clone());
 
//         if was_new {
 
//             let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
//             Self::elaborate_into_new_local_rec(
 
//                 logger,
 
//                 predicate,
 
//                 set_visitor,
 
//                 old_local,
 
//                 new_local,
 
//             );
 
//         }
 
//     }
 

	
 
//     fn elaborate_into_new_local_rec<'a, 'b>(
 
//         logger: &mut String,
 
//         partial: Predicate,
 
//         mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
//         old_local: &'b HashSet<Predicate>,
 
//         new_local: &'a mut HashSet<Predicate>,
 
//     ) {
 
//         if let Some(set) = set_visitor.next() {
 
//             // incomplete solution. keep traversing
 
//             for pred in set.iter() {
 
//                 if let Some(elaborated) = pred.union_with(&partial) {
 
//                     Self::elaborate_into_new_local_rec(
 
//                         logger,
 
//                         elaborated,
 
//                         set_visitor.clone(),
 
//                         old_local,
 
//                         new_local,
 
//                     )
 
//                 }
 
//             }
 
//         } else {
 
//             // recursive stop condition. `partial` is a local subtree solution
 
//             if !old_local.contains(&partial) {
 
//                 // ... and it hasn't been found before
 
//                 log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
//                 new_local.insert(partial);
 
//             }
 
//         }
 
//     }
 
// }
 
// impl PolyContext for BranchPContext<'_, '_> {
 
//     type D = ProtocolD;
 

	
 
//     fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
//         assert!(self.ports.contains(&port));
 
//         let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let val = self.predicate.query(channel_id);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
//     fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
//         assert!(self.ports.contains(&port));
 
//         let val = self.inbox.get(&port);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
// }
src/runtime/mod.rs
Show inline comments
 
@@ -8,12 +8,12 @@ mod my_tests;
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(Clone, Copy, Debug)]
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy)]
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum Route {
 
    LocalComponent(LocalComponentId),
 
    Endpoint { index: usize },
 
@@ -172,9 +172,10 @@ pub struct NonsyncProtoContext<'a> {
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    proto_component_id: ProtoComponentId,
 
    inbox: HashMap<PortId, Payload>,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 

	
 
// pub struct MonoPContext<'a> {
 
@@ -201,14 +202,6 @@ pub struct SyncProtoContext<'a> {
 
//     inbox: &'r HashMap<PortId, Payload>,
 
// }
 

	
 
// #[derive(Default)]
 
// pub struct SolutionStorage {
 
//     old_local: HashSet<Predicate>,
 
//     new_local: HashSet<Predicate>,
 
//     // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
//     subtree_solutions: Vec<HashSet<Predicate>>,
 
//     subtree_id_to_index: HashMap<SubtreeId, usize>,
 
// }
 
// #[derive(Debug)]
 
// pub enum SyncRunResult {
 
//     BlockingForRecv,
 
@@ -432,6 +425,11 @@ impl Connector {
 
// }
 

	
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: PortId, v: bool) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
@@ -530,9 +528,9 @@ impl Predicate {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option<bool> {
 
        self.assigned.insert(channel_id, value)
 
    }
 
    // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option<bool> {
 
    //     self.assigned.insert(channel_id, value)
 
    // }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
0 comments (0 inline, 0 general)