Changeset - a6f53f74e58c
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-06-26 09:54:12
christopher.esterhuyse@gmail.com
fixed bug: failed to restore !sync_batches.is_empty() invariant when the native is immediately inconsistent
8 files changed with 104 insertions and 62 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
@@ -6,4 +6,5 @@ main
 
examples/*/*.exe
 
examples/*.dll
 
examples/reowolf*
 
examples/*.txt
 
logs
examples/6_amy_log.txt
Show inline comments
 
CID(3382080479): Created with connector_id 3382080479
 
CID(3382080479): Added port pair (out->in) ptID(3382080479'0) -> ptID(3382080479'1)
 
CID(3382080479): ~~~ CONNECT called timeout None
 
CID(3382080479): Successfully connected 0 endpoints
 
CID(3382080479): beginning neighborhood construction
 
CID(3382080479): Edge case of no neighbors! No parent an no children!
 
CID(3382080479): Successfully created neighborhood Neighborhood { parent: None, children: {} }
 
CID(3382080479): Beginning session optimization
 
CID(3382080479): Gathered all children's maps. ConnectorId set is... []
 
CID(3382080479): Inserting my own info. Unoptimized subtree map is {3382080479: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(3382080479'1): Getter, ptID(3382080479'0): Putter}, peers: {ptID(3382080479'0): ptID(3382080479'1), ptID(3382080479'1): ptID(3382080479'0)}, routes: {ptID(3382080479'0): LocalComponent(Native), ptID(3382080479'1): LocalComponent(Native)} }, proto_components: {} }}
 
CID(3382080479): I am the leader! I will optimize this session
 
CID(3382080479): Session map optimize START
 
CID(3382080479): Session map optimize END
 
CID(3382080479): Optimized info map is {3382080479: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(3382080479'1): Getter, ptID(3382080479'0): Putter}, peers: {ptID(3382080479'0): ptID(3382080479'1), ptID(3382080479'1): ptID(3382080479'0)}, routes: {ptID(3382080479'0): LocalComponent(Native), ptID(3382080479'1): LocalComponent(Native)} }, proto_components: {} }}. Sending to children Iter([])
 
CID(3382080479): All session info dumped!: {
 
    3382080479: SessionInfo {
 
CID(1463643093): Created with connector_id 1463643093
 
CID(1463643093): Added port pair (out->in) ptID(1463643093'0) -> ptID(1463643093'1)
 
CID(1463643093): ~~~ CONNECT called timeout None
 
CID(1463643093): Successfully connected 0 endpoints
 
CID(1463643093): beginning neighborhood construction
 
CID(1463643093): Edge case of no neighbors! No parent an no children!
 
CID(1463643093): Successfully created neighborhood Neighborhood { parent: None, children: {} }
 
CID(1463643093): Beginning session optimization
 
CID(1463643093): Gathered all children's maps. ConnectorId set is... []
 
CID(1463643093): Inserting my own info. Unoptimized subtree map is {1463643093: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(1463643093'0): Putter, ptID(1463643093'1): Getter}, peers: {ptID(1463643093'1): ptID(1463643093'0), ptID(1463643093'0): ptID(1463643093'1)}, routes: {ptID(1463643093'1): LocalComponent(Native), ptID(1463643093'0): LocalComponent(Native)} }, proto_components: {} }}
 
CID(1463643093): I am the leader! I will optimize this session
 
CID(1463643093): Session map optimize START
 
CID(1463643093): Session map optimize END
 
CID(1463643093): Optimized info map is {1463643093: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(1463643093'0): Putter, ptID(1463643093'1): Getter}, peers: {ptID(1463643093'1): ptID(1463643093'0), ptID(1463643093'0): ptID(1463643093'1)}, routes: {ptID(1463643093'1): LocalComponent(Native), ptID(1463643093'0): LocalComponent(Native)} }, proto_components: {} }}. Sending to children Iter([])
 
CID(1463643093): All session info dumped!: {
 
    1463643093: SessionInfo {
 
        serde_proto_description: SerdeProtocolDescription(
 
            (A big honkin' protocol description),
 
        ),
 
        port_info: PortInfo {
 
            polarities: {
 
                ptID(3382080479'1): Getter,
 
                ptID(3382080479'0): Putter,
 
                ptID(1463643093'0): Putter,
 
                ptID(1463643093'1): Getter,
 
            },
 
            peers: {
 
                ptID(3382080479'0): ptID(3382080479'1),
 
                ptID(3382080479'1): ptID(3382080479'0),
 
                ptID(1463643093'1): ptID(1463643093'0),
 
                ptID(1463643093'0): ptID(1463643093'1),
 
            },
 
            routes: {
 
                ptID(3382080479'0): LocalComponent(
 
                ptID(1463643093'1): LocalComponent(
 
                    Native,
 
                ),
 
                ptID(3382080479'1): LocalComponent(
 
                ptID(1463643093'0): LocalComponent(
 
                    Native,
 
                ),
 
            },
 
@@ -38,16 +38,40 @@ CID(3382080479): All session info dumped!: {
 
        proto_components: {},
 
    },
 
}
 
CID(3382080479): Session optimizations applied
 
CID(3382080479): connect() finished. setup phase complete
 
CID(3382080479): ~~~ SYNC called with timeout Some(5s); starting round 0
 
CID(3382080479): Nonsync running 0 proto components...
 
CID(3382080479): All 0 proto components are now done with Nonsync phase
 
CID(3382080479): Solution storage initialized
 
CID(3382080479): Translating 1 native batches into branches...
 
CID(3382080479): Native branch index=0 contains internal inconsistency wrt. fvID(3382080479'1). Skipping
 
CID(3382080479): Native starts with no branches! Failure!
 
CID(3382080479): No parent. Deciding on failure
 
CID(3382080479): Committing to decision Failure!
 
CID(3382080479): Announcing decision CommMsg(CommMsg { round_index: 0, contents: Announce { decision: Failure } }) through child endpoints {}
 
CID(3382080479): Connector dropping. Goodbye!
 
CID(1463643093): Session optimizations applied
 
CID(1463643093): connect() finished. setup phase complete
 
CID(1463643093): ~~~ SYNC called with timeout Some(5s); starting round 0
 
CID(1463643093): Nonsync running 0 proto components...
 
CID(1463643093): All 0 proto components are now done with Nonsync phase
 
CID(1463643093): Solution storage initialized
 
CID(1463643093): Translating 1 native batches into branches...
 
CID(1463643093): Native branch index=0 contains internal inconsistency wrt. fvID(1463643093'1). Skipping
 
CID(1463643093): Native starts with no branches! Failure!
 
CID(1463643093): No parent. Deciding on failure
 
CID(1463643093): Committing to decision Failure!
 
CID(1463643093): Announcing decision CommMsg(CommMsg { round_index: 0, contents: Announce { decision: Failure } }) through child endpoints {}
 
CID(1463643093): Sync round ending! Cleaning up
 
CID(1463643093): ~~~ SYNC called with timeout Some(5s); starting round 0
 
CID(1463643093): Nonsync running 0 proto components...
 
CID(1463643093): All 0 proto components are now done with Nonsync phase
 
CID(1463643093): Solution storage initialized
 
CID(1463643093): Translating 1 native batches into branches...
 
CID(1463643093): Native branch index=0 has consistent Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): Native branch 0 sending msg SendPayloadMsg { predicate: Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }, payload: Payload([104, 101, 108, 108, 111]) }
 
CID(1463643093): Done translating native batches into branches
 
CID(1463643093): Running all 0 proto components to their sync blocker...
 
CID(1463643093): All proto components are blocked
 
CID(1463643093): Entering decision loop...
 
CID(1463643093): feeding native getter ptID(1463643093'1) SendPayloadMsg { predicate: Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }, payload: Payload([104, 101, 108, 108, 111]) }
 
CID(1463643093): visiting native branch NativeBranch { index: 0, gotten: {}, to_get: {ptID(1463643093'1)} } with Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): NEW COMPONENT SOLUTION LocalComponent(Native) Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): storing NEW LOCAL SOLUTION Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): branch pred covers it! Accept the msg
 
CID(1463643093): Check if we have any local decisions...
 
CID(1463643093): New local decision with solution Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }...
 
CID(1463643093): No parent. Deciding on solution Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }
 
CID(1463643093): Committing to decision Success(Predicate { Trues: {fvID(1463643093'1)}, Falses: {} })!
 
CID(1463643093): Announcing decision CommMsg(CommMsg { round_index: 0, contents: Announce { decision: Success(Predicate { Trues: {fvID(1463643093'1)}, Falses: {} }) } }) through child endpoints {}
 
CID(1463643093): End round with (updated) component states []
 
CID(1463643093): Sync round ending! Cleaning up
 
CID(1463643093): Connector dropping. Goodbye!
examples/6_atomic/amy.c
Show inline comments
 
@@ -23,7 +23,6 @@ int main(int argc, char** argv) {
 
	int err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	
 
	/*
 
	printf("Let's try again, doing both\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	connector_get(c, getter);
 
@@ -32,7 +31,6 @@ int main(int argc, char** argv) {
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, getter, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	*/
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
examples/reowolf_rs.dll
Show inline comments
 
binary diff not shown
src/lib.rs
Show inline comments
 
@@ -5,12 +5,9 @@ mod common;
 
mod protocol;
 
mod runtime;
 

	
 
// #[cfg(test)]
 
// mod test;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Polarity, PortId};
 
pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger};
 

	
 
// #[cfg(feature = "ffi")]
 
// pub use runtime::ffi;
 
#[cfg(feature = "ffi")]
 
pub use runtime::ffi;
src/runtime/communication.rs
Show inline comments
 
@@ -269,10 +269,11 @@ impl Connector {
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                // TODO
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        // restore the invariant
 
        comm.native_batches.push(Default::default());
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
@@ -353,7 +354,6 @@ impl Connector {
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 
        comm.native_batches.push(Default::default());
 

	
 
        // run all proto components to their sync blocker
 
        log!(
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
mod endpoints;
 
pub mod error;
 
mod ffi;
 
mod logging;
 
mod setup;
 

	
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
src/runtime/tests.rs
Show inline comments
 
@@ -18,10 +18,6 @@ fn next_test_addr() -> SocketAddr {
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> =
 
        { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) };
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
@@ -30,6 +26,17 @@ fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connecto
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap())
 
    };
 
}
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(b"hello" as &[u8])
 
    };
 
}
 

	
 
//////////////////////////////////////////
 

	
 
#[test]
 
@@ -110,7 +117,7 @@ fn put_no_sync() {
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
@@ -119,7 +126,7 @@ fn wrong_polarity_bad() {
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
@@ -128,8 +135,8 @@ fn dup_put_bad() {
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap_err();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
@@ -175,10 +182,10 @@ fn native_polarity_checks() {
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
@@ -209,7 +216,7 @@ fn native_self_msg() {
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
@@ -230,7 +237,7 @@ fn two_natives_msg() {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
@@ -269,7 +276,7 @@ fn connector_pair_nondet() {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
@@ -286,6 +293,19 @@ fn native_immediately_inconsistent() {
 
    c.sync(Some(Duration::from_secs(30))).unwrap_err();
 
}
 

	
 
#[test]
 
fn native_recovers() {
 
    let test_log_path = Path::new("./logs/native_recovers");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    /*
 
@@ -296,7 +316,7 @@ fn cannot_use_moved_ports() {
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, (b"hello" as &[_]).into()).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
@@ -312,7 +332,7 @@ fn sync_sync() {
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"sync", &[g0, p1]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p0, (b"hello" as &[_]).into()).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.gotten(g1).unwrap();
 
@@ -377,7 +397,7 @@ fn distributed_msg_bounce() {
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.gotten(g).unwrap();
0 comments (0 inline, 0 general)