Changeset - a6f53f74e58c
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-06-26 09:54:12
christopher.esterhuyse@gmail.com
fixed bug: failed to restore !sync_batches.is_empty() invariant when the native is immediately inconsistent
8 files changed with 105 insertions and 63 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
target
 
/.idea
 
**/*.rs.bk
 
Cargo.lock
 
main
 
examples/*/*.exe
 
examples/*.dll
 
examples/reowolf*
 
logs
 
\ No newline at end of file
 
examples/*.txt
 
logs
examples/6_amy_log.txt
Show inline comments
 
CID(3382080479): Created with connector_id 3382080479
 
CID(3382080479): Added port pair (out->in) ptID(3382080479'0) -> ptID(3382080479'1)
 
CID(3382080479): ~~~ CONNECT called timeout None
 
CID(3382080479): Successfully connected 0 endpoints
 
CID(3382080479): beginning neighborhood construction
 
CID(3382080479): Edge case of no neighbors! No parent an no children!
 
CID(3382080479): Successfully created neighborhood Neighborhood { parent: None, children: {} }
 
CID(3382080479): Beginning session optimization
 
CID(3382080479): Gathered all children's maps. ConnectorId set is... []
 
CID(3382080479): Inserting my own info. Unoptimized subtree map is {3382080479: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(3382080479'1): Getter, ptID(3382080479'0): Putter}, peers: {ptID(3382080479'0): ptID(3382080479'1), ptID(3382080479'1): ptID(3382080479'0)}, routes: {ptID(3382080479'0): LocalComponent(Native), ptID(3382080479'1): LocalComponent(Native)} }, proto_components: {} }}
 
CID(3382080479): I am the leader! I will optimize this session
 
CID(3382080479): Session map optimize START
 
CID(3382080479): Session map optimize END
 
CID(3382080479): Optimized info map is {3382080479: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(3382080479'1): Getter, ptID(3382080479'0): Putter}, peers: {ptID(3382080479'0): ptID(3382080479'1), ptID(3382080479'1): ptID(3382080479'0)}, routes: {ptID(3382080479'0): LocalComponent(Native), ptID(3382080479'1): LocalComponent(Native)} }, proto_components: {} }}. Sending to children Iter([])
 
CID(3382080479): All session info dumped!: {
 
    3382080479: SessionInfo {
 
CID(1463643093): Created with connector_id 1463643093
 
CID(1463643093): Added port pair (out->in) ptID(1463643093'0) -> ptID(1463643093'1)
 
CID(1463643093): ~~~ CONNECT called timeout None
 
CID(1463643093): Successfully connected 0 endpoints
 
CID(1463643093): beginning neighborhood construction
 
CID(1463643093): Edge case of no neighbors! No parent an no children!
 
CID(1463643093): Successfully created neighborhood Neighborhood { parent: None, children: {} }
 
CID(1463643093): Beginning session optimization
 
CID(1463643093): Gathered all children's maps. ConnectorId set is... []
 
CID(1463643093): Inserting my own info. Unoptimized subtree map is {1463643093: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(1463643093'0): Putter, ptID(1463643093'1): Getter}, peers: {ptID(1463643093'1): ptID(1463643093'0), ptID(1463643093'0): ptID(1463643093'1)}, routes: {ptID(1463643093'1): LocalComponent(Native), ptID(1463643093'0): LocalComponent(Native)} }, proto_components: {} }}
 
CID(1463643093): I am the leader! I will optimize this session
 
CID(1463643093): Session map optimize START
 
CID(1463643093): Session map optimize END
 
CID(1463643093): Optimized info map is {1463643093: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(1463643093'0): Putter, ptID(1463643093'1): Getter}, peers: {ptID(1463643093'1): ptID(1463643093'0), ptID(1463643093'0): ptID(1463643093'1)}, routes: {ptID(1463643093'1): LocalComponent(Native), ptID(1463643093'0): LocalComponent(Native)} }, proto_components: {} }}. Sending to children Iter([])
 
CID(1463643093): All session info dumped!: {
 
    1463643093: SessionInfo {
 
        serde_proto_description: SerdeProtocolDescription(
 
            (A big honkin' protocol description),
 
        ),
 
        port_info: PortInfo {
 
            polarities: {
 
                ptID(3382080479'1): Getter,
 
                ptID(3382080479'0): Putter,
 
                ptID(1463643093'0): Putter,
 
                ptID(1463643093'1): Getter,
 
            },
 
            peers: {
 
                ptID(3382080479'0): ptID(3382080479'1),
 
                ptID(3382080479'1): ptID(3382080479'0),
 
                ptID(1463643093'1): ptID(1463643093'0),
 
                ptID(1463643093'0): ptID(1463643093'1),
 
            },
 
            routes: {
 
                ptID(3382080479'0): LocalComponent(
 
                ptID(1463643093'1): LocalComponent(
 
                    Native,
 
                ),
 
                ptID(3382080479'1): LocalComponent(
 
                ptID(1463643093'0): LocalComponent(
 
                    Native,
 
                ),
 
            },
 
        },
 
        proto_components: {},
 
    },
 
}
 
CID(3382080479): Session optimizations applied
 
CID(3382080479): connect() finished. setup phase complete
 
CID(3382080479): ~~~ SYNC called with timeout Some(5s); starting round 0
 
CID(3382080479): Nonsync running 0 proto components...
 
CID(3382080479): All 0 proto components are now done with Nonsync phase
 
CID(3382080479): Solution storage initialized
 
CID(3382080479): Translating 1 native batches into branches...
 
CID(3382080479): Native branch index=0 contains internal inconsistency wrt. fvID(3382080479'1). Skipping
 
CID(3382080479): Native starts with no branches! Failure!
 
CID(3382080479): No parent. Deciding on failure
 
CID(3382080479): Committing to decision Failure!
 
CID(3382080479): Announcing decision CommMsg(CommMsg { round_index: 0, contents: Announce { decision: Failure } }) through child endpoints {}
 
CID(3382080479): Connector dropping. Goodbye!
 
CID(1463643093): Session optimizations applied
 
CID(1463643093): connect() finished. setup phase complete
 
CID(1463643093): ~~~ SYNC called with timeout Some(5s); starting round 0
 
CID(1463643093): Nonsync running 0 proto components...
 
CID(1463643093): All 0 proto components are now done with Nonsync phase
 
CID(1463643093): Solution storage initialized
 
CID(1463643093): Translating 1 native batches into branches...
 
CID(1463643093): Native branch index=0 contains internal inconsistency wrt. fvID(1463643093'1). Skipping
 
CID(1463643093): Native starts with no branches! Failure!
 
CID(1463643093): No parent. Deciding on failure
 
CID(1463643093): Committing to decision Failure!
 
CID(1463643093): Announcing decision CommMsg(CommMsg { round_index: 0, contents: Announce { decision: Failure } }) through child endpoints {}
 
CID(1463643093): Sync round ending! Cleaning up
 
CID(1463643093): ~~~ SYNC called with timeout Some(5s); starting round 0
 
CID(1463643093): Nonsync running 0 proto components...
 
CID(1463643093): All 0 proto components are now done with Nonsync phase
 
CID(1463643093): Solution storage initialized
 
CID(1463643093): Translating 1 native batches into branches...
 
CID(1463643093): Native branch index=0 has consistent Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): Native branch 0 sending msg SendPayloadMsg { predicate: Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }, payload: Payload([104, 101, 108, 108, 111]) }
 
CID(1463643093): Done translating native batches into branches
 
CID(1463643093): Running all 0 proto components to their sync blocker...
 
CID(1463643093): All proto components are blocked
 
CID(1463643093): Entering decision loop...
 
CID(1463643093): feeding native getter ptID(1463643093'1) SendPayloadMsg { predicate: Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }, payload: Payload([104, 101, 108, 108, 111]) }
 
CID(1463643093): visiting native branch NativeBranch { index: 0, gotten: {}, to_get: {ptID(1463643093'1)} } with Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): NEW COMPONENT SOLUTION LocalComponent(Native) Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): storing NEW LOCAL SOLUTION Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): branch pred covers it! Accept the msg
 
CID(1463643093): Check if we have any local decisions...
 
CID(1463643093): New local decision with solution Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }...
 
CID(1463643093): No parent. Deciding on solution Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): Committing to decision Success(Predicate { Trues: {fvID(1463643093'1)}, Falses: {} })!
 
CID(1463643093): Announcing decision CommMsg(CommMsg { round_index: 0, contents: Announce { decision: Success(Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }) } }) through child endpoints {}
 
CID(1463643093): End round with (updated) component states []
 
CID(1463643093): Sync round ending! Cleaning up
 
CID(1463643093): Connector dropping. Goodbye!
examples/6_atomic/amy.c
Show inline comments
 
@@ -14,28 +14,26 @@ int main(int argc, char** argv) {
 
	PortId putter, getter;
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	connector_print_debug(c);
 
	
 
	printf("Let's try to put without get\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	// connector_get(c, getter);
 
	
 
	int err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	
 
	/*
 
	printf("Let's try again, doing both\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	connector_get(c, getter);
 
	err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, getter, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	*/
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(pdl_ptr);
 
	return 0;
 
}
 
\ No newline at end of file
examples/reowolf_rs.dll
Show inline comments
 
binary diff not shown
src/lib.rs
Show inline comments
 
#[macro_use]
 
mod macros;
 

	
 
mod common;
 
mod protocol;
 
mod runtime;
 

	
 
// #[cfg(test)]
 
// mod test;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Polarity, PortId};
 
pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger};
 

	
 
// #[cfg(feature = "ffi")]
 
// pub use runtime::ffi;
 
#[cfg(feature = "ffi")]
 
pub use runtime::ffi;
src/runtime/communication.rs
Show inline comments
 
@@ -260,28 +260,29 @@ impl Connector {
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                // TODO
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        // restore the invariant
 
        comm.native_batches.push(Default::default());
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            solution_storage,
 
            payloads_to_get,
 
            deadline,
 
        )?;
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 

	
 
        // propagate the decision to children
 
@@ -344,25 +345,24 @@ impl Connector {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 
        comm.native_batches.push(Default::default());
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
mod endpoints;
 
pub mod error;
 
mod ffi;
 
mod logging;
 
mod setup;
 

	
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(Debug)]
 
pub struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
pub struct VecSet<T: std::cmp::Ord> {
src/runtime/tests.rs
Show inline comments
 
@@ -9,36 +9,43 @@ use reowolf::{
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> =
 
        { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) };
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap())
 
    };
 
}
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(b"hello" as &[u8])
 
    };
 
}
 

	
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0, 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 
@@ -101,44 +108,44 @@ fn minimal_net_connect() {
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn wrong_polarity_bad() {
 
    let test_log_path = Path::new("./logs/wrong_polarity_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn dup_put_bad() {
 
    let test_log_path = Path::new("./logs/dup_put_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap_err();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let test_log_path = Path::new("./logs/trivial_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
@@ -166,28 +173,28 @@ fn connected_gotten_err_ungotten() {
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let test_log_path = Path::new("./logs/native_polarity_checks");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let test_log_path = Path::new("./logs/native_multiple_gets");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
@@ -200,46 +207,46 @@ fn next_batch() {
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_self_msg() {
 
    let test_log_path = Path::new("./logs/native_self_msg");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn two_natives_msg() {
 
    let test_log_path = Path::new("./logs/two_natives_msg");
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn trivial_nondet() {
 
    let test_log_path = Path::new("./logs/trivial_nondet");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
@@ -260,68 +267,81 @@ fn connector_pair_nondet() {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_immediately_inconsistent() {
 
    let test_log_path = Path::new("./logs/native_immediately_inconsistent");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap_err();
 
}
 

	
 
#[test]
 
fn native_recovers() {
 
    let test_log_path = Path::new("./logs/native_recovers");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    /*
 
    native p|-->|g sync
 
    */
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, (b"hello" as &[_]).into()).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
    /*
 
    native p0|-->|g0 sync
 
           g1|<--|p1
 
    */
 
    let test_log_path = Path::new("./logs/sync_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"sync", &[g0, p1]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p0, (b"hello" as &[_]).into()).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 

	
 
#[test]
 
fn double_net_connect() {
 
    let test_log_path = Path::new("./logs/double_net_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
@@ -368,25 +388,25 @@ fn distributed_msg_bounce() {
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
                   g|<--
 
            */
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [g, p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn local_timeout() {
 
    let test_log_path = Path::new("./logs/local_timeout");
 
    let mut c = file_logged_connector(0, test_log_path);
0 comments (0 inline, 0 general)