From d3835a56401dbcceea959b3d4d1386645b6bef43 2020-09-22 16:28:07 From: Christopher Esterhuyse Date: 2020-09-22 16:28:07 Subject: [PATCH] flattened some structures for simplicity. added more internal doc comments --- diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index fd1b4735baca5202d4ead0a21a34176cf8ff76c1..f0ea80da75a74343044a206bdee2eb74f0c96980 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -7,6 +7,8 @@ mod lexer; mod parser; lazy_static::lazy_static! { + /// Conveniently-provided protocol description initialized with a zero-length PDL string. + /// Exposed to minimize repeated initializations of this common protocol description. pub static ref TRIVIAL_PD: std::sync::Arc = { std::sync::Arc::new(ProtocolDescription::parse(b"").unwrap()) }; @@ -18,6 +20,8 @@ use crate::protocol::eval::*; use crate::protocol::inputsource::*; use crate::protocol::parser::*; +/// Description of a protocol object, used to configure new connectors. +/// (De)serializable. #[derive(serde::Serialize, serde::Deserialize)] #[repr(C)] pub struct ProtocolDescription { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index b4342f823c424d21082fcd76c5d1cbfa7a5eb750..10510935b5ece0a80c312bdfba36912726dfe43a 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -49,16 +49,16 @@ impl ReplaceBoolTrue for bool { // making it harder to accidentally mutate its contents in a way that cannot be rolled back. impl CuUndecided for ConnectorUnphased { fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription) { - (&mut *self.inner.logger, &self.proto_description) + (&mut *self.logger, &self.proto_description) } fn logger(&mut self) -> &mut dyn Logger { - &mut *self.inner.logger + &mut *self.logger } fn proto_description(&self) -> &ProtocolDescription { &self.proto_description } fn native_component_id(&self) -> ComponentId { - self.inner.native_component_id + self.native_component_id } } @@ -127,8 +127,8 @@ impl Connector { ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; let Self { unphased: cu, phased } = self; - let info = cu.inner.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?; - if info.owner != cu.inner.native_component_id { + let info = cu.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?; + if info.owner != cu.native_component_id { return Err(Poe::PortUnavailable); } if info.polarity != expect_polarity { @@ -228,7 +228,7 @@ impl Connector { // 1. run all proto components to Nonsync blockers // iterate - let mut current_state = cu.inner.current_state.clone(); + let mut current_state = cu.current_state.clone(); let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu @@ -280,7 +280,7 @@ impl Connector { let mut rctx = RoundCtx { current_state, solution_storage: { - let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id)); + let n = std::iter::once(SubtreeId::LocalComponent(cu.native_component_id)); let c = branching_proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid)); let e = comm @@ -290,13 +290,13 @@ impl Connector { .map(|&index| SubtreeId::NetEndpoint { index }); let subtree_id_iter = n.chain(c).chain(e); log!( - cu.inner.logger, + cu.logger, "Children in subtree are: {:?}", subtree_id_iter.clone().collect::>() ); SolutionStorage::new(subtree_id_iter) }, - spec_var_stream: cu.inner.current_state.id_manager.new_spec_var_stream(), + spec_var_stream: cu.current_state.id_manager.new_spec_var_stream(), payload_inbox: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; @@ -328,12 +328,12 @@ impl Connector { ); let firing_ports: HashSet = firing_iter.clone().collect(); for port in firing_iter { - let var = cu.inner.current_state.spec_var_for(port); + let var = cu.current_state.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } // all silent ports have SpecVal::SILENT - for (port, port_info) in cu.inner.current_state.port_info.iter() { - if port_info.owner != cu.inner.native_component_id { + for (port, port_info) in cu.current_state.port_info.iter() { + if port_info.owner != cu.native_component_id { // not my port continue; } @@ -341,7 +341,7 @@ impl Connector { // this one is FIRING continue; } - let var = cu.inner.current_state.spec_var_for(*port); + let var = cu.current_state.spec_var_for(*port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; @@ -362,7 +362,7 @@ impl Connector { putter ); // sanity check - assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity); + assert_eq!(Putter, cu.current_state.port_info.get(&putter).unwrap().polarity); rctx.putter_push(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; @@ -375,7 +375,7 @@ impl Connector { ); rctx.solution_storage.submit_and_digest_subtree_solution( cu, - SubtreeId::LocalComponent(cu.inner.native_component_id), + SubtreeId::LocalComponent(cu.native_component_id), predicate.clone(), ); } @@ -421,7 +421,7 @@ impl Connector { let ret = match decision { Decision::Failure => { // dropping {branching_proto_components, branching_native} - log!(cu.inner.logger, "Failure with {:#?}", &rctx.solution_storage); + log!(cu.logger, "Failure with {:#?}", &rctx.solution_storage); Err(Se::RoundFailure) } Decision::Success(predicate) => { @@ -434,9 +434,9 @@ impl Connector { .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), ); // commit changes to ports and id_manager - cu.inner.current_state = rctx.current_state; + cu.current_state = rctx.current_state; log!( - cu.inner.logger, + cu.logger, "End round with (updated) component states {:?}", cu.proto_components.keys() ); diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 62dbd2cc59e0a488b456401011ed0130ca21342a..36c8a1de749887f8f1c2dbd51eecece32f65b9bf 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -27,7 +27,7 @@ impl NetEndpoint { 'read_loop: loop { let res = self.stream.read_to_end(&mut self.inbox); match res { - Err(e) if would_block(&e) => break 'read_loop, + Err(e) if err_would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), Err(_e) => return Err(Nee::BrokenNetEndpoint), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index cd6e88d0692245b159b8d32ab094431f125a0a79..37b1dff24e324a9cd07455480885b1a47880bc06 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -15,7 +15,7 @@ use crate::common::*; use error::*; use mio::net::UdpSocket; -/// Each Connector structure is the interface between the user's application and a communication session, +/// The interface between the user's application and a communication session, /// in which the application plays the part of a (native) component. This structure provides the application /// with functionality available to all components: the ability to add new channels (port pairs), and to /// instantiate new components whose definitions are defined in the connector's configured protocol @@ -47,58 +47,75 @@ pub struct DummyLogger; /// A logger that writes the logged lines to a given file. #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); -#[derive(Debug, Clone)] -struct CurrentState { - port_info: HashMap, - id_manager: IdManager, -} + +// Interface between protocol state and the connector runtime BEFORE all components +// ave begun their branching speculation. See ComponentState::nonsync_run. pub(crate) struct NonsyncProtoContext<'a> { current_state: &'a mut CurrentState, logger: &'a mut dyn Logger, - // cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase proto_component_id: ComponentId, // KEY in id->component map } + +// Interface between protocol state and the connector runtime AFTER all components +// have begun their branching speculation. See ComponentState::sync_run. pub(crate) struct SyncProtoContext<'a> { rctx: &'a RoundCtx, branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch predicate: &'a Predicate, // KEY in pred->branch map } + +// The data coupled with a particular protocol component branch, but crucially omitting +// the `ComponentState` such that this may be passed by reference to the state with separate +// access control. #[derive(Default, Debug, Clone)] struct ProtoComponentBranchInner { untaken_choice: Option, did_put_or_get: HashSet, inbox: HashMap, } + +// A speculative variable that lives for the duration of the synchronous round. +// Each is assigned a value in domain `SpecVal`. #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] struct SpecVar(PortId); + +// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT, +// but may also enumerate many more values to facilitate finer-grained nondeterministic branching. #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] struct SpecVal(u16); + +// Data associated with a successful synchronous round, retained afterwards such that the +// native component can freely reflect on how it went, reading the messages received at their +// inputs, and reflecting on which of their connector's synchronous batches succeeded. #[derive(Debug)] struct RoundOk { batch_index: usize, gotten: HashMap, } + +// Implementation of a set in terms of a vector (optimized for reading, not writing) #[derive(Default)] struct VecSet { // invariant: ordered, deduplicated vec: Vec, } + +// Allows a connector to remember how to forward payloads towards the component that +// owns their destination port. `LocalComponent` corresponds with messages for components +// managed by the connector itself (hinting for it to look it up in a local structure), +// whereas the other variants direct the connector to forward the messages over the network. #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum Route { LocalComponent, NetEndpoint { index: usize }, UdpEndpoint { index: usize }, } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -enum SubtreeId { - LocalComponent(ComponentId), - NetEndpoint { index: usize }, -} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct MyPortInfo { polarity: Polarity, @@ -235,6 +252,13 @@ struct PortInfo { route: Route, } +#[derive(Debug, Clone)] +struct CurrentState { + port_info: HashMap, + id_manager: IdManager, +} + +// A component's setup-phase-specific data #[derive(Debug)] struct ConnectorCommunication { round_index: usize, @@ -247,36 +271,63 @@ struct ConnectorCommunication { struct ConnectorUnphased { proto_description: Arc, proto_components: HashMap, - inner: ConnectorUnphasedInner, -} -#[derive(Debug)] -struct ConnectorUnphasedInner { logger: Box, current_state: CurrentState, native_component_id: ComponentId, } -#[derive(Debug)] -struct ConnectorSetup { - net_endpoint_setups: Vec, - udp_endpoint_setups: Vec, -} + +// A connector's phase-specific data #[derive(Debug)] enum ConnectorPhased { Setup(Box), Communication(Box), } + +// A connector's setup-phase-specific data +#[derive(Debug)] +struct ConnectorSetup { + net_endpoint_setups: Vec, + udp_endpoint_setups: Vec, +} + #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] struct Predicate { assigned: BTreeMap, } + +// Identifies a child of this connector in the _solution tree_. +// Each connector creates its own local solutions for the consensus procedure during `sync`, +// from the solutions of its children. Those children are either locally-managed components, +// (which are leaves in the solution tree), or other connectors reachable through the given +// network endpoint (which are internal nodes in the solution tree). +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +enum SubtreeId { + LocalComponent(ComponentId), + NetEndpoint { index: usize }, +} + +// An accumulation of the connector's knowledge of all (a) the local solutions its children +// in the solution tree have found, and (b) its own solutions derivable from those of its children. +// This structure starts off each round with an empty set, and accumulates solutions as they are found +// by local components, or received over the network in control messages. +// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to +// say that these sets GROW until the round is over, and all solutions are reset. #[derive(Debug)] struct SolutionStorage { + // invariant: old_local U new_local solutions are those that can be created from + // the UNION of one element from each set in `subtree_solution`. + // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. old_local: HashSet, new_local: HashSet, // this pair acts as SubtreeId -> HashSet which is friendlier to iteration subtree_solutions: Vec>, subtree_id_to_index: HashMap, } + +// Stores the transient data of a synchronous round. +// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of +// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx, +// and can be undone if the round fails. struct RoundCtx { solution_storage: SolutionStorage, spec_var_stream: SpecVarStream, @@ -284,58 +335,51 @@ struct RoundCtx { deadline: Option, current_state: CurrentState, } + +// A trait intended to limit the access of the ConnectorUnphased structure +// such that we don't accidentally modify any important component/port data +// while the results of the round are undecided. Why? Any actions during Connector::sync +// are _speculative_ until the round is decided, and we need a safe way of rolling +// back any changes. trait CuUndecided { fn logger(&mut self) -> &mut dyn Logger; fn proto_description(&self) -> &ProtocolDescription; fn native_component_id(&self) -> ComponentId; fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription); } + +// Represents a set of synchronous port operations that the native component +// has described as an "option" for completing during the synchronous rounds. +// Operations contained here succeed together or not at all. +// A native with N=2+ batches are expressing an N-way nondeterministic choice #[derive(Debug, Default)] struct NativeBatch { // invariant: putters' and getters' polarities respected to_put: HashMap, to_get: HashSet, } + +// Parallels a mio::Token type, but more clearly communicates +// the way it identifies the evented structre it corresponds to. +// See runtime/setup for methods converting between TokenTarget and mio::Token #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] enum TokenTarget { NetEndpoint { index: usize }, UdpEndpoint { index: usize }, Waker, } + +// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, +// such that it can know when to continue polling, and when to block. enum CommRecvOk { TimeoutWithoutNew, NewPayloadMsgs, NewControlMsg { net_index: usize, msg: CommCtrlMsg }, } //////////////// -fn would_block(err: &std::io::Error) -> bool { +fn err_would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock } -impl TokenTarget { - const HALFWAY_INDEX: usize = usize::MAX / 2; - const MAX_INDEX: usize = usize::MAX; - const WAKER_TOKEN: usize = Self::MAX_INDEX; -} -impl From for TokenTarget { - fn from(Token(index): Token) -> Self { - if index == Self::WAKER_TOKEN { - TokenTarget::Waker - } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { - TokenTarget::UdpEndpoint { index: shifted } - } else { - TokenTarget::NetEndpoint { index } - } - } -} -impl Into for TokenTarget { - fn into(self) -> Token { - match self { - TokenTarget::Waker => Token(Self::WAKER_TOKEN), - TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), - TokenTarget::NetEndpoint { index } => Token(index), - } - } -} impl VecSet { fn new(mut vec: Vec) -> Self { vec.sort(); @@ -448,13 +492,13 @@ impl Connector { /// Enables the connector's current logger to be swapped out for another pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger); + std::mem::swap(&mut self.unphased.logger, &mut new_logger); new_logger } /// Access the connector's current logger pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.unphased.inner.logger + &mut *self.unphased.logger } /// Create a new synchronous channel, returning its ends as a pair of ports, @@ -464,27 +508,27 @@ impl Connector { pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let mut new_cid = || cu.inner.current_state.id_manager.new_port_id(); + let mut new_cid = || cu.current_state.id_manager.new_port_id(); let [o, i] = [new_cid(), new_cid()]; - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( o, PortInfo { route: Route::LocalComponent, peer: Some(i), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, polarity: Putter, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( i, PortInfo { route: Route::LocalComponent, peer: Some(o), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, polarity: Getter, }, ); - log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } @@ -515,8 +559,8 @@ impl Connector { return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); } for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.inner.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; - if info.owner != cu.inner.native_component_id { + let info = cu.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; + if info.owner != cu.native_component_id { return Err(Ace::UnknownPort(port)); } if info.polarity != expected_polarity { @@ -524,12 +568,12 @@ impl Connector { } } // 2. add new component - let new_cid = cu.inner.current_state.id_manager.new_component_id(); + let new_cid = cu.current_state.id_manager.new_component_id(); cu.proto_components .insert(new_cid, cu.proto_description.new_main_component(identifier, ports)); // 3. update port ownership for port in ports.iter() { - match cu.inner.current_state.port_info.get_mut(port) { + match cu.current_state.port_info.get_mut(port) { Some(port_info) => port_info.owner = new_cid, None => unreachable!(), } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 27be90d80ca6bcc5d220bad3cf0b79947c432449..1f77f0121fde90a0d50510dc6a31881ab46986b5 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -1,6 +1,31 @@ use crate::common::*; use crate::runtime::*; +impl TokenTarget { + const HALFWAY_INDEX: usize = usize::MAX / 2; + const MAX_INDEX: usize = usize::MAX; + const WAKER_TOKEN: usize = Self::MAX_INDEX; +} +impl From for TokenTarget { + fn from(Token(index): Token) -> Self { + if index == Self::WAKER_TOKEN { + TokenTarget::Waker + } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { + TokenTarget::UdpEndpoint { index: shifted } + } else { + TokenTarget::NetEndpoint { index } + } + } +} +impl Into for TokenTarget { + fn into(self) -> Token { + match self { + TokenTarget::Waker => Token(Self::WAKER_TOKEN), + TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), + TokenTarget::NetEndpoint { index } => Token(index), + } + } +} impl Connector { /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing). /// The resulting connector will start in the setup phase, and cannot be used for communication until the @@ -25,11 +50,9 @@ impl Connector { unphased: ConnectorUnphased { proto_description, proto_components: Default::default(), - inner: ConnectorUnphasedInner { - logger, - native_component_id, - current_state: CurrentState { id_manager, port_info: Default::default() }, - }, + logger, + native_component_id, + current_state: CurrentState { id_manager, port_info: Default::default() }, }, phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { net_endpoint_setups: Default::default(), @@ -52,29 +75,29 @@ impl Connector { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); - let udp_cid = cu.inner.current_state.id_manager.new_component_id(); - let mut npid = || cu.inner.current_state.id_manager.new_port_id(); + let udp_cid = cu.current_state.id_manager.new_component_id(); + let mut npid = || cu.current_state.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( nin, PortInfo { route: Route::LocalComponent, polarity: Getter, peer: Some(uout), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( nout, PortInfo { route: Route::LocalComponent, polarity: Putter, peer: Some(uin), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( uin, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -83,7 +106,7 @@ impl Connector { owner: udp_cid, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( uout, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -115,18 +138,18 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let new_pid = cu.inner.current_state.id_manager.new_port_id(); - cu.inner.current_state.port_info.insert( + let new_pid = cu.current_state.id_manager.new_port_id(); + cu.current_state.port_info.insert( new_pid, PortInfo { route: Route::LocalComponent, peer: None, - owner: cu.inner.native_component_id, + owner: cu.native_component_id, polarity, }, ); log!( - cu.inner.logger, + cu.logger, "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", new_pid, polarity, @@ -156,35 +179,35 @@ impl Connector { let Self { unphased: cu, phased } = self; match &phased { ConnectorPhased::Communication { .. } => { - log!(cu.inner.logger, "Call to connecting in connected state"); + log!(cu.logger, "Call to connecting in connected state"); Err(Ce::AlreadyConnected) } ConnectorPhased::Setup(setup) => { - log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout); + log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( - &mut *cu.inner.logger, + &mut *cu.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.inner.current_state.port_info, + &mut cu.current_state.port_info, &deadline, )?; log!( - cu.inner.logger, + cu.logger, "Successfully connected {} endpoints. info now {:#?} {:#?}", endpoint_manager.net_endpoint_store.endpoint_exts.len(), - &cu.inner.current_state.port_info, + &cu.current_state.port_info, &endpoint_manager, ); // leader election and tree construction let neighborhood = init_neighborhood( - cu.inner.current_state.id_manager.connector_id, - &mut *cu.inner.logger, + cu.current_state.id_manager.connector_id, + &mut *cu.logger, &mut endpoint_manager, &deadline, )?; - log!(cu.inner.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); let mut comm = ConnectorCommunication { round_index: 0, endpoint_manager, @@ -195,7 +218,7 @@ impl Connector { if cfg!(feature = "session_optimization") { session_optimize(cu, &mut comm, &deadline)?; } - log!(cu.inner.logger, "connect() finished. setup phase complete"); + log!(cu.logger, "connect() finished. setup phase complete"); *phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } @@ -366,7 +389,7 @@ fn new_endpoint_manager( if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { // FIRST try complete this connection match listener.accept() { - Err(e) if would_block(&e) => continue, // spurious wakeup + Err(e) if err_would_block(&e) => continue, // spurious wakeup Err(_) => { log!(logger, "accept() failure on index {}", index); return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); @@ -781,25 +804,25 @@ fn session_optimize( //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; //////////////////////////////////////// - log!(cu.inner.logger, "Beginning session optimization"); + log!(cu.logger, "Beginning session optimization"); // populate session_info_map from a message per child let mut unoptimized_map: HashMap = Default::default(); let mut awaiting: HashSet = comm.neighborhood.children.iter().copied().collect(); comm.endpoint_manager.undelay_all(); while !awaiting.is_empty() { log!( - cu.inner.logger, + cu.logger, "Session gather loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; - log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; + log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => { if !awaiting.remove(&recv_index) { log!( - cu.inner.logger, + cu.logger, "Wasn't expecting session info from {:?}. Got {:?}", recv_index, &child_unoptimized_map @@ -812,11 +835,11 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.inner.logger, "discarding old message {:?} during election", msg); + log!(cu.logger, "discarding old message {:?} during election", msg); } msg @ S(Sm::SessionScatter { .. }) => { log!( - cu.inner.logger, + cu.logger, "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", recv_index, &msg @@ -824,18 +847,18 @@ fn session_optimize( return Err(Ce::SetupAlgMisbehavior); } msg @ Msg::CommMsg(..) => { - log!(cu.inner.logger, "delaying msg {:?} during session optimization", msg); + log!(cu.logger, "delaying msg {:?} during session optimization", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } } } log!( - cu.inner.logger, + cu.logger, "Gathered all children's maps. ConnectorId set is... {:?}", unoptimized_map.keys() ); let my_session_info = SessionInfo { - port_info: cu.inner.current_state.port_info.clone(), + port_info: cu.current_state.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm @@ -846,38 +869,34 @@ fn session_optimize( .map(|ee| ee.getter_for_incoming) .collect(), }; - unoptimized_map.insert(cu.inner.current_state.id_manager.connector_id, my_session_info); - log!( - cu.inner.logger, - "Inserting my own info. Unoptimized subtree map is {:?}", - &unoptimized_map - ); + unoptimized_map.insert(cu.current_state.id_manager.connector_id, my_session_info); + log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); // acquire the optimized info... let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent - log!(cu.inner.logger, "Forwarding gathered info to parent {:?}", parent); + log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); let msg = S(Sm::SessionGather { unoptimized_map }); comm.endpoint_manager.send_to_setup(parent, &msg)?; 'scatter_loop: loop { log!( - cu.inner.logger, + cu.logger, "Session scatter recv loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; - log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; + log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionScatter { optimized_map }) => { if recv_index != parent { - log!(cu.inner.logger, "I expected the scatter from my parent only!"); + log!(cu.logger, "I expected the scatter from my parent only!"); return Err(Ce::SetupAlgMisbehavior); } break 'scatter_loop optimized_map; } msg @ Msg::CommMsg { .. } => { - log!(cu.inner.logger, "delaying msg {:?} during scatter recv", msg); + log!(cu.logger, "delaying msg {:?} during scatter recv", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } msg @ S(Sm::SessionGather { .. }) @@ -885,24 +904,24 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.inner.logger, "discarding old message {:?} during election", msg); + log!(cu.logger, "discarding old message {:?} during election", msg); } } } } else { // by computing it myself - log!(cu.inner.logger, "I am the leader! I will optimize this session"); - leader_session_map_optimize(&mut *cu.inner.logger, unoptimized_map)? + log!(cu.logger, "I am the leader! I will optimize this session"); + leader_session_map_optimize(&mut *cu.logger, unoptimized_map)? }; log!( - cu.inner.logger, + cu.logger, "Optimized info map is {:?}. Sending to children {:?}", &optimized_map, comm.neighborhood.children.iter() ); - log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map); + log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); let optimized_info = optimized_map - .get(&cu.inner.current_state.id_manager.connector_id) + .get(&cu.current_state.id_manager.connector_id) .expect("HEY NO INFO FOR ME?") .clone(); let msg = S(Sm::SessionScatter { optimized_map }); @@ -910,7 +929,7 @@ fn session_optimize( comm.endpoint_manager.send_to_setup(child, &msg)?; } apply_optimizations(cu, comm, optimized_info)?; - log!(cu.inner.logger, "Session optimizations applied"); + log!(cu.logger, "Session optimizations applied"); Ok(()) } fn leader_session_map_optimize( @@ -933,7 +952,7 @@ fn apply_optimizations( endpoint_incoming_to_getter, } = session_info; // TODO some info which should be read-only can be mutated with the current scheme - cu.inner.current_state.port_info = port_info; + cu.current_state.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in comm