Changeset - 0c617cc162dd
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-09-18 17:35:22
christopher.esterhuyse@gmail.com
conitnued continuing refactor (safe state) nonsyncprotocontext now correctly uses the rollback-safe current_state structure for modifying port information
2 files changed with 46 insertions and 47 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -45,6 +45,8 @@ impl ReplaceBoolTrue for bool {
 
        !was
 
    }
 
}
 
// CuUndecided provides a mostly immutable view into the ConnectorUnphased structure,
 
// making it harder to accidentally mutate its contents in a way that cannot be rolled back.
 
impl CuUndecided for ConnectorUnphased {
 
    fn logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.inner.logger
 
@@ -161,7 +163,7 @@ impl Connector {
 
            ConnectorPhased::Communication(comm) => {
 
                match &comm.round_result {
 
                    Err(SyncError::Unrecoverable(e)) => {
 
                        log!(cu.inner.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        log!(cu.logger(), "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        return Err(SyncError::Unrecoverable(e.clone()));
 
                    }
 
                    _ => {}
 
@@ -187,18 +189,18 @@ impl Connector {
 
        use SyncError as Se;
 
        //////////////////////////////////
 

	
 
        log!(@MARK, cu.inner.logger, "sync start {}", comm.round_index);
 
        log!(@MARK, cu.logger(), "sync start {}", comm.round_index);
 
        log!(
 
            cu.inner.logger,
 
            cu.logger(),
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // iterate
 
        let current_state = cu.inner.current_state.clone();
 
        let mut current_state = cu.inner.current_state.clone();
 
        let mut branching_proto_components =
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu
 
@@ -206,23 +208,24 @@ impl Connector {
 
            .iter()
 
            .map(|(&proto_id, proto)| (proto_id, proto.clone()))
 
            .collect();
 
        log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            log!(
 
                cu.inner.logger,
 
                cu.logger(),
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                current_state: &mut current_state,
 
                logger: &mut *cu.inner.logger,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
            };
 
            let blocker = component.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                cu.logger(),
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
@@ -238,11 +241,11 @@ impl Connector {
 
            }
 
        }
 
        log!(
 
            cu.inner.logger,
 
            cu.logger(),
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
@@ -267,17 +270,17 @@ impl Connector {
 
            payload_inbox: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 
        log!(@BENCH, cu.inner.logger, "");
 
        log!(cu.logger(), "Round context structure initialized");
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            cu.logger(),
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_spec_var);
 
        log!(cu.logger(), "Native branch spec var is {:?}", native_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
@@ -289,7 +292,7 @@ impl Connector {
 
                let firing_iter = to_get.iter().chain(to_put.keys()).copied();
 

	
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger(),
 
                    "New native with firing ports {:?}",
 
                    firing_iter.clone().collect::<Vec<_>>()
 
                );
 
@@ -310,24 +313,19 @@ impl Connector {
 
                    }
 
                    let var = cu.inner.current_state.spec_var_for(*port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_spec_var, branch_spec_val)
 
            };
 
            log!(
 
                cu.inner.logger,
 
                "Native branch index={:?} has consistent {:?}",
 
                index,
 
                &predicate
 
            );
 
            log!(cu.logger(), "Native branch index={:?} has consistent {:?}", index, &predicate);
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger(),
 
                    "Native branch {} sending msg {:?} with putter {:?}",
 
                    index,
 
                    &msg,
 
@@ -340,7 +338,7 @@ impl Connector {
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger(),
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
@@ -359,8 +357,8 @@ impl Connector {
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        log!(cu.inner.logger, "Searching for decision...");
 
        log!(@BENCH, cu.inner.logger, "");
 
        log!(cu.logger(), "Searching for decision...");
 
        log!(@BENCH, cu.logger(), "");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
@@ -368,10 +366,10 @@ impl Connector {
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(@MARK, cu.inner.logger, "got decision!");
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(@BENCH, cu.inner.logger, "");
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 
        log!(@MARK, cu.logger(), "got decision!");
 
        log!(cu.logger(), "Committing to decision {:?}!", &decision);
 
        log!(@BENCH, cu.logger(), "");
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger(), &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
@@ -381,12 +379,12 @@ impl Connector {
 
            }),
 
        });
 
        log!(
 
            cu.inner.logger,
 
            cu.logger(),
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        log!(@MARK, cu.inner.logger, "forwarding decision!");
 
        log!(@MARK, cu.logger(), "forwarding decision!");
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
@@ -412,11 +410,11 @@ impl Connector {
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
                Ok(Some(branching_native.collapse_with(&mut *cu.logger(), &predicate)))
 
            }
 
        };
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(@BENCH, cu.inner.logger, "");
 
        log!(cu.logger(), "Sync round ending! Cleaning up");
 
        log!(@BENCH, cu.logger(), "");
 
        ret
 
    }
 

	
 
@@ -1166,7 +1164,7 @@ impl NonsyncProtoContext<'_> {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.cu_inner.logger,
 
            self.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
@@ -1177,23 +1175,23 @@ impl NonsyncProtoContext<'_> {
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.cu_inner.current_state.port_info.get(port).unwrap().owner
 
                self.current_state.port_info.get(port).unwrap().owner
 
            );
 
        }
 
        // 2. create new component
 
        let new_cid = self.cu_inner.current_state.id_manager.new_component_id();
 
        let new_cid = self.current_state.id_manager.new_component_id();
 
        self.unrun_components.push((new_cid, state));
 
        // 3. update ownership of moved ports
 
        for port in moved_ports.iter() {
 
            self.cu_inner.current_state.port_info.get_mut(port).unwrap().owner = new_cid;
 
            self.current_state.port_info.get_mut(port).unwrap().owner = new_cid;
 
        }
 
        // 3. create a new component
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.cu_inner.current_state.id_manager.new_port_id();
 
        let mut new_cid_fn = || self.current_state.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.cu_inner.current_state.port_info.insert(
 
        self.current_state.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -1202,7 +1200,7 @@ impl NonsyncProtoContext<'_> {
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.cu_inner.current_state.port_info.insert(
 
        self.current_state.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -1212,7 +1210,7 @@ impl NonsyncProtoContext<'_> {
 
            },
 
        );
 
        log!(
 
            self.cu_inner.logger,
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
src/runtime/mod.rs
Show inline comments
 
@@ -35,13 +35,14 @@ struct CurrentState {
 
    id_manager: IdManager,
 
}
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    current_state: &'a mut CurrentState,
 
    logger: &'a mut dyn Logger,
 
    // cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase
 
    proto_component_id: ComponentId,          // KEY in id->component map
 
    proto_component_id: ComponentId,                              // KEY in id->component map
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    rctx: &'a RoundCtx,
 
    // cu: &'a mut dyn CuUndecided,
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                        // KEY in pred->branch map
 
}
0 comments (0 inline, 0 general)