Changeset - 0131beae03dc
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-07-09 13:20:42
christopher.esterhuyse@gmail.com
udp sending and receiving
6 files changed with 184 insertions and 78 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -393,109 +393,111 @@ impl Connector {
 
        log!(cu.logger, "Done translating native batches into branches");
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            // initially, no components have .ended==true
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                rctx,
 
                proto_component_id,
 
                ports,
 
            )?;
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
            if branches.is_empty() {
 
                log!(cu.logger, "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.logger, "As the leader, deciding on timeout");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                let route = cu.port_info.routes.get(&getter);
 
                log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route);
 
                match route {
 
                    None => {
 
                log!(
 
                    cu.logger,
 
                            "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!",
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
                            &send_payload_msg
 
                    &route
 
                );
 
                    }
 
                match route {
 
                    None => log!(cu.logger, "Delivery failed. Physical route unmapped!"),
 
                    Some(Route::UdpEndpoint { index }) => {
 
                        // TODO UDP RECV
 
                        todo!()
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.logger, "Delivering to udp endpoint index={}", index);
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Some(Route::NetEndpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut rctx.solution_storage,
 
                        getter,
 
                        &send_payload_msg,
 
                    ),
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                rctx,
 
                                proto_component_id,
 
                                getter,
 
                                &send_payload_msg,
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(
 
                                    cu.logger,
 
                                    "{:?} has become inconsistent!",
 
                                    proto_component_id
 
                                );
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
                                        Self::request_failure(cu, comm, parent)?
 
                                    } else {
 
                                        log!(cu.logger, "Already requested failure");
 
                                    }
 
                                } else {
 
                                    log!(cu.logger, "As the leader, deciding on timeout");
 
                                    return Ok(Decision::Failure);
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
src/runtime/endpoints.rs
Show inline comments
 
@@ -325,99 +325,100 @@ impl EndpointManager {
 
            }
 
        }
 
        self.events.clear();
 
        Ok(())
 
    }
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
    pub(super) fn udp_endpoints_round_start(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        spec_var_stream: &mut SpecVarStream,
 
    ) {
 
        log!(
 
            logger,
 
            "Starting round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() {
 
            let spec_var = spec_var_stream.next();
 
            log!(logger, "Udp endpoint given {} spec var {:?} for this round", index, spec_var);
 
            ee.incoming_round_spec_var = Some(spec_var);
 
        }
 
    }
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        decision: &Decision,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(
 
            logger,
 
            "Ending round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        use UnrecoverableSyncError as Use;
 
        if let Decision::Success(solution_predicate) = decision {
 
            'endpoint_loop: for (index, ee) in
 
                self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate()
 
            {
 
                ee.incoming_round_spec_var = None; // shouldn't be accessed before its overwritten next round; still adding for clarity.
 
                for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
                    if payload_predicate.assigns_subset(solution_predicate) {
 
                        ee.sock
 
                            .send(payload.as_slice())
 
                            .map_err(|_| Use::BrokenNetEndpoint { index })?;
 
                        ee.sock.send(payload.as_slice()).map_err(|e| {
 
                            println!("{:?}", e);
 
                            Use::BrokenUdpEndpoint { index }
 
                        })?;
 
                        log!(
 
                            logger,
 
                            "Sent payload {:?} with pred {:?} through Udp endpoint {}",
 
                            &payload,
 
                            &payload_predicate,
 
                            index
 
                        );
 
                        continue 'endpoint_loop; // send at most one payload per endpoint per round
 
                    }
 
                }
 
                log!(logger, "Sent no message through Udp endpoint {}", index);
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
// impl UdpEndpointExt {
 
//     fn try_recv(
 
//         &mut self,
 
//         port_info: &PortInfo,
 
//         udp_in_buffer: &mut UdpInBuffer,
 
//     ) -> Option<SendPayloadMsg> {
 
//         let recv_buffer = udp_in_buffer.as_mut_slice();
 
//         let len = self.sock.recv(recv_buffer).ok()?;
 
//         let payload = Payload::from(&recv_buffer[..len]);
 
//         let branch_spec_var = self
 
//             .incoming_round_spec_var
 
//             .expect("Udp spec var should be Some(..) if recv() is called");
 
//         let branch_spec_val = SpecVal::nth_domain_element(self.incoming_payloads.len());
 
//         self.incoming_payloads.push(payload.clone());
 
//         let predicate = Predicate::default()
 
//             .inserted(branch_spec_var, branch_spec_val)
 
//             .inserted(port_info.spec_var_for(self.getter_for_incoming), SpecVal::FIRING);
 
//         Some(SendPayloadMsg { payload, predicate })
 
//     }
 
// }
 
impl Debug for NetEndpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub(super) fn bytes_read(&self) -> usize {
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    UdpConnectFailed(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    NetEndpointSetupError(SocketAddr, NetEndpointError),
 
    SetupAlgMisbehavior,
 
}
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum UnrecoverableSyncError {
 
    PollFailed,
 
    BrokenNetEndpoint { index: usize },
 
    BrokenUdpEndpoint { index: usize },
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    RoundFailure,
 
    Unrecoverable(UnrecoverableSyncError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
    PortCannotPut(PortId),
 
    GetterUnknownFor { putter: PortId },
 
}
 
#[derive(Debug, Clone)]
 
pub enum NetEndpointError {
 
    MalformedMessage,
 
    BrokenNetEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
    UnknownPolarity,
src/runtime/mod.rs
Show inline comments
 
@@ -109,201 +109,203 @@ enum SetupMsg {
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommCtrlMsg {
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    local_port: PortId,
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    local_port: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    incoming_round_spec_var: Option<SpecVar>,
 
    getter_for_incoming: PortId,
 
    incoming_payloads: Vec<Payload>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
}
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<(PortId, NetEndpointSetup)>,
 
    udp_endpoint_setups: Vec<(PortId, UdpEndpointSetup)>,
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
    surplus_sockets: u16,
 
}
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 
trait RoundCtxTrait {
 
    fn get_deadline(&self) -> &Option<Instant>;
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg);
 
}
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl TokenTarget {
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl Connector {
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                id_manager: IdManager::new(connector_id),
 
                native_ports: Default::default(),
 
                port_info: Default::default(),
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
                surplus_sockets,
 
            })),
 
        }
 
    }
 
    pub fn new_udp_port(
 
        &mut self,
 
        polarity: Polarity,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr };
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let [port_nat, port_udp] =
 
                    [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()];
 
                cu.native_ports.insert(port_nat);
 
                cu.port_info.peers.insert(port_nat, port_udp);
 
                cu.port_info.peers.insert(port_udp, port_nat);
 
                cu.port_info.routes.insert(port_nat, Route::LocalComponent(ComponentId::Native));
 
                cu.port_info.routes.insert(port_udp, Route::UdpEndpoint { index: udp_index });
 
                cu.port_info.polarities.insert(port_nat, polarity);
 
                cu.port_info.polarities.insert(port_udp, !polarity);
 
                setup.udp_endpoint_setups.push((port_nat, udp_endpoint_setup));
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    local_port: port_nat,
 
                });
 
                Ok(port_nat)
 
            }
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity };
 
                let p = cu.id_manager.new_port_id();
 
                cu.native_ports.insert(p);
 
                let local_port = cu.id_manager.new_port_id();
 
                cu.native_ports.insert(local_port);
 
                // {polarity, route} known. {peer} unknown.
 
                cu.port_info.polarities.insert(p, polarity);
 
                cu.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                cu.port_info.polarities.insert(local_port, polarity);
 
                cu.port_info.routes.insert(local_port, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    local_port,
 
                    polarity,
 
                    &endpoint_setup
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                setup.net_endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    local_port,
 
                });
 
                Ok(local_port)
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[(PortId, NetEndpointSetup)],
 
    udp_endpoint_setups: &[(PortId, UdpEndpointSetup)],
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &mut PortInfo,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    struct UdpTodo {
 
        local_port: PortId,
 
        sock: UdpSocket,
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        NetEndpoint(NetEndpoint),
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 
    struct WakerState {
 
        continue_signal: AtomicBool,
 
        waker: mio::Waker,
 
    }
 

	
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(net_endpoint_setups.len() * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                TodoEndpoint::Accepting(listener)
 
            };
 
            Ok(Todo {
 
                todo_endpoint,
 
                local_port: *local_port,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
        .map(|(index, endpoint_setup)| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            sock.connect(endpoint_setup.peer_addr)
 
                .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, local_port: *local_port })
 
            Ok(UdpTodo { sock, local_port: endpoint_setup.local_port })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 
    // TODO register udps, and all them to incomplete list
 

	
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let token_target = TokenTarget::from(token);
 
            if !setup_incomplete.contains(&token_target) {
 
                // spurious wakeup
 
                continue;
 
            }
 
            match token_target {
 
                TokenTarget::Waker => {
 
                    log!(
 
                        logger,
 
                        "Notification from waker. connect_failed is {:?}",
 
                        connect_failed.iter()
 
                    );
 
                    assert!(waker_state.is_some());
 
                    for net_index in connect_failed.drain() {
 
                        let net_todo = &mut net_todos[net_index];
 
                        log!(
 
                            logger,
 
                            "Restarting connection with endpoint {:?} {:?}",
 
                            net_index,
 
                            net_todo.endpoint_setup.sock_addr
 
@@ -312,204 +319,208 @@ fn new_endpoint_manager(
 
                                // right now you cannot retry an acceptor. return failure
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                ));
 
                            }
 
                            // this actively-connecting endpoint failed to connect!
 
                            if connect_failed.insert(index) {
 
                                log!(
 
                                    logger,
 
                                    "Connection failed for {:?}. List is {:?}",
 
                                    index,
 
                                    connect_failed.iter()
 
                                );
 
                                poll.registry().deregister(&mut net_endpoint.stream).unwrap();
 
                            } else {
 
                                // spurious wakeup.
 
                                continue;
 
                            }
 
                            if waker_state.is_none() {
 
                                log!(logger, "First connect failure. Starting waker thread");
 
                                let arc = Arc::new(WakerState {
 
                                    waker: mio::Waker::new(
 
                                        poll.registry(),
 
                                        TokenTarget::Waker.into(),
 
                                    )
 
                                    .unwrap(),
 
                                    continue_signal: true.into(),
 
                                });
 
                                let moved_arc = arc.clone();
 
                                waker_state = Some(arc);
 
                                std::thread::spawn(move || {
 
                                    while moved_arc
 
                                        .continue_signal
 
                                        .load(std::sync::atomic::Ordering::SeqCst)
 
                                    {
 
                                        std::thread::sleep(WAKER_PERIOD);
 
                                        let _ = moved_arc.waker.wake();
 
                                    }
 
                                });
 
                            }
 
                            continue;
 
                        }
 
                        // event wasn't ERROR
 
                        if connect_failed.contains(&index) {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 
                        let local_polarity =
 
                            *port_info.polarities.get(&net_todo.local_port).unwrap();
 
                            *port_info.polarities.get(&net_todo.endpoint_setup.local_port).unwrap();
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                polarity: local_polarity,
 
                                port: net_todo.local_port,
 
                                port: net_todo.endpoint_setup.local_port,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't recv setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                    e,
 
                                )
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.local_port,
 
                                            net_todo.endpoint_setup.local_port,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    port_info.peers.insert(net_todo.local_port, peer_info.port);
 
                                    port_info
 
                                        .peers
 
                                        .insert(net_todo.endpoint_setup.local_port, peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                    port_info.peers.insert(peer_info.port, net_todo.local_port);
 
                                    port_info
 
                                        .peers
 
                                        .insert(peer_info.port, net_todo.endpoint_setup.local_port);
 
                                    if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                        // check just for logging purposes
 
                                        log!(
 
                                            logger,
 
                                            "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                            peer_info.port,
 
                                            route
 
                                        );
 
                                    }
 
                                    port_info
 
                                        .routes
 
                                        .entry(peer_info.port)
 
                                        .or_insert(Route::NetEndpoint { index });
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    if let Some(arc) = waker_state {
 
        log!(logger, "Sending waker the stop signal");
 
        arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst);
 
        // TODO leave the waker registered?
 
    }
 

	
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt {
 
        .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
            getter_for_incoming: endpoint_setup.local_port,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, udp_todo)| {
 
            let UdpTodo { mut sock, local_port } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                incoming_round_spec_var: None,
 
                getter_for_incoming: local_port,
 
                incoming_payloads: Default::default(),
 
            }
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
src/runtime/tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{
 
    error::*,
 
    EndpointPolarity::{Active, Passive},
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
const SEC1: Option<Duration> = Some(Duration::from_secs(1));
 
const SEC5: Option<Duration> = Some(Duration::from_secs(5));
 
const SEC15: Option<Duration> = Some(Duration::from_secs(15));
 
const MS300: Option<Duration> = Some(Duration::from_millis(300));
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
static TEST_MSG_BYTES: &'static [u8] = b"hello";
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(b"hello" as &[u8])
 
        Payload::from(TEST_MSG_BYTES)
 
    };
 
}
 

	
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0, 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addr = next_test_addr();
 
    let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let sock_addrs = [next_test_addr()];
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let sock_addr = next_test_addr();
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn minimal_net_connect() {
 
    let sock_addr = next_test_addr();
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn wrong_polarity_bad() {
 
    let test_log_path = Path::new("./logs/wrong_polarity_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn dup_put_bad() {
 
    let test_log_path = Path::new("./logs/dup_put_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let test_log_path = Path::new("./logs/trivial_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
    let test_log_path = Path::new("./logs/unconnected_gotten_err");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
@@ -190,148 +191,148 @@ fn connected_gotten_err_ungotten() {
 
fn native_polarity_checks() {
 
    let test_log_path = Path::new("./logs/native_polarity_checks");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let test_log_path = Path::new("./logs/native_multiple_gets");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn next_batch() {
 
    let test_log_path = Path::new("./logs/next_batch");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.next_batch().unwrap_err();
 
    c.connect(SEC1).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_self_msg() {
 
    let test_log_path = Path::new("./logs/native_self_msg");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn two_natives_msg() {
 
    let test_log_path = Path::new("./logs/two_natives_msg");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn trivial_nondet() {
 
    let test_log_path = Path::new("./logs/trivial_nondet");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    // getting 0 batch
 
    c.next_batch().unwrap();
 
    // silent 1 batch
 
    assert_eq!(1, c.sync(SEC1).unwrap());
 
    c.gotten(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn connector_pair_nondet() {
 
    let test_log_path = Path::new("./logs/connector_pair_nondet");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(SEC1).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_immediately_inconsistent() {
 
    let test_log_path = Path::new("./logs/native_immediately_inconsistent");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap_err();
 
}
 

	
 
#[test]
 
fn native_recovers() {
 
    let test_log_path = Path::new("./logs/native_recovers");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    /*
 
    native p|-->|g sync
 
    */
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
    /*
 
@@ -389,206 +390,206 @@ fn distributed_msg_bounce() {
 
            native | sync p|-->
 
                   |      g|<--
 
            */
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p, g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.add_component(b"sync", &[g, p]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
                   g|<--
 
            */
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [g, p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn local_timeout() {
 
    let test_log_path = Path::new("./logs/local_timeout");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    match c.sync(MS300) {
 
        Err(SyncError::RoundFailure) => {}
 
        res => panic!("expeted timeout. but got {:?}", res),
 
    }
 
}
 

	
 
#[test]
 
fn parent_timeout() {
 
    let test_log_path = Path::new("./logs/parent_timeout");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // parent; times out
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // child
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn child_timeout() {
 
    let test_log_path = Path::new("./logs/child_timeout");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // child; times out
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // parent
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn chain_connect() {
 
    let test_log_path = Path::new("./logs/chain_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(10, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // LEADER
 
            let mut c = file_logged_connector(7, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[1], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[2], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(4, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[2], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn net_self_loop() {
 
    let test_log_path = Path::new("./logs/net_self_loop");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let p = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn nobody_connects_active() {
 
    let test_log_path = Path::new("./logs/nobody_connects_active");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
    let _g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 
#[test]
 
fn nobody_connects_passive() {
 
    let test_log_path = Path::new("./logs/nobody_connects_passive");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 

	
 
#[test]
 
fn together() {
 
    let test_log_path = Path::new("./logs/together");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(MS300).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(MS300).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_batch_distinguish() {
 
    let test_log_path = Path::new("./logs/native_batch_distinguish");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
    c.next_batch().unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
@@ -613,57 +614,145 @@ fn multirounds() {
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..10 {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn multi_recover() {
 
    let test_log_path = Path::new("./logs/multi_recover");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let success_iter = [true, false].iter().copied().cycle().take(10);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                if succeeds {
 
                    c.get(p1).unwrap();
 
                }
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
// #[test]
 
// fn udp_self_connect() {
 
//     let test_log_path = Path::new("./logs/udp_self_connect");
 
//     let sock_addrs = [next_test_addr(), next_test_addr()];
 
//     let mut c = file_logged_connector(0, test_log_path);
 
//     c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
//     c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap();
 
//     c.connect(SEC1).unwrap();
 
// }
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_put_success() {
 
    let test_log_path = Path::new("./logs/solo_udp_put_success");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p0 = c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_get_fail() {
 
    let test_log_path = Path::new("./logs/solo_udp_get_fail");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p0 = c.new_udp_port(Getter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(p0).unwrap();
 
    c.sync(MS300).unwrap_err();
 
}
 

	
 
#[test]
 
fn reowolf_to_udp() {
 
    let test_log_path = Path::new("./logs/reowolf_to_udp");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = unsafe {
 
                // canonical way to create uninitalized byte buffer
 
                let mut v = Vec::with_capacity(256);
 
                v.set_len(256);
 
                v
 
            };
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_to_reowolf() {
 
    let test_log_path = Path::new("./logs/udp_to_reowolf");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_udp_port(Getter, sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC1).unwrap();
 
            assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            for _ in 0..5 {
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
            }
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)