Changeset - d3835a56401d
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-09-22 16:28:07
christopher.esterhuyse@gmail.com
flattened some structures for simplicity. added more internal doc comments
5 files changed with 203 insertions and 136 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
@@ -7,6 +7,8 @@ mod lexer;
 
mod parser;
 

	
 
lazy_static::lazy_static! {
 
    /// Conveniently-provided protocol description initialized with a zero-length PDL string.
 
    /// Exposed to minimize repeated initializations of this common protocol description.
 
    pub static ref TRIVIAL_PD: std::sync::Arc<ProtocolDescription> = {
 
        std::sync::Arc::new(ProtocolDescription::parse(b"").unwrap())
 
    };
 
@@ -18,6 +20,8 @@ use crate::protocol::eval::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 

	
 
/// Description of a protocol object, used to configure new connectors.
 
/// (De)serializable.
 
#[derive(serde::Serialize, serde::Deserialize)]
 
#[repr(C)]
 
pub struct ProtocolDescription {
src/runtime/communication.rs
Show inline comments
 
@@ -49,16 +49,16 @@ impl ReplaceBoolTrue for bool {
 
// making it harder to accidentally mutate its contents in a way that cannot be rolled back.
 
impl CuUndecided for ConnectorUnphased {
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription) {
 
        (&mut *self.inner.logger, &self.proto_description)
 
        (&mut *self.logger, &self.proto_description)
 
    }
 
    fn logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.inner.logger
 
        &mut *self.logger
 
    }
 
    fn proto_description(&self) -> &ProtocolDescription {
 
        &self.proto_description
 
    }
 
    fn native_component_id(&self) -> ComponentId {
 
        self.inner.native_component_id
 
        self.native_component_id
 
    }
 
}
 

	
 
@@ -127,8 +127,8 @@ impl Connector {
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        let info = cu.inner.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.inner.native_component_id {
 
        let info = cu.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.native_component_id {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        if info.polarity != expect_polarity {
 
@@ -228,7 +228,7 @@ impl Connector {
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // iterate
 
        let mut current_state = cu.inner.current_state.clone();
 
        let mut current_state = cu.current_state.clone();
 
        let mut branching_proto_components =
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu
 
@@ -280,7 +280,7 @@ impl Connector {
 
        let mut rctx = RoundCtx {
 
            current_state,
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id));
 
                let n = std::iter::once(SubtreeId::LocalComponent(cu.native_component_id));
 
                let c =
 
                    branching_proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid));
 
                let e = comm
 
@@ -290,13 +290,13 @@ impl Connector {
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.current_state.id_manager.new_spec_var_stream(),
 
            spec_var_stream: cu.current_state.id_manager.new_spec_var_stream(),
 
            payload_inbox: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
@@ -328,12 +328,12 @@ impl Connector {
 
                );
 
                let firing_ports: HashSet<PortId> = firing_iter.clone().collect();
 
                for port in firing_iter {
 
                    let var = cu.inner.current_state.spec_var_for(port);
 
                    let var = cu.current_state.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // all silent ports have SpecVal::SILENT
 
                for (port, port_info) in cu.inner.current_state.port_info.iter() {
 
                    if port_info.owner != cu.inner.native_component_id {
 
                for (port, port_info) in cu.current_state.port_info.iter() {
 
                    if port_info.owner != cu.native_component_id {
 
                        // not my port
 
                        continue;
 
                    }
 
@@ -341,7 +341,7 @@ impl Connector {
 
                        // this one is FIRING
 
                        continue;
 
                    }
 
                    let var = cu.inner.current_state.spec_var_for(*port);
 
                    let var = cu.current_state.spec_var_for(*port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
@@ -362,7 +362,7 @@ impl Connector {
 
                    putter
 
                );
 
                // sanity check
 
                assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity);
 
                assert_eq!(Putter, cu.current_state.port_info.get(&putter).unwrap().polarity);
 
                rctx.putter_push(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
@@ -375,7 +375,7 @@ impl Connector {
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    cu,
 
                    SubtreeId::LocalComponent(cu.inner.native_component_id),
 
                    SubtreeId::LocalComponent(cu.native_component_id),
 
                    predicate.clone(),
 
                );
 
            }
 
@@ -421,7 +421,7 @@ impl Connector {
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                log!(cu.inner.logger, "Failure with {:#?}", &rctx.solution_storage);
 
                log!(cu.logger, "Failure with {:#?}", &rctx.solution_storage);
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
@@ -434,9 +434,9 @@ impl Connector {
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                // commit changes to ports and id_manager
 
                cu.inner.current_state = rctx.current_state;
 
                cu.current_state = rctx.current_state;
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
src/runtime/endpoints.rs
Show inline comments
 
@@ -27,7 +27,7 @@ impl NetEndpoint {
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Err(e) if err_would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(Nee::BrokenNetEndpoint),
src/runtime/mod.rs
Show inline comments
 
@@ -15,7 +15,7 @@ use crate::common::*;
 
use error::*;
 
use mio::net::UdpSocket;
 

	
 
/// Each Connector structure is the interface between the user's application and a communication session,
 
/// The interface between the user's application and a communication session,
 
/// in which the application plays the part of a (native) component. This structure provides the application
 
/// with functionality available to all components: the ability to add new channels (port pairs), and to
 
/// instantiate new components whose definitions are defined in the connector's configured protocol
 
@@ -47,58 +47,75 @@ pub struct DummyLogger;
 
/// A logger that writes the logged lines to a given file.
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug, Clone)]
 
struct CurrentState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    id_manager: IdManager,
 
}
 

	
 
// Interface between protocol state and the connector runtime BEFORE all components
 
// ave begun their branching speculation. See ComponentState::nonsync_run.
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    current_state: &'a mut CurrentState,
 
    logger: &'a mut dyn Logger,
 
    // cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase
 
    proto_component_id: ComponentId,                              // KEY in id->component map
 
}
 

	
 
// Interface between protocol state and the connector runtime AFTER all components
 
// have begun their branching speculation. See ComponentState::sync_run.
 
pub(crate) struct SyncProtoContext<'a> {
 
    rctx: &'a RoundCtx,
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                        // KEY in pred->branch map
 
}
 

	
 
// The data coupled with a particular protocol component branch, but crucially omitting
 
// the `ComponentState` such that this may be passed by reference to the state with separate
 
// access control.
 
#[derive(Default, Debug, Clone)]
 
struct ProtoComponentBranchInner {
 
    untaken_choice: Option<u16>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
}
 

	
 
// A speculative variable that lives for the duration of the synchronous round.
 
// Each is assigned a value in domain `SpecVal`.
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVar(PortId);
 

	
 
// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT,
 
// but may also enumerate many more values to facilitate finer-grained nondeterministic branching.
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVal(u16);
 

	
 
// Data associated with a successful synchronous round, retained afterwards such that the
 
// native component can freely reflect on how it went, reading the messages received at their
 
// inputs, and reflecting on which of their connector's synchronous batches succeeded.
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 

	
 
// Implementation of a set in terms of a vector (optimized for reading, not writing)
 
#[derive(Default)]
 
struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 

	
 
// Allows a connector to remember how to forward payloads towards the component that
 
// owns their destination port. `LocalComponent` corresponds with messages for components
 
// managed by the connector itself (hinting for it to look it up in a local structure),
 
// whereas the other variants direct the connector to forward the messages over the network.
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum Route {
 
    LocalComponent,
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
@@ -235,6 +252,13 @@ struct PortInfo {
 
    route: Route,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct CurrentState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    id_manager: IdManager,
 
}
 

	
 
// A component's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
@@ -247,36 +271,63 @@ struct ConnectorCommunication {
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
    inner: ConnectorUnphasedInner,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphasedInner {
 
    logger: Box<dyn Logger>,
 
    current_state: CurrentState,
 
    native_component_id: ComponentId,
 
}
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 

	
 
// A connector's phase-specific data
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 

	
 
// A connector's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 

	
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 

	
 
// Identifies a child of this connector in the _solution tree_.
 
// Each connector creates its own local solutions for the consensus procedure during `sync`,
 
// from the solutions of its children. Those children are either locally-managed components,
 
// (which are leaves in the solution tree), or other connectors reachable through the given
 
// network endpoint (which are internal nodes in the solution tree).
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 

	
 
// An accumulation of the connector's knowledge of all (a) the local solutions its children
 
// in the solution tree have found, and (b) its own solutions derivable from those of its children.
 
// This structure starts off each round with an empty set, and accumulates solutions as they are found
 
// by local components, or received over the network in control messages.
 
// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to
 
// say that these sets GROW until the round is over, and all solutions are reset.
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    // invariant: old_local U new_local solutions are those that can be created from
 
    // the UNION of one element from each set in `subtree_solution`.
 
    // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated.
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
// Stores the transient data of a synchronous round.
 
// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of
 
// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx,
 
// and can be undone if the round fails.
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
@@ -284,58 +335,51 @@ struct RoundCtx {
 
    deadline: Option<Instant>,
 
    current_state: CurrentState,
 
}
 

	
 
// A trait intended to limit the access of the ConnectorUnphased structure
 
// such that we don't accidentally modify any important component/port data
 
// while the results of the round are undecided. Why? Any actions during Connector::sync
 
// are _speculative_ until the round is decided, and we need a safe way of rolling
 
// back any changes.
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription);
 
}
 

	
 
// Represents a set of synchronous port operations that the native component
 
// has described as an "option" for completing during the synchronous rounds.
 
// Operations contained here succeed together or not at all.
 
// A native with N=2+ batches are expressing an N-way nondeterministic choice
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
// Parallels a mio::Token type, but more clearly communicates
 
// the way it identifies the evented structre it corresponds to.
 
// See runtime/setup for methods converting between TokenTarget and mio::Token
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
// such that it can know when to continue polling, and when to block.
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn would_block(err: &std::io::Error) -> bool {
 
fn err_would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl TokenTarget {
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
@@ -448,13 +492,13 @@ impl Connector {
 

	
 
    /// Enables the connector's current logger to be swapped out for another
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger);
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 

	
 
    /// Access the connector's current logger
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.inner.logger
 
        &mut *self.unphased.logger
 
    }
 

	
 
    /// Create a new synchronous channel, returning its ends as a pair of ports,
 
@@ -464,27 +508,27 @@ impl Connector {
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let mut new_cid = || cu.inner.current_state.id_manager.new_port_id();
 
        let mut new_cid = || cu.current_state.id_manager.new_port_id();
 
        let [o, i] = [new_cid(), new_cid()];
 
        cu.inner.current_state.port_info.insert(
 
        cu.current_state.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                owner: cu.inner.native_component_id,
 
                owner: cu.native_component_id,
 
                polarity: Putter,
 
            },
 
        );
 
        cu.inner.current_state.port_info.insert(
 
        cu.current_state.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                owner: cu.inner.native_component_id,
 
                owner: cu.native_component_id,
 
                polarity: Getter,
 
            },
 
        );
 
        log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 

	
 
@@ -515,8 +559,8 @@ impl Connector {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.inner.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.inner.native_component_id {
 
            let info = cu.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if info.polarity != expected_polarity {
 
@@ -524,12 +568,12 @@ impl Connector {
 
            }
 
        }
 
        // 2. add new component
 
        let new_cid = cu.inner.current_state.id_manager.new_component_id();
 
        let new_cid = cu.current_state.id_manager.new_component_id();
 
        cu.proto_components
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        // 3. update port ownership
 
        for port in ports.iter() {
 
            match cu.inner.current_state.port_info.get_mut(port) {
 
            match cu.current_state.port_info.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl TokenTarget {
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl Connector {
 
    /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing).
 
    /// The resulting connector will start in the setup phase, and cannot be used for communication until the
 
@@ -25,11 +50,9 @@ impl Connector {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                inner: ConnectorUnphasedInner {
 
                    logger,
 
                    native_component_id,
 
                    current_state: CurrentState { id_manager, port_info: Default::default() },
 
                },
 
                logger,
 
                native_component_id,
 
                current_state: CurrentState { id_manager, port_info: Default::default() },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
@@ -52,29 +75,29 @@ impl Connector {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.inner.current_state.id_manager.new_component_id();
 
                let mut npid = || cu.inner.current_state.id_manager.new_port_id();
 
                let udp_cid = cu.current_state.id_manager.new_component_id();
 
                let mut npid = || cu.current_state.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 

	
 
                cu.inner.current_state.port_info.insert(
 
                cu.current_state.port_info.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.inner.native_component_id,
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.inner.current_state.port_info.insert(
 
                cu.current_state.port_info.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: cu.inner.native_component_id,
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.inner.current_state.port_info.insert(
 
                cu.current_state.port_info.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
@@ -83,7 +106,7 @@ impl Connector {
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.inner.current_state.port_info.insert(
 
                cu.current_state.port_info.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
@@ -115,18 +138,18 @@ impl Connector {
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let new_pid = cu.inner.current_state.id_manager.new_port_id();
 
                cu.inner.current_state.port_info.insert(
 
                let new_pid = cu.current_state.id_manager.new_port_id();
 
                cu.current_state.port_info.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.inner.native_component_id,
 
                        owner: cu.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    new_pid,
 
                    polarity,
 
@@ -156,35 +179,35 @@ impl Connector {
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.inner.logger, "Call to connecting in connected state");
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.inner.logger,
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.inner.current_state.port_info,
 
                    &mut cu.current_state.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.inner.current_state.port_info,
 
                    &cu.current_state.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.inner.current_state.id_manager.connector_id,
 
                    &mut *cu.inner.logger,
 
                    cu.current_state.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.inner.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
@@ -195,7 +218,7 @@ impl Connector {
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.inner.logger, "connect() finished. setup phase complete");
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
@@ -366,7 +389,7 @@ fn new_endpoint_manager(
 
                    if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // FIRST try complete this connection
 
                        match listener.accept() {
 
                            Err(e) if would_block(&e) => continue, // spurious wakeup
 
                            Err(e) if err_would_block(&e) => continue, // spurious wakeup
 
                            Err(_) => {
 
                                log!(logger, "accept() failure on index {}", index);
 
                                return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
@@ -781,25 +804,25 @@ fn session_optimize(
 
    ////////////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    ////////////////////////////////////////
 
    log!(cu.inner.logger, "Beginning session optimization");
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
    let mut unoptimized_map: HashMap<ConnectorId, SessionInfo> = Default::default();
 
    let mut awaiting: HashSet<usize> = comm.neighborhood.children.iter().copied().collect();
 
    comm.endpoint_manager.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(
 
            cu.inner.logger,
 
            cu.logger,
 
            "Session gather loop. awaiting info from children {:?}...",
 
            awaiting.iter()
 
        );
 
        let (recv_index, msg) =
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?;
 
        log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
        log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        cu.inner.logger,
 
                        cu.logger,
 
                        "Wasn't expecting session info from {:?}. Got {:?}",
 
                        recv_index,
 
                        &child_unoptimized_map
 
@@ -812,11 +835,11 @@ fn session_optimize(
 
            | msg @ S(Sm::MyPortInfo(..))
 
            | msg @ S(Sm::LeaderAnnounce { .. })
 
            | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(cu.inner.logger, "discarding old message {:?} during election", msg);
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(Sm::SessionScatter { .. }) => {
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
@@ -824,18 +847,18 @@ fn session_optimize(
 
                return Err(Ce::SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.inner.logger, "delaying msg {:?} during session optimization", msg);
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.inner.logger,
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {
 
        port_info: cu.inner.current_state.port_info.clone(),
 
        port_info: cu.current_state.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
@@ -846,38 +869,34 @@ fn session_optimize(
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.inner.current_state.id_manager.connector_id, my_session_info);
 
    log!(
 
        cu.inner.logger,
 
        "Inserting my own info. Unoptimized subtree map is {:?}",
 
        &unoptimized_map
 
    );
 
    unoptimized_map.insert(cu.current_state.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.inner.logger, "Forwarding gathered info to parent {:?}", parent);
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.inner.logger,
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?;
 
            log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.inner.logger, "I expected the scatter from my parent only!");
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(Ce::SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.inner.logger, "delaying msg {:?} during scatter recv", msg);
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(Sm::SessionGather { .. })
 
@@ -885,24 +904,24 @@ fn session_optimize(
 
                | msg @ S(Sm::MyPortInfo(..))
 
                | msg @ S(Sm::LeaderAnnounce { .. })
 
                | msg @ S(Sm::LeaderWave { .. }) => {
 
                    log!(cu.inner.logger, "discarding old message {:?} during election", msg);
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.inner.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.inner.logger, unoptimized_map)?
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.inner.logger,
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info = optimized_map
 
        .get(&cu.inner.current_state.id_manager.connector_id)
 
        .get(&cu.current_state.id_manager.connector_id)
 
        .expect("HEY NO INFO FOR ME?")
 
        .clone();
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
@@ -910,7 +929,7 @@ fn session_optimize(
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.inner.logger, "Session optimizations applied");
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
@@ -933,7 +952,7 @@ fn apply_optimizations(
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    cu.inner.current_state.port_info = port_info;
 
    cu.current_state.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
0 comments (0 inline, 0 general)