From 0131beae03dc36095b69355c982479bbee592036 2020-07-09 13:20:42 From: Christopher Esterhuyse Date: 2020-07-09 13:20:42 Subject: [PATCH] udp sending and receiving --- diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 041bf67a6c91d33b2767a569bf078662251bc90c..a0c32dd3d83a1c33a769e3a677a9158ccfbb37a3 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -438,19 +438,21 @@ impl Connector { while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); let route = cu.port_info.routes.get(&getter); - log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route); + log!( + cu.logger, + "Routing msg {:?} to {:?} via {:?}", + &send_payload_msg, + getter, + &route + ); match route { - None => { - log!( - cu.logger, - "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!", - getter, - &send_payload_msg - ); - } + None => log!(cu.logger, "Delivery failed. Physical route unmapped!"), Some(Route::UdpEndpoint { index }) => { - // TODO UDP RECV - todo!() + let udp_endpoint_ext = + &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index]; + let SendPayloadMsg { predicate, payload } = send_payload_msg; + log!(cu.logger, "Delivering to udp endpoint index={}", index); + udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Some(Route::NetEndpoint { index }) => { let msg = Msg::CommMsg(CommMsg { diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index f1d2e7103e76070572e1b49e53009b95054f5358..50dab213add6688c1e367ef99fa2954af8f2c65d 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -370,9 +370,10 @@ impl EndpointManager { ee.incoming_round_spec_var = None; // shouldn't be accessed before its overwritten next round; still adding for clarity. for (payload_predicate, payload) in ee.outgoing_payloads.drain() { if payload_predicate.assigns_subset(solution_predicate) { - ee.sock - .send(payload.as_slice()) - .map_err(|_| Use::BrokenNetEndpoint { index })?; + ee.sock.send(payload.as_slice()).map_err(|e| { + println!("{:?}", e); + Use::BrokenUdpEndpoint { index } + })?; log!( logger, "Sent payload {:?} with pred {:?} through Udp endpoint {}", diff --git a/src/runtime/error.rs b/src/runtime/error.rs index c21fa7b9801fbbb80fa7542541210a70c5fa3a2f..16322977191072c35e6259929d78cf68be9fb515 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -3,6 +3,7 @@ use crate::common::*; #[derive(Debug)] pub enum ConnectError { BindFailed(SocketAddr), + UdpConnectFailed(SocketAddr), PollInitFailed, Timeout, PollFailed, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 46ad59c08164ff1e9a1619faa0bacc41bf51f6f9..f856116712da79ab851b3033074a25c5568e7c12 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -154,12 +154,14 @@ struct ProtoComponent { } #[derive(Debug, Clone)] struct NetEndpointSetup { + local_port: PortId, sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, } #[derive(Debug, Clone)] struct UdpEndpointSetup { + local_port: PortId, local_addr: SocketAddr, peer_addr: SocketAddr, } @@ -239,8 +241,8 @@ struct ConnectorUnphased { } #[derive(Debug)] struct ConnectorSetup { - net_endpoint_setups: Vec<(PortId, NetEndpointSetup)>, - udp_endpoint_setups: Vec<(PortId, UdpEndpointSetup)>, + net_endpoint_setups: Vec, + udp_endpoint_setups: Vec, surplus_sockets: u16, } #[derive(Debug)] @@ -258,7 +260,7 @@ struct NativeBatch { to_put: HashMap, to_get: HashSet, } -#[derive(Copy, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] enum TokenTarget { NetEndpoint { index: usize }, UdpEndpoint { index: usize }, diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index b3cf1418b1f02f71110d7a05086d21bf5e47f9b2..76c48fc1c28f5a64bb63beb42edfdb9489df94d0 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -35,7 +35,6 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr }; let udp_index = setup.udp_endpoint_setups.len(); let [port_nat, port_udp] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()]; @@ -46,7 +45,11 @@ impl Connector { cu.port_info.routes.insert(port_udp, Route::UdpEndpoint { index: udp_index }); cu.port_info.polarities.insert(port_nat, polarity); cu.port_info.polarities.insert(port_udp, !polarity); - setup.udp_endpoint_setups.push((port_nat, udp_endpoint_setup)); + setup.udp_endpoint_setups.push(UdpEndpointSetup { + local_addr, + peer_addr, + local_port: port_nat, + }); Ok(port_nat) } } @@ -61,21 +64,25 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity }; - let p = cu.id_manager.new_port_id(); - cu.native_ports.insert(p); + let local_port = cu.id_manager.new_port_id(); + cu.native_ports.insert(local_port); // {polarity, route} known. {peer} unknown. - cu.port_info.polarities.insert(p, polarity); - cu.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native)); + cu.port_info.polarities.insert(local_port, polarity); + cu.port_info.routes.insert(local_port, Route::LocalComponent(ComponentId::Native)); log!( cu.logger, - "Added net port {:?} with polarity {:?} and endpoint setup {:?} ", - p, + "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", + local_port, polarity, - &endpoint_setup + &sock_addr, + endpoint_polarity ); - setup.net_endpoint_setups.push((p, endpoint_setup)); - Ok(p) + setup.net_endpoint_setups.push(NetEndpointSetup { + sock_addr, + endpoint_polarity, + local_port, + }); + Ok(local_port) } } } @@ -130,8 +137,8 @@ impl Connector { } fn new_endpoint_manager( logger: &mut dyn Logger, - net_endpoint_setups: &[(PortId, NetEndpointSetup)], - udp_endpoint_setups: &[(PortId, UdpEndpointSetup)], + net_endpoint_setups: &[NetEndpointSetup], + udp_endpoint_setups: &[UdpEndpointSetup], port_info: &mut PortInfo, deadline: &Option, ) -> Result { @@ -142,7 +149,6 @@ fn new_endpoint_manager( struct Todo { todo_endpoint: TodoEndpoint, endpoint_setup: NetEndpointSetup, - local_port: PortId, sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port } @@ -173,7 +179,7 @@ fn new_endpoint_manager( let mut net_todos = net_endpoint_setups .iter() .enumerate() - .map(|(index, (local_port, endpoint_setup))| { + .map(|(index, endpoint_setup)| { let token = TokenTarget::NetEndpoint { index }.into(); let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity { let mut stream = TcpStream::connect(endpoint_setup.sock_addr) @@ -188,7 +194,6 @@ fn new_endpoint_manager( }; Ok(Todo { todo_endpoint, - local_port: *local_port, sent_local_port: false, recv_peer_port: None, endpoint_setup: endpoint_setup.clone(), @@ -198,13 +203,15 @@ fn new_endpoint_manager( let udp_todos = udp_endpoint_setups .iter() .enumerate() - .map(|(index, (local_port, endpoint_setup))| { + .map(|(index, endpoint_setup)| { let mut sock = UdpSocket::bind(endpoint_setup.local_addr) .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?; + sock.connect(endpoint_setup.peer_addr) + .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?; poll.registry() .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE) .unwrap(); - Ok(UdpTodo { sock, local_port: *local_port }) + Ok(UdpTodo { sock, local_port: endpoint_setup.local_port }) }) .collect::, ConnectError>>()?; @@ -357,12 +364,12 @@ fn new_endpoint_manager( continue; } let local_polarity = - *port_info.polarities.get(&net_todo.local_port).unwrap(); + *port_info.polarities.get(&net_todo.endpoint_setup.local_port).unwrap(); if event.is_writable() && !net_todo.sent_local_port { // can write and didn't send setup msg yet? Do so! let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { polarity: local_polarity, - port: net_todo.local_port, + port: net_todo.endpoint_setup.local_port, })); net_endpoint .send(&msg) @@ -398,15 +405,19 @@ fn new_endpoint_manager( ); if peer_info.polarity == local_polarity { return Err(ConnectError::PortPeerPolarityMismatch( - net_todo.local_port, + net_todo.endpoint_setup.local_port, )); } net_todo.recv_peer_port = Some(peer_info.port); // 1. finally learned the peer of this port! - port_info.peers.insert(net_todo.local_port, peer_info.port); + port_info + .peers + .insert(net_todo.endpoint_setup.local_port, peer_info.port); // 2. learned the info of this peer port port_info.polarities.insert(peer_info.port, peer_info.polarity); - port_info.peers.insert(peer_info.port, net_todo.local_port); + port_info + .peers + .insert(peer_info.port, net_todo.endpoint_setup.local_port); if let Some(route) = port_info.routes.get(&peer_info.port) { // check just for logging purposes log!( @@ -453,7 +464,7 @@ fn new_endpoint_manager( let net_endpoint_exts = net_todos .into_iter() .enumerate() - .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt { + .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt { net_endpoint: match todo_endpoint { TodoEndpoint::NetEndpoint(mut net_endpoint) => { let token = TokenTarget::NetEndpoint { index }.into(); @@ -464,7 +475,7 @@ fn new_endpoint_manager( } _ => unreachable!(), }, - getter_for_incoming: local_port, + getter_for_incoming: endpoint_setup.local_port, }) .collect(); let udp_endpoint_exts = udp_todos diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 42c242a477d027fb1cb76d13951eb4ce2d3724d6..8211d6b180ad89472f0924eaa670a80b9910867f 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -43,9 +43,10 @@ lazy_static::lazy_static! { Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap()) }; } +static TEST_MSG_BYTES: &'static [u8] = b"hello"; lazy_static::lazy_static! { static ref TEST_MSG: Payload = { - Payload::from(b"hello" as &[u8]) + Payload::from(TEST_MSG_BYTES) }; } @@ -82,9 +83,9 @@ fn new_sync() { fn new_net_port() { let test_log_path = Path::new("./logs/new_net_port"); let mut c = file_logged_connector(0, test_log_path); - let sock_addr = next_test_addr(); - let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); + let sock_addrs = [next_test_addr()]; + let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); } #[test] @@ -96,27 +97,27 @@ fn trivial_connect() { #[test] fn single_node_connect() { - let sock_addr = next_test_addr(); let test_log_path = Path::new("./logs/single_node_connect"); + let sock_addrs = [next_test_addr()]; let mut c = file_logged_connector(0, test_log_path); - let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); + let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); c.connect(SEC1).unwrap(); } #[test] fn minimal_net_connect() { - let sock_addr = next_test_addr(); let test_log_path = Path::new("./logs/minimal_net_connect"); + let sock_addrs = [next_test_addr()]; scope(|s| { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); - let _ = c.new_net_port(Getter, sock_addr, Active).unwrap(); + let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); c.connect(SEC1).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); - let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap(); + let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); c.connect(SEC1).unwrap(); }); }) @@ -235,11 +236,11 @@ fn native_self_msg() { #[test] fn two_natives_msg() { let test_log_path = Path::new("./logs/two_natives_msg"); - let sock_addr = next_test_addr(); + let sock_addrs = [next_test_addr()]; scope(|s| { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); - let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); + let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); c.connect(SEC1).unwrap(); c.get(g).unwrap(); c.sync(SEC1).unwrap(); @@ -247,7 +248,7 @@ fn two_natives_msg() { }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); - let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); + let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); c.sync(SEC1).unwrap(); @@ -273,11 +274,11 @@ fn trivial_nondet() { #[test] fn connector_pair_nondet() { let test_log_path = Path::new("./logs/connector_pair_nondet"); - let sock_addr = next_test_addr(); + let sock_addrs = [next_test_addr()]; scope(|s| { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); - let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); + let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); c.connect(SEC1).unwrap(); c.next_batch().unwrap(); c.get(g).unwrap(); @@ -286,7 +287,7 @@ fn connector_pair_nondet() { }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); - let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); + let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); c.sync(SEC1).unwrap(); @@ -434,19 +435,19 @@ fn local_timeout() { #[test] fn parent_timeout() { let test_log_path = Path::new("./logs/parent_timeout"); - let sock_addr = next_test_addr(); + let sock_addrs = [next_test_addr()]; scope(|s| { s.spawn(|_| { // parent; times out let mut c = file_logged_connector(999, test_log_path); - let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); c.connect(SEC1).unwrap(); c.sync(MS300).unwrap_err(); // timeout }); s.spawn(|_| { // child let mut c = file_logged_connector(000, test_log_path); - let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); + let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); c.connect(SEC1).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout @@ -458,19 +459,19 @@ fn parent_timeout() { #[test] fn child_timeout() { let test_log_path = Path::new("./logs/child_timeout"); - let sock_addr = next_test_addr(); + let sock_addrs = [next_test_addr()]; scope(|s| { s.spawn(|_| { // child; times out let mut c = file_logged_connector(000, test_log_path); - let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); c.connect(SEC1).unwrap(); c.sync(MS300).unwrap_err(); // timeout }); s.spawn(|_| { // parent let mut c = file_logged_connector(999, test_log_path); - let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); + let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); c.connect(SEC1).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout @@ -520,10 +521,10 @@ fn chain_connect() { #[test] fn net_self_loop() { let test_log_path = Path::new("./logs/net_self_loop"); - let sock_addr = next_test_addr(); + let sock_addrs = [next_test_addr()]; let mut c = file_logged_connector(0, test_log_path); - let p = c.new_net_port(Putter, sock_addr, Active).unwrap(); - let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); + let p = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); c.get(g).unwrap(); @@ -533,17 +534,17 @@ fn net_self_loop() { #[test] fn nobody_connects_active() { let test_log_path = Path::new("./logs/nobody_connects_active"); - let sock_addr = next_test_addr(); + let sock_addrs = [next_test_addr()]; let mut c = file_logged_connector(0, test_log_path); - let _g = c.new_net_port(Getter, sock_addr, Active).unwrap(); + let _g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); c.connect(Some(Duration::from_secs(5))).unwrap_err(); } #[test] fn nobody_connects_passive() { let test_log_path = Path::new("./logs/nobody_connects_passive"); - let sock_addr = next_test_addr(); + let sock_addrs = [next_test_addr()]; let mut c = file_logged_connector(0, test_log_path); - let _g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); + let _g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); c.connect(Some(Duration::from_secs(5))).unwrap_err(); } @@ -658,12 +659,100 @@ fn multi_recover() { .unwrap(); } -// #[test] -// fn udp_self_connect() { -// let test_log_path = Path::new("./logs/udp_self_connect"); -// let sock_addrs = [next_test_addr(), next_test_addr()]; -// let mut c = file_logged_connector(0, test_log_path); -// c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap(); -// c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap(); -// c.connect(SEC1).unwrap(); -// } +#[test] +fn udp_self_connect() { + let test_log_path = Path::new("./logs/udp_self_connect"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap(); + c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap(); + c.connect(SEC1).unwrap(); +} + +#[test] +fn solo_udp_put_success() { + let test_log_path = Path::new("./logs/solo_udp_put_success"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(MS300).unwrap(); +} + +#[test] +fn solo_udp_get_fail() { + let test_log_path = Path::new("./logs/solo_udp_get_fail"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_udp_port(Getter, sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.get(p0).unwrap(); + c.sync(MS300).unwrap_err(); +} + +#[test] +fn reowolf_to_udp() { + let test_log_path = Path::new("./logs/reowolf_to_udp"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let barrier = std::sync::Barrier::new(2); + scope(|s| { + s.spawn(|_| { + barrier.wait(); + // reowolf thread + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(MS300).unwrap(); + barrier.wait(); + }); + s.spawn(|_| { + barrier.wait(); + // udp thread + let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); + udp.connect(sock_addrs[0]).unwrap(); + let mut buf = unsafe { + // canonical way to create uninitalized byte buffer + let mut v = Vec::with_capacity(256); + v.set_len(256); + v + }; + let len = udp.recv(&mut buf).unwrap(); + assert_eq!(TEST_MSG_BYTES, &buf[0..len]); + barrier.wait(); + }); + }) + .unwrap(); +} + +#[test] +fn udp_to_reowolf() { + let test_log_path = Path::new("./logs/udp_to_reowolf"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let barrier = std::sync::Barrier::new(2); + scope(|s| { + s.spawn(|_| { + barrier.wait(); + // reowolf thread + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_udp_port(Getter, sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.get(p0).unwrap(); + c.sync(SEC1).unwrap(); + assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES); + barrier.wait(); + }); + s.spawn(|_| { + barrier.wait(); + // udp thread + let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); + udp.connect(sock_addrs[0]).unwrap(); + for _ in 0..5 { + udp.send(TEST_MSG_BYTES).unwrap(); + } + barrier.wait(); + }); + }) + .unwrap(); +}