Changeset - 557148ec2325
[Not reviewed]
1 5 1
Christopher Esterhuyse - 5 years ago 2020-04-30 10:03:19
christopher.esterhuyse@gmail.com
removed accidentally pushed dead code
7 files changed with 94 insertions and 38 deletions:
0 comments (0 inline, 0 general)
src/runtime/batches.rs
Show inline comments
 
deleted file
src/runtime/connector.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{errors::*, *};
 

	
 
pub fn random_controller_id() -> ControllerId {
 
    type Bytes8 = [u8; std::mem::size_of::<ControllerId>()];
 
    let mut bytes = Bytes8::default();
 
    getrandom::getrandom(&mut bytes).unwrap();
 
    unsafe { std::mem::transmute::<Bytes8, ControllerId>(bytes) }
 
}
 

	
 
impl Default for Unconfigured {
 
    fn default() -> Self {
 
        let controller_id = random_controller_id();
 
        Self { controller_id }
 
    }
 
}
 
impl Default for Connector {
 
    fn default() -> Self {
 
        Self::Unconfigured(Unconfigured::default())
 
    }
 
}
 
impl Connector {
 
    /// Configure the Connector with the given Pdl description.
 
    pub fn configure(&mut self, pdl: &[u8], main_component: &[u8]) -> Result<(), ConfigErr> {
 
        use ConfigErr::*;
 
        let controller_id = match self {
 
            Connector::Configured(_) => return Err(AlreadyConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Unconfigured(Unconfigured { controller_id }) => *controller_id,
 
        };
 
        let protocol_description = Arc::new(ProtocolD::parse(pdl).map_err(ParseErr)?);
 
        let polarities = protocol_description.component_polarities(main_component)?;
 
        let configured = Configured {
 
            controller_id,
 
            protocol_description,
 
            bindings: Default::default(),
 
            polarities,
 
            main_component: main_component.to_vec(),
 
            logger: "Logger created!\n".into(),
 
        };
 
        *self = Connector::Configured(configured);
 
        Ok(())
 
    }
 

	
 
    /// Bind the (configured) connector's port corresponding to the
 
    pub fn bind_port(
 
        &mut self,
 
        proto_port_index: usize,
 
        binding: PortBinding,
 
    ) -> Result<(), PortBindErr> {
 
        use PortBindErr::*;
 
        match self {
 
            Connector::Unconfigured { .. } => Err(NotConfigured),
 
            Connector::Connected(_) => Err(AlreadyConnected),
 
            Connector::Configured(configured) => {
 
                if configured.polarities.len() <= proto_port_index {
 
                    return Err(IndexOutOfBounds);
 
                }
 
                configured.bindings.insert(proto_port_index, binding);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ConnectErr> {
 
        let deadline = Instant::now() + timeout;
 
        use ConnectErr::*;
 
        let configured = match self {
 
            Connector::Unconfigured { .. } => return Err(NotConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Configured(configured) => configured,
 
        };
 
        // 1. Unwrap bindings or err
 
        let bound_proto_interface: Vec<(_, _)> = configured
 
            .polarities
 
            .iter()
 
            .copied()
 
            .enumerate()
 
            .map(|(native_index, polarity)| {
 
                let binding = configured
 
                    .bindings
 
                    .get(&native_index)
 
                    .copied()
 
                    .ok_or(PortNotBound { native_index })?;
 
                Ok((binding, polarity))
 
            })
 
            .collect::<Result<Vec<(_, _)>, ConnectErr>>()?;
 
        let (controller, native_interface) = Controller::connect(
 
            configured.controller_id,
 
            &configured.main_component,
 
            configured.protocol_description.clone(),
 
            &bound_proto_interface[..],
 
            &mut configured.logger,
 
            deadline,
 
        )?;
 
        *self = Connector::Connected(Connected {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Configured(configured) => Some(&mut configured.logger),
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Putter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.puts.contains_key(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.puts.insert(ekey, payload);
 
        Ok(())
 
    }
 

	
 
    pub fn get(&mut self, native_port_index: usize) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.gets.contains(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.gets.insert(ekey);
 
        Ok(())
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, ()> {
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(()),
 
        };
 
        connected.sync_batches.push(SyncBatch::default());
 
        Ok(connected.sync_batches.len() - 2)
 
    }
 

	
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncErr> {
 
        let deadline = Instant::now() + timeout;
 
        use SyncErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 

	
 
        // do the synchronous round!
 
        let res =
 
            connected.controller.sync_round(Some(deadline), Some(connected.sync_batches.drain(..)));
 
        connected.sync_batches.push(SyncBatch::default());
 
        res?;
 
        Ok(connected.controller.inner.mono_n.result.as_mut().expect("qqqs").0)
 
    }
 

	
 
    pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> {
 
        use ReadGottenErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let &(key, polarity) =
 
            connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let result = connected.controller.inner.mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidNotGet)?;
 
        Ok(payload)
 
    }
 
}
src/runtime/endpoint.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{errors::*, Predicate};
 
use mio::{Evented, PollOpt, Ready};
 

	
 
pub(crate) enum Endpoint {
 
    Memory { s: mio_extras::channel::Sender<Msg>, r: mio_extras::channel::Receiver<Msg> },
 
    Network(NetworkEndpoint),
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct EndpointExt {
 
    pub endpoint: Endpoint,
 
    pub info: EndpointInfo,
 
}
 
#[derive(Debug, Copy, Clone)]
 
pub struct EndpointInfo {
 
    pub polarity: Polarity,
 
    pub channel_id: ChannelId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 

	
 
#[derive(Clone, Debug)]
 
pub(crate) enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug)]
 
pub(crate) enum SetupMsg {
 
    // sent by the passive endpoint to the active endpoint
 
    ChannelSetup { info: EndpointInfo },
 
    LeaderEcho { maybe_leader: ControllerId },
 
    LeaderAnnounce { leader: ControllerId },
 
    YouAreMyParent,
 
}
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 

	
 
#[derive(Clone, Debug)]
 
pub(crate) struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug)]
 
pub(crate) enum CommMsgContents {
 
    SendPayload { payload_predicate: Predicate, payload: Payload },
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Announce { decision: Decision },         // SINKAWAYS
 
}
 

	
 
pub struct NetworkEndpoint {
 
    stream: mio::net::TcpStream,
 
    inbox: Vec<u8>,
 
    outbox: Vec<u8>,
 
}
 

	
 
impl std::fmt::Debug for Endpoint {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
 
        let s = match self {
 
            Endpoint::Memory { .. } => "Memory",
 
            Endpoint::Network(..) => "Network",
 
        };
 
        f.write_fmt(format_args!("Endpoint::{}", s))
 
    }
 
}
 

	
 
impl CommMsgContents {
 
    pub fn into_msg(self, round_index: usize) -> Msg {
 
        Msg::CommMsg(CommMsg { round_index, contents: self })
 
    }
 
}
 

	
 
impl From<EndpointErr> for ConnectErr {
 
    fn from(e: EndpointErr) -> Self {
 
        match e {
 
            EndpointErr::Disconnected => ConnectErr::Disconnected,
 
            EndpointErr::MetaProtocolDeviation => ConnectErr::MetaProtocolDeviation,
 
        }
 
    }
 
}
 
impl Endpoint {
 
    // asymmetric
 
    pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
    //     Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // }
 
    pub(crate) fn from_fresh_stream_and_inbox(stream: mio::net::TcpStream, inbox: Vec<u8>) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox, outbox: vec![] })
 
    }
 

	
 
    // symmetric
 
    pub fn new_memory_pair() -> [Self; 2] {
 
        let (s1, r1) = mio_extras::channel::channel::<Msg>();
 
        let (s2, r2) = mio_extras::channel::channel::<Msg>();
 
        [Self::Memory { s: s1, r: r2 }, Self::Memory { s: s2, r: r1 }]
 
    }
 
    pub fn send(&mut self, msg: Msg) -> Result<(), EndpointErr> {
 
        match self {
 
            Self::Memory { s, .. } => s.send(msg).map_err(|_| EndpointErr::Disconnected),
 
            Self::Network(NetworkEndpoint { stream, outbox, .. }) => {
 
                use crate::runtime::serde::Ser;
 
                outbox.ser(&msg).expect("ser failed");
 
                loop {
 
                    use std::io::Write;
 
                    match stream.write(outbox) {
 
                        Ok(0) => return Ok(()),
 
                        Ok(bytes_written) => {
 
                            outbox.drain(0..bytes_written);
 
                        }
 
                        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
 
                            panic!("sending shouldn't WouldBlock")
 
                        }
 
                        Err(_e) => return Err(EndpointErr::Disconnected),
 
                    }
 
                }
 
            }
 
        }
 
    }
 
    pub fn recv(&mut self) -> Result<Option<Msg>, EndpointErr> {
 
        match self {
 
            Self::Memory { r, .. } => match r.try_recv() {
 
                Ok(msg) => Ok(Some(msg)),
 
                Err(std::sync::mpsc::TryRecvError::Empty) => Ok(None),
 
                Err(std::sync::mpsc::TryRecvError::Disconnected) => Err(EndpointErr::Disconnected),
 
            },
 
            Self::Network(NetworkEndpoint { stream, inbox, .. }) => {
 
                // populate inbox as much as possible
 
                'read_loop: loop {
 
                    use std::io::Read;
 
                    match stream.read_to_end(inbox) {
 
                        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop,
 
                        Ok(0) => break 'read_loop,
 
                        Ok(_) => (),
 
                        Err(_e) => return Err(EndpointErr::Disconnected),
 
                    }
 
                }
 
                use crate::runtime::serde::{De, MonitoredReader};
 
                let mut monitored = MonitoredReader::from(&inbox[..]);
 
                match De::<Msg>::de(&mut monitored) {
 
                    Ok(msg) => {
 
                        let msg_size2 = monitored.bytes_read();
 
                        inbox.drain(0..(msg_size2.try_into().unwrap()));
 
                        Ok(Some(msg))
 
                    }
 
                    Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None),
 
                    Err(_) => Err(EndpointErr::MetaProtocolDeviation),
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
impl Evented for Endpoint {
 
    fn register(
 
        &self,
 
        poll: &Poll,
 
        token: Token,
 
        interest: Ready,
 
        opts: PollOpt,
 
    ) -> Result<(), std::io::Error> {
 
        match self {
 
            Self::Memory { r, .. } => r.register(poll, token, interest, opts),
 
            Self::Network(n) => n.register(poll, token, interest, opts),
 
        }
 
    }
 

	
 
    fn reregister(
 
        &self,
 
        poll: &Poll,
 
        token: Token,
 
        interest: Ready,
 
        opts: PollOpt,
 
    ) -> Result<(), std::io::Error> {
 
        match self {
 
            Self::Memory { r, .. } => r.reregister(poll, token, interest, opts),
 
            Self::Network(n) => n.reregister(poll, token, interest, opts),
 
        }
 
    }
 

	
 
    fn deregister(&self, poll: &Poll) -> Result<(), std::io::Error> {
 
        match self {
 
            Self::Memory { r, .. } => r.deregister(poll),
 
            Self::Network(n) => n.deregister(poll),
 
        }
src/runtime/mod.rs
Show inline comments
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
mod actors;
 
pub(crate) mod communication;
 
pub(crate) mod connector;
 
pub(crate) mod endpoint;
 
pub mod errors;
 
// pub mod experimental;
 
mod serde;
 
pub(crate) mod setup;
 

	
 
pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl;
 
pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl;
 

	
 
use crate::common::*;
 
use actors::*;
 
use endpoint::*;
 
use errors::*;
 

	
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
#[derive(Clone, Eq, PartialEq, Hash)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<ChannelId, bool>,
 
}
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Key, Payload>,
 
    gets: HashSet<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Connector {
 
    Unconfigured(Unconfigured),
 
    Configured(Configured),
 
    Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
    logger: String,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
 
    Passive(SocketAddr),
 
}
 

	
 
#[derive(Debug)]
 
struct Arena<T> {
 
    storage: Vec<T>,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Key,
 
    msg: Msg,
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Key>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
    unrecoverable_error: Option<SyncErr>, // prevents future calls to Sync
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: MonoN,       // state at next round start
 
    mono_ps: Vec<MonoP>, // state at next round start
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    mono_ps: Vec<MonoP>,
 
    ekey_to_holder: HashMap<Key, PolyId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_ekey: Option<Key>,
 
    children_ekeys: Vec<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 

	
 
// Used to identify poly actors
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { ekey: Key },
 
}
 

	
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{
 
    actors::{MonoN, MonoP},
 
    endpoint::*,
 
    errors::*,
 
    *,
 
};
 

	
 
#[derive(Debug)]
 
enum EndpointExtTodo {
 
    Finished(EndpointExt),
 
    ActiveConnecting { addr: SocketAddr, polarity: Polarity, stream: TcpStream },
 
    ActiveRecving { addr: SocketAddr, polarity: Polarity, endpoint: Endpoint },
 
    PassiveAccepting { addr: SocketAddr, info: EndpointInfo, listener: TcpListener },
 
    PassiveConnecting { addr: SocketAddr, info: EndpointInfo, stream: TcpStream },
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Controller {
 
    // Given port bindings and a protocol config, create a connector with 1 native node
 
    pub fn connect(
 
        major: ControllerId,
 
        main_component: &[u8],
 
        protocol_description: Arc<ProtocolD>,
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        logger: &mut String,
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        let mut logger = String::default();
 
        log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 
        log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 

	
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 

	
 
        let mut ekeys_native = vec![];
 
        let mut ekeys_proto = vec![];
 
        let mut ekeys_network = vec![];
 

	
 
        let mut native_interface = vec![];
 

	
 
        /*
 
        1.  - allocate an EndpointExtTodo for every native and interface port
 
            - store all the resulting keys in two keylists for the interfaces of the native and proto components
 
                native: [a, c,    f]
 
                         |  |     |
 
                         |  |     |
 
                proto:  [b, d, e, g]
 
                               ^todo
 
                arena: <A,B,C,D,E,F,G>
 
        */
 
        for &(binding, polarity) in bound_proto_interface.iter() {
 
            match binding {
 
                PortBinding::Native => {
 
                    let channel_id = channel_id_stream.next();
 
                    let ([ekey_native, ekey_proto], native_polarity) = {
 
                        let [p, g] = Endpoint::new_memory_pair();
 
                        let mut endpoint_to_key = |endpoint, polarity| {
 
                            endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt {
 
                                endpoint,
 
                                info: EndpointInfo { polarity, channel_id },
 
                            }))
 
                        };
 
                        let pkey = endpoint_to_key(p, Putter);
 
                        let gkey = endpoint_to_key(g, Getter);
 
                        let key_pair = match polarity {
 
                            Putter => [gkey, pkey],
 
                            Getter => [pkey, gkey],
 
                        };
 
                        (key_pair, !polarity)
 
                    };
 
                    native_interface.push((ekey_native, native_polarity));
 
                    ekeys_native.push(ekey_native);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
                PortBinding::Passive(addr) => {
 
                    let channel_id = channel_id_stream.next();
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting {
 
                        addr,
 
                        info: EndpointInfo { polarity, channel_id },
 
                        listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?,
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
                PortBinding::Active(addr) => {
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting {
 
                        addr,
 
                        polarity,
 
                        stream: TcpStream::connect(&addr).unwrap(),
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
            }
 
        }
 
        log!(&mut logger, "{:03?} setup todos...", major);
 
        log!(logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 
            Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None };
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(main_component, &ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        log!(logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger,
 
            logger: {
 
                let mut l = String::default();
 
                std::mem::swap(&mut l, logger);
 
                l
 
            },
 
        };
 
        let controller = Self {
 
            protocol_description,
 
            inner,
 
            ephemeral: Default::default(),
 
            // round_histories: vec![],
 
            unrecoverable_error: None,
 
        };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    // with mio v0.6 attempting to read bytes into a nonempty buffer appears to
 
    // be the only reliably platform-independent means of testing the connectivity of
 
    // a mio::TcpStream (see Self::connection_testing_read).
 
    // as this unavoidably MAY read some crucial payload bytes, we have to be careful
 
    // to pass these potentially populated buffers into the Endpoint, or bytes may be lost.
 
    // This is done with Endpoint::from_fresh_stream_and_inbox.
 
    fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec<u8>) -> std::io::Result<()> {
 
        inbox.clear();
 
        use std::io::Read;
 
        match stream.read_to_end(inbox) {
 
            Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
            Ok(_) => Ok(()),
 
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
 
            Err(e) => Err(e),
 
        }
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
        mut endpoint_ext_todos: Arena<EndpointExtTodo>,
 
        deadline: Instant,
 
    ) -> Result<(MessengerState, Arena<EndpointExt>), ConnectErr> {
 
        use {ConnectErr::*, EndpointExtTodo::*};
 

	
 
        // 1. define and setup a poller and event loop
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms = MessengerState {
 
            poll: Poll::new().map_err(|_| PollInitFailed)?,
 
            events: Events::with_capacity(endpoint_ext_todos.len()),
 
            delayed: vec![],
 
            undelayed: vec![],
 
            polled_undrained: Default::default(),
 
        };
 

	
 
        // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener}
 
        // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`.
 
        let mut to_finish = HashSet::<_>::default();
 
        log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        for (key, t) in endpoint_ext_todos.iter() {
 
            let token = key.to_token();
 
            match t {
 
                ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(),
 
                Finished(EndpointExt { endpoint, .. }) => {
 
                    ms.poll.register(endpoint, token, ready_r, edge)
 
                }
 
                ActiveConnecting { stream, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(stream, token, ready_w, edge)
 
                }
 
                PassiveAccepting { listener, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(listener, token, ready_r, edge)
 
                }
 
            }
 
            .expect("register first");
 
        }
 
        // invariant: every EndpointExtTodo has one thing registered with mio
 

	
 
        // 4. until all in endpoint_ext_todos are Finished variant, handle events
 
        let mut polled_undrained_later = IndexSet::<_>::default();
 
        let mut backoff_millis = 10;
 
        // see Self::connection_testing_read for why we populate Endpoint inboxes here.
 
        let mut next_inbox = vec![];
 
        while !to_finish.is_empty() {
 
            ms.poll_events(deadline)?;
 
            ms.poll_events(deadline).map_err(|e| {
 
                log!(logger, "{:03?} timing out", major);
 
                e
 
            })?;
 
            for event in ms.events.iter() {
 
                log!(logger, "event {:#?}", event);
 
                let token = event.token();
 
                let ekey = Key::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
                        polled_undrained_later.insert(ekey);
 
                    }
 
                    PassiveAccepting { addr, listener, .. } => {
 
                        log!(logger, "{:03?} start PassiveAccepting...", major);
 
                        assert!(event.readiness().is_readable());
 
                        let (stream, _peer_addr) =
 
                            listener.accept().map_err(|_| AcceptFailed(*addr))?;
 
                        ms.poll.deregister(listener).expect("wer");
 
                        ms.poll.register(&stream, token, ready_w, edge).expect("3y5");
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveAccepting { addr, info, .. } = e => {
 
                                PassiveConnecting { addr, info, stream }
 
                            }]
 
                        });
 
                        log!(logger, "{:03?} ... end PassiveAccepting", major);
 
                    }
 
                    PassiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start PassiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if !Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_err() {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveConnecting { info, stream, .. } = e => {
 
                                let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
                            }]
 
                        });
 
                        res?;
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_ok() {
 
                            // connect successful
 
                            log!(logger, "CONNECT SUCCESS");
 
                            log!(logger, "Connectivity test passed");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                    let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            log!(logger, "CONNECT FAIL");
 
                            ms.poll.deregister(stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let mut new_stream = TcpStream::connect(addr).unwrap();
 
                            ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3");
 
                            std::mem::swap(stream, &mut new_stream);
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveConnecting", major);
 
                    }
 
                    ActiveRecving { addr, polarity, endpoint } => {
 
                        log!(logger, "{:03?} start ActiveRecving...", major);
 
                        assert!(event.readiness().is_readable());
 
                        'recv_loop: while let Some(msg) = endpoint.recv()? {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == *polarity {
 
                                    return Err(PolarityMatched(*addr));
 
                                }
 
                                take_mut::take(entry, |e| {
 
                                    assert_let![ActiveRecving { polarity, endpoint, .. } = e => {
 
                                        let info = EndpointInfo { polarity, channel_id: info.channel_id };
 
                                        Finished(EndpointExt { info, endpoint })
 
                                    }]
 
                                });
 
                                ms.polled_undrained.insert(ekey);
 
                                assert!(to_finish.remove(&ekey));
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveRecving", major);
 
                    }
 
                }
 
            }
 
        }
 
        for ekey in polled_undrained_later {
 
            ms.polled_undrained.insert(ekey);
 
        }
 
        let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo {
 
            Finished(endpoint_ext) => endpoint_ext,
 
            _ => unreachable!(),
 
        });
 
        Ok((ms, endpoint_exts))
 
    }
 

	
 
    fn setup_sink_tree_family(
 
        major: ControllerId,
 
        logger: &mut String,
 
        endpoint_exts: &mut Arena<EndpointExt>,
 
        messenger_state: &mut MessengerState,
 
        neighbors: Vec<Key>,
 
        deadline: Instant,
 
    ) -> Result<ControllerFamily, ConnectErr> {
 
        use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*};
 

	
 
        log!(logger, "neighbors {:?}", &neighbors);
 

	
 
        let mut messenger = (messenger_state, endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut Arena<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self.1.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
            }
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let echo = S(LeaderEcho { maybe_leader: major });
 
        let mut awaiting = IndexSet::with_capacity(neighbors.len());
 
        for &n in neighbors.iter() {
 
            log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo);
 
            messenger.send(n, echo.clone())?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Key> = None;
 
        let mut my_leader = major;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?;
 
            log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg);
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
                    awaiting.clear();
 
                    break 'echo_loop;
 
                }
src/test/mod.rs
Show inline comments
 
use crate::common::ControllerId;
 
use crate::runtime::Connector;
 
use crate::runtime::Unconfigured;
 
use core::fmt::Debug;
 
use std::net::SocketAddr;
 

	
 
mod connector;
 
mod net;
 
mod setup;
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
 
// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000..
 
fn next_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(7_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into()
 
}
 

	
 
struct Panicked(Box<dyn std::any::Any>);
 
impl Debug for Panicked {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        if let Some(str_slice) = self.0.downcast_ref::<&'static str>() {
 
            f.pad(str_slice)
 
        } else if let Some(string) = self.0.downcast_ref::<String>() {
 
            f.pad(string)
 
        } else {
 
            f.pad("Box<Any>")
 
        }
 
    }
 
}
 

	
 
// Given a set of tasks (where each is some function that interacts with a connector)
 
// run each task in in its own thread.
 
// print the log and OK/PANIC result of each thread
 
// then finally, return true IFF no threads panicked
 
fn run_connector_set(i: &[&(dyn Fn(&mut Connector) + Sync)]) -> bool {
 
    let cid_iter = 0..(i.len() as ControllerId);
 
    let mut connectors = cid_iter
 
        .clone()
 
        .map(|controller_id| Connector::Unconfigured(Unconfigured { controller_id }))
 
        .collect::<Vec<_>>();
 

	
 
    let mut results = vec![];
 
    crossbeam_utils::thread::scope(|s| {
 
        let handles: Vec<_> = i
 
            .iter()
 
            .zip(connectors.iter_mut())
 
            .map(|(func, connector)| s.spawn(move |_| func(connector)))
 
            .collect();
 
        for h in handles {
 
            results.push(h.join());
 
        }
 
    })
 
    .unwrap();
 

	
 
    let mut alright = true;
 

	
 
    for ((controller_id, connector), res) in
 
        cid_iter.zip(connectors.iter_mut()).zip(results.into_iter())
 
    {
 
        println!("\n\n====================\n CID {:?} ...", controller_id);
 
        match connector.get_mut_logger() {
 
            Some(logger) => println!("{}", logger),
 
            None => println!("<No Log>"),
 
        }
 
        match res {
 
            Ok(()) => println!("CID {:?} OK!", controller_id),
 
            Err(e) => {
 
                alright = false;
 
                println!("CI {:?} PANIC! {:?}", controller_id, Panicked(e));
 
            }
 
        };
 
    }
 
    alright
 
}
src/test/net.rs
Show inline comments
 
new file 100644
 
use mio::*;
 
use std::io::ErrorKind::WouldBlock;
 
use std::net::SocketAddr;
 

	
 
fn connection_testing_read(
 
    stream: &mut mio::net::TcpStream,
 
    inbox: &mut Vec<u8>,
 
) -> std::io::Result<()> {
 
    assert!(inbox.is_empty());
 
    use std::io::Read;
 
    match stream.read_to_end(inbox) {
 
        Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
        Ok(_) => Ok(()),
 
        Err(e) if e.kind() == WouldBlock => Ok(()),
 
        Err(e) => Err(e),
 
    }
 
}
 

	
 
#[test]
 
fn mio_tcp_connect_err() {
 
    let poll = Poll::new().unwrap();
 
    let mut events = Events::with_capacity(64);
 

	
 
    let addr: SocketAddr = "127.0.0.1:12000".parse().unwrap();
 
    let mut stream = mio::net::TcpStream::connect(&addr).unwrap();
 
    poll.register(&stream, Token(0), Ready::all(), PollOpt::edge()).unwrap();
 

	
 
    let mut v = vec![];
 
    loop {
 
        poll.poll(&mut events, Some(std::time::Duration::from_secs(2))).unwrap();
 
        for event in events.iter() {
 
            assert_eq!(event.token(), Token(0));
 
            println!("readiness {:?}", event.readiness());
 
            // assert_eq!(event.readiness(), Ready::writable());
 

	
 
            v.clear();
 
            println!("{:?}", connection_testing_read(&mut stream, &mut v));
 
            println!("----------- {:?}", &v);
 
            std::thread::sleep(std::time::Duration::from_secs(1));
 
        }
 
    }
 
}
0 comments (0 inline, 0 general)