Changeset - 27ba74fbac9f
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-08-28 11:58:15
christopher.esterhuyse@gmail.com
made the (static) logging more modular in preparation for benchmarking
3 files changed with 31 insertions and 21 deletions:
0 comments (0 inline, 0 general)
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
    	if cfg!(feature = "endpoint_logging") {
 
	        if let Some(w) = $logger.line_writer() {
 
	        	let _ = writeln!(w, $($arg)*);
 
	        }
 
	    }
 
    }};
 
}
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    (@COMM_NB, $logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
    	if let Some(w) = $logger.line_writer() {
 
        	let _ = writeln!(w, $($arg)*);
 
    	}
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -183,48 +183,49 @@ impl Connector {
 
                    }
 
                    _ => {}
 
                }
 
                comm.round_result = Self::connected_sync(cu, comm, timeout);
 
                comm.round_index += 1;
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
            }
 
        }
 
    }
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.inner.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
@@ -233,79 +234,80 @@ impl Connector {
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
                    .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 
        log!(@COMM_NB, cu.inner.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for all silent (not firing) ports
 
@@ -332,98 +334,99 @@ impl Connector {
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        log!(@COMM_NB, cu.inner.logger, "Searching for decision...");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(@COMM_NB, cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.inner.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.inner.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(@COMM_NB, cu.inner.logger, "Sync round ending! Cleaning up");
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.inner.logger, "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.inner.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.inner.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
src/runtime/endpoints.rs
Show inline comments
 
@@ -12,61 +12,63 @@ struct TryRecvAnyNetError {
 
    error: NetEndpointError,
 
    index: usize,
 
}
 
/////////////////////
 
impl NetEndpoint {
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        bincode::config::DefaultOptions::default()
 
    }
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, NetEndpointError> {
 
        use NetEndpointError as Nee;
 
        // populate inbox as much as possible
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(Nee::BrokenNetEndpoint),
 
            }
 
        }
 
        endptlog!(
 
        log!(
 
            @ENDPT,
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                endptlog!(
 
                log!(
 
                    @ENDPT,
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
                );
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(Nee::MalformedMessage),
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
        &mut self,
 
        msg: &T,
 
    ) -> Result<(), NetEndpointError> {
 
        use bincode::config::Options;
 
        use NetEndpointError as Nee;
 
        Self::bincode_opts()
 
@@ -102,49 +104,49 @@ impl EndpointManager {
 
    /// Why not return SetupMsg? Because often this message will be forwarded to several others,
 
    /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!)
 
    pub(super) fn try_recv_any_setup(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        ///////////////////////////////////////////
 
        fn map_trane(
 
            trane: TryRecvAnyNetError,
 
            net_endpoint_store: &EndpointStore<NetEndpointExt>,
 
        ) -> ConnectError {
 
            ConnectError::NetEndpointSetupError(
 
                net_endpoint_store.endpoint_exts[trane.index]
 
                    .net_endpoint
 
                    .stream
 
                    .local_addr()
 
                    .unwrap(), // stream must already be connected
 
                trane.error,
 
            )
 
        }
 
        ///////////////////////////////////////////
 
        // try yield undelayed net message
 
        if let Some(tup) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &tup);
 
            log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup);
 
            return Ok(tup);
 
        }
 
        loop {
 
            // try recv from some polled undrained NET endpoint
 
            if let Some(tup) = self
 
                .try_recv_undrained_net(logger)
 
                .map_err(|trane| map_trane(trane, &self.net_endpoint_store))?
 
            {
 
                return Ok(tup);
 
            }
 
            // poll if time remains
 
            self.poll_and_populate(logger, deadline)?;
 
        }
 
    }
 

	
 
    // drops all Setup messages,
 
    // buffers all future round messages,
 
    // drops all previous round messages,
 
    // enqueues all current round SendPayload messages using round_ctx.getter_add
 
    // returns the first comm_ctrl_msg encountered
 
    // only polls until SOME message is enqueued
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
@@ -252,90 +254,92 @@ impl EndpointManager {
 
                    }
 
                }
 
            }
 
            if some_message_enqueued {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
            match self.poll_and_populate(logger, round_ctx.get_deadline()) {
 
                Ok(()) => {} // continue looping
 
                Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew),
 
                Err(Pape::PollFailed) => return Err(Use::PollFailed),
 
            }
 
        }
 
    }
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<(usize, Msg)>, TryRecvAnyNetError> {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                log!(@ENDPT, logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                return Ok(Some((index, msg)));
 
            }
 
        }
 
        Ok(None)
 
    }
 
    fn poll_and_populate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(), PollAndPopulateError> {
 
        use PollAndPopulateError as Pape;
 
        // No message yet. Do we have enough time to poll?
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?)
 
        } else {
 
            None
 
        };
 
        // Yes we do! Poll with remaining time as poll deadline
 
        self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?;
 
        for event in self.events.iter() {
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                    log!(
 
                        @ENDPT,
 
                        logger,
 
                        "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.net_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                    log!(
 
                        @ENDPT,
 
                        logger,
 
                        "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.udp_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
            }
 
        }
 
        self.events.clear();
 
        Ok(())
 
    }
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
0 comments (0 inline, 0 general)