Changeset - 4ef99b52a75d
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-23 15:37:37
christopher.esterhuyse@gmail.com
tightened visibility. cleaned up and simplified UDP mediator components
5 files changed with 11 insertions and 24 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.4"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 
socket2 = { version = "0.3.12", optional = true }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"]
 
default = ["ffi", "session_optimization"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
src/runtime/communication.rs
Show inline comments
 
@@ -307,98 +307,96 @@ impl Connector {
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for all silent (not firing) ports
 
                for &port in cu.inner.native_ports.difference(&firing_ports) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_spec_var, branch_spec_val)
 
            };
 
            log!(
 
                cu.inner.logger,
 
                "Native branch index={:?} has consistent {:?}",
 
                index,
 
                &predicate
 
            );
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 

	
 
        comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.inner.logger);
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.inner.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.inner.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
src/runtime/endpoints.rs
Show inline comments
 
@@ -189,240 +189,229 @@ impl EndpointManager {
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)),
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter =
 
                            self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming;
 
                        round_ctx.getter_add(getter, send_payload_msg);
 
                        *some_message_enqueued = true;
 
                        None
 
                    }
 
                }
 
            }
 
        }
 
        use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use};
 
        ///////////////////////////////////////////
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) = self.handle_msg(
 
                logger,
 
                round_ctx,
 
                net_index,
 
                msg,
 
                round_index,
 
                &mut some_message_enqueued,
 
            ) {
 
                return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
            }
 
        }
 
        loop {
 
            // try receive a net message
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(logger)? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    logger,
 
                    round_ctx,
 
                    net_index,
 
                    msg,
 
                    round_index,
 
                    &mut some_message_enqueued,
 
                ) {
 
                    return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some((bytes_written, from)) = ee.sock.recv_from(recv_buffer).ok() {
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_from_this_round.is_none() {
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        round_ctx.getter_add(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
                        some_message_enqueued = true;
 
                        ee.received_from_this_round = Some(from);
 
                        ee.received_this_round = true;
 
                    } else {
 
                        // lose the message!
 
                    }
 
                }
 
            }
 
            if some_message_enqueued {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
            match self.poll_and_populate(logger, round_ctx.get_deadline()) {
 
                Ok(()) => {} // continue looping
 
                Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew),
 
                Err(Pape::PollFailed) => return Err(Use::PollFailed),
 
            }
 
        }
 
    }
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<(usize, Msg)>, TryRecvAnyNetError> {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                return Ok(Some((index, msg)));
 
            }
 
        }
 
        Ok(None)
 
    }
 
    fn poll_and_populate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(), PollAndPopulateError> {
 
        use PollAndPopulateError as Pape;
 
        // No message yet. Do we have enough time to poll?
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?)
 
        } else {
 
            None
 
        };
 
        // Yes we do! Poll with remaining time as poll deadline
 
        self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?;
 
        for event in self.events.iter() {
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                        logger,
 
                        "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.net_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                        logger,
 
                        "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.udp_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
            }
 
        }
 
        self.events.clear();
 
        Ok(())
 
    }
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
    pub(super) fn udp_endpoints_round_start(&mut self, logger: &mut dyn Logger) {
 
        log!(
 
            logger,
 
            "Starting round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() {
 
            ee.received_from_this_round = None;
 
        }
 
    }
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        decision: &Decision,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        // retain received_from_this_round for use in pseudo_socket_api::recv_from
 
        log!(
 
            logger,
 
            "Ending round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        use UnrecoverableSyncError as Use;
 
        if let Decision::Success(solution_predicate) = decision {
 
            'endpoint_loop: for (index, ee) in
 
                self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate()
 
            {
 
                for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
            for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() {
 
                'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
                    if payload_predicate.assigns_subset(solution_predicate) {
 
                        ee.sock.send(payload.as_slice()).map_err(|e| {
 
                            println!("{:?}", e);
 
                            Use::BrokenUdpEndpoint { index }
 
                        })?;
 
                        log!(
 
                            logger,
 
                            "Sent payload {:?} with pred {:?} through Udp endpoint {}",
 
                            &payload,
 
                            &payload_predicate,
 
                            index
 
                        );
 
                        continue 'endpoint_loop; // send at most one payload per endpoint per round
 
                        // send at most one payload per endpoint per round
 
                        break 'outgoing_loop;
 
                    }
 
                }
 
                log!(logger, "Sent no message through Udp endpoint {}", index);
 
                ee.received_this_round = false;
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
impl Debug for NetEndpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub(super) fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 
impl From<PollAndPopulateError> for ConnectError {
 
    fn from(pape: PollAndPopulateError) -> ConnectError {
 
        use {ConnectError as Ce, PollAndPopulateError as Pape};
 
        match pape {
 
            Pape::PollFailed => Ce::PollFailed,
 
            Pape::Timeout => Ce::Timeout,
 
        }
 
    }
 
}
 
impl From<TryRecvAnyNetError> for UnrecoverableSyncError {
 
    fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError {
 
        let TryRecvAnyNetError { index, .. } = trane;
 
        UnrecoverableSyncError::BrokenNetEndpoint { index }
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -126,97 +126,97 @@ enum CommMsgContents {
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommCtrlMsg {
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    getter_for_incoming: PortId,
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    getter_for_incoming: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_from_this_round: Option<SocketAddr>,
 
    received_this_round: bool,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll. Poll token computed with TargetToken::into
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
src/runtime/setup.rs
Show inline comments
 
@@ -466,97 +466,97 @@ fn new_endpoint_manager(
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    if let Some(ws) = waker_state.take() {
 
        ws.waker_stop();
 
    }
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: endpoint_setup.getter_for_incoming,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, udp_todo)| {
 
            let UdpTodo { mut sock, getter_for_incoming } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                received_from_this_round: None,
 
                received_this_round: false,
 
                getter_for_incoming,
 
            }
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(Sm::LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
        }
 
        Ok(())
0 comments (0 inline, 0 general)