From cf50a74eafd7e7dd5ca3f346b2876299f60bb0a7 2020-07-08 14:46:27 From: Christopher Esterhuyse Date: 2020-07-08 14:46:27 Subject: [PATCH] setup of udp endpoints partially done --- diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index dcbff96cf8c2fe45ed3090bd16d785ea96886f89..041bf67a6c91d33b2767a569bf078662251bc90c 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -534,46 +534,46 @@ impl Connector { // try recv messages arriving through endpoints log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); { - let (endpoint_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm - .endpoint_manager - .try_recv_any_comms(&mut *cu.logger, &cu.port_info, rctx, comm.round_index)? - { - CommRecvOk::NewControlMsg { net_endpoint_index, msg } => { - (net_endpoint_index, msg) - } - CommRecvOk::NewPayloadMsgs => continue 'undecided, - CommRecvOk::TimeoutWithoutNew => { - log!(cu.logger, "Reached user-defined deadling without decision..."); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? + let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = + match comm.endpoint_manager.try_recv_any_comms( + &mut *cu.logger, + &cu.port_info, + rctx, + comm.round_index, + )? { + CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), + CommRecvOk::NewPayloadMsgs => continue 'undecided, + CommRecvOk::TimeoutWithoutNew => { + log!(cu.logger, "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? + } else { + log!(cu.logger, "Already requested failure"); + } } else { - log!(cu.logger, "Already requested failure"); + log!(cu.logger, "As the leader, deciding on timeout"); + return Ok(Decision::Failure); } - } else { - log!(cu.logger, "As the leader, deciding on timeout"); - return Ok(Decision::Failure); + rctx.deadline = None; + continue 'undecided; } - rctx.deadline = None; - continue 'undecided; - } - }; + }; log!( cu.logger, "Received from endpoint {} ctrl msg {:?}", - endpoint_index, + net_index, &comm_ctrl_msg ); match comm_ctrl_msg { CommCtrlMsg::Suggest { suggestion } => { // only accept this control msg through a child endpoint - if comm.neighborhood.children.contains(&endpoint_index) { + if comm.neighborhood.children.contains(&net_index) { match suggestion { Decision::Success(predicate) => { // child solution contributes to local solution log!(cu.logger, "Child provided solution {:?}", &predicate); - let subtree_id = - SubtreeId::NetEndpoint { index: endpoint_index }; + let subtree_id = SubtreeId::NetEndpoint { index: net_index }; rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, subtree_id, @@ -602,12 +602,12 @@ impl Connector { cu.logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, - endpoint_index + net_index ); } } CommCtrlMsg::Announce { decision } => { - if Some(endpoint_index) == comm.neighborhood.parent { + if Some(net_index) == comm.neighborhood.parent { // adopt this decision return Ok(decision); } else { @@ -615,7 +615,7 @@ impl Connector { cu.logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, - endpoint_index + net_index ); } } @@ -684,9 +684,9 @@ impl BranchingNative { finished.insert(predicate, branch); continue; } - use AllMapperResult as Amr; - match predicate.all_mapper(&send_payload_msg.predicate) { - Amr::Nonexistant => { + use AssignmentUnionResult as Aur; + match predicate.assignment_union(&send_payload_msg.predicate) { + Aur::Nonexistant => { // this branch does not receive the message log!( cu.logger, @@ -695,13 +695,13 @@ impl BranchingNative { ); finished.insert(predicate, branch); } - Amr::Equivalent | Amr::FormerNotLatter => { + Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.logger, "branch pred covers it! Accept the msg"); finished.insert(predicate, branch); } - Amr::LatterNotFormer => { + Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); @@ -715,7 +715,7 @@ impl BranchingNative { finished.insert(predicate, branch); finished.insert(predicate2, branch2); } - Amr::New(predicate2) => { + Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); @@ -865,21 +865,21 @@ impl BranchingProtoComponent { blocked.insert(predicate, branch); continue; } - use AllMapperResult as Amr; + use AssignmentUnionResult as Aur; log!(logger, "visiting branch with pred {:?}", &predicate); - match predicate.all_mapper(&send_payload_msg.predicate) { - Amr::Nonexistant => { + match predicate.assignment_union(&send_payload_msg.predicate) { + Aur::Nonexistant => { // this branch does not receive the message log!(logger, "skipping branch"); blocked.insert(predicate, branch); } - Amr::Equivalent | Amr::FormerNotLatter => { + Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); unblocked.insert(predicate, branch); } - Amr::LatterNotFormer => { + Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched log!(logger, "Forking this branch, giving it the predicate of the msg"); let mut branch2 = branch.clone(); @@ -888,7 +888,7 @@ impl BranchingProtoComponent { blocked.insert(predicate, branch); unblocked.insert(predicate2, branch2); } - Amr::New(predicate2) => { + Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 551f2604b1075e9b52f53715c175f3b815d10dc1..f1d2e7103e76070572e1b49e53009b95054f5358 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -158,7 +158,7 @@ impl EndpointManager { &mut self, logger: &mut dyn Logger, round_ctx: &mut impl RoundCtxTrait, - net_endpoint_index: usize, + net_index: usize, msg: Msg, round_index: usize, some_message_enqueued: &mut bool, @@ -183,19 +183,16 @@ impl EndpointManager { comm_msg.round_index, round_index, ); - self.delayed_messages - .push((net_endpoint_index, Msg::CommMsg(comm_msg))); + self.delayed_messages.push((net_index, Msg::CommMsg(comm_msg))); return None; } }, }; match comm_msg_contents { - CommMsgContents::CommCtrl(comm_ctrl_msg) => { - Some((net_endpoint_index, comm_ctrl_msg)) - } + CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)), CommMsgContents::SendPayload(send_payload_msg) => { - let getter = self.net_endpoint_store.endpoint_exts[net_endpoint_index] - .getter_for_incoming; + let getter = + self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming; round_ctx.getter_add(getter, send_payload_msg); *some_message_enqueued = true; None @@ -207,30 +204,30 @@ impl EndpointManager { /////////////////////////////////////////// let mut some_message_enqueued = false; // try yield undelayed net message - while let Some((net_endpoint_index, msg)) = self.undelayed_messages.pop() { - if let Some((net_endpoint_index, msg)) = self.handle_msg( + while let Some((net_index, msg)) = self.undelayed_messages.pop() { + if let Some((net_index, msg)) = self.handle_msg( logger, round_ctx, - net_endpoint_index, + net_index, msg, round_index, &mut some_message_enqueued, ) { - return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg }); + return Ok(CommRecvOk::NewControlMsg { net_index, msg }); } } loop { // try receive a net message - while let Some((net_endpoint_index, msg)) = self.try_recv_undrained_net(logger)? { - if let Some((net_endpoint_index, msg)) = self.handle_msg( + while let Some((net_index, msg)) = self.try_recv_undrained_net(logger)? { + if let Some((net_index, msg)) = self.handle_msg( logger, round_ctx, - net_endpoint_index, + net_index, msg, round_index, &mut some_message_enqueued, ) { - return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg }); + return Ok(CommRecvOk::NewControlMsg { net_index, msg }); } } // try receive a udp message diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 6ae66019b5754d73bf9fd9c1e5d992d8304c83c4..46ad59c08164ff1e9a1619faa0bacc41bf51f6f9 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -136,7 +136,7 @@ struct SendPayloadMsg { payload: Payload, } #[derive(Debug, PartialEq)] -enum AllMapperResult { +enum AssignmentUnionResult { FormerNotLatter, LatterNotFormer, Equivalent, @@ -258,6 +258,7 @@ struct NativeBatch { to_put: HashMap, to_get: HashSet, } +#[derive(Copy, Clone, Eq, PartialEq, Hash)] enum TokenTarget { NetEndpoint { index: usize }, UdpEndpoint { index: usize }, @@ -270,7 +271,7 @@ trait RoundCtxTrait { enum CommRecvOk { TimeoutWithoutNew, NewPayloadMsgs, - NewControlMsg { net_endpoint_index: usize, msg: CommCtrlMsg }, + NewControlMsg { net_index: usize, msg: CommCtrlMsg }, } //////////////// fn would_block(err: &std::io::Error) -> bool { @@ -456,34 +457,31 @@ impl Predicate { for (var, val) in self.assigned.iter() { match maybe_superset.assigned.get(var) { Some(val2) if val2 == val => {} - _ => return false, + _ => return false, // var unmapped, or mapped differently } } true } // returns true IFF self.unify would return Equivalent OR FormerNotLatter - pub fn consistent_with(&self, other: &Self) -> bool { - let [larger, smaller] = - if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] }; + // pub fn consistent_with(&self, other: &Self) -> bool { + // let [larger, smaller] = + // if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] }; - for (var, val) in smaller.assigned.iter() { - match larger.assigned.get(var) { - Some(val2) if val2 != val => return false, - _ => {} - } - } - true - } + // for (var, val) in smaller.assigned.iter() { + // match larger.assigned.get(var) { + // Some(val2) if val2 != val => return false, + // _ => {} + // } + // } + // true + // } - /// Given self and other, two predicates, return the most general Predicate possible, N - /// such that n.satisfies(self) && n.satisfies(other). - /// If none exists Nonexistant is returned. - /// If the resulting predicate is equivlanet to self, other, or both, - /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. - /// otherwise New(N) is returned. - fn all_mapper(&self, other: &Self) -> AllMapperResult { - use AllMapperResult as Amr; + /// Given self and other, two predicates, return the predicate whose + /// assignments are the union of those of self and other. + /// + fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { + use AssignmentUnionResult as Aur; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; @@ -514,7 +512,7 @@ impl Predicate { } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value - return Amr::Nonexistant; + return Aur::Nonexistant; } else { // both predicates assign the variable to the same value s = s_it.next(); @@ -525,9 +523,9 @@ impl Predicate { } // Observed zero inconsistencies. A unified predicate exists... match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Amr::Equivalent, // ... equivalent to both. - [false, true] => Amr::FormerNotLatter, // ... equivalent to self. - [true, false] => Amr::LatterNotFormer, // ... equivalent to other. + [true, true] => Aur::Equivalent, // ... equivalent to both. + [false, true] => Aur::FormerNotLatter, // ... equivalent to self. + [true, false] => Aur::LatterNotFormer, // ... equivalent to other. [false, false] => { // ... which is the union of the predicates' assignments but // is equivalent to neither self nor other. @@ -535,7 +533,7 @@ impl Predicate { for (&id, &b) in o_not_s { new.assigned.insert(id, b); } - Amr::New(new) + Aur::New(new) } } } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 32ea1ac6d99fd1954189264e22daf4605ce0545c..3a45844f1cdcf75dcb342fd0192e990713a05e3f 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -94,6 +94,7 @@ impl Connector { let mut endpoint_manager = new_endpoint_manager( &mut *cu.logger, &setup.net_endpoint_setups, + &setup.udp_endpoint_setups, &mut cu.port_info, &deadline, )?; @@ -129,7 +130,8 @@ impl Connector { } fn new_endpoint_manager( logger: &mut dyn Logger, - endpoint_setups: &[(PortId, NetEndpointSetup)], + net_endpoint_setups: &[(PortId, NetEndpointSetup)], + udp_endpoint_setups: &[(PortId, UdpEndpointSetup)], port_info: &mut PortInfo, deadline: &Option, ) -> Result { @@ -144,35 +146,14 @@ fn new_endpoint_manager( sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port } + struct UdpTodo { + local_port: PortId, + sock: UdpSocket, + } enum TodoEndpoint { Accepting(TcpListener), NetEndpoint(NetEndpoint), } - fn init_todo( - token: Token, - local_port: PortId, - endpoint_setup: &NetEndpointSetup, - poll: &mut Poll, - ) -> Result { - let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity { - let mut stream = TcpStream::connect(endpoint_setup.sock_addr) - .expect("mio::TcpStream connect should not fail!"); - poll.registry().register(&mut stream, token, BOTH).unwrap(); - TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] }) - } else { - let mut listener = TcpListener::bind(endpoint_setup.sock_addr) - .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; - poll.registry().register(&mut listener, token, BOTH).unwrap(); - TodoEndpoint::Accepting(listener) - }; - Ok(Todo { - todo_endpoint, - local_port, - sent_local_port: false, - recv_peer_port: None, - endpoint_setup: endpoint_setup.clone(), - }) - } //////////////////////////////////////////// // 1. Start to construct EndpointManager @@ -184,24 +165,49 @@ fn new_endpoint_manager( let mut waker_state: Option> = None; let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; - let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); + let mut events = Events::with_capacity(net_endpoint_setups.len() * 2 + 4); let mut net_polled_undrained = VecSet::default(); let udp_polled_undrained = VecSet::default(); let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively - let mut todos = endpoint_setups + let mut todos = net_endpoint_setups .iter() .enumerate() .map(|(index, (local_port, endpoint_setup))| { - init_todo( - TokenTarget::NetEndpoint { index }.into(), - *local_port, - endpoint_setup, - &mut poll, - ) + let token = TokenTarget::NetEndpoint { index }.into(); + let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity { + let mut stream = TcpStream::connect(endpoint_setup.sock_addr) + .expect("mio::TcpStream connect should not fail!"); + poll.registry().register(&mut stream, token, BOTH).unwrap(); + TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] }) + } else { + let mut listener = TcpListener::bind(endpoint_setup.sock_addr) + .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; + poll.registry().register(&mut listener, token, BOTH).unwrap(); + TodoEndpoint::Accepting(listener) + }; + Ok(Todo { + todo_endpoint, + local_port: *local_port, + sent_local_port: false, + recv_peer_port: None, + endpoint_setup: endpoint_setup.clone(), + }) }) .collect::, ConnectError>>()?; + let udp_todos = udp_endpoint_setups + .iter() + .enumerate() + .map(|(index, (local_port, endpoint_setup))| { + let mut sock = UdpSocket::bind(endpoint_setup.local_addr) + .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?; + poll.registry() + .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE) + .unwrap(); + Ok(UdpTodo { sock, local_port: *local_port }) + }) + .collect::, ConnectError>>()?; // 3. Using poll to drive progress: // - accept an incoming connection for each TcpListener (turning them into endpoints too) @@ -210,8 +216,12 @@ fn new_endpoint_manager( // all in connect_failed are NOT registered with Poll let mut connect_failed: HashSet = Default::default(); + // TODO register udps, and all them to incomplete list - let mut setup_incomplete: HashSet = (0..todos.len()).collect(); + let mut setup_incomplete: HashSet = (0..todos.len()) + .map(|index| TokenTarget::NetEndpoint { index }) + .chain((0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index })) + .collect(); while !setup_incomplete.is_empty() { let remaining = if let Some(deadline) = deadline { Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?) @@ -222,6 +232,10 @@ fn new_endpoint_manager( for event in events.iter() { let token = event.token(); let token_target = TokenTarget::from(token); + if !setup_incomplete.contains(&token_target) { + // spurious wakeup + continue; + } match token_target { TokenTarget::Waker => { log!( @@ -230,12 +244,12 @@ fn new_endpoint_manager( connect_failed.iter() ); assert!(waker_state.is_some()); - for net_endpoint_index in connect_failed.drain() { - let todo: &mut Todo = &mut todos[net_endpoint_index]; + for net_index in connect_failed.drain() { + let todo: &mut Todo = &mut todos[net_index]; log!( logger, "Restarting connection with endpoint {:?} {:?}", - net_endpoint_index, + net_index, todo.endpoint_setup.sock_addr ); match &mut todo.todo_endpoint { @@ -244,8 +258,7 @@ fn new_endpoint_manager( TcpStream::connect(todo.endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); std::mem::swap(&mut endpoint.stream, &mut new_stream); - let token = - TokenTarget::NetEndpoint { index: net_endpoint_index }.into(); + let token = TokenTarget::NetEndpoint { index: net_index }.into(); poll.registry() .register(&mut endpoint.stream, token, BOTH) .unwrap(); @@ -254,7 +267,13 @@ fn new_endpoint_manager( } } } - TokenTarget::UdpEndpoint { index: _ } => unreachable!(), + TokenTarget::UdpEndpoint { index } => { + let udp_todo: &UdpTodo = &udp_todos[index]; + if event.is_error() { + return Err(Ce::BindFailed(udp_todo.sock.local_addr().unwrap())); + } + setup_incomplete.remove(&token_target); + } TokenTarget::NetEndpoint { index } => { let todo: &mut Todo = &mut todos[index]; if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint { @@ -333,10 +352,6 @@ fn new_endpoint_manager( // spurious wakeup continue; } - if !setup_incomplete.contains(&index) { - // spurious wakeup - continue; - } let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap(); if event.is_writable() && !todo.sent_local_port { // can write and didn't send setup msg yet? Do so! @@ -414,7 +429,7 @@ fn new_endpoint_manager( // is the setup for this net_endpoint now complete? if todo.sent_local_port && todo.recv_peer_port.is_some() { // yes! connected, sent my info and received peer's info - setup_incomplete.remove(&index); + setup_incomplete.remove(&token_target); log!(logger, "endpoint[{}] is finished!", index); } } @@ -429,7 +444,6 @@ fn new_endpoint_manager( arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst); // TODO leave the waker registered? } - let udp_endpoint_exts = vec![]; let net_endpoint_exts = todos .into_iter() @@ -448,6 +462,22 @@ fn new_endpoint_manager( getter_for_incoming: local_port, }) .collect(); + let udp_endpoint_exts = udp_todos + .into_iter() + .enumerate() + .map(|(index, udp_todo)| { + let UdpTodo { mut sock, local_port } = udp_todo; + let token = TokenTarget::UdpEndpoint { index }.into(); + poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap(); + UdpEndpointExt { + sock, + outgoing_payloads: Default::default(), + incoming_round_spec_var: None, + getter_for_incoming: local_port, + incoming_payloads: Default::default(), + } + }) + .collect(); Ok(EndpointManager { poll, events, diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 731c9c83b9b04b5889bf040029c6aa04391ceaaa..42c242a477d027fb1cb76d13951eb4ce2d3724d6 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -658,12 +658,12 @@ fn multi_recover() { .unwrap(); } -#[test] -fn udp_self_connect() { - let test_log_path = Path::new("./logs/udp_self_connect"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap(); - c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap(); - c.connect(SEC1).unwrap(); -} +// #[test] +// fn udp_self_connect() { +// let test_log_path = Path::new("./logs/udp_self_connect"); +// let sock_addrs = [next_test_addr(), next_test_addr()]; +// let mut c = file_logged_connector(0, test_log_path); +// c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap(); +// c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap(); +// c.connect(SEC1).unwrap(); +// }