Changeset - 0131beae03dc
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-07-09 13:20:42
christopher.esterhuyse@gmail.com
udp sending and receiving
6 files changed with 184 insertions and 78 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -429,37 +429,39 @@ impl Connector {
 
            }
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                let route = cu.port_info.routes.get(&getter);
 
                log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route);
 
                match route {
 
                    None => {
 
                log!(
 
                    cu.logger,
 
                            "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!",
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
                            &send_payload_msg
 
                    &route
 
                );
 
                    }
 
                match route {
 
                    None => log!(cu.logger, "Delivery failed. Physical route unmapped!"),
 
                    Some(Route::UdpEndpoint { index }) => {
 
                        // TODO UDP RECV
 
                        todo!()
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.logger, "Delivering to udp endpoint index={}", index);
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Some(Route::NetEndpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut rctx.solution_storage,
 
                        getter,
src/runtime/endpoints.rs
Show inline comments
 
@@ -361,27 +361,28 @@ impl EndpointManager {
 
            logger,
 
            "Ending round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        use UnrecoverableSyncError as Use;
 
        if let Decision::Success(solution_predicate) = decision {
 
            'endpoint_loop: for (index, ee) in
 
                self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate()
 
            {
 
                ee.incoming_round_spec_var = None; // shouldn't be accessed before its overwritten next round; still adding for clarity.
 
                for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
                    if payload_predicate.assigns_subset(solution_predicate) {
 
                        ee.sock
 
                            .send(payload.as_slice())
 
                            .map_err(|_| Use::BrokenNetEndpoint { index })?;
 
                        ee.sock.send(payload.as_slice()).map_err(|e| {
 
                            println!("{:?}", e);
 
                            Use::BrokenUdpEndpoint { index }
 
                        })?;
 
                        log!(
 
                            logger,
 
                            "Sent payload {:?} with pred {:?} through Udp endpoint {}",
 
                            &payload,
 
                            &payload_predicate,
 
                            index
 
                        );
 
                        continue 'endpoint_loop; // send at most one payload per endpoint per round
 
                    }
 
                }
 
                log!(logger, "Sent no message through Udp endpoint {}", index);
 
            }
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    UdpConnectFailed(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    NetEndpointSetupError(SocketAddr, NetEndpointError),
 
    SetupAlgMisbehavior,
 
}
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    NoSuchComponent,
src/runtime/mod.rs
Show inline comments
 
@@ -145,30 +145,32 @@ enum AssignmentUnionResult {
 
}
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    local_port: PortId,
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    local_port: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
@@ -230,44 +232,44 @@ struct ConnectorCommunication {
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
}
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<(PortId, NetEndpointSetup)>,
 
    udp_endpoint_setups: Vec<(PortId, UdpEndpointSetup)>,
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
    surplus_sockets: u16,
 
}
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 
trait RoundCtxTrait {
 
    fn get_deadline(&self) -> &Option<Instant>;
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg);
 
}
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
src/runtime/setup.rs
Show inline comments
 
@@ -26,65 +26,72 @@ impl Connector {
 
        }
 
    }
 
    pub fn new_udp_port(
 
        &mut self,
 
        polarity: Polarity,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr };
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let [port_nat, port_udp] =
 
                    [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()];
 
                cu.native_ports.insert(port_nat);
 
                cu.port_info.peers.insert(port_nat, port_udp);
 
                cu.port_info.peers.insert(port_udp, port_nat);
 
                cu.port_info.routes.insert(port_nat, Route::LocalComponent(ComponentId::Native));
 
                cu.port_info.routes.insert(port_udp, Route::UdpEndpoint { index: udp_index });
 
                cu.port_info.polarities.insert(port_nat, polarity);
 
                cu.port_info.polarities.insert(port_udp, !polarity);
 
                setup.udp_endpoint_setups.push((port_nat, udp_endpoint_setup));
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    local_port: port_nat,
 
                });
 
                Ok(port_nat)
 
            }
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity };
 
                let p = cu.id_manager.new_port_id();
 
                cu.native_ports.insert(p);
 
                let local_port = cu.id_manager.new_port_id();
 
                cu.native_ports.insert(local_port);
 
                // {polarity, route} known. {peer} unknown.
 
                cu.port_info.polarities.insert(p, polarity);
 
                cu.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                cu.port_info.polarities.insert(local_port, polarity);
 
                cu.port_info.routes.insert(local_port, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    local_port,
 
                    polarity,
 
                    &endpoint_setup
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                setup.net_endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    local_port,
 
                });
 
                Ok(local_port)
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
@@ -121,37 +128,36 @@ impl Connector {
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[(PortId, NetEndpointSetup)],
 
    udp_endpoint_setups: &[(PortId, UdpEndpointSetup)],
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &mut PortInfo,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    struct UdpTodo {
 
        local_port: PortId,
 
        sock: UdpSocket,
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        NetEndpoint(NetEndpoint),
 
    }
 
    ////////////////////////////////////////////
 
@@ -164,56 +170,57 @@ fn new_endpoint_manager(
 
    }
 

	
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(net_endpoint_setups.len() * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                TodoEndpoint::Accepting(listener)
 
            };
 
            Ok(Todo {
 
                todo_endpoint,
 
                local_port: *local_port,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
        .map(|(index, endpoint_setup)| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            sock.connect(endpoint_setup.peer_addr)
 
                .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, local_port: *local_port })
 
            Ok(UdpTodo { sock, local_port: endpoint_setup.local_port })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 
    // TODO register udps, and all them to incomplete list
 

	
 
@@ -348,30 +355,30 @@ fn new_endpoint_manager(
 
                                        let _ = moved_arc.waker.wake();
 
                                    }
 
                                });
 
                            }
 
                            continue;
 
                        }
 
                        // event wasn't ERROR
 
                        if connect_failed.contains(&index) {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 
                        let local_polarity =
 
                            *port_info.polarities.get(&net_todo.local_port).unwrap();
 
                            *port_info.polarities.get(&net_todo.endpoint_setup.local_port).unwrap();
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                polarity: local_polarity,
 
                                port: net_todo.local_port,
 
                                port: net_todo.endpoint_setup.local_port,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            net_todo.sent_local_port = true;
 
@@ -389,33 +396,37 @@ fn new_endpoint_manager(
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.local_port,
 
                                            net_todo.endpoint_setup.local_port,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    port_info.peers.insert(net_todo.local_port, peer_info.port);
 
                                    port_info
 
                                        .peers
 
                                        .insert(net_todo.endpoint_setup.local_port, peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                    port_info.peers.insert(peer_info.port, net_todo.local_port);
 
                                    port_info
 
                                        .peers
 
                                        .insert(peer_info.port, net_todo.endpoint_setup.local_port);
 
                                    if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                        // check just for logging purposes
 
                                        log!(
 
                                            logger,
 
                                            "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                            peer_info.port,
 
                                            route
 
                                        );
 
                                    }
 
                                    port_info
 
                                        .routes
 
                                        .entry(peer_info.port)
 
@@ -444,36 +455,36 @@ fn new_endpoint_manager(
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    if let Some(arc) = waker_state {
 
        log!(logger, "Sending waker the stop signal");
 
        arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst);
 
        // TODO leave the waker registered?
 
    }
 

	
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt {
 
        .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
            getter_for_incoming: endpoint_setup.local_port,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, udp_todo)| {
 
            let UdpTodo { mut sock, local_port } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
src/runtime/tests.rs
Show inline comments
 
@@ -34,27 +34,28 @@ primitive together(in ia, in ib, out oa, out ob){
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
static TEST_MSG_BYTES: &'static [u8] = b"hello";
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(b"hello" as &[u8])
 
        Payload::from(TEST_MSG_BYTES)
 
    };
 
}
 

	
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0, 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
@@ -73,59 +74,59 @@ fn new_port_pair() {
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addr = next_test_addr();
 
    let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let sock_addrs = [next_test_addr()];
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let sock_addr = next_test_addr();
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn minimal_net_connect() {
 
    let sock_addr = next_test_addr();
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
@@ -226,37 +227,37 @@ fn native_self_msg() {
 
    let test_log_path = Path::new("./logs/native_self_msg");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn two_natives_msg() {
 
    let test_log_path = Path::new("./logs/two_natives_msg");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn trivial_nondet() {
 
    let test_log_path = Path::new("./logs/trivial_nondet");
 
    let mut c = file_logged_connector(0, test_log_path);
 
@@ -264,38 +265,38 @@ fn trivial_nondet() {
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    // getting 0 batch
 
    c.next_batch().unwrap();
 
    // silent 1 batch
 
    assert_eq!(1, c.sync(SEC1).unwrap());
 
    c.gotten(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn connector_pair_nondet() {
 
    let test_log_path = Path::new("./logs/connector_pair_nondet");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(SEC1).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_immediately_inconsistent() {
 
    let test_log_path = Path::new("./logs/native_immediately_inconsistent");
 
    let mut c = file_logged_connector(0, test_log_path);
 
@@ -425,61 +426,61 @@ fn local_timeout() {
 
    let [_, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    match c.sync(MS300) {
 
        Err(SyncError::RoundFailure) => {}
 
        res => panic!("expeted timeout. but got {:?}", res),
 
    }
 
}
 

	
 
#[test]
 
fn parent_timeout() {
 
    let test_log_path = Path::new("./logs/parent_timeout");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // parent; times out
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // child
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn child_timeout() {
 
    let test_log_path = Path::new("./logs/child_timeout");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // child; times out
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // parent
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn chain_connect() {
 
    let test_log_path = Path::new("./logs/chain_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()];
 
@@ -511,48 +512,48 @@ fn chain_connect() {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn net_self_loop() {
 
    let test_log_path = Path::new("./logs/net_self_loop");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let p = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn nobody_connects_active() {
 
    let test_log_path = Path::new("./logs/nobody_connects_active");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
    let _g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 
#[test]
 
fn nobody_connects_passive() {
 
    let test_log_path = Path::new("./logs/nobody_connects_passive");
 
    let sock_addr = next_test_addr();
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 

	
 
#[test]
 
fn together() {
 
    let test_log_path = Path::new("./logs/together");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
@@ -649,21 +650,109 @@ fn multi_recover() {
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
// #[test]
 
// fn udp_self_connect() {
 
//     let test_log_path = Path::new("./logs/udp_self_connect");
 
//     let sock_addrs = [next_test_addr(), next_test_addr()];
 
//     let mut c = file_logged_connector(0, test_log_path);
 
//     c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
//     c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap();
 
//     c.connect(SEC1).unwrap();
 
// }
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_put_success() {
 
    let test_log_path = Path::new("./logs/solo_udp_put_success");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p0 = c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_get_fail() {
 
    let test_log_path = Path::new("./logs/solo_udp_get_fail");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p0 = c.new_udp_port(Getter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(p0).unwrap();
 
    c.sync(MS300).unwrap_err();
 
}
 

	
 
#[test]
 
fn reowolf_to_udp() {
 
    let test_log_path = Path::new("./logs/reowolf_to_udp");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = unsafe {
 
                // canonical way to create uninitalized byte buffer
 
                let mut v = Vec::with_capacity(256);
 
                v.set_len(256);
 
                v
 
            };
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_to_reowolf() {
 
    let test_log_path = Path::new("./logs/udp_to_reowolf");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_udp_port(Getter, sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC1).unwrap();
 
            assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            for _ in 0..5 {
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
            }
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)