Changeset - d9774c9084d7
[Not reviewed]
4 9 2
Christopher Esterhuyse - 5 years ago 2020-06-30 10:45:50
christopher.esterhuyse@gmail.com
more logging, testing, examples and bugfixes: (1) components remember whether they have submitted a solution; only those are considered when selecting a branch at the end of a round, (2) retrying active connections during setup phase were using the wrong index for looking up their TODO structure, (3) recently failed connections are deregistered from mio and reregistered after the retry process restarts s.t. they don't produce a storm of mio events
15 files changed with 256 insertions and 1045 deletions:
0 comments (0 inline, 0 general)
examples/a_swap/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_amy_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
    
 
    protocol_description_destroy(pd);
 
    connector_destroy(c);
 
    return 0;
 
}
 
\ No newline at end of file
examples/a_swap/bob.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_bob_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
    
 
    protocol_description_destroy(pd);
 
    connector_destroy(c);
 
    return 0;
 
}
 
\ No newline at end of file
examples/eg_protocols.pdl
Show inline comments
 
primitive foo(){}
 
\ No newline at end of file
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 
\ No newline at end of file
examples/make.py
Show inline comments
 
import os, glob, subprocess
 
import os, glob, subprocess, time
 
script_path = os.path.dirname(os.path.realpath(__file__));
 
for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
  print("compiling", c_file)
 
@@ -12,4 +12,5 @@ for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
    "-o",           # output flag
 
    c_file[:-2]     # output filename
 
  ];
 
  subprocess.run(args);
 
  subprocess.run(args)
 
input("Blocking until newline...");
examples/reowolf_rs.dll
Show inline comments
 
deleted file
 
binary diff not shown
src/runtime/communication.rs
Show inline comments
 
@@ -26,6 +26,7 @@ struct BranchingProtoComponent {
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    ended: bool,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 
@@ -319,7 +320,7 @@ impl Connector {
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
                Ok(Some(branching_native.collapse_with(&mut *cu.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.logger, "Sync round ending! Cleaning up");
 
@@ -364,6 +365,7 @@ impl Connector {
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            // initially, no components have .ended==true
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
@@ -651,6 +653,12 @@ impl BranchingNative {
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    log!(
 
                        cu.logger,
 
                        "new native solution with {:?} (to_get.is_empty()) with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
@@ -715,10 +723,17 @@ impl BranchingNative {
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk {
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            if solution_predicate.satisfies(&branch_predicate) {
 
            if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
@@ -771,6 +786,7 @@ impl BranchingProtoComponent {
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
@@ -832,6 +848,11 @@ impl BranchingProtoComponent {
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
@@ -885,7 +906,7 @@ impl BranchingProtoComponent {
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
            if branch.ended && branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
@@ -893,7 +914,7 @@ impl BranchingProtoComponent {
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state, ended: false };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
src/runtime/endpoints.rs
Show inline comments
 
@@ -15,7 +15,7 @@ enum TryRecyAnyError {
 
impl Endpoint {
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        _logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
src/runtime/ffi.rs
Show inline comments
 
@@ -241,7 +241,9 @@ pub unsafe extern "C" fn connector_add_net_port(
 
    };
 
    match connector.new_net_port(port_polarity, sock_address, endpoint_polarity) {
 
        Ok(p) => {
 
            if !port.is_null() {
 
                port.write(p);
 
            }
 
            0
 
        }
 
        Err(err) => {
 
@@ -360,13 +362,15 @@ pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_milli
 
pub unsafe extern "C" fn connector_gotten_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    len: *mut usize,
 
    out_len: *mut usize,
 
) -> *const u8 {
 
    StoredError::tl_clear();
 
    match connector.gotten(port) {
 
        Ok(payload_borrow) => {
 
            let slice = payload_borrow.as_slice();
 
            len.write(slice.len());
 
            if !out_len.is_null() {
 
                out_len.write(slice.len());
 
            }
 
            slice.as_ptr()
 
        }
 
        Err(err) => {
src/runtime/logging.rs
Show inline comments
 
@@ -18,13 +18,13 @@ impl Logger for DummyLogger {
 
}
 
impl Logger for VecLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        self
 
    }
 
}
 
impl Logger for FileLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        &mut self.1
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -217,15 +217,15 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    // fn insert(&mut self, element: T) -> bool {
 
    //     match self.vec.binary_search(&element) {
 
    //         Ok(_) => false,
 
    //         Err(index) => {
 
    //             self.vec.insert(index, element);
 
    //             true
 
    //         }
 
    //     }
 
    // }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
@@ -265,9 +265,13 @@ impl Drop for Connector {
 
impl Connector {
 
    fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        unsafe { std::mem::transmute::<Bytes8, ConnectorId>(bytes) }
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
src/runtime/setup.rs
Show inline comments
 
@@ -146,7 +146,7 @@ fn new_endpoint_manager(
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(90);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 

	
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
@@ -169,7 +169,10 @@ fn new_endpoint_manager(
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 

	
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
@@ -181,11 +184,15 @@ fn new_endpoint_manager(
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if token == WAKER_TOKEN {
 
                log!(logger, "Notification from waker");
 
                log!(
 
                    logger,
 
                    "Notification from waker. connect_failed is {:?}",
 
                    connect_failed.iter()
 
                );
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    let todo: &mut Todo = &mut todos[index];
 
                    log!(
 
                        logger,
 
                        "Restarting connection with endpoint {:?} {:?}",
 
@@ -196,14 +203,16 @@ fn new_endpoint_manager(
 
                        TodoEndpoint::Endpoint(endpoint) => {
 
                            let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                .expect("mio::TcpStream connect should not fail!");
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                            std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                            poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                            poll.registry()
 
                                .register(&mut endpoint.stream, Token(index), BOTH)
 
                                .unwrap();
 
                        }
 
                        _ => unreachable!(),
 
                    }
 
                }
 
            } else {
 
                let todo: &mut Todo = &mut todos[index];
 
                // FIRST try convert this into an endpoint
 
                if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    match listener.accept() {
 
@@ -234,7 +243,19 @@ fn new_endpoint_manager(
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        connect_failed.insert(index);
 
                        if connect_failed.insert(index) {
 
                            log!(
 
                                logger,
 
                                "Connection failed for {:?}. List is {:?}",
 
                                index,
 
                                connect_failed.iter()
 
                            );
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                        } else {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 

	
 
                        if waker_continue_signal.is_none() {
 
                            log!(logger, "First connect failure. Starting waker thread");
 
                            let waker =
src/runtime/tests.rs
Show inline comments
 
@@ -26,9 +26,19 @@ fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connecto
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap())
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
lazy_static::lazy_static! {
 
@@ -517,3 +527,55 @@ fn net_self_loop() {
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_millis(500))).unwrap();
 
}
 

	
 
#[test]
 
fn nobody_connects_active() {
 
    let test_log_path = Path::new("./logs/nobody_connects_active");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 
#[test]
 
fn nobody_connects_passive() {
 
    let test_log_path = Path::new("./logs/nobody_connects_passive");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 

	
 
#[test]
 
fn together() {
 
    let test_log_path = Path::new("./logs/together");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
src/test/connector.rs
Show inline comments
 
deleted file
src/test/mod.rs
Show inline comments
 
deleted file
src/test/setup.rs
Show inline comments
 
deleted file
0 comments (0 inline, 0 general)