Changeset - fd175763a0f7
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-06-24 13:48:54
christopher.esterhuyse@gmail.com
refactoring
6 files changed with 524 insertions and 528 deletions:
0 comments (0 inline, 0 general)
src/protocol/lexer.rs
Show inline comments
 
@@ -1031,7 +1031,7 @@ impl Lexer<'_> {
 
                self.source.consume();
 
                next = self.source.next();
 
            }
 
            if next != Some(b'\'') || data.len() == 0 {
 
            if next != Some(b'\'') || data.is_empty() {
 
                return Err(self.source.error("Expected character constant"));
 
            }
 
            self.source.consume();
 
@@ -1205,14 +1205,10 @@ impl Lexer<'_> {
 
        }
 
        let backup = self.source.clone();
 
        let mut result = false;
 
        match self.consume_type_annotation_spilled() {
 
            Ok(_) => match self.consume_whitespace(false) {
 
                Ok(_) => {
 
                    result = self.has_identifier();
 
                }
 
                Err(_) => {}
 
            },
 
            Err(_) => {}
 
        if let Ok(_) = self.consume_type_annotation_spilled() {
 
            if let Ok(_) = self.consume_whitespace(false) {
 
                result = self.has_identifier();
 
            }
 
        }
 
        *self.source = backup;
 
        return result;
 
@@ -1231,7 +1227,7 @@ impl Lexer<'_> {
 
            self.consume_whitespace(false)?;
 
        }
 
        self.consume_string(b"}")?;
 
        if statements.len() == 0 {
 
        if statements.is_empty() {
 
            Ok(h.alloc_skip_statement(|this| SkipStatement { this, position, next: None }).upcast())
 
        } else {
 
            Ok(h.alloc_block_statement(|this| BlockStatement {
 
@@ -1341,16 +1337,13 @@ impl Lexer<'_> {
 
        self.consume_whitespace(false)?;
 
        let true_body = self.consume_statement(h)?;
 
        self.consume_whitespace(false)?;
 
        let false_body;
 
        if self.has_keyword(b"else") {
 
        let false_body = if self.has_keyword(b"else") {
 
            self.consume_keyword(b"else")?;
 
            self.consume_whitespace(false)?;
 
            false_body = self.consume_statement(h)?;
 
            self.consume_statement(h)?
 
        } else {
 
            false_body = h
 
                .alloc_skip_statement(|this| SkipStatement { this, position, next: None })
 
                .upcast();
 
        }
 
            h.alloc_skip_statement(|this| SkipStatement { this, position, next: None }).upcast()
 
        };
 
        Ok(h.alloc_if_statement(|this| IfStatement { this, position, test, true_body, false_body }))
 
    }
 
    fn consume_while_statement(&mut self, h: &mut Heap) -> Result<WhileStatementId, ParseError> {
 
@@ -1432,12 +1425,11 @@ impl Lexer<'_> {
 
        let position = self.source.pos();
 
        self.consume_keyword(b"return")?;
 
        self.consume_whitespace(false)?;
 
        let expression;
 
        if self.has_string(b"(") {
 
            expression = self.consume_paren_expression(h)?;
 
        let expression = if self.has_string(b"(") {
 
            self.consume_paren_expression(h)
 
        } else {
 
            expression = self.consume_expression(h)?;
 
        }
 
            self.consume_expression(h)
 
        }?;
 
        self.consume_whitespace(false)?;
 
        self.consume_string(b";")?;
 
        Ok(h.alloc_return_statement(|this| ReturnStatement { this, position, expression }))
 
@@ -1446,12 +1438,11 @@ impl Lexer<'_> {
 
        let position = self.source.pos();
 
        self.consume_keyword(b"assert")?;
 
        self.consume_whitespace(false)?;
 
        let expression;
 
        if self.has_string(b"(") {
 
            expression = self.consume_paren_expression(h)?;
 
        let expression = if self.has_string(b"(") {
 
            self.consume_paren_expression(h)
 
        } else {
 
            expression = self.consume_expression(h)?;
 
        }
 
            self.consume_expression(h)
 
        }?;
 
        self.consume_whitespace(false)?;
 
        self.consume_string(b";")?;
 
        Ok(h.alloc_assert_statement(|this| AssertStatement {
src/protocol/mod.rs
Show inline comments
 
@@ -203,7 +203,7 @@ impl ComponentState {
 
                            }
 
                            Value::Message(MessageValue(Some(buffer))) => {
 
                                // Create a copy of the payload
 
                                payload = buffer.clone();
 
                                payload = buffer;
 
                            }
 
                            _ => unreachable!(),
 
                        }
src/protocol/parser.rs
Show inline comments
 
@@ -999,7 +999,7 @@ impl Visitor for ResolveVariables {
 
        let id = h[var].identifier;
 
        // First check whether variable with same identifier is in scope
 
        let check_duplicate = self.find_variable(h, id);
 
        if !check_duplicate.is_none() {
 
        if check_duplicate.is_some() {
 
            return Err(ParseError::new(h[id].position, "Declared variable clash"));
 
        }
 
        // Then check the expression's variables (this should not refer to own variable)
 
@@ -1018,7 +1018,7 @@ impl Visitor for ResolveVariables {
 
            let var = h[stmt].from;
 
            let id = h[var].identifier;
 
            let check_duplicate = self.find_variable(h, id);
 
            if !check_duplicate.is_none() {
 
            if check_duplicate.is_some() {
 
                return Err(ParseError::new(h[id].position, "Declared variable clash"));
 
            }
 
            let mut block = &mut h[self.scope.unwrap().to_block()];
 
@@ -1029,7 +1029,7 @@ impl Visitor for ResolveVariables {
 
            let var = h[stmt].to;
 
            let id = h[var].identifier;
 
            let check_duplicate = self.find_variable(h, id);
 
            if !check_duplicate.is_none() {
 
            if check_duplicate.is_some() {
 
                return Err(ParseError::new(h[id].position, "Declared variable clash"));
 
            }
 
            let mut block = &mut h[self.scope.unwrap().to_block()];
src/runtime/communication.rs
Show inline comments
 
@@ -42,26 +42,30 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        match &mut self.phased {
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Communication { round_result, .. } => match round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some((_index, gotten))) => gotten.get(&port).ok_or(PortDidntGet),
 
            },
 
            ConnectorPhased::Communication(ConnectorCommunication { round_result, .. }) => {
 
                match round_result {
 
                    Err(_) => Err(PreviousSyncFailed),
 
                    Ok(None) => Err(NoPreviousRound),
 
                    Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
                }
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        if !self.native_ports.contains(&port) {
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Putter != *self.port_info.polarities.get(&port).unwrap() {
 
        if Putter != *unphased.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match &mut self.phased {
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if batch.to_put.contains_key(&port) {
 
                    return Err(MultipleOpsOnPort);
 
@@ -74,9 +78,10 @@ impl Connector {
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        match &mut self.phased {
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                native_batches.push(Default::default());
 
                Ok(native_batches.len() - 1)
 
            }
 
@@ -84,15 +89,16 @@ impl Connector {
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        if !self.native_ports.contains(&port) {
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Getter != *self.port_info.polarities.get(&port).unwrap() {
 
        if Getter != *unphased.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match &mut self.phased {
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if !batch.to_get.insert(port) {
 
                    return Err(MultipleOpsOnPort);
 
@@ -101,417 +107,405 @@ impl Connector {
 
            }
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication {
 
                round_index,
 
                neighborhood,
 
                native_batches,
 
                endpoint_manager,
 
                round_result,
 
                ..
 
            } => {
 
                let mut deadline = timeout.map(|to| Instant::now() + to);
 
                let logger: &mut dyn Logger = &mut *self.logger;
 
                // 1. run all proto components to Nonsync blockers
 
                log!(
 
                    logger,
 
                    "~~~ SYNC called with timeout {:?}; starting round {}",
 
                    &timeout,
 
                    round_index
 
                );
 
                let mut branching_proto_components =
 
                    HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
                let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
                    self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
                log!(logger, "Nonsync running {} proto components...", unrun_components.len());
 
                while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
                    // TODO coalesce fields
 
                    log!(
 
                        logger,
 
                        "Nonsync running proto component with ID {:?}. {} to go after this",
 
                        proto_component_id,
 
                        unrun_components.len()
 
                    );
 
                    let mut ctx = NonsyncProtoContext {
 
                        logger: &mut *logger,
 
                        port_info: &mut self.port_info,
 
                        id_manager: &mut self.id_manager,
 
                        proto_component_id,
 
                        unrun_components: &mut unrun_components,
 
                        proto_component_ports: &mut self
 
                            .proto_components
 
                            .get_mut(&proto_component_id)
 
                            .unwrap()
 
                            .ports,
 
                    };
 
                    let blocker = component.state.nonsync_run(&mut ctx, &self.proto_description);
 
                    log!(
 
                        logger,
 
                        "proto component {:?} ran to nonsync blocker {:?}",
 
                        proto_component_id,
 
                        &blocker
 
                    );
 
                    use NonsyncBlocker as B;
 
                    match blocker {
 
                        B::ComponentExit => drop(component),
 
                        B::Inconsistent => {
 
                            return Err(InconsistentProtoComponent(proto_component_id))
 
                        }
 
                        B::SyncBlockStart => {
 
                            branching_proto_components.insert(
 
                                proto_component_id,
 
                                BranchingProtoComponent::initial(component),
 
                            );
 
                        }
 
                    }
 
        let Self { unphased, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.round_result = Self::connected_sync(unphased, comm, timeout);
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
                log!(
 
                    logger,
 
                    "All {} proto components are now done with Nonsync phase",
 
                    branching_proto_components.len(),
 
                );
 
            }
 
        }
 
    }
 

	
 
                // NOTE: all msgs in outbox are of form (Getter, Payload)
 
                let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 
    // TODO make cu immutable
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        use SyncError as Se;
 
        let mut deadline = timeout.map(|to| Instant::now() + to);
 
        // 1. run all proto components to Nonsync blockers
 
        log!(
 
            cu.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                logger: &mut *cu.logger,
 
                port_info: &mut cu.port_info,
 
                id_manager: &mut cu.id_manager,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap()
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            cu.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 
                log!(logger, "Solution storage initialized");
 
        // NOTE: all msgs in outbox are of form (Getter, Payload)
 
        let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // 2. kick off the native
 
        // create the solution storage
 
        let mut solution_storage = {
 
            let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
            let c = cu
 
                .proto_components
 
                .keys()
 
                .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
            let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
            SolutionStorage::new(n.chain(c).chain(e))
 
        };
 
        log!(cu.logger, "Solution storage initialized");
 

	
 
        // 2. kick off the native
 
        log!(
 
            cu.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        for (index, NativeBatch { to_get, to_put }) in comm.native_batches.drain(..).enumerate() {
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.insert(var, true);
 
                }
 
                // assign falses
 
                for &port in cu.native_ports.iter() {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.entry(var).or_insert(false);
 
                }
 
                predicate
 
            };
 
            log!(cu.logger, "Native branch {} has pred {:?}", index, &predicate);
 

	
 
            // put all messages
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                // rely on invariant: sync batches respect port polarity
 
                let getter = *cu.port_info.peers.get(&putter).unwrap();
 
                payloads_to_get.push((getter, msg));
 
            }
 
            if to_get.is_empty() {
 
                log!(
 
                    logger,
 
                    "Translating {} native batches into branches...",
 
                    native_batches.len()
 
                    cu.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                let mut branching_native = BranchingNative { branches: Default::default() };
 
                for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate()
 
                {
 
                    let predicate = {
 
                        let mut predicate = Predicate::default();
 
                        // assign trues
 
                        for &port in to_get.iter().chain(to_put.keys()) {
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.insert(var, true);
 
                        }
 
                        // assign falses
 
                        for &port in self.native_ports.iter() {
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        predicate
 
                    };
 
                    log!(logger, "Native branch {} has pred {:?}", index, &predicate);
 
                solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(LocalComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                // TODO
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 
        comm.native_batches.push(Default::default());
 

	
 
                    // put all messages
 
                    for (putter, payload) in to_put {
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        log!(logger, "Native branch {} sending msg {:?}", index, &msg);
 
                        // rely on invariant: sync batches respect port polarity
 
                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                        payloads_to_get.push((getter, msg));
 
                    }
 
                    if to_get.is_empty() {
 
                        log!(
 
                            logger,
 
                            "Native submitting solution for batch {} with {:?}",
 
                            index,
 
                            &predicate
 
                        );
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            Route::LocalComponent(LocalComponentId::Native),
 
                            predicate.clone(),
 
                        );
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            );
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        let decision = 'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                match cu.port_info.routes.get(&getter).unwrap() {
 
                    Route::Endpoint { index } => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to(*index, &msg).unwrap();
 
                    }
 
                    let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
                    if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                        // TODO
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
                    Route::LocalComponent(LocalComponentId::Native) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        send_payload_msg,
 
                    ),
 
                    Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                send_payload_msg,
 
                            )
 
                        }
 
                    }
 
                }
 
                log!(logger, "Done translating native batches into branches");
 
                native_batches.push(Default::default());
 
            }
 

	
 
                // run all proto components to their sync blocker
 
                log!(
 
                    logger,
 
                    "Running all {} proto components to their sync blocker...",
 
                    branching_proto_components.len()
 
                );
 
                for (&proto_component_id, proto_component) in branching_proto_components.iter_mut()
 
                {
 
                    let Self { port_info, proto_description, .. } = self;
 
                    let BranchingProtoComponent { ports, branches } = proto_component;
 
                    let mut swap = HashMap::default();
 
                    let mut blocked = HashMap::default();
 
                    // drain from branches --> blocked
 
                    let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
                    BranchingProtoComponent::drain_branches_to_blocked(
 
                        cd,
 
                        logger,
 
                        port_info,
 
                        proto_description,
 
                        &mut solution_storage,
 
                        |putter, m| {
 
                            let getter = *port_info.peers.get(&putter).unwrap();
 
                            payloads_to_get.push((getter, m));
 
                        },
 
                        proto_component_id,
 
                        ports,
 
                    );
 
                    // swap the blocked branches back
 
                    std::mem::swap(&mut blocked, branches);
 
            // check if we have a solution yet
 
            log!(cu.logger, "Check if we have any local decisions...");
 
            for solution in solution_storage.iter_new_local_make_old() {
 
                log!(cu.logger, "New local decision with solution {:?}...", &solution);
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::Suggest { suggestion },
 
                        });
 
                        comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                    }
 
                    None => {
 
                        log!(cu.logger, "No parent. Deciding on solution {:?}", &solution);
 
                        break 'undecided Decision::Success(solution);
 
                    }
 
                }
 
                log!(logger, "All proto components are blocked");
 
            }
 

	
 
                log!(logger, "Entering decision loop...");
 
                endpoint_manager.undelay_all();
 
                let decision = 'undecided: loop {
 
                    // drain payloads_to_get, sending them through endpoints / feeding them to components
 
                    while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                        assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                        match self.port_info.routes.get(&getter).unwrap() {
 
                            Route::Endpoint { index } => {
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            log!(cu.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (endpoint_index, msg) = loop {
 
                    match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? {
 
                        None => {
 
                            log!(cu.logger, "Reached user-defined deadling without decision...");
 
                            if let Some(parent) = comm.neighborhood.parent {
 
                                log!(
 
                                    cu.logger,
 
                                    "Sending failure request to parent index {}",
 
                                    parent
 
                                );
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::SendPayload(send_payload_msg),
 
                                    round_index: comm.round_index,
 
                                    contents: CommMsgContents::Suggest {
 
                                        suggestion: Decision::Failure,
 
                                    },
 
                                });
 
                                endpoint_manager.send_to(*index, &msg).unwrap();
 
                            }
 
                            Route::LocalComponent(LocalComponentId::Native) => branching_native
 
                                .feed_msg(
 
                                    logger,
 
                                    &self.port_info,
 
                                    &mut solution_storage,
 
                                    getter,
 
                                    send_payload_msg,
 
                                ),
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                                if let Some(branching_component) =
 
                                    branching_proto_components.get_mut(proto_component_id)
 
                                {
 
                                    let proto_component_id = *proto_component_id;
 
                                    let Self { port_info, proto_description, .. } = self;
 
                                    branching_component.feed_msg(
 
                                        logger,
 
                                        port_info,
 
                                        proto_description,
 
                                        &mut solution_storage,
 
                                        proto_component_id,
 
                                        |putter, m| {
 
                                            let getter = *port_info.peers.get(&putter).unwrap();
 
                                            payloads_to_get.push((getter, m));
 
                                        },
 
                                        getter,
 
                                        send_payload_msg,
 
                                    )
 
                                }
 
                                comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                            } else {
 
                                log!(cu.logger, "As the leader, deciding on timeout");
 
                                break 'undecided Decision::Failure;
 
                            }
 
                            deadline = None;
 
                        }
 
                        Some((endpoint_index, msg)) => break (endpoint_index, msg),
 
                    }
 

	
 
                    // check if we have a solution yet
 
                    log!(logger, "Check if we have any local decisions...");
 
                    for solution in solution_storage.iter_new_local_make_old() {
 
                        log!(logger, "New local decision with solution {:?}...", &solution);
 
                        match neighborhood.parent {
 
                            Some(parent) => {
 
                                log!(logger, "Forwarding to my parent {:?}", parent);
 
                                let suggestion = Decision::Success(solution);
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::Suggest { suggestion },
 
                                });
 
                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                            }
 
                            None => {
 
                                log!(logger, "No parent. Deciding on solution {:?}", &solution);
 
                                break 'undecided Decision::Success(solution);
 
                            }
 
                };
 
                log!(cu.logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                let comm_msg_contents = match msg {
 
                    Msg::SetupMsg(..) => {
 
                        log!(cu.logger, "Discarding setup message; that phase is over");
 
                        continue 'undecided;
 
                    }
 
                    Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&comm.round_index) {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Less => {
 
                            log!(
 
                                cu.logger,
 
                                "We are in round {}, but msg is for round {}. Discard",
 
                                comm_msg.round_index,
 
                                comm.round_index,
 
                            );
 
                            drop(comm_msg);
 
                            continue 'undecided;
 
                        }
 
                        Ordering::Greater => {
 
                            log!(
 
                                cu.logger,
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
                                comm_msg.round_index,
 
                                comm.round_index,
 
                            );
 
                            comm.endpoint_manager
 
                                .delayed_messages
 
                                .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                            continue 'undecided;
 
                        }
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter =
 
                            comm.endpoint_manager.endpoint_exts[endpoint_index].getter_for_incoming;
 
                        assert!(cu.port_info.polarities.get(&getter) == Some(&Getter));
 
                        log!(
 
                            cu.logger,
 
                            "Msg routed to getter port {:?}. Buffer for recv loop",
 
                            getter,
 
                        );
 
                        payloads_to_get.push((getter, send_payload_msg));
 
                    }
 

	
 
                    // stuck! make progress by receiving a msg
 
                    // try recv messages arriving through endpoints
 
                    log!(logger, "No decision yet. Let's recv an endpoint msg...");
 
                    {
 
                        let (endpoint_index, msg) = loop {
 
                            match endpoint_manager.try_recv_any_comms(logger, deadline)? {
 
                                None => {
 
                                    log!(
 
                                        logger,
 
                                        "Reached user-defined deadling without decision..."
 
                    CommMsgContents::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
                        if comm.neighborhood.children.contains(&endpoint_index) {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
                                    // child solution contributes to local solution
 
                                    log!(cu.logger, "Child provided solution {:?}", &predicate);
 
                                    let route = Route::Endpoint { index: endpoint_index };
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *cu.logger,
 
                                        route,
 
                                        predicate,
 
                                    );
 
                                    if let Some(parent) = neighborhood.parent {
 
                                        log!(
 
                                            logger,
 
                                            "Sending failure request to parent index {}",
 
                                            parent
 
                                        );
 
                                        let msg = Msg::CommMsg(CommMsg {
 
                                            round_index: *round_index,
 
                                            contents: CommMsgContents::Suggest {
 
                                                suggestion: Decision::Failure,
 
                                            },
 
                                        });
 
                                        endpoint_manager.send_to(parent, &msg).unwrap();
 
                                    } else {
 
                                        log!(logger, "As the leader, deciding on timeout");
 
                                        break 'undecided Decision::Failure;
 
                                    }
 
                                    deadline = None;
 
                                }
 
                                Some((endpoint_index, msg)) => break (endpoint_index, msg),
 
                            }
 
                        };
 
                        log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                        let comm_msg_contents = match msg {
 
                            Msg::SetupMsg(..) => {
 
                                log!(logger, "Discarding setup message; that phase is over");
 
                                continue 'undecided;
 
                            }
 
                            Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) {
 
                                Ordering::Equal => comm_msg.contents,
 
                                Ordering::Less => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Discard",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    drop(comm_msg);
 
                                    continue 'undecided;
 
                                }
 
                                Ordering::Greater => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Buffer",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    endpoint_manager
 
                                        .delayed_messages
 
                                        .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                                    continue 'undecided;
 
                                }
 
                            },
 
                        };
 
                        match comm_msg_contents {
 
                            CommMsgContents::SendPayload(send_payload_msg) => {
 
                                let getter = endpoint_manager.endpoint_exts[endpoint_index]
 
                                    .getter_for_incoming;
 
                                assert!(self.port_info.polarities.get(&getter) == Some(&Getter));
 
                                log!(
 
                                    logger,
 
                                    "Msg routed to getter port {:?}. Buffer for recv loop",
 
                                    getter,
 
                                );
 
                                payloads_to_get.push((getter, send_payload_msg));
 
                            }
 
                            CommMsgContents::Suggest { suggestion } => {
 
                                // only accept this control msg through a child endpoint
 
                                if neighborhood.children.contains(&endpoint_index) {
 
                                    match suggestion {
 
                                        Decision::Success(predicate) => {
 
                                            // child solution contributes to local solution
 
                                Decision::Failure => {
 
                                    match comm.neighborhood.parent {
 
                                        None => {
 
                                            log!(
 
                                                logger,
 
                                                "Child provided solution {:?}",
 
                                                &predicate
 
                                            );
 
                                            let route = Route::Endpoint { index: endpoint_index };
 
                                            solution_storage.submit_and_digest_subtree_solution(
 
                                                logger, route, predicate,
 
                                                cu.logger,
 
                                                "As sink, I decide on my child's failure"
 
                                            );
 
                                            // I am the sink. Decide on failed
 
                                            break 'undecided Decision::Failure;
 
                                        }
 
                                        Some(parent) => {
 
                                            log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                            // I've got a parent. Forward the failure suggestion.
 
                                            let msg = Msg::CommMsg(CommMsg {
 
                                                round_index: comm.round_index,
 
                                                contents: CommMsgContents::Suggest { suggestion },
 
                                            });
 
                                            comm.endpoint_manager.send_to(parent, &msg).unwrap();
 
                                        }
 
                                        Decision::Failure => match neighborhood.parent {
 
                                            None => {
 
                                                log!(
 
                                                    logger,
 
                                                    "As sink, I decide on my child's failure"
 
                                                );
 
                                                // I am the sink. Decide on failed
 
                                                break 'undecided Decision::Failure;
 
                                            }
 
                                            Some(parent) => {
 
                                                log!(logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                                // I've got a parent. Forward the failure suggestion.
 
                                                let msg = Msg::CommMsg(CommMsg {
 
                                                    round_index: *round_index,
 
                                                    contents: CommMsgContents::Suggest {
 
                                                        suggestion,
 
                                                    },
 
                                                });
 
                                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                                            }
 
                                        },
 
                                    }
 
                                } else {
 
                                    log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index);
 
                                }
 
                            }
 
                            CommMsgContents::Announce { decision } => {
 
                                if Some(endpoint_index) == neighborhood.parent {
 
                                    // adopt this decision
 
                                    break 'undecided decision;
 
                                } else {
 
                                    log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index);
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
                                &suggestion,
 
                                endpoint_index
 
                            );
 
                        }
 
                    }
 
                    CommMsgContents::Announce { decision } => {
 
                        if Some(endpoint_index) == comm.neighborhood.parent {
 
                            // adopt this decision
 
                            break 'undecided decision;
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
                                &decision,
 
                                endpoint_index
 
                            );
 
                        }
 
                    }
 
                    log!(logger, "Endpoint msg recv done");
 
                };
 
                log!(logger, "Committing to decision {:?}!", &decision);
 

	
 
                // propagate the decision to children
 
                let msg = Msg::CommMsg(CommMsg {
 
                    round_index: *round_index,
 
                    contents: CommMsgContents::Announce { decision: decision.clone() },
 
                });
 
                log!(
 
                    logger,
 
                    "Announcing decision {:?} through child endpoints {:?}",
 
                    &msg,
 
                    &neighborhood.children
 
                );
 
                for &child in neighborhood.children.iter() {
 
                    endpoint_manager.send_to(child, &msg).unwrap();
 
                }
 
            }
 
            log!(cu.logger, "Endpoint msg recv done");
 
        };
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 

	
 
                *round_result = match decision {
 
                    Decision::Failure => Err(RoundFailure),
 
                    Decision::Success(predicate) => {
 
                        // commit changes to component states
 
                        self.proto_components.clear();
 
                        self.proto_components.extend(
 
                            branching_proto_components
 
                                .into_iter()
 
                                .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                        );
 
                        Ok(Some(branching_native.collapse_with(&predicate)))
 
                    }
 
                };
 
                log!(logger, "Updated round_result to {:?}", round_result);
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::Announce { decision: decision.clone() },
 
        });
 
        log!(
 
            cu.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to(child, &msg).unwrap();
 
        }
 

	
 
                let returning = round_result
 
                    .as_ref()
 
                    .map(|option| option.as_ref().unwrap().0)
 
                    .map_err(|sync_error| sync_error.clone());
 
                log!(logger, "Returning {:?}", &returning);
 
                returning
 
        match decision {
 
            Decision::Failure => Err(Se::RoundFailure),
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
            }
 
        }
 
    }
 
@@ -519,21 +513,21 @@ impl Connector {
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        // payloads_to_get: &mut Vec<(PortId, CommMsgContents)>,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        log!(logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(port_info.polarities.get(&getter).copied() == Some(Getter));
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = port_info.firing_var_for(getter);
 
            let var = cu.port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
@@ -541,7 +535,7 @@ impl BranchingNative {
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        logger,
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
@@ -550,7 +544,7 @@ impl BranchingNative {
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    logger,
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
@@ -562,7 +556,7 @@ impl BranchingNative {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        logger,
 
                        cu.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
@@ -571,7 +565,7 @@ impl BranchingNative {
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(logger, "branch pred covers it! Accept the msg");
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
@@ -580,7 +574,7 @@ impl BranchingNative {
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        logger,
 
                        cu.logger,
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
@@ -593,7 +587,7 @@ impl BranchingNative {
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        logger,
 
                        cu.logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
@@ -603,38 +597,40 @@ impl BranchingNative {
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap<PortId, Payload>) {
 
    fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk {
 
        for (branch_predicate, branch) in self.branches {
 
            if solution_predicate.satisfies(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                return (index, gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 

	
 
// |putter, m| {
 
//     let getter = *cu.port_info.peers.get(&putter).unwrap();
 
//     payloads_to_get.push((getter, m));
 
// },
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        //
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        proto_description: &ProtocolDescription,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        mut outbox_unqueue: impl FnMut(PortId, SendPayloadMsg),
 
        payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                    logger,
 
                    logger: &mut *cu.logger,
 
                    predicate: &predicate,
 
                    port_info,
 
                    port_info: &cu.port_info,
 
                    inbox: &branch.inbox,
 
                };
 
                let blocker = branch.state.sync_run(&mut ctx, proto_description);
 
                let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
                log!(
 
                    logger,
 
                    cu.logger,
 
                    "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                    proto_component_id,
 
                    &predicate,
 
@@ -649,12 +645,12 @@ impl BranchingProtoComponent {
 
                    B::SyncBlockEnd => {
 
                        // make concrete all variables
 
                        for &port in ports.iter() {
 
                            let var = port_info.firing_var_for(port);
 
                            let var = cu.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        // submit solution for this component
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            &mut *cu.logger,
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)),
 
                            predicate.clone(),
 
                        );
 
@@ -668,7 +664,7 @@ impl BranchingProtoComponent {
 
                    }
 
                    B::CouldntCheckFiring(port) => {
 
                        // sanity check
 
                        let var = port_info.firing_var_for(port);
 
                        let var = cu.port_info.firing_var_for(port);
 
                        assert!(predicate.query(var).is_none());
 
                        // keep forks in "unblocked"
 
                        drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
@@ -676,21 +672,20 @@ impl BranchingProtoComponent {
 
                    }
 
                    B::PutMsg(putter, payload) => {
 
                        // sanity check
 
                        assert_eq!(Some(&Putter), port_info.polarities.get(&putter));
 
                        assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                        // overwrite assignment
 
                        let var = port_info.firing_var_for(putter);
 
                        let var = cu.port_info.firing_var_for(putter);
 
                        let was = predicate.assigned.insert(var, true);
 
                        if was == Some(false) {
 
                            log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                            log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                            // discard forever
 
                            drop((predicate, branch));
 
                        } else {
 
                            // keep in "unblocked"
 
                            log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                            outbox_unqueue(
 
                                putter,
 
                                SendPayloadMsg { predicate: predicate.clone(), payload },
 
                            );
 
                            log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                            let getter = *cu.port_info.peers.get(&putter).unwrap();
 
                            let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
    payloads_to_get.push((getter, msg));
 
                            drainer.add_input(predicate, branch);
 
                        }
 
                    }
 
@@ -699,15 +694,14 @@ impl BranchingProtoComponent {
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        proto_description: &ProtocolDescription,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        proto_component_id: ProtoComponentId,
 
        outbox_unqueue: impl FnMut(PortId, SendPayloadMsg),
 
        payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
@@ -760,17 +754,15 @@ impl BranchingProtoComponent {
 
        let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            logger,
 
            port_info,
 
            proto_description,
 
            cu,
 
            solution_storage,
 
            outbox_unqueue,
 
            payloads_to_get,
 
            proto_component_id,
 
            ports,
 
        );
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        log!(logger, "component settles down with branches: {:?}", branches.keys());
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
@@ -957,7 +949,6 @@ impl ProtoComponentBranch {
 
        assert!(was.is_none())
 
    }
 
}
 

	
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
src/runtime/mod.rs
Show inline comments
 
@@ -10,6 +10,11 @@ mod tests;
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(Debug)]
 
pub struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Debug)]
 
pub struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
@@ -135,28 +140,31 @@ pub struct PortInfo {
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    mem_inbox: Vec<MemInMsg>,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
pub enum ConnectorPhased {
 
    Setup {
 
        endpoint_setups: Vec<(PortId, EndpointSetup)>,
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        round_index: usize,
 
        endpoint_manager: EndpointManager,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
        native_batches: Vec<NativeBatch>,
 
        round_result: Result<Option<(usize, HashMap<PortId, Payload>)>, SyncError>,
 
    },
 
    Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 },
 
    Communication(ConnectorCommunication),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
@@ -225,31 +233,32 @@ impl IdManager {
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.logger, "Connector dropping. Goodbye!");
 
        log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.logger, &mut new_logger);
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.logger
 
        &mut *self.unphased.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()];
 
        cu.native_ports.insert(o);
 
        cu.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        cu.port_info.polarities.insert(o, Putter);
 
        cu.port_info.polarities.insert(i, Getter);
 
        cu.port_info.peers.insert(o, i);
 
        cu.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        cu.port_info.routes.insert(o, route);
 
        cu.port_info.routes.insert(i, route);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
@@ -260,31 +269,32 @@ impl Connector {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        // 1. check if this is OK
 
        let polarities = self.proto_description.component_polarities(identifier)?;
 
        let cu = &mut self.unphased;
 
        let polarities = cu.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !self.native_ports.contains(port) {
 
            if !cu.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != *self.port_info.polarities.get(port).unwrap() {
 
            if expected_polarity != *cu.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        let new_id = cu.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            self.port_info
 
            cu.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
        }
 
        self.native_ports.retain(|port| !ports.contains(port));
 
        cu.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        self.proto_components.insert(
 
        cu.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
                state: self.proto_description.new_main_component(identifier, ports),
 
                state: cu.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
src/runtime/setup.rs
Show inline comments
 
@@ -20,12 +20,14 @@ impl Connector {
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        Self {
 
            proto_description,
 
            proto_components: Default::default(),
 
            logger,
 
            id_manager: IdManager::new(connector_id),
 
            native_ports: Default::default(),
 
            port_info: Default::default(),
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                id_manager: IdManager::new(connector_id),
 
                native_ports: Default::default(),
 
                port_info: Default::default(),
 
            },
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
@@ -35,16 +37,17 @@ impl Connector {
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, ()> {
 
        match &mut self.phased {
 
        let Self { unphased: up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity };
 
                let p = self.id_manager.new_port_id();
 
                self.native_ports.insert(p);
 
                let p = up.id_manager.new_port_id();
 
                up.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                self.port_info.polarities.insert(p, polarity);
 
                self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                up.port_info.polarities.insert(p, polarity);
 
                up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                log!(
 
                    self.logger,
 
                    up.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
@@ -58,44 +61,45 @@ impl Connector {
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError::*;
 
        match &mut self.phased {
 
        let Self { unphased: up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(self.logger, "Call to connecting in connected state");
 
                log!(up.logger, "Call to connecting in connected state");
 
                Err(AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                log!(up.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *self.logger,
 
                    &mut *up.logger,
 
                    endpoint_setups,
 
                    &mut self.port_info,
 
                    &mut up.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
                    self.logger,
 
                    up.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    self.id_manager.connector_id,
 
                    &mut *self.logger,
 
                    up.id_manager.connector_id,
 
                    &mut *up.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                log!(self.logger, "connect() finished. setup phase complete");
 
                log!(up.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                log!(up.logger, "connect() finished. setup phase complete");
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                self.phased = ConnectorPhased::Communication(ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                });
 
                Ok(())
 
            }
 
        }
 
@@ -144,8 +148,8 @@ fn new_endpoint_manager(
 

	
 
    // 1. Start to construct EndpointManager
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(64);
 
    let mut polled_undrained = IndexSet::<usize>::default();
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = IndexSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
0 comments (0 inline, 0 general)