Changeset - d23010006486
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-07-03 13:09:20
christopher.esterhuyse@gmail.com
bugfix: increment round index on recovery to avoid mixing messages once we switch away from tcp. refactoring communication internals to simplify bookkeeping structures. more consts in tests to make them terser
8 files changed with 230 insertions and 132 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -81,6 +81,7 @@ pub(crate) enum SyncBlocker {
 
    CouldntReadMsg(PortId),
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
    NondetChoice { n: u16 },
 
}
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
src/protocol/library.rs
Show inline comments
 
use crate::protocol::ast::*;
 
use crate::protocol::inputsource::*;
 

	
 
// TABBED OUT FOR NOW
 

	
 
pub fn get_declarations(h: &mut Heap, i: ImportId) -> Result<Vec<DeclarationId>, ParseError> {
 
    if h[i].value == b"std.reo" {
 
        let mut vec = Vec::new();
src/protocol/mod.rs
Show inline comments
 
@@ -3,7 +3,7 @@ mod ast;
 
mod eval;
 
pub(crate) mod inputsource;
 
mod lexer;
 
mod library;
 
// mod library;
 
mod parser;
 

	
 
use crate::common::*;
 
@@ -32,7 +32,7 @@ pub(crate) enum EvalContext<'a> {
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "(A big honkin' protocol description)")
 
        write!(f, "(An opaque protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
src/protocol/parser.rs
Show inline comments
 
use crate::protocol::ast::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::lexer::*;
 
use crate::protocol::library;
 
// use crate::protocol::library;
 

	
 
// The following indirection is needed due to a bug in the cbindgen tool.
 
type Unit = ();
 
@@ -797,12 +797,13 @@ impl Visitor for BuildSymbolDeclarations {
 
        Ok(())
 
    }
 
    fn visit_import(&mut self, h: &mut Heap, import: ImportId) -> VisitorResult {
 
        let vec = library::get_declarations(h, import)?;
 
        // Destructively iterate over the vector
 
        for decl in vec {
 
            self.checked_add(h, decl)?;
 
        }
 
        Ok(())
 
        todo!()
 
        // let vec = library::get_declarations(h, import)?;
 
        // // Destructively iterate over the vector
 
        // for decl in vec {
 
        //     self.checked_add(h, decl)?;
 
        // }
 
        // Ok(())
 
    }
 
    fn visit_symbol_definition(&mut self, h: &mut Heap, definition: DefinitionId) -> VisitorResult {
 
        let signature = Signature::from_definition(h, definition);
src/runtime/communication.rs
Show inline comments
 
@@ -2,6 +2,16 @@ use super::*;
 
use crate::common::*;
 

	
 
////////////////
 
#[derive(Default)]
 
struct GetterBuffer {
 
    getters_and_sends: Vec<(PortId, SendPayloadMsg)>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
@@ -28,6 +38,7 @@ struct BranchingProtoComponent {
 
struct ProtoComponentBranch {
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
    untaken_choice: Option<u16>,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
@@ -38,14 +49,6 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait PayloadMsgSender {
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError>;
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
@@ -132,6 +135,7 @@ impl Connector {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.round_result = Self::connected_sync(unphased, comm, timeout);
 
                comm.round_index += 1;
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
@@ -148,7 +152,7 @@ impl Connector {
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        use SyncError as Se;
 
        let deadline = timeout.map(|to| Instant::now() + to);
 
        // let deadline = timeout.map(|to| Instant::now() + to);
 
        log!(
 
            cu.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
@@ -156,8 +160,6 @@ impl Connector {
 
            comm.round_index
 
        );
 

	
 
        let mut spec_var_stream = cu.id_manager.new_spec_var_stream();
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
@@ -207,18 +209,21 @@ impl Connector {
 
            branching_proto_components.len(),
 
        );
 

	
 
        // NOTE: all msgs in outbox are of form (Getter, Payload)
 
        let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
        // create the solution storage
 
        let mut solution_storage = {
 
            let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
            let c =
 
                cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
            let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
            SolutionStorage::new(n.chain(c).chain(e))
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
                    .map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
                SolutionStorage::new(n.chain(c).chain(e))
 
            },
 
            spec_var_stream: cu.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.logger, "Solution storage initialized");
 
        log!(cu.logger, "Round context structure initialized");
 

	
 
        // 2. kick off the native
 
        log!(
 
@@ -226,7 +231,7 @@ impl Connector {
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = spec_var_stream.next();
 
        let native_branch_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
@@ -259,7 +264,7 @@ impl Connector {
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                payloads_to_get.putter_send(cu, putter, msg)?;
 
                rctx.getter_buffer.putter_add(cu, putter, msg)?;
 
            }
 
            if to_get.is_empty() {
 
                log!(
 
@@ -268,16 +273,16 @@ impl Connector {
 
                    index,
 
                    &predicate
 
                );
 
                solution_storage.submit_and_digest_subtree_solution(
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(_existing) = branching_native.branches.insert(predicate, branch) {
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
                // return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        // restore the invariant
 
@@ -287,9 +292,7 @@ impl Connector {
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            solution_storage,
 
            payloads_to_get,
 
            deadline,
 
            &mut rctx,
 
        )?;
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 

	
 
@@ -340,9 +343,7 @@ impl Connector {
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        mut solution_storage: SolutionStorage,
 
        mut payloads_to_get: Vec<(PortId, SendPayloadMsg)>,
 
        mut deadline: Option<Instant>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, SyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
@@ -379,8 +380,7 @@ impl Connector {
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                rctx,
 
                proto_component_id,
 
                ports,
 
            )?;
 
@@ -406,8 +406,8 @@ impl Connector {
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(cu.logger, "Decision loop! have {} messages to recv", payloads_to_get.len());
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
            log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                let route = cu.port_info.routes.get(&getter);
 
                log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route);
 
@@ -429,7 +429,7 @@ impl Connector {
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        &mut rctx.solution_storage,
 
                        getter,
 
                        &send_payload_msg,
 
                    ),
 
@@ -440,9 +440,8 @@ impl Connector {
 
                            let proto_component_id = *proto_component_id;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
 
                                rctx,
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                &send_payload_msg,
 
                            )?;
 
@@ -478,7 +477,7 @@ impl Connector {
 

	
 
            // check if we have a solution yet
 
            log!(cu.logger, "Check if we have any local decisions...");
 
            for solution in solution_storage.iter_new_local_make_old() {
 
            for solution in rctx.solution_storage.iter_new_local_make_old() {
 
                log!(cu.logger, "New local decision with solution {:?}...", &solution);
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
@@ -502,7 +501,10 @@ impl Connector {
 
            log!(cu.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (endpoint_index, msg) = loop {
 
                    match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? {
 
                    match comm
 
                        .endpoint_manager
 
                        .try_recv_any_comms(&mut *cu.logger, rctx.deadline)?
 
                    {
 
                        None => {
 
                            log!(cu.logger, "Reached user-defined deadling without decision...");
 
                            if let Some(parent) = comm.neighborhood.parent {
 
@@ -515,7 +517,7 @@ impl Connector {
 
                                log!(cu.logger, "As the leader, deciding on timeout");
 
                                return Ok(Decision::Failure);
 
                            }
 
                            deadline = None;
 
                            rctx.deadline = None;
 
                        }
 
                        Some((endpoint_index, msg)) => break (endpoint_index, msg),
 
                    }
 
@@ -562,7 +564,7 @@ impl Connector {
 
                            "Msg routed to getter port {:?}. Buffer for recv loop",
 
                            getter,
 
                        );
 
                        payloads_to_get.push((getter, send_payload_msg));
 
                        rctx.getter_buffer.getter_add(getter, send_payload_msg);
 
                    }
 
                    CommMsgContents::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
@@ -572,7 +574,7 @@ impl Connector {
 
                                    // child solution contributes to local solution
 
                                    log!(cu.logger, "Child provided solution {:?}", &predicate);
 
                                    let route = Route::Endpoint { index: endpoint_index };
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *cu.logger,
 
                                        route,
 
                                        predicate,
 
@@ -753,13 +755,13 @@ impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), SyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                untaken_choice: &mut branch.untaken_choice,
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
@@ -775,6 +777,15 @@ impl BranchingProtoComponent {
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::NondetChoice { n } => {
 
                    let var = rctx.spec_var_stream.next();
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
                B::Inconsistent => {
 
                    // branch is inconsistent. throw it away
 
                    drop((predicate, branch));
 
@@ -786,7 +797,7 @@ impl BranchingProtoComponent {
 
                        predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                    }
 
                    // submit solution for this component
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        predicate.clone(),
 
@@ -822,7 +833,7 @@ impl BranchingProtoComponent {
 
                        // keep in "unblocked"
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        payload_msg_sender.putter_send(cu, putter, msg)?;
 
                        rctx.getter_buffer.putter_add(cu, putter, msg)?;
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
@@ -833,9 +844,8 @@ impl BranchingProtoComponent {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
@@ -898,8 +908,7 @@ impl BranchingProtoComponent {
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            cu,
 
            solution_storage,
 
            payload_msg_sender,
 
            rctx,
 
            proto_component_id,
 
            ports,
 
        )?;
 
@@ -919,7 +928,12 @@ impl BranchingProtoComponent {
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state, ended: false };
 
        let branch = ProtoComponentBranch {
 
            inbox: Default::default(),
 
            state,
 
            ended: false,
 
            untaken_choice: None,
 
        };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
@@ -960,9 +974,6 @@ impl SolutionStorage {
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 
    // pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
    //     self.new_local.iter()
 
    // }
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
@@ -1024,15 +1035,24 @@ impl SolutionStorage {
 
        }
 
    }
 
}
 
impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> {
 
    fn putter_send(
 
impl GetterBuffer {
 
    fn len(&self) -> usize {
 
        self.getters_and_sends.len()
 
    }
 
    fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.getters_and_sends.pop()
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getters_and_sends.push((getter, msg));
 
    }
 
    fn putter_add(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
        if let Some(&getter) = cu.port_info.peers.get(&putter) {
 
            self.push((getter, msg));
 
            self.getter_add(getter, msg);
 
            Ok(())
 
        } else {
 
            Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter }))
 
@@ -1047,6 +1067,9 @@ impl SyncProtoContext<'_> {
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
src/runtime/error.rs
Show inline comments
 
@@ -27,7 +27,6 @@ pub enum AddComponentError {
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
    RoundFailure,
 
    PollFailed,
 
    BrokenEndpoint(usize),
src/runtime/mod.rs
Show inline comments
 
@@ -41,6 +41,7 @@ pub(crate) struct NonsyncProtoContext<'a> {
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    untaken_choice: &'a mut Option<u16>,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
src/runtime/tests.rs
Show inline comments
 
@@ -8,6 +8,10 @@ use reowolf::{
 
};
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
const SEC1: Option<Duration> = Some(Duration::from_secs(1));
 
const SEC5: Option<Duration> = Some(Duration::from_secs(5));
 
const SEC15: Option<Duration> = Some(Duration::from_secs(15));
 
const MS300: Option<Duration> = Some(Duration::from_millis(300));
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
@@ -87,7 +91,7 @@ fn new_net_port() {
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
@@ -97,7 +101,7 @@ fn single_node_connect() {
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
@@ -108,12 +112,12 @@ fn minimal_net_connect() {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
@@ -124,7 +128,7 @@ fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
@@ -133,7 +137,7 @@ fn wrong_polarity_bad() {
 
    let test_log_path = Path::new("./logs/wrong_polarity_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
@@ -142,7 +146,7 @@ fn dup_put_bad() {
 
    let test_log_path = Path::new("./logs/dup_put_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap_err();
 
}
 
@@ -151,8 +155,8 @@ fn dup_put_bad() {
 
fn trivial_sync() {
 
    let test_log_path = Path::new("./logs/trivial_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
@@ -168,7 +172,7 @@ fn connected_gotten_err_no_round() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_no_round");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
@@ -177,8 +181,8 @@ fn connected_gotten_err_ungotten() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_ungotten");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.sync(SEC1).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
@@ -187,7 +191,7 @@ fn native_polarity_checks() {
 
    let test_log_path = Path::new("./logs/native_polarity_checks");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
@@ -201,7 +205,7 @@ fn native_multiple_gets() {
 
    let test_log_path = Path::new("./logs/native_multiple_gets");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 
@@ -211,7 +215,7 @@ fn next_batch() {
 
    let test_log_path = Path::new("./logs/next_batch");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.next_batch().unwrap_err();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
@@ -222,10 +226,10 @@ fn native_self_msg() {
 
    let test_log_path = Path::new("./logs/native_self_msg");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
@@ -236,17 +240,17 @@ fn two_natives_msg() {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
@@ -257,12 +261,12 @@ fn trivial_nondet() {
 
    let test_log_path = Path::new("./logs/trivial_nondet");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    // getting 0 batch
 
    c.next_batch().unwrap();
 
    // silent 1 batch
 
    assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
    assert_eq!(1, c.sync(SEC1).unwrap());
 
    c.gotten(i).unwrap_err();
 
}
 

	
 
@@ -274,18 +278,18 @@ fn connector_pair_nondet() {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
            assert_eq!(1, c.sync(SEC1).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
@@ -296,9 +300,9 @@ fn native_immediately_inconsistent() {
 
    let test_log_path = Path::new("./logs/native_immediately_inconsistent");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap_err();
 
    c.sync(SEC15).unwrap_err();
 
}
 

	
 
#[test]
 
@@ -306,12 +310,12 @@ fn native_recovers() {
 
    let test_log_path = Path::new("./logs/native_recovers");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap_err();
 
    c.sync(SEC15).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap();
 
    c.sync(SEC15).unwrap();
 
}
 

	
 
#[test]
 
@@ -323,7 +327,7 @@ fn cannot_use_moved_ports() {
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 
@@ -339,10 +343,10 @@ fn sync_sync() {
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"sync", &[g0, p1]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(SEC1).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 

	
 
@@ -357,7 +361,7 @@ fn double_net_connect() {
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
@@ -365,7 +369,7 @@ fn double_net_connect() {
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
@@ -391,8 +395,8 @@ fn distributed_msg_bounce() {
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.add_component(b"sync", &[g, p]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
@@ -404,10 +408,10 @@ fn distributed_msg_bounce() {
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
    })
 
@@ -419,9 +423,9 @@ fn local_timeout() {
 
    let test_log_path = Path::new("./logs/local_timeout");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    match c.sync(Some(Duration::from_millis(200))) {
 
    match c.sync(MS300) {
 
        Err(SyncError::RoundFailure) => {}
 
        res => panic!("expeted timeout. but got {:?}", res),
 
    }
 
@@ -436,14 +440,14 @@ fn parent_timeout() {
 
            // parent; times out
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // child
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
@@ -460,14 +464,14 @@ fn child_timeout() {
 
            // child; times out
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // parent
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
@@ -483,31 +487,31 @@ fn chain_connect() {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(10, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[1], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // LEADER
 
            let mut c = file_logged_connector(7, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[1], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[2], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(4, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[2], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
@@ -520,10 +524,10 @@ fn net_self_loop() {
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_millis(500))).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
@@ -555,10 +559,10 @@ fn together() {
 
            let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.sync(MS300).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
        s.spawn(|_| {
 
@@ -568,10 +572,10 @@ fn together() {
 
            let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.sync(MS300).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
@@ -582,7 +586,74 @@ fn together() {
 
fn native_batch_distinguish() {
 
    let test_log_path = Path::new("./logs/native_batch_distinguish");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.next_batch().unwrap();
 
    c.sync(Some(Duration::from_secs(3))).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn multirounds() {
 
    let test_log_path = Path::new("./logs/multirounds");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..10 {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(p1).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..10 {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn multi_recover() {
 
    let test_log_path = Path::new("./logs/multi_recover");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let success_iter = [true, false].iter().copied().cycle().take(10);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                if succeeds {
 
                    c.get(p1).unwrap();
 
                }
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)