Changeset - 330b9c117fa5
[Not reviewed]
0 11 0
Christopher Esterhuyse - 5 years ago 2020-09-17 14:27:27
christopher.esterhuyse@gmail.com
started refactor (safe state). goals: clarify ownership. make cu immutable while round is fallable. distinguish routing from ownership
11 files changed with 213 insertions and 93 deletions:
0 comments (0 inline, 0 general)
examples/bench_3/getter.c
Show inline comments
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./3_16_getter.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 0);
 
	rw_err_peek(c);
 
	
 
	PortId getter;
 
	FfiSocketAddr addr = {{192, 168, 1, 124}, 8009};
 
	FfiSocketAddr addr = {{127, 0, 0, 1}, 8001};
 
	rw_err_peek(c);
 
	connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_get(c, getter);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
examples/bench_3/putter.c
Show inline comments
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./3_16_putter.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 1);
 
	rw_err_peek(c);
 
	
 
	PortId putter;
 
	FfiSocketAddr addr = {{192, 168, 1, 124}, 8009};
 
	FfiSocketAddr addr = {{127, 0, 0, 1}, 8001};
 
	rw_err_peek(c);
 
	connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
src/common.rs
Show inline comments
 
@@ -25,26 +25,26 @@ pub(crate) use std::{
 
    time::Instant,
 
};
 
pub(crate) use Polarity::*;
 

	
 
pub(crate) trait IdParts {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix);
 
}
 
pub type ConnectorId = u32;
 
pub type U32Suffix = u32;
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
// acquired via error in the Rust API
 
pub struct ProtoComponentId(Id);
 
// pub, because it can be acquired via error in the Rust API
 
pub struct ComponentId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: U32Suffix,
 
}
 
#[derive(Clone, Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 
@@ -90,49 +90,49 @@ pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        (self.connector_id, self.u32_suffix)
 
    }
 
}
 
impl IdParts for PortId {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl IdParts for ProtoComponentId {
 
impl IdParts for ComponentId {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl U32Stream {
 
    pub(crate) fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
    pub(crate) fn n_skipped(mut self, n: u32) -> Self {
 
        self.next = self.next.saturating_add(n);
 
        self
 
    }
 
}
 
impl From<Id> for PortId {
 
    fn from(id: Id) -> PortId {
 
        Self(id)
 
    }
 
}
 
impl From<Id> for ProtoComponentId {
 
    fn from(id: Id) -> ProtoComponentId {
 
impl From<Id> for ComponentId {
 
    fn from(id: Id) -> Self {
 
        Self(id)
 
    }
 
}
 
impl From<&[u8]> for Payload {
 
    fn from(s: &[u8]) -> Payload {
 
        Payload(Arc::new(s.to_vec()))
 
    }
 
}
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
@@ -175,25 +175,25 @@ impl<'de> serde::Deserialize<'de> for Payload {
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        let (a, b) = self.id_parts();
 
        write!(f, "pid{}_{}", a, b)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
impl Debug for ComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        let (a, b) = self.id_parts();
 
        write!(f, "cid{}_{}", a, b)
 
    }
 
}
 
impl Debug for Payload {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Payload[{:?}]", DenseDebugHex(self.as_slice()))
 
    }
 
}
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
src/macros.rs
Show inline comments
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@BENCH, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    (@MARK, $logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
src/protocol/eval.rs
Show inline comments
 
@@ -1524,24 +1524,25 @@ impl Store {
 
                    }
 
                    _ => {}
 
                }
 
                right = self.eval(h, ctx, expr.right)?;
 
                match expr.operation {
 
                    BinaryOperator::Equality => Ok(left.eq(&right)),
 
                    BinaryOperator::Inequality => Ok(left.neq(&right)),
 
                    BinaryOperator::LessThan => Ok(left.lt(&right)),
 
                    BinaryOperator::LessThanEqual => Ok(left.lte(&right)),
 
                    BinaryOperator::GreaterThan => Ok(left.gt(&right)),
 
                    BinaryOperator::GreaterThanEqual => Ok(left.gte(&right)),
 
                    BinaryOperator::Remainder => Ok(left.modulus(&right)),
 
                    BinaryOperator::Add => Ok(left.plus(&right)),
 
                    _ => unimplemented!("{:?}", expr.operation),
 
                }
 
            }
 
            Expression::Unary(expr) => {
 
                let mut value = self.eval(h, ctx, expr.expression)?;
 
                match expr.operation {
 
                    UnaryOperation::PostIncrement => {
 
                        self.update(h, ctx, expr.expression, value.plus(&ONE))?;
 
                    }
 
                    UnaryOperation::PreIncrement => {
 
                        value = value.plus(&ONE);
 
                        self.update(h, ctx, expr.expression, value.clone())?;
src/runtime/communication.rs
Show inline comments
 
@@ -194,42 +194,46 @@ impl Connector {
 
        }
 
    }
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 

	
 
        log!(@MARK, cu.inner.logger, "sync start {}", comm.round_index);
 
        log!(
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        // iterate
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ProtoComponent)> = cu
 
            .proto_components
 
            .iter()
 
            .map(|(&proto_id, proto)| (proto_id, proto.clone()))
 
            .collect();
 
        log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.inner.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
@@ -255,29 +259,26 @@ impl Connector {
 
            }
 
        }
 
        log!(
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
                    .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id)));
 
                let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id));
 
                let c = cu.proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid));
 
                let e = comm
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
@@ -341,62 +342,64 @@ impl Connector {
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    SubtreeId::LocalComponent(cu.inner.native_component_id),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        log!(cu.inner.logger, "Searching for decision...");
 
        log!(@BENCH, cu.inner.logger, "");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(@MARK, cu.inner.logger, "got decision!");
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(@BENCH, cu.inner.logger, "");
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.inner.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        log!(@MARK, cu.inner.logger, "forwarding decision!");
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
@@ -414,46 +417,45 @@ impl Connector {
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(@BENCH, cu.inner.logger, "");
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        branching_proto_components: &mut HashMap<ComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        log!(@MARK, cu.inner.logger, "decide start");
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.inner.logger, "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.inner.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.inner.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.inner.logger, "Done translating native batches into branches");
 

	
 
        let mut pcb_temps_owner = <[HashMap<Predicate, ProtoComponentBranch>; 3]>::default();
 
        let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner);
 
        let mut bn_temp_owner = <HashMap<Predicate, NativeBranch>>::default();
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.inner.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
@@ -489,75 +491,73 @@ impl Connector {
 
        log!(cu.inner.logger, "All proto components are blocked");
 

	
 
        log!(cu.inner.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(
 
                cu.inner.logger,
 
                "Decision loop! have {} messages to recv",
 
                rctx.getter_buffer.len()
 
            );
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                log!(@MARK, cu.inner.logger, "handling payload msg");
 
                assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                let route = cu.inner.port_info.routes.get(&getter);
 
                log!(
 
                    cu.inner.logger,
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
                    &route
 
                );
 
                match route {
 
                    None => log!(cu.inner.logger, "Delivery failed. Physical route unmapped!"),
 
                    Some(Route::UdpEndpoint { index }) => {
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.inner.logger, "Delivering to udp endpoint index={}", index);
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Some(Route::NetEndpoint { index }) => {
 
                        log!(@MARK, cu.inner.logger, "sending payload");
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                    Some(Route::LocalComponent(cid)) if *cid == cu.inner.native_component_id => {
 
                        branching_native.feed_msg(
 
                            cu,
 
                            rctx,
 
                            getter,
 
                            &send_payload_msg,
 
                            MapTempGuard::new(&mut bn_temp_owner),
 
                    ),
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                        )
 
                    }
 
                    Some(Route::LocalComponent(cid)) => {
 
                        if let Some(branching_component) = branching_proto_components.get_mut(cid) {
 
                            let cid = *cid;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                rctx,
 
                                proto_component_id,
 
                                cid,
 
                                getter,
 
                                &send_payload_msg,
 
                                pcb_temps.reborrow(),
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(
 
                                    cu.inner.logger,
 
                                    "{:?} has become inconsistent!",
 
                                    proto_component_id
 
                                );
 
                                log!(cu.inner.logger, "{:?} has become inconsistent!", cid);
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
                                        Self::request_failure(cu, comm, parent)?
 
                                    } else {
 
                                        log!(cu.inner.logger, "Already requested failure");
 
                                    }
 
                                } else {
 
                                    log!(cu.inner.logger, "As the leader, deciding on timeout");
 
                                    return Ok(Decision::Failure);
 
                                }
 
                            }
 
                        } else {
 
@@ -568,24 +568,25 @@ impl Connector {
 
                                &send_payload_msg,
 
                                proto_component_id
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // check if we have a solution yet
 
            log!(cu.inner.logger, "Check if we have any local decisions...");
 
            for solution in rctx.solution_storage.iter_new_local_make_old() {
 
                log!(cu.inner.logger, "New local decision with solution {:?}...", &solution);
 
                log!(@MARK, cu.inner.logger, "local solution");
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        log!(cu.inner.logger, "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest {
 
                                suggestion,
 
                            }),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
@@ -734,25 +735,25 @@ impl BranchingNative {
 
            let var = cu.inner.port_info.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                branch.to_get.remove(&getter);
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                if branch.is_ended() {
 
                    log!(
 
                        cu.inner.logger,
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Native);
 
                    let subtree_id = SubtreeId::LocalComponent(cu.inner.native_component_id);
 
                    round_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.inner.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                } else {
 
                    log!(
 
                        cu.inner.logger,
 
                        "Fed native {:?} still has to_get {:?}",
 
                        &predicate,
 
                        &branch.to_get
 
                    );
 
@@ -860,25 +861,25 @@ impl BranchingNative {
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        proto_component_id: ComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                predicate: &predicate,
 
                branch_inner: &mut branch.inner,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
@@ -909,52 +910,55 @@ impl BranchingProtoComponent {
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.inner.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                            proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.inner.did_put_or_get.insert(putter);
 
                        log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for port in ports.iter() {
 
                        let var = cu.inner.port_info.spec_var_for(*port);
 
                        let should_have_fired = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired);
 
                            log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}"
 
                                port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id));
 
                    let subtree_id = SubtreeId::LocalComponent(proto_component_id);
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.inner.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
            }
 
            Ok(())
 
        })
 
@@ -964,25 +968,25 @@ impl BranchingProtoComponent {
 
    //     b: &mut ProtoComponentBranch,
 
    // ) -> ProtoComponentBranch {
 
    //     if b.ended && !a.ended {
 
    //         a.ended = true;
 
    //         std::mem::swap(&mut a, b);
 
    //     }
 
    //     a
 
    // }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        proto_component_id: ComponentId,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        let logger = &mut *cu.inner.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
            &send_payload_msg
 
        );
 
@@ -1209,77 +1213,62 @@ impl SyncProtoContext<'_> {
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.branch_inner.did_put_or_get.insert(port);
 
        self.branch_inner.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    // fn merge_input_with<F: FnMut(V, &mut V) -> V>(&mut self, k: K, v: V, mut func: F) {
 
    //     use std::collections::hash_map::Entry;
 
    //     let e = self.swap.entry(k);
 
    //     match e {
 
    //         Entry::Vacant(ev) => {
 
    //             ev.insert(v);
 
    //         }
 
    //         Entry::Occupied(mut eo) => {
 
    //             let old = eo.get_mut();
 
    //             *old = func(v, old);
 
    //         }
 
    //     }
 
    // }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.cu_inner.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        assert!(self.proto_component_ports.is_subset(&moved_ports));
 
        println!("MOVED PORTS {:#?}. got {:#?}", &moved_ports, &self.proto_component_ports);
 
        assert!(self.proto_component_ports.is_superset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
 
        let new_id = self.cu_inner.id_manager.new_proto_component_id();
 
        let new_id = self.cu_inner.id_manager.new_component_id();
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.cu_inner
 
                .port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
            self.cu_inner.port_info.routes.insert(*port, Route::LocalComponent(new_id));
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let [o, i] =
 
            [self.cu_inner.id_manager.new_port_id(), self.cu_inner.id_manager.new_port_id()];
 
        self.proto_component_ports.insert(o);
 
        self.proto_component_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.cu_inner.port_info.polarities.insert(o, Putter);
 
        self.cu_inner.port_info.polarities.insert(i, Getter);
 
        self.cu_inner.port_info.peers.insert(o, i);
 
        self.cu_inner.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id));
 
        let route = Route::LocalComponent(self.proto_component_id);
 
        self.cu_inner.port_info.routes.insert(o, route);
 
        self.cu_inner.port_info.routes.insert(i, route);
 
        log!(
 
            self.cu_inner.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
src/runtime/endpoints.rs
Show inline comments
 
@@ -64,25 +64,27 @@ impl NetEndpoint {
 
                _ => Err(Nee::MalformedMessage),
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
        &mut self,
 
        msg: &T,
 
    ) -> Result<(), NetEndpointError> {
 
        use bincode::config::Options;
 
        use NetEndpointError as Nee;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| Nee::BrokenNetEndpoint)
 
            .map_err(|_| Nee::BrokenNetEndpoint)?;
 
        let _ = self.stream.flush();
 
        Ok(())
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_net_endpoints()
 
    }
 
    pub(super) fn num_net_endpoints(&self) -> usize {
 
        self.net_endpoint_store.endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
@@ -370,25 +372,37 @@ impl EndpointManager {
 
                        // send at most one payload per endpoint per round
 
                        break 'outgoing_loop;
 
                    }
 
                }
 
                ee.received_this_round = false;
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
impl Debug for NetEndpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
        struct DebugStream<'a>(&'a TcpStream);
 
        impl Debug for DebugStream<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                f.debug_struct("Endpoint")
 
                    .field("local_addr", &self.0.local_addr())
 
                    .field("peer_addr", &self.0.peer_addr())
 
                    .finish()
 
            }
 
        }
 
        f.debug_struct("Endpoint")
 
            .field("inbox", &self.inbox)
 
            .field("stream", &DebugStream(&self.stream))
 
            .finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub(super) fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
src/runtime/error.rs
Show inline comments
 
@@ -25,25 +25,25 @@ pub enum AddComponentError {
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum UnrecoverableSyncError {
 
    PollFailed,
 
    BrokenNetEndpoint { index: usize },
 
    BrokenUdpEndpoint { index: usize },
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    InconsistentProtoComponent(ComponentId),
 
    RoundFailure,
 
    Unrecoverable(UnrecoverableSyncError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
    PortCannotPut(PortId),
 
    GetterUnknownFor { putter: PortId },
 
}
 
#[derive(Debug, Clone)]
 
pub enum NetEndpointError {
 
    MalformedMessage,
 
    BrokenNetEndpoint,
src/runtime/mod.rs
Show inline comments
 
@@ -23,26 +23,26 @@ pub struct Connector {
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    proto_component_ports: &'a mut HashSet<PortId>, // sub-structure of component
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, // lives for Nonsync phase
 
    proto_component_id: ProtoComponentId,     // KEY in id->component map
 
    unrun_components: &'a mut Vec<(ComponentId, ProtoComponent)>, // lives for Nonsync phase
 
    proto_component_id: ComponentId,          // KEY in id->component map
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                 // KEY in pred->branch map
 
}
 
#[derive(Default, Debug, Clone)]
 
struct ProtoComponentBranchInner {
 
    untaken_choice: Option<u16>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
}
 
@@ -56,29 +56,24 @@ struct SpecVar(PortId);
 
struct SpecVal(u16);
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Default)]
 
struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum Route {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
@@ -100,25 +95,25 @@ enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    proto_components: HashMap<ComponentId, ProtoComponent>,
 
}
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
@@ -175,27 +170,26 @@ struct UdpEndpointExt {
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
    component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll. Poll token computed with TargetToken::into
 
@@ -206,48 +200,50 @@ struct EndpointManager {
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    owners: HashMap<PortId, ComponentId>,
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    proto_components: HashMap<ComponentId, ProtoComponent>,
 
    inner: ConnectorUnphasedInner,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphasedInner {
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
    native_component_id: ComponentId,
 
}
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
@@ -341,44 +337,41 @@ impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
            component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N.
 
        // This gap is entirely unnecessary (i.e. 0 is fine)
 
        // It's purpose is only to make SpecVars easier to spot in logs.
 
        // E.g. spot the spec var: { v0_0, v1_2, v1_103 }
 
        const SKIP_N: u32 = 100;
 
        let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N);
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            connector_id: self.connector_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
    fn new_component_id(&mut self) -> ComponentId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() }
 
            .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    pub fn is_connected(&self) -> bool {
 
        // If designed for Rust usage, connectors would be exposed as an enum type from the start.
 
        // consequently, this "phased" business would also include connector variants and this would
 
@@ -409,25 +402,25 @@ impl Connector {
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [cu.inner.id_manager.new_port_id(), cu.inner.id_manager.new_port_id()];
 
        cu.inner.native_ports.insert(o);
 
        cu.inner.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.inner.port_info.polarities.insert(o, Putter);
 
        cu.inner.port_info.polarities.insert(i, Getter);
 
        cu.inner.port_info.peers.insert(o, i);
 
        cu.inner.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        let route = Route::LocalComponent(cu.inner.native_component_id);
 
        cu.inner.port_info.routes.insert(o, route);
 
        cu.inner.port_info.routes.insert(i, route);
 
        log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError as Ace;
 
@@ -437,35 +430,32 @@ impl Connector {
 
        if polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !cu.inner.native_ports.contains(port) {
 
                return Err(Ace::UnknownPort(*port));
 
            }
 
            if expected_polarity != *cu.inner.port_info.polarities.get(port).unwrap() {
 
                return Err(Ace::WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = cu.inner.id_manager.new_proto_component_id();
 
        let new_cid = cu.inner.id_manager.new_component_id();
 
        for port in ports.iter() {
 
            cu.inner
 
                .port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
            cu.inner.port_info.routes.insert(*port, Route::LocalComponent(new_cid));
 
        }
 
        cu.inner.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        cu.proto_components.insert(
 
            new_id,
 
            new_cid,
 
            ProtoComponent {
 
                state: cu.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn singleton(k: SpecVar, v: SpecVal) -> Self {
 
        Self::default().inserted(k, v)
 
@@ -641,12 +631,18 @@ impl Default for UdpInBuffer {
 
            // safe! this vector is guaranteed to have sufficient capacity
 
            byte_vec.set_len(Self::CAPACITY);
 
        }
 
        Self { byte_vec }
 
    }
 
}
 
impl UdpInBuffer {
 
    const CAPACITY: usize = u16::MAX as usize;
 
    fn as_mut_slice(&mut self) -> &mut [u8] {
 
        self.byte_vec.as_mut_slice()
 
    }
 
}
 

	
 
impl Debug for UdpInBuffer {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "UdpInBuffer")
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl Connector {
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        let mut id_manager = IdManager::new(connector_id);
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                inner: ConnectorUnphasedInner {
 
                    logger,
 
                    id_manager: IdManager::new(connector_id),
 
                    native_component_id: id_manager.new_component_id(),
 
                    id_manager,
 
                    native_ports: Default::default(),
 
                    port_info: Default::default(),
 
                },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
@@ -43,26 +45,32 @@ impl Connector {
 
                let mut npid = || cu.inner.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 
                cu.inner.native_ports.insert(nin);
 
                cu.inner.native_ports.insert(nout);
 
                cu.inner.port_info.polarities.insert(nin, Getter);
 
                cu.inner.port_info.polarities.insert(nout, Putter);
 
                cu.inner.port_info.polarities.insert(uin, Getter);
 
                cu.inner.port_info.polarities.insert(uout, Putter);
 
                cu.inner.port_info.peers.insert(nin, uout);
 
                cu.inner.port_info.peers.insert(nout, uin);
 
                cu.inner.port_info.peers.insert(uin, nout);
 
                cu.inner.port_info.peers.insert(uout, nin);
 
                cu.inner.port_info.routes.insert(nin, Route::LocalComponent(ComponentId::Native));
 
                cu.inner.port_info.routes.insert(nout, Route::LocalComponent(ComponentId::Native));
 
                cu.inner
 
                    .port_info
 
                    .routes
 
                    .insert(nin, Route::LocalComponent(cu.inner.native_component_id));
 
                cu.inner
 
                    .port_info
 
                    .routes
 
                    .insert(nout, Route::LocalComponent(cu.inner.native_component_id));
 
                cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index });
 
                cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index });
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 
    pub fn new_net_port(
 
@@ -73,25 +81,25 @@ impl Connector {
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let local_port = cu.inner.id_manager.new_port_id();
 
                cu.inner.native_ports.insert(local_port);
 
                // {polarity, route} known. {peer} unknown.
 
                cu.inner.port_info.polarities.insert(local_port, polarity);
 
                cu.inner
 
                    .port_info
 
                    .routes
 
                    .insert(local_port, Route::LocalComponent(ComponentId::Native));
 
                    .insert(local_port, Route::LocalComponent(cu.inner.native_component_id));
 
                log!(
 
                    cu.inner.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    local_port,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: local_port,
src/runtime/tests.rs
Show inline comments
 
@@ -1005,12 +1005,119 @@ fn pdl_msg_consensus() {
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"msgconsensus", &[g0, g1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap();
 

	
 
    c.put(p0, Payload::from(b"HEY" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap_err();
 
}
 

	
 
#[test]
 
fn sequencer3_prim() {
 
    let test_log_path = Path::new("./logs/sequencer3_prim");
 
    let pdl = b"
 
    primitive seq3primitive(out a, out b, out c) {
 
        int i = 0;
 
        while(true) synchronous {
 
            out to = a;
 
            if     (i==1) to = b;
 
            else if(i==2) to = c;
 
            if(fires(to)) {
 
                put(to, create(0));
 
                i = (i + 1)%3;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) primitive sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"seq3primitive", &[p0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let mut which_of_three = move || {
 
        // setup three sync batches. sync. return which succeeded
 
        c.get(g0).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g1).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(None).unwrap()
 
    };
 

	
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        assert_eq!(expected_batch_idx, which_of_three());
 
    }
 
}
 

	
 
// #[test]
 
// fn sequencer3_comp() {
 
//     let test_log_path = Path::new("./logs/sequencer3_comp");
 
//     let pdl = b"
 
//     primitive fifo1_init(msg m, in a, out b) {
 
//         while(true) synchronous {
 
//             if(m != null && fires(b)) {
 
//                 put(b, m);
 
//                 m = null;
 
//             } else if (m == null && fires(a)) {
 
//                 m = get(a);
 
//             }
 
//         }
 
//     }
 
//     composite fifo1_full(in a, out b) {
 
//         new fifo1_init(create(0), a, b);
 
//     }
 
//     composite fifo1(in a, out b) {
 
//         new fifo1_init(null, a, b);
 
//     }
 
//     composite seq3composite(out a, out b, out c) {
 
//         channel d -> e;
 
//         channel f -> g;
 
//         channel h -> i;
 
//         channel j -> k;
 
//         channel l -> m;
 
//         channel n -> o;
 

	
 
//         new fifo1_full(o, d);
 
//         new replicator2(e, f, a);
 
//         new fifo1(g, h);
 
//         new replicator2(i, j, b);
 
//         new fifo1(k, l);
 
//         new replicator2(m, n, c);
 
//     }
 
//     ";
 
//     let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
//     let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
//     // setup a session between (a) native, and (b) composite sequencer3, connected by 3 ports.
 
//     let [p0, g0] = c.new_port_pair();
 
//     let [p1, g1] = c.new_port_pair();
 
//     let [p2, g2] = c.new_port_pair();
 
//     c.add_component(b"seq3composite", &[p0, p1, p2]).unwrap();
 
//     c.connect(None).unwrap();
 

	
 
//     let mut which_of_three = move || {
 
//         // setup three sync batches. sync. return which succeeded
 
//         c.get(g0).unwrap();
 
//         c.next_batch().unwrap();
 
//         c.get(g1).unwrap();
 
//         c.next_batch().unwrap();
 
//         c.get(g2).unwrap();
 
//         c.sync(None).unwrap()
 
//     };
 

	
 
//     const TEST_ROUNDS: usize = 50;
 
//     // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
//     for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
//         assert_eq!(expected_batch_idx, which_of_three());
 
//     }
 
// }
0 comments (0 inline, 0 general)