Changeset - c4a5132773ce
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-09-23 14:06:29
christopher.esterhuyse@gmail.com
cleanup and even more doc comments
4 files changed with 191 insertions and 109 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -152,7 +152,7 @@ impl Connector {
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        let info = cu.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        let info = cu.ips.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.native_component_id {
 
            return Err(Poe::PortUnavailable);
 
        }
 
@@ -261,7 +261,7 @@ impl Connector {
 
        // while kicking off the branching of components until the set of
 
        // components entering their synchronous block is finalized in `branching_proto_components`.
 
        // This is the last time cu's components and ports are accessed until the round is decided.
 
        let mut current_state = cu.current_state.clone();
 
        let mut ips = cu.ips.clone();
 
        let mut branching_proto_components =
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu
 
@@ -282,7 +282,7 @@ impl Connector {
 
            );
 
            let (logger, proto_description) = cu.logger_and_protocol_description();
 
            let mut ctx = NonsyncProtoContext {
 
                current_state: &mut current_state,
 
                ips: &mut ips,
 
                logger,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
@@ -312,7 +312,7 @@ impl Connector {
 

	
 
        // Create temporary structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            current_state, // already used previously, now moved into RoundCtx
 
            ips, // already used previously, now moved into RoundCtx
 
            solution_storage: {
 
                let subtree_id_iter = {
 
                    // Create an iterator over the identifiers of this
 
@@ -337,7 +337,7 @@ impl Connector {
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.current_state.id_manager.new_spec_var_stream(),
 
            spec_var_stream: cu.ips.id_manager.new_spec_var_stream(),
 
            payload_inbox: Default::default(), // buffer for in-memory payloads to be handled
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
@@ -376,16 +376,16 @@ impl Connector {
 
                );
 
                let firing_ports: HashSet<PortId> = firing_iter.clone().collect();
 
                for port in firing_iter {
 
                    let var = cu.current_state.spec_var_for(port);
 
                    let var = cu.ips.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // all silent ports have SpecVal::SILENT
 
                for port in cu.current_state.ports_owned_by(cu.native_component_id) {
 
                for port in cu.ips.ports_owned_by(cu.native_component_id) {
 
                    if firing_ports.contains(port) {
 
                        // this one is FIRING
 
                        continue;
 
                    }
 
                    let var = cu.current_state.spec_var_for(*port);
 
                    let var = cu.ips.spec_var_for(*port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
@@ -406,7 +406,7 @@ impl Connector {
 
                    putter
 
                );
 
                // sanity check
 
                assert_eq!(Putter, cu.current_state.port_info.get(&putter).unwrap().polarity);
 
                assert_eq!(Putter, cu.ips.port_info.get(&putter).unwrap().polarity);
 
                rctx.putter_push(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
@@ -481,7 +481,7 @@ impl Connector {
 
                        .map(|(cid, bpc)| (cid, bpc.collapse_with(&predicate))),
 
                );
 
                // commit changes to ports and id_manager
 
                cu.current_state = rctx.current_state;
 
                cu.ips = rctx.ips;
 
                log!(
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
@@ -580,7 +580,7 @@ impl Connector {
 
            log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_pop() {
 
                log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = rctx.current_state.port_info.get(&getter).unwrap();
 
                let getter_info = rctx.ips.port_info.get(&getter).unwrap();
 
                let cid = getter_info.owner; // the id of the component owning `getter` port
 
                assert_eq!(Getter, getter_info.polarity); // sanity check
 
                log!(
 
@@ -832,7 +832,7 @@ impl BranchingNative {
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
        log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert_eq!(Getter, rctx.current_state.port_info.get(&getter).unwrap().polarity);
 
        assert_eq!(Getter, rctx.ips.port_info.get(&getter).unwrap().polarity);
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(draining.0, finished);
 
@@ -840,7 +840,7 @@ impl BranchingNative {
 
        // consistent with that of the received message.
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            let var = rctx.current_state.spec_var_for(getter);
 
            let var = rctx.ips.spec_var_for(getter);
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch,
 
                // because the resulting branch would have an inconsistent predicate.
 
@@ -1046,7 +1046,7 @@ impl BranchingProtoComponent {
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned
 
                    let var = rctx.current_state.spec_var_for(port);
 
                    let var = rctx.ips.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // speculate on the two possible values of `var`. Schedule both branches to be rerun.
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
@@ -1054,9 +1054,9 @@ impl BranchingProtoComponent {
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check: The given port indeed has `Putter` polarity
 
                    assert_eq!(Putter, rctx.current_state.port_info.get(&putter).unwrap().polarity);
 
                    assert_eq!(Putter, rctx.ips.port_info.get(&putter).unwrap().polarity);
 
                    // assign FIRING to this port's associated firing variable
 
                    let var = rctx.current_state.spec_var_for(putter);
 
                    let var = rctx.ips.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        // Discard the branch, as it clearly has contradictory requirements for this value.
 
@@ -1079,8 +1079,8 @@ impl BranchingProtoComponent {
 
                B::SyncBlockEnd => {
 
                    // This branch reached the end of it's synchronous block
 
                    // assign all variables of owned ports that DIDN'T fire to SILENT
 
                    for port in rctx.current_state.ports_owned_by(proto_component_id) {
 
                        let var = rctx.current_state.spec_var_for(*port);
 
                    for port in rctx.ips.ports_owned_by(proto_component_id) {
 
                        let var = rctx.ips.spec_var_for(*port);
 
                        let actually_exchanged = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let speculated_to_fire = val == SpecVal::FIRING;
 
@@ -1362,11 +1362,11 @@ impl NonsyncProtoContext<'_> {
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.current_state.port_info.get(port).unwrap().owner
 
                self.ips.port_info.get(port).unwrap().owner
 
            );
 
        }
 
        // Create the new component, and schedule it to be run
 
        let new_cid = self.current_state.id_manager.new_component_id();
 
        let new_cid = self.ips.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component {:?} with state {:?}, moving ports {:?}",
 
@@ -1378,7 +1378,7 @@ impl NonsyncProtoContext<'_> {
 
        self.unrun_components.push((new_cid, state));
 
        // Update the ownership of the moved ports
 
        for port in moved_ports.iter() {
 
            self.current_state.port_info.get_mut(port).unwrap().owner = new_cid;
 
            self.ips.port_info.get_mut(port).unwrap().owner = new_cid;
 
        }
 
    }
 

	
 
@@ -1386,9 +1386,9 @@ impl NonsyncProtoContext<'_> {
 
    // creating a new port-pair connected by an memory channel
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.current_state.id_manager.new_port_id();
 
        let mut new_cid_fn = || self.ips.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.current_state.port_info.insert(
 
        self.ips.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -1397,7 +1397,7 @@ impl NonsyncProtoContext<'_> {
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.current_state.port_info.insert(
 
        self.ips.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -1420,7 +1420,7 @@ impl SyncProtoContext<'_> {
 
    // The component calls the runtime back, inspecting whether it's associated
 
    // preidcate has already determined a (speculative) value for the given port's firing variable.
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.current_state.spec_var_for(port);
 
        let var = self.rctx.ips.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 

	
src/runtime/endpoints.rs
Show inline comments
 
@@ -237,7 +237,7 @@ impl EndpointManager {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = rctx.current_state.spec_var_for(ee.getter_for_incoming);
 
                        let port_spec_var = rctx.ips.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        rctx.getter_push(
 
                            ee.getter_for_incoming,
src/runtime/mod.rs
Show inline comments
 
@@ -51,7 +51,7 @@ pub struct FileLogger(ConnectorId, std::fs::File);
 
// Interface between protocol state and the connector runtime BEFORE all components
 
// ave begun their branching speculation. See ComponentState::nonsync_run.
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    current_state: &'a mut CurrentState,
 
    ips: &'a mut IdAndPortState,
 
    logger: &'a mut dyn Logger,
 
    unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase
 
    proto_component_id: ComponentId,                              // KEY in id->component map
 
@@ -116,22 +116,22 @@ enum Route {
 
    UdpEndpoint { index: usize },
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 
// The outcome of a synchronous round, representing the distributed consensus.
 
// In the success case, the attached predicate encodes a row in the session's trace table.
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
enum Decision {
 
    Failure,
 
    Failure, // some connector timed out!
 
    Success(Predicate),
 
}
 

	
 
// The type of control messages exchanged between connectors over the network
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 

	
 
// Control messages exchanged during the setup phase only
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
@@ -141,6 +141,9 @@ enum SetupMsg {
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
// A data structure encoding the state of a connector, passed around
 
// during the session optimization procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
@@ -148,28 +151,44 @@ struct SessionInfo {
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
}
 

	
 
// Newtype wrapper for an Arc<ProtocolDescription>,
 
// such that it can be (de)serialized for transmission over the network.
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 

	
 
// Control message particular to the communication phase.
 
// as such, it's annotated with a round_index
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
}
 

	
 
// Connector <-> connector control messages for use in the communication phase
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommCtrlMsg {
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
    Suggest { suggestion: Decision }, // child->parent
 
    Announce { decision: Decision },  // parent->child
 
}
 

	
 
// Speculative payload message, communicating the value for the given
 
// port's message predecated on the given speculative variable assignments.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 

	
 
// Return result of `Predicate::assignment_union`, communicating the contents
 
// of the predicate which represents the (consistent) union of their mappings,
 
// if it exists (no variable mapped distinctly by the input predicates)
 
#[derive(Debug, PartialEq)]
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
@@ -178,10 +197,16 @@ enum AssignmentUnionResult {
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
// One of two endpoints for a control channel with a connector on either end.
 
// The underlying transport is TCP, so we use an inbox buffer to allow
 
// discrete payload receipt.
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 

	
 
// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    getter_for_incoming: PortId,
 
@@ -189,17 +214,29 @@ struct NetEndpointSetup {
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    getter_for_incoming: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 

	
 
// NetEndpoint annotated with the ID of the port that receives payload
 
// messages received through the endpoint. This approach assumes that NetEndpoints
 
// DO NOT multiplex port->port channels, and so a mapping such as this is possible.
 
// As a result, the messages themselves don't need to carry the PortID with them.
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component"
 
// described in the literature.
 
// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager),
 
// It acts as a native component by managing a (speculative) set of payload messages (an outbox,
 
//  protecting the peer on the other side of the network).
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
@@ -207,43 +244,59 @@ struct UdpEndpointExt {
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Meta-data for the connector: its role in the consensus tree.
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 

	
 
// Manages the connector's ID, and manages allocations for connector/port IDs.
 
#[derive(Debug, Clone)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    component_suffix_stream: U32Stream,
 
}
 

	
 
// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams.
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 

	
 
// A generator of speculative variables. Created on-demand during the synchronous round
 
// by the IdManager.
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 

	
 
// Manages the messy state of the various endpoints, pollers, buffers, etc.
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll. Poll token computed with TargetToken::into
 
    // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>, // ready to yield
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 

	
 
// A storage of endpoints, which keeps track of which components have raised
 
// an event during poll(), signifying that they need to be checked for new incoming data
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 

	
 
// The information associated with a port identifier, designed for local storage.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    owner: ComponentId,
 
@@ -252,8 +305,19 @@ struct PortInfo {
 
    route: Route,
 
}
 

	
 
// Similar to `PortInfo`, but designed for communication during the setup procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 

	
 
// A convenient substructure for containing port info and the ID manager.
 
// Houses the bulk of the connector's persistent state between rounds.
 
// It turns out several situations require access to both things.
 
#[derive(Debug, Clone)]
 
struct CurrentState {
 
struct IdAndPortState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    id_manager: IdManager,
 
}
 
@@ -267,12 +331,14 @@ struct ConnectorCommunication {
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundEndedNative>, SyncError>,
 
}
 

	
 
// A component's data common to both setup and communication phases
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
    logger: Box<dyn Logger>,
 
    current_state: CurrentState,
 
    ips: IdAndPortState,
 
    native_component_id: ComponentId,
 
}
 

	
 
@@ -290,6 +356,8 @@ struct ConnectorSetup {
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 

	
 
// A newtype wrapper for a map from speculative variable to speculative value
 
// A missing mapping corresponds with "unspecified".
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
@@ -333,7 +401,7 @@ struct RoundCtx {
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    current_state: CurrentState,
 
    ips: IdAndPortState,
 
}
 

	
 
// A trait intended to limit the access of the ConnectorUnphased structure
 
@@ -382,6 +450,7 @@ fn err_would_block(err: &std::io::Error) -> bool {
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        // establish the invariant
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
@@ -389,6 +458,7 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    // Insert the given element. Returns whether it was already present.
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
@@ -405,7 +475,7 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl CurrentState {
 
impl IdAndPortState {
 
    fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator<Item = &PortId> {
 
        self.port_info
 
            .iter()
 
@@ -413,6 +483,9 @@ impl CurrentState {
 
            .map(|(port, _)| port)
 
    }
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        // Every port maps to a speculative variable
 
        // Two distinct ports map to the same variable
 
        // IFF they are two ends of the same logical channel.
 
        let info = self.port_info.get(&port).unwrap();
 
        SpecVar(match info.polarity {
 
            Getter => port,
 
@@ -458,7 +531,7 @@ impl Drop for Connector {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 

	
 
// Given a slice of ports, return the first, if any, port is present repeatedly
 
fn duplicate_port(slice: &[PortId]) -> Option<PortId> {
 
    let mut vec = Vec::with_capacity(slice.len());
 
    for port in slice.iter() {
 
@@ -514,9 +587,14 @@ impl Connector {
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let mut new_cid = || cu.current_state.id_manager.new_port_id();
 
        let mut new_cid = || cu.ips.id_manager.new_port_id();
 
        // allocate two fresh port identifiers
 
        let [o, i] = [new_cid(), new_cid()];
 
        cu.current_state.port_info.insert(
 
        // store info for each:
 
        // - they are each others' peers
 
        // - they are owned by a local component with id `cid`
 
        // - polarity putter, getter respectively
 
        cu.ips.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -525,7 +603,7 @@ impl Connector {
 
                polarity: Putter,
 
            },
 
        );
 
        cu.current_state.port_info.insert(
 
        cu.ips.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -553,19 +631,18 @@ impl Connector {
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        // Check for error cases first before modifying `cu`
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        let cu = &self.unphased;
 
        if let Some(port) = duplicate_port(ports) {
 
            return Err(Ace::DuplicatePort(port));
 
        }
 
        let cu = &mut self.unphased;
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            let info = cu.ips.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
@@ -573,13 +650,15 @@ impl Connector {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // 2. add new component
 
        let new_cid = cu.current_state.id_manager.new_component_id();
 
        // No errors! Time to modify `cu`
 
        // create a new component and identifier
 
        let cu = &mut self.unphased;
 
        let new_cid = cu.ips.id_manager.new_component_id();
 
        cu.proto_components
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        // 3. update port ownership
 
        // update the ownership of moved ports
 
        for port in ports.iter() {
 
            match cu.current_state.port_info.get_mut(port) {
 
            match cu.ips.port_info.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
@@ -598,6 +677,7 @@ impl Predicate {
 
        self
 
    }
 

	
 
    // Return true whether `self` is a subset of `maybe_superset`
 
    pub fn assigns_subset(&self, maybe_superset: &Self) -> bool {
 
        for (var, val) in self.assigned.iter() {
 
            match maybe_superset.assigned.get(var) {
 
@@ -605,41 +685,35 @@ impl Predicate {
 
                _ => return false, // var unmapped, or mapped differently
 
            }
 
        }
 
        // `maybe_superset` mirrored all my assignments!
 
        true
 
    }
 

	
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    // pub fn consistent_with(&self, other: &Self) -> bool {
 
    //     let [larger, smaller] =
 
    //         if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] };
 

	
 
    //     for (var, val) in smaller.assigned.iter() {
 
    //         match larger.assigned.get(var) {
 
    //             Some(val2) if val2 != val => return false,
 
    //             _ => {}
 
    //         }
 
    //     }
 
    //     true
 
    // }
 

	
 
    /// Given self and other, two predicates, return the predicate whose
 
    /// assignments are the union of those of self and other.
 
    /// Given the two predicates {self, other}, return that whose
 
    /// assignments are the union of those of both.
 
    fn assignment_union(&self, other: &Self) -> AssignmentUnionResult {
 
        use AssignmentUnionResult as Aur;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        // populate lists of assignments in self but not other and vice versa.
 
        // do this by incrementally unfolding the iterators, keeping an eye
 
        // on the ordering between the head elements [s, o].
 
        // whenever s<o, other is certainly missing element 's', etc.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, None] => break, // both iterators are empty
 
                [None, Some(x)] => {
 
                    // self's iterator is empty.
 
                    // all remaning elements are in other but not self
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    // other's iterator is empty.
 
                    // all remaning elements are in self but not other
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
@@ -656,6 +730,7 @@ impl Predicate {
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        // No predicate exists which satisfies both!
 
                        return Aur::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
@@ -681,6 +756,9 @@ impl Predicate {
 
            }
 
        }
 
    }
 

	
 
    // Compute the union of the assignments of the two given predicates, if it exists.
 
    // It doesn't exist if there is some value which the predicates assign to different values.
 
    pub(crate) fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
@@ -695,6 +773,30 @@ impl Predicate {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 

	
 
impl RoundCtx {
 
    // remove an arbitrary buffered message, along with the ID of the getter who receives it
 
    fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.payload_inbox.pop()
 
    }
 

	
 
    // buffer a message along with the ID of the getter who receives it
 
    fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.payload_inbox.push((getter, msg));
 
    }
 

	
 
    // buffer a message along with the ID of the putter who sent it
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.ips.port_info.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
 
            log!(cu.logger(), "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
 

	
 
impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_set().entries(self.vec.iter()).finish()
 
@@ -778,21 +880,3 @@ impl Debug for UdpInBuffer {
 
        write!(f, "UdpInBuffer")
 
    }
 
}
 

	
 
impl RoundCtx {
 
    fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.payload_inbox.pop()
 
    }
 
    fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.payload_inbox.push((getter, msg));
 
    }
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.current_state.port_info.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
 
            log!(cu.logger(), "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -52,7 +52,7 @@ impl Connector {
 
                proto_components: Default::default(),
 
                logger,
 
                native_component_id,
 
                current_state: CurrentState { id_manager, port_info: Default::default() },
 
                ips: IdAndPortState { id_manager, port_info: Default::default() },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
@@ -75,11 +75,11 @@ impl Connector {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.current_state.id_manager.new_component_id();
 
                let mut npid = || cu.current_state.id_manager.new_port_id();
 
                let udp_cid = cu.ips.id_manager.new_component_id();
 
                let mut npid = || cu.ips.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 

	
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
@@ -88,7 +88,7 @@ impl Connector {
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
@@ -97,7 +97,7 @@ impl Connector {
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
@@ -106,7 +106,7 @@ impl Connector {
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
@@ -138,8 +138,8 @@ impl Connector {
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let new_pid = cu.current_state.id_manager.new_port_id();
 
                cu.current_state.port_info.insert(
 
                let new_pid = cu.ips.id_manager.new_port_id();
 
                cu.ips.port_info.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
@@ -190,19 +190,19 @@ impl Connector {
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.current_state.port_info,
 
                    &mut cu.ips.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.current_state.port_info,
 
                    &cu.ips.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.current_state.id_manager.connector_id,
 
                    cu.ips.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
@@ -858,7 +858,7 @@ fn session_optimize(
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {
 
        port_info: cu.current_state.port_info.clone(),
 
        port_info: cu.ips.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
@@ -869,7 +869,7 @@ fn session_optimize(
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.current_state.id_manager.connector_id, my_session_info);
 
    unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
@@ -920,10 +920,8 @@ fn session_optimize(
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info = optimized_map
 
        .get(&cu.current_state.id_manager.connector_id)
 
        .expect("HEY NO INFO FOR ME?")
 
        .clone();
 
    let optimized_info =
 
        optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
@@ -952,7 +950,7 @@ fn apply_optimizations(
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    cu.current_state.port_info = port_info;
 
    cu.ips.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
0 comments (0 inline, 0 general)