Changeset - dedf89df1602
[Not reviewed]
0 9 0
Christopher Esterhuyse - 5 years ago 2020-05-28 11:14:52
christopher.esterhuyse@gmail.com
retiring `Endpoint Key` term in favor of port
9 files changed with 253 insertions and 258 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -24,7 +24,6 @@ pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
// pub type Payload = Vec<u8>;
 
pub type ControllerId = u32;
 
pub type ChannelIndex = u32;
 

	
 
@@ -47,7 +46,6 @@ pub enum Polarity {
 
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone)]
 
#[repr(C)]
 
pub struct Port(pub usize); // ports are COPY
 
pub type Key = Port;
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum MainComponentErr {
 
@@ -64,7 +62,7 @@ pub trait ProtocolDescription: Sized {
 

	
 
    fn parse(pdl: &[u8]) -> Result<Self, String>;
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr>;
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> Self::S;
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> Self::S;
 
}
 

	
 
pub trait ComponentState: Sized + Clone {
 
@@ -93,24 +91,24 @@ pub enum MonoBlocker {
 
pub enum PolyBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(Key),
 
    CouldntCheckFiring(Key),
 
    PutMsg(Key, Payload),
 
    CouldntReadMsg(Port),
 
    CouldntCheckFiring(Port),
 
    PutMsg(Port, Payload),
 
}
 

	
 
pub trait MonoContext {
 
    type D: ProtocolDescription;
 
    type S: ComponentState<D = Self::D>;
 

	
 
    fn new_component(&mut self, moved_keys: HashSet<Key>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Key; 2];
 
    fn new_component(&mut self, moved_ports: HashSet<Port>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Port; 2];
 
    fn new_random(&mut self) -> u64;
 
}
 
pub trait PolyContext {
 
    type D: ProtocolDescription;
 

	
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload>;
 
    fn is_firing(&mut self, port: Port) -> Option<bool>;
 
    fn read_msg(&mut self, port: Port) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
@@ -152,7 +150,7 @@ impl Debug for Port {
 
        write!(f, "Port({})", self.0)
 
    }
 
}
 
impl Key {
 
impl Port {
 
    pub fn from_raw(raw: usize) -> Self {
 
        Self(raw)
 
    }
src/protocol/eval.rs
Show inline comments
 
@@ -886,7 +886,7 @@ impl Display for Value {
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct InputValue(pub Key);
 
pub struct InputValue(pub Port);
 

	
 
impl Display for InputValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
@@ -911,7 +911,7 @@ impl ValueImpl for InputValue {
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct OutputValue(pub Key);
 
pub struct OutputValue(pub Port);
 

	
 
impl Display for OutputValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
src/protocol/inputsource.rs
Show inline comments
 
@@ -21,10 +21,10 @@ primitive forward(in i, out o) {
 
primitive sync(in i, out o) {
 
    while(true) synchronous() if(fires(i)) put(o, get(i));
 
}
 
primitive alternator_2(in i, out a, out b) {
 
primitive alternator_2(in i, out l, out r) {
 
    while(true) {
 
        synchronous() put(a, get(i));
 
        synchronous() put(b, get(i));
 
        synchronous() put(l, get(i));
 
        synchronous() put(r, get(i));
 
    }
 
}
 
primitive replicator_2(in i, out l, out r) {
 
@@ -43,7 +43,7 @@ primitive merger_2(in l, in r, out o) {
 

	
 
impl InputSource {
 
    // Constructors
 
    pub fn new<A: io::Read, S: ToString>(filename: S, reader: &mut A) -> io::Result<InputSource> {
 
    pub fn new<R: io::Read, S: ToString>(filename: S, reader: &mut R) -> io::Result<InputSource> {
 
        let mut vec = STD_LIB_PDL.to_vec();
 
        reader.read_to_end(&mut vec)?;
 
        Ok(InputSource {
src/protocol/mod.rs
Show inline comments
 
@@ -80,7 +80,7 @@ impl ProtocolDescription for ProtocolDescriptionImpl {
 
        }
 
        Ok(result)
 
    }
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> ComponentStateImpl {
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> ComponentStateImpl {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) {
 
            match y {
 
@@ -160,31 +160,31 @@ impl ComponentState for ComponentStateImpl {
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _) => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        Value::Output(OutputValue(port)) => {
 
                            return PolyBlocker::CouldntCheckFiring(port);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        Value::Input(InputValue(port)) => {
 
                            return PolyBlocker::CouldntCheckFiring(port);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::BlockGet(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        Value::Output(OutputValue(port)) => {
 
                            return PolyBlocker::CouldntReadMsg(port);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        Value::Input(InputValue(port)) => {
 
                            return PolyBlocker::CouldntReadMsg(port);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let key;
 
                        let value;
 
                        match port {
 
                            Value::Output(OutputValue(the_key)) => {
 
                                key = the_key;
 
                            Value::Output(OutputValue(port_value)) => {
 
                                value = port_value;
 
                            }
 
                            Value::Input(InputValue(the_key)) => {
 
                                key = the_key;
 
                            Value::Input(InputValue(port_value)) => {
 
                                value = port_value;
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
@@ -200,7 +200,7 @@ impl ComponentState for ComponentStateImpl {
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return PolyBlocker::PutMsg(key, payload);
 
                        return PolyBlocker::PutMsg(value, payload);
 
                    }
 
                },
 
            }
 
@@ -214,30 +214,30 @@ pub enum EvalContext<'a> {
 
    None,
 
}
 
impl EvalContext<'_> {
 
    fn random(&mut self) -> LongValue {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => todo!(),
 
            EvalContext::Poly(_) => unreachable!(),
 
        }
 
    }
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
 
    //         EvalContext::None => unreachable!(),
 
    //         EvalContext::Mono(_context) => todo!(),
 
    //         EvalContext::Poly(_) => unreachable!(),
 
    //     }
 
    // }
 
    fn new_component(&mut self, args: &[Value], init_state: ComponentStateImpl) -> () {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => {
 
                let mut moved_keys = HashSet::new();
 
                let mut moved_ports = HashSet::new();
 
                for arg in args.iter() {
 
                    match arg {
 
                        Value::Output(OutputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        Value::Output(OutputValue(port)) => {
 
                            moved_ports.insert(*port);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            moved_keys.insert(*key);
 
                        Value::Input(InputValue(port)) => {
 
                            moved_ports.insert(*port);
 
                        }
 
                        _ => {}
 
                    }
 
                }
 
                context.new_component(moved_keys, init_state)
 
                context.new_component(moved_ports, init_state)
 
            }
 
            EvalContext::Poly(_) => unreachable!(),
 
        }
 
@@ -259,8 +259,8 @@ impl EvalContext<'_> {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(_) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => context.is_firing(key).map(Value::from),
 
                Value::Input(InputValue(key)) => context.is_firing(key).map(Value::from),
 
                Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from),
 
                Value::Input(InputValue(port)) => context.is_firing(port).map(Value::from),
 
                _ => unreachable!(),
 
            },
 
        }
 
@@ -270,10 +270,12 @@ impl EvalContext<'_> {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(_) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => {
 
                    context.read_msg(key).map(Value::receive_message)
 
                Value::Output(OutputValue(port)) => {
 
                    context.read_msg(port).map(Value::receive_message)
 
                }
 
                Value::Input(InputValue(port)) => {
 
                    context.read_msg(port).map(Value::receive_message)
 
                }
 
                Value::Input(InputValue(key)) => context.read_msg(key).map(Value::receive_message),
 
                _ => unreachable!(),
 
            },
 
        }
src/runtime/actors.rs
Show inline comments
 
@@ -3,37 +3,37 @@ use crate::runtime::{endpoint::*, *};
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct MonoN {
 
    pub ekeys: HashSet<Key>,
 
    pub result: Option<(usize, HashMap<Key, Payload>)>,
 
    pub ports: HashSet<Port>,
 
    pub result: Option<(usize, HashMap<Port, Payload>)>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyN {
 
    pub ekeys: HashSet<Key>,
 
    pub ports: HashSet<Port>,
 
    pub branches: HashMap<Predicate, BranchN>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchN {
 
    pub to_get: HashSet<Key>,
 
    pub gotten: HashMap<Key, Payload>,
 
    pub to_get: HashSet<Port>,
 
    pub gotten: HashMap<Port, Payload>,
 
    pub sync_batch_index: usize,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct MonoP {
 
    pub state: ProtocolS,
 
    pub ekeys: HashSet<Key>,
 
    pub ports: HashSet<Port>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyP {
 
    pub incomplete: HashMap<Predicate, BranchP>,
 
    pub complete: HashMap<Predicate, BranchP>,
 
    pub ekeys: HashSet<Key>,
 
    pub ports: HashSet<Port>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchP {
 
    pub blocking_on: Option<Key>,
 
    pub outbox: HashMap<Key, Payload>,
 
    pub inbox: HashMap<Key, Payload>,
 
    pub blocking_on: Option<Port>,
 
    pub outbox: HashMap<Port, Payload>,
 
    pub inbox: HashMap<Port, Payload>,
 
    pub state: ProtocolS,
 
}
 

	
 
@@ -60,7 +60,7 @@ impl PolyP {
 
        'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
                ports: &self.ports,
 
                predicate: &predicate,
 
                inbox: &branch.inbox,
 
            };
 
@@ -75,10 +75,10 @@ impl PolyP {
 
            );
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
                Sb::CouldntReadMsg(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                Sb::CouldntReadMsg(port) => {
 
                    assert!(self.ports.contains(&port));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
                    log!(
 
                        &mut r_ctx.m_ctx.inner.logger,
 
                        "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
@@ -90,17 +90,17 @@ impl PolyP {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
                        log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,);
 
                        branch.blocking_on = Some(ekey);
 
                        branch.blocking_on = Some(port);
 
                        self.incomplete.insert(predicate, branch);
 
                    } else {
 
                        log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,);
 
                    }
 
                    // ELSE DROP
 
                }
 
                Sb::CouldntCheckFiring(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                Sb::CouldntCheckFiring(port) => {
 
                    assert!(self.ports.contains(&port));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
                    // split the branch!
 
                    let branch_f = branch.clone();
 
                    let mut predicate_f = predicate.clone();
 
@@ -121,10 +121,10 @@ impl PolyP {
 
                    );
 
                    // come up with the predicate for this local solution
 

	
 
                    for ekey in self.ekeys.iter() {
 
                        let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id;
 
                    for port in self.ports.iter() {
 
                        let channel_id = endpoint_exts.get(*port).unwrap().info.channel_id;
 
                        let fired =
 
                            branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey);
 
                            branch.inbox.contains_key(port) || branch.outbox.contains_key(port);
 
                        match predicate.query(channel_id) {
 
                            Some(true) => {
 
                                if !fired {
 
@@ -142,8 +142,8 @@ impl PolyP {
 
                                    println!(
 
                                        "pred {:#?} in {:#?} out {:#?}",
 
                                        &predicate,
 
                                        branch.inbox.get(ekey),
 
                                        branch.outbox.get(ekey)
 
                                        branch.inbox.get(port),
 
                                        branch.outbox.get(port)
 
                                    );
 
                                    panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had Some(false)!" ,channel_id)
 
                                }
 
@@ -154,8 +154,8 @@ impl PolyP {
 
                                    println!(
 
                                        "pred {:#?} in {:#?} out {:#?}",
 
                                        &predicate,
 
                                        branch.inbox.get(ekey),
 
                                        branch.outbox.get(ekey)
 
                                        branch.inbox.get(port),
 
                                        branch.outbox.get(port)
 
                                    );
 
                                    panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had None!" ,channel_id)
 
                                }
 
@@ -170,12 +170,12 @@ impl PolyP {
 
                    );
 
                    self.complete.insert(predicate, branch);
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                Sb::PutMsg(port, payload) => {
 
                    assert!(self.ports.contains(&port));
 
                    let EndpointExt { info, endpoint } =
 
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
 
                        m_ctx.inner.endpoint_exts.get_mut(port).unwrap();
 
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
 
                        branch.outbox.insert(ekey, payload.clone());
 
                        branch.outbox.insert(port, payload.clone());
 
                        let msg = CommMsgContents::SendPayload {
 
                            payload_predicate: predicate.clone(),
 
                            payload,
 
@@ -185,7 +185,7 @@ impl PolyP {
 
                            &mut m_ctx.inner.logger,
 
                            "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!",
 
                            &msg,
 
                            ekey,
 
                            port,
 
                            (info.channel_id.controller_id, info.channel_id.channel_index),
 
                        );
 
                        endpoint.send(msg)?;
 
@@ -211,7 +211,7 @@ impl PolyP {
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        ekey: Key,
 
        port: Port,
 
        payload_predicate: Predicate,
 
        payload: Payload,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
@@ -233,8 +233,8 @@ impl PolyP {
 
                "... poly_recv_run matched running machine exactly! pred is {:?}",
 
                &payload_predicate
 
            );
 
            branch.inbox.insert(ekey, payload);
 
            if branch.blocking_on == Some(ekey) {
 
            branch.inbox.insert(port, payload);
 
            if branch.blocking_on == Some(port) {
 
                branch.blocking_on = None;
 
                vec![(payload_predicate, branch)]
 
            } else {
 
@@ -260,12 +260,12 @@ impl PolyP {
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 

	
 
                            if let Some(prev_payload) = branch.inbox.get(&ekey) {
 
                            if let Some(prev_payload) = branch.inbox.get(&port) {
 
                                // Incorrect to receive two distinct messages in same branch!
 
                                assert_eq!(prev_payload, &payload);
 
                            }
 
                            branch.inbox.insert(ekey, payload.clone());
 
                            if branch.blocking_on == Some(ekey) {
 
                            branch.inbox.insert(port, payload.clone());
 
                            if branch.blocking_on == Some(port) {
 
                                // run.
 
                                branch.blocking_on = None;
 
                                Some((old_predicate, branch))
 
@@ -285,15 +285,15 @@ impl PolyP {
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            if let Some(prev_payload) = payload_branch.inbox.get(&ekey) {
 
                            if let Some(prev_payload) = payload_branch.inbox.get(&port) {
 
                                // Incorrect to receive two distinct messages in same branch!
 
                                assert_eq!(prev_payload, &payload);
 
                            }
 
                            payload_branch.inbox.insert(ekey, payload.clone());
 
                            payload_branch.inbox.insert(port, payload.clone());
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            if payload_branch.blocking_on == Some(ekey) {
 
                            if payload_branch.blocking_on == Some(port) {
 
                                // run the fork
 
                                payload_branch.blocking_on = None;
 
                                Some((new, payload_branch))
 
@@ -312,15 +312,15 @@ impl PolyP {
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            if let Some(prev_payload) = payload_branch.inbox.get(&ekey) {
 
                            if let Some(prev_payload) = payload_branch.inbox.get(&port) {
 
                                // Incorrect to receive two distinct messages in same branch!
 
                                assert_eq!(prev_payload, &payload);
 
                            }
 
                            payload_branch.inbox.insert(ekey, payload.clone());
 
                            payload_branch.inbox.insert(port, payload.clone());
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate.clone(), branch);
 
                            if payload_branch.blocking_on == Some(ekey) {
 
                            if payload_branch.blocking_on == Some(port) {
 
                                // run the fork
 
                                payload_branch.blocking_on = None;
 
                                Some((payload_predicate.clone(), payload_branch))
 
@@ -359,14 +359,14 @@ impl PolyP {
 
        self.complete
 
            .iter()
 
            .find(|(p, _)| decision.satisfies(p))
 
            .map(|(_, branch)| MonoP { state: branch.state.clone(), ekeys: self.ekeys.clone() })
 
            .map(|(_, branch)| MonoP { state: branch.state.clone(), ports: self.ports.clone() })
 
    }
 
}
 

	
 
impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
        ekey: Key,
 
        port: Port,
 
        logger: &mut String,
 
        payload: Payload,
 
        payload_predicate: Predicate,
 
@@ -399,16 +399,16 @@ impl PolyN {
 
                Csr::Nonexistant => { /* skip branch */ }
 
                Csr::LatterNotFormer | Csr::Equivalent => {
 
                    // Feed the message to this branch in-place. no need to modify pred.
 
                    if branch.to_get.remove(&ekey) {
 
                        branch.gotten.insert(ekey, payload.clone());
 
                    if branch.to_get.remove(&port) {
 
                        branch.gotten.insert(port, payload.clone());
 
                        report_if_solution(&branch, &old_predicate, logger);
 
                    }
 
                }
 
                Csr::FormerNotLatter => {
 
                    // create a new branch with the payload_predicate.
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                    if forked.to_get.remove(&port) {
 
                        forked.gotten.insert(port, payload.clone());
 
                        report_if_solution(&forked, &payload_predicate, logger);
 
                        branches2.insert(payload_predicate.clone(), forked);
 
                    }
 
@@ -416,8 +416,8 @@ impl PolyN {
 
                Csr::New(new) => {
 
                    // create a new branch with the newly-created predicate
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                    if forked.to_get.remove(&port) {
 
                        forked.gotten.insert(port, payload.clone());
 
                        report_if_solution(&forked, &new, logger);
 
                        branches2.insert(new.clone(), forked);
 
                    }
 
@@ -436,7 +436,7 @@ impl PolyN {
 
            .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p))
 
            .map(|(_, branch)| {
 
                let BranchN { gotten, sync_batch_index, .. } = branch.clone();
 
                MonoN { ekeys: self.ekeys.clone(), result: Some((sync_batch_index, gotten)) }
 
                MonoN { ports: self.ports.clone(), result: Some((sync_batch_index, gotten)) }
 
            })
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -28,16 +28,16 @@ impl Controller {
 
            Decision::Failure => Err(SyncErr::Timeout),
 
        };
 
        let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
        for &child_port in self.inner.family.children_ports.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with ekey {:?}",
 
                "Forwarding {:?} to child with port {:?}",
 
                &announcement,
 
                child_ekey
 
                child_port
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .get_mut(child_port)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
@@ -49,20 +49,20 @@ impl Controller {
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
        if let Some(parent_port) = self.inner.family.parent_port {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
                &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            assert!(self.inner.family.parent_port.is_none());
 
            let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(predicate) = maybe_predicate {
 
                let decision = Decision::Success(predicate);
 
@@ -79,19 +79,19 @@ impl Controller {
 
        &mut self,
 
        sync_batches: impl Iterator<Item = SyncBatch>,
 
    ) -> Result<PolyN, EndpointErr> {
 
        let MonoN { ekeys, .. } = self.inner.mono_n.clone();
 
        let MonoN { ports, .. } = self.inner.mono_n.clone();
 
        let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
        let mut branches = HashMap::<_, _>::default();
 
        for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
            let ekey_to_channel_id = |ekey| endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            let all_ekeys = ekeys.iter().copied();
 
            let all_channel_ids = all_ekeys.map(ekey_to_channel_id);
 
            let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id;
 
            let all_ports = ports.iter().copied();
 
            let all_channel_ids = all_ports.map(port_to_channel_id);
 

	
 
            let mut predicate = Predicate::new_trivial();
 

	
 
            // assign TRUE for puts and gets
 
            let true_ekeys = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id);
 
            let true_ports = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ports.clone().map(port_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
            // assign FALSE for all in interface not assigned true
 
@@ -106,7 +106,7 @@ impl Controller {
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
            for (port, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
@@ -117,7 +117,7 @@ impl Controller {
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
                endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
@@ -134,7 +134,7 @@ impl Controller {
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
        Ok(PolyN { ports, branches })
 
    }
 
    pub fn sync_round(
 
        &mut self,
 
@@ -178,7 +178,7 @@ impl Controller {
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
        while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                ports: &mut mono_p.ports,
 
                mono_ps: &mut self.ephemeral.mono_ps,
 
                inner: &mut self.inner,
 
            };
 
@@ -197,25 +197,25 @@ impl Controller {
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from ekey -> actor
 
        // 3. define the mapping from port -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let ekey_to_holder: HashMap<Key, PolyId> = {
 
        let port_to_holder: HashMap<Port, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.ekeys.iter().map(move |&e| (e, N));
 
            let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index })));
 
                .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}",
 
            &ekey_to_holder
 
            "SET OF PolyPs and MonoPs final! port lookup map is {:?}",
 
            &port_to_holder
 
        );
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
@@ -226,9 +226,9 @@ impl Controller {
 
            let c = self
 
                .inner
 
                .family
 
                .children_ekeys
 
                .children_ports
 
                .iter()
 
                .map(|&ekey| SubtreeId::ChildController { ekey });
 
                .map(|&port| SubtreeId::ChildController { port });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            log!(
 
                &mut self.inner.logger,
 
@@ -309,12 +309,12 @@ impl Controller {
 
                        // timed out! send a FAILURE message to the sink,
 
                        // and henceforth don't time out on polling.
 
                        deadline = None;
 
                        match self.inner.family.parent_ekey {
 
                        match self.inner.family.parent_port {
 
                            None => {
 
                                // I am the sink! announce failure and return.
 
                                return self.end_round_with_decision(Decision::Failure);
 
                            }
 
                            Some(parent_ekey) => {
 
                            Some(parent_port) => {
 
                                // I am not the sink! send a failure message.
 
                                let announcement = Msg::CommMsg(CommMsg {
 
                                    round_index: self.inner.round_index,
 
@@ -322,13 +322,13 @@ impl Controller {
 
                                });
 
                                log!(
 
                                    &mut self.inner.logger,
 
                                    "Forwarding {:?} to parent with ekey {:?}",
 
                                    "Forwarding {:?} to parent with port {:?}",
 
                                    &announcement,
 
                                    parent_ekey
 
                                    parent_port
 
                                );
 
                                self.inner
 
                                    .endpoint_exts
 
                                    .get_mut(parent_ekey)
 
                                    .get_mut(parent_port)
 
                                    .expect("ss")
 
                                    .endpoint
 
                                    .send(announcement.clone())?;
 
@@ -364,7 +364,7 @@ impl Controller {
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "... its a round-appropriate CommMsg with key {:?}",
 
                        "... its a round-appropriate CommMsg with port {:?}",
 
                        received.recipient
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
@@ -372,21 +372,21 @@ impl Controller {
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Failure => match self.inner.family.parent_ekey {
 
                    Some(parent_ekey) => {
 
                CommMsgContents::Failure => match self.inner.family.parent_port {
 
                    Some(parent_port) => {
 
                        let announcement = Msg::CommMsg(CommMsg {
 
                            round_index: self.inner.round_index,
 
                            contents: CommMsgContents::Failure,
 
                        });
 
                        log!(
 
                            &mut self.inner.logger,
 
                            "Forwarding {:?} to parent with ekey {:?}",
 
                            "Forwarding {:?} to parent with port {:?}",
 
                            &announcement,
 
                            parent_ekey
 
                            parent_port
 
                        );
 
                        self.inner
 
                            .endpoint_exts
 
                            .get_mut(parent_ekey)
 
                            .get_mut(parent_port)
 
                            .expect("ss")
 
                            .endpoint
 
                            .send(announcement.clone())?;
 
@@ -395,10 +395,10 @@ impl Controller {
 
                },
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ekeys.contains(&received.recipient) {
 
                    if !self.inner.family.children_ports.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { ekey: received.recipient };
 
                    let subtree_id = SubtreeId::ChildController { port: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
@@ -415,7 +415,7 @@ impl Controller {
 
                    }
 
                }
 
                CommMsgContents::Announce { decision } => {
 
                    if self.inner.family.parent_ekey != Some(received.recipient) {
 
                    if self.inner.family.parent_port != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    log!(
 
@@ -435,7 +435,7 @@ impl Controller {
 

	
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = ekey_to_holder.get(&received.recipient);
 
                    let subtree_id = port_to_holder.get(&received.recipient);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
@@ -530,13 +530,13 @@ impl ControllerEphemeral {
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.mono_ps.is_empty()
 
            && self.ekey_to_holder.is_empty()
 
            && self.port_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.ekey_to_holder.clear();
 
        self.port_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
@@ -552,7 +552,7 @@ impl Into<PolyP> for MonoP {
 
                    blocking_on: None,
 
                }
 
            },
 
            ekeys: self.ekeys,
 
            ports: self.ports,
 
        }
 
    }
 
}
 
@@ -566,41 +566,41 @@ impl From<EndpointErr> for SyncErr {
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ekeys: HashSet<Key>, init_state: Self::S) {
 
    fn new_component(&mut self, moved_ports: HashSet<Port>, init_state: Self::S) {
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ekeys {:?}!",
 
            &moved_ekeys,
 
            "!! MonoContext callback to new_component with ports {:?}!",
 
            &moved_ports,
 
        );
 
        if moved_ekeys.is_subset(self.ekeys) {
 
            self.ekeys.retain(|x| !moved_ekeys.contains(x));
 
            self.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys });
 
        if moved_ports.is_subset(self.ports) {
 
            self.ports.retain(|x| !moved_ports.contains(x));
 
            self.mono_ps.push(MonoP { state: init_state, ports: moved_ports });
 
        } else {
 
            panic!("MachineP attempting to move alien ekey!");
 
            panic!("MachineP attempting to move alien port!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Key; 2] {
 
    fn new_channel(&mut self) -> [Port; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 

	
 
        let mut clos = |endpoint, polarity| {
 
            let endpoint_ext =
 
                EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint };
 
            let ekey = self.inner.endpoint_exts.alloc(endpoint_ext);
 
            let endpoint = &self.inner.endpoint_exts.get(ekey).unwrap().endpoint;
 
            let token = Key::to_token(ekey);
 
            let port = self.inner.endpoint_exts.alloc(endpoint_ext);
 
            let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint;
 
            let token = Port::to_token(port);
 
            self.inner
 
                .messenger_state
 
                .poll
 
                .register(endpoint, token, Ready::readable(), PollOpt::edge())
 
                .expect("AAGAGGGGG");
 
            self.ekeys.insert(ekey);
 
            ekey
 
            self.ports.insert(port);
 
            port
 
        };
 
        let [kp, kg] = [clos(a, Putter), clos(b, Getter)];
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_channel. returning ekeys {:?}!",
 
            "!! MonoContext callback to new_channel. returning ports {:?}!",
 
            [kp, kg],
 
        );
 
        [kp, kg]
 
@@ -713,9 +713,9 @@ impl SolutionStorage {
 
impl PolyContext for BranchPContext<'_, '_> {
 
    type D = ProtocolD;
 

	
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
    fn is_firing(&mut self, port: Port) -> Option<bool> {
 
        assert!(self.ports.contains(&port));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
@@ -725,9 +725,9 @@ impl PolyContext for BranchPContext<'_, '_> {
 
        );
 
        val
 
    }
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let val = self.inbox.get(&ekey);
 
    fn read_msg(&mut self, port: Port) -> Option<&Payload> {
 
        assert!(self.ports.contains(&port));
 
        let val = self.inbox.get(&port);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to read_msg by {:?}! returning {:?}",
src/runtime/connector.rs
Show inline comments
 
@@ -113,16 +113,16 @@ impl Connector {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
        let (port, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Putter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.puts.contains_key(&ekey) {
 
        if sync_batch.puts.contains_key(&port) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.puts.insert(ekey, payload);
 
        sync_batch.puts.insert(port, payload);
 
        Ok(())
 
    }
 

	
 
@@ -132,16 +132,16 @@ impl Connector {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
        let (port, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.gets.contains(&ekey) {
 
        if sync_batch.gets.contains(&port) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.gets.insert(ekey);
 
        sync_batch.gets.insert(port);
 
        Ok(())
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, ()> {
src/runtime/mod.rs
Show inline comments
 
@@ -34,8 +34,8 @@ pub(crate) struct Predicate {
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Key, Payload>,
 
    gets: HashSet<Key>,
 
    puts: HashMap<Port, Payload>,
 
    gets: HashSet<Port>,
 
}
 

	
 
#[derive(Debug)]
 
@@ -59,7 +59,7 @@ pub struct Configured {
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    native_interface: Vec<(Port, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 
@@ -78,7 +78,7 @@ struct Arena<T> {
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Key,
 
    recipient: Port,
 
    msg: Msg,
 
}
 

	
 
@@ -88,7 +88,7 @@ struct MessengerState {
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Key>,
 
    polled_undrained: IndexSet<Port>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
@@ -122,13 +122,13 @@ struct ControllerEphemeral {
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    mono_ps: Vec<MonoP>,
 
    ekey_to_holder: HashMap<Key, PolyId>,
 
    port_to_holder: HashMap<Port, PolyId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_ekey: Option<Key>,
 
    children_ekeys: Vec<Key>,
 
    parent_port: Option<Port>,
 
    children_ports: Vec<Port>,
 
}
 

	
 
#[derive(Debug)]
 
@@ -149,12 +149,12 @@ enum PolyId {
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { ekey: Key },
 
    ChildController { port: Port },
 
}
 

	
 
pub(crate) struct MonoPContext<'a> {
 
    inner: &'a mut ControllerInner,
 
    ekeys: &'a mut HashSet<Key>,
 
    ports: &'a mut HashSet<Port>,
 
    mono_ps: &'a mut Vec<MonoP>,
 
}
 
pub(crate) struct PolyPContext<'a> {
 
@@ -171,9 +171,9 @@ impl PolyPContext<'_> {
 
}
 
struct BranchPContext<'m, 'r> {
 
    m_ctx: PolyPContext<'m>,
 
    ekeys: &'r HashSet<Key>,
 
    ports: &'r HashSet<Port>,
 
    predicate: &'r Predicate,
 
    inbox: &'r HashMap<Key, Payload>,
 
    inbox: &'r HashMap<Port, Payload>,
 
}
 

	
 
#[derive(Default)]
 
@@ -187,7 +187,7 @@ pub(crate) struct SolutionStorage {
 

	
 
trait Messengerlike {
 
    fn get_state_mut(&mut self) -> &mut MessengerState;
 
    fn get_endpoint_mut(&mut self, eekey: Key) -> &mut Endpoint;
 
    fn get_endpoint_mut(&mut self, eport: Port) -> &mut Endpoint;
 

	
 
    fn delay(&mut self, received: ReceivedMsg) {
 
        self.get_state_mut().delayed.push(received);
 
@@ -197,7 +197,7 @@ trait Messengerlike {
 
        undelayed.extend(delayed.drain(..))
 
    }
 

	
 
    fn send(&mut self, to: Key, msg: Msg) -> Result<(), EndpointErr> {
 
    fn send(&mut self, to: Port, msg: Msg) -> Result<(), EndpointErr> {
 
        self.get_endpoint_mut(to).send(msg)
 
    }
 

	
 
@@ -210,15 +210,15 @@ trait Messengerlike {
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eekey) = self.get_state_mut().polled_undrained.pop() {
 
            while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self
 
                    .get_endpoint_mut(eekey)
 
                    .get_endpoint_mut(eport)
 
                    .recv()
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))?
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
                {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eekey);
 
                    return Ok(Some(ReceivedMsg { recipient: eekey, msg }));
 
                    self.get_state_mut().polled_undrained.insert(eport);
 
                    return Ok(Some(ReceivedMsg { recipient: eport, msg }));
 
                }
 
            }
 

	
 
@@ -226,7 +226,7 @@ trait Messengerlike {
 
            match state.poll_events(deadline) {
 
                Ok(()) => {
 
                    for e in state.events.iter() {
 
                        state.polled_undrained.insert(Key::from_token(e.token()));
 
                        state.polled_undrained.insert(Port::from_token(e.token()));
 
                    }
 
                }
 
                Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
@@ -242,15 +242,15 @@ trait Messengerlike {
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eekey) = self.get_state_mut().polled_undrained.pop() {
 
            while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self
 
                    .get_endpoint_mut(eekey)
 
                    .get_endpoint_mut(eport)
 
                    .recv()
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))?
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
                {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eekey);
 
                    return Ok(ReceivedMsg { recipient: eekey, msg });
 
                    self.get_state_mut().polled_undrained.insert(eport);
 
                    return Ok(ReceivedMsg { recipient: eport, msg });
 
                }
 
            }
 

	
 
@@ -261,7 +261,7 @@ trait Messengerlike {
 
                .poll(&mut state.events, None)
 
                .map_err(|_| MessengerRecvErr::PollingFailed)?;
 
            for e in state.events.iter() {
 
                state.polled_undrained.insert(Key::from_token(e.token()));
 
                state.polled_undrained.insert(Port::from_token(e.token()));
 
            }
 
        }
 
    }
 
@@ -293,38 +293,33 @@ impl From<MessengerRecvErr> for ConnectErr {
 
        ConnectErr::MessengerRecvErr(e)
 
    }
 
}
 
// impl From<EndpointErr> for MessengerRecvErr {
 
//     fn from(e: EndpointErr) -> MessengerRecvErr {
 
//         MessengerRecvErr::EndpointErr(e)
 
//     }
 
// }
 
impl<T> Default for Arena<T> {
 
    fn default() -> Self {
 
        Self { storage: vec![] }
 
    }
 
}
 
impl<T> Arena<T> {
 
    pub fn alloc(&mut self, t: T) -> Key {
 
    pub fn alloc(&mut self, t: T) -> Port {
 
        self.storage.push(t);
 
        Key::from_raw(self.storage.len() - 1)
 
        Port::from_raw(self.storage.len() - 1)
 
    }
 
    pub fn get(&self, key: Key) -> Option<&T> {
 
    pub fn get(&self, key: Port) -> Option<&T> {
 
        self.storage.get(key.to_raw() as usize)
 
    }
 
    pub fn get_mut(&mut self, key: Key) -> Option<&mut T> {
 
    pub fn get_mut(&mut self, key: Port) -> Option<&mut T> {
 
        self.storage.get_mut(key.to_raw() as usize)
 
    }
 
    pub fn type_convert<X>(self, f: impl FnMut((Key, T)) -> X) -> Arena<X> {
 
    pub fn type_convert<X>(self, f: impl FnMut((Port, T)) -> X) -> Arena<X> {
 
        Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() }
 
    }
 
    pub fn iter(&self) -> impl Iterator<Item = (Key, &T)> {
 
    pub fn iter(&self) -> impl Iterator<Item = (Port, &T)> {
 
        self.keyspace().zip(self.storage.iter())
 
    }
 
    pub fn len(&self) -> usize {
 
        self.storage.len()
 
    }
 
    pub fn keyspace(&self) -> impl Iterator<Item = Key> {
 
        (0..self.storage.len()).map(Key::from_raw)
 
    pub fn keyspace(&self) -> impl Iterator<Item = Port> {
 
        (0..self.storage.len()).map(Port::from_raw)
 
    }
 
}
 

	
src/runtime/setup.rs
Show inline comments
 
@@ -25,7 +25,7 @@ impl Controller {
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        logger: &mut String,
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
    ) -> Result<(Self, Vec<(Port, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 
@@ -33,15 +33,15 @@ impl Controller {
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 

	
 
        let mut ekeys_native = vec![];
 
        let mut ekeys_proto = vec![];
 
        let mut ekeys_network = vec![];
 
        let mut ports_native = vec![];
 
        let mut ports_proto = vec![];
 
        let mut ports_network = vec![];
 

	
 
        let mut native_interface = vec![];
 

	
 
        /*
 
        1.  - allocate an EndpointExtTodo for every native and interface port
 
            - store all the resulting keys in two keylists for the interfaces of the native and proto components
 
            - store all the resulting ports in two portlists for the interfaces of the native and proto components
 
                native: [a, c,    f]
 
                         |  |     |
 
                         |  |     |
 
@@ -53,44 +53,44 @@ impl Controller {
 
            match binding {
 
                PortBinding::Native => {
 
                    let channel_id = channel_id_stream.next();
 
                    let ([ekey_native, ekey_proto], native_polarity) = {
 
                    let ([port_native, port_proto], native_polarity) = {
 
                        let [p, g] = Endpoint::new_memory_pair();
 
                        let mut endpoint_to_key = |endpoint, polarity| {
 
                        let mut endpoint_to_port = |endpoint, polarity| {
 
                            endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt {
 
                                endpoint,
 
                                info: EndpointInfo { polarity, channel_id },
 
                            }))
 
                        };
 
                        let pkey = endpoint_to_key(p, Putter);
 
                        let gkey = endpoint_to_key(g, Getter);
 
                        let key_pair = match polarity {
 
                            Putter => [gkey, pkey],
 
                            Getter => [pkey, gkey],
 
                        let pport = endpoint_to_port(p, Putter);
 
                        let gport = endpoint_to_port(g, Getter);
 
                        let port_pair = match polarity {
 
                            Putter => [gport, pport],
 
                            Getter => [pport, gport],
 
                        };
 
                        (key_pair, !polarity)
 
                        (port_pair, !polarity)
 
                    };
 
                    native_interface.push((ekey_native, native_polarity));
 
                    ekeys_native.push(ekey_native);
 
                    ekeys_proto.push(ekey_proto);
 
                    native_interface.push((port_native, native_polarity));
 
                    ports_native.push(port_native);
 
                    ports_proto.push(port_proto);
 
                }
 
                PortBinding::Passive(addr) => {
 
                    let channel_id = channel_id_stream.next();
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting {
 
                    let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting {
 
                        addr,
 
                        info: EndpointInfo { polarity, channel_id },
 
                        listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?,
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                    ports_network.push(port_proto);
 
                    ports_proto.push(port_proto);
 
                }
 
                PortBinding::Active(addr) => {
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting {
 
                    let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting {
 
                        addr,
 
                        polarity,
 
                        stream: TcpStream::connect(&addr).unwrap(),
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                    ports_network.push(port_proto);
 
                    ports_proto.push(port_proto);
 
                }
 
            }
 
        }
 
@@ -100,10 +100,10 @@ impl Controller {
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None };
 
        let n_mono = MonoN { ports: ports_native.into_iter().collect(), result: None };
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(main_component, &ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
            state: protocol_description.new_main_component(main_component, &ports_proto),
 
            ports: ports_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
@@ -112,7 +112,7 @@ impl Controller {
 
            logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            ports_network,
 
            deadline,
 
        )?;
 

	
 
@@ -179,22 +179,22 @@ impl Controller {
 
        };
 

	
 
        // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener}
 
        // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`.
 
        // 3. store the portset of EndpointExtTodos which are not Finished in `to_finish`.
 
        let mut to_finish = HashSet::<_>::default();
 
        log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        for (key, t) in endpoint_ext_todos.iter() {
 
            let token = key.to_token();
 
        for (port, t) in endpoint_ext_todos.iter() {
 
            let token = port.to_token();
 
            match t {
 
                ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(),
 
                Finished(EndpointExt { endpoint, .. }) => {
 
                    ms.poll.register(endpoint, token, ready_r, edge)
 
                }
 
                ActiveConnecting { stream, .. } => {
 
                    to_finish.insert(key);
 
                    to_finish.insert(port);
 
                    ms.poll.register(stream, token, ready_w, edge)
 
                }
 
                PassiveAccepting { listener, .. } => {
 
                    to_finish.insert(key);
 
                    to_finish.insert(port);
 
                    ms.poll.register(listener, token, ready_r, edge)
 
                }
 
            }
 
@@ -215,11 +215,11 @@ impl Controller {
 
            for event in ms.events.iter() {
 
                log!(logger, "event {:#?}", event);
 
                let token = event.token();
 
                let ekey = Key::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
                let port = Port::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(port).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
                        polled_undrained_later.insert(ekey);
 
                        polled_undrained_later.insert(port);
 
                    }
 
                    PassiveAccepting { addr, listener, .. } => {
 
                        log!(logger, "{:03?} start PassiveAccepting...", major);
 
@@ -255,7 +255,7 @@ impl Controller {
 
                        });
 
                        res?;
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                        assert!(to_finish.remove(&port));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
@@ -298,11 +298,11 @@ impl Controller {
 
                                        Finished(EndpointExt { info, endpoint })
 
                                    }]
 
                                });
 
                                ms.polled_undrained.insert(ekey);
 
                                assert!(to_finish.remove(&ekey));
 
                                ms.polled_undrained.insert(port);
 
                                assert!(to_finish.remove(&port));
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                                ms.delayed.push(ReceivedMsg { recipient: port, msg });
 
                            }
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveRecving", major);
 
@@ -310,8 +310,8 @@ impl Controller {
 
                }
 
            }
 
        }
 
        for ekey in polled_undrained_later {
 
            ms.polled_undrained.insert(ekey);
 
        for port in polled_undrained_later {
 
            ms.polled_undrained.insert(port);
 
        }
 
        let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo {
 
            Finished(endpoint_ext) => endpoint_ext,
 
@@ -325,7 +325,7 @@ impl Controller {
 
        logger: &mut String,
 
        endpoint_exts: &mut Arena<EndpointExt>,
 
        messenger_state: &mut MessengerState,
 
        neighbors: Vec<Key>,
 
        neighbors: Vec<Port>,
 
        deadline: Instant,
 
    ) -> Result<ControllerFamily, ConnectErr> {
 
        use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*};
 
@@ -337,12 +337,12 @@ impl Controller {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self.1.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
            fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint {
 
                &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint
 
            }
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        // 1. broadcast my ID as the first echo. await reply from all in net_portlist
 
        let echo = S(LeaderEcho { maybe_leader: major });
 
        let mut awaiting = IndexSet::with_capacity(neighbors.len());
 
        for &n in neighbors.iter() {
 
@@ -353,7 +353,7 @@ impl Controller {
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Key> = None;
 
        let mut parent: Option<Port> = None;
 
        let mut my_leader = major;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
@@ -470,7 +470,7 @@ impl Controller {
 
                _ => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        Ok(ControllerFamily { parent_ekey: parent, children_ekeys: children })
 
        Ok(ControllerFamily { parent_port: parent, children_ports: children })
 
    }
 
}
 

	
 
@@ -478,7 +478,7 @@ impl Messengerlike for Controller {
 
    fn get_state_mut(&mut self) -> &mut MessengerState {
 
        &mut self.inner.messenger_state
 
    }
 
    fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
        &mut self.inner.endpoint_exts.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
    fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint {
 
        &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint
 
    }
 
}
0 comments (0 inline, 0 general)