Changeset - a3c92705eeee
[Not reviewed]
0 10 4
Christopher Esterhuyse - 5 years ago 2020-07-14 11:39:53
christopher.esterhuyse@gmail.com
bugfix: native component branch forks that clash are MERGED rather than overwritten. Avoids race condition where (1) branch x ends and submits a solution, (2) branch y is created, has same predicate as x and overwrites it, but has a subset of its messages, (3) round ends in success but branch x is gone, so no suitable native branch is found
14 files changed with 407 insertions and 58 deletions:
0 comments (0 inline, 0 general)
examples/eg_protocols.pdl
Show inline comments
 
primitive pres_2(in i, out o) {
 
  synchronous {
 
    //put(o, get(i));
 
  }
 
}
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
  while(true) synchronous {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 
\ No newline at end of file
 
}
examples/pres_1/amy.c
Show inline comments
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector...
 
	char msgbuf[64];
 
	// ask user what message to send
 
	size_t msglen = get_user_msg(msgbuf, sizeof(msgbuf));
 

	
 
	// Create a connector, configured with our (trivial) protocol.
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_1_amy.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	rw_err_peek(c);
 
	
 
	// ... with 1 outgoing network connection
 
	PortId p0;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &p0, addr_str, sizeof(addr_str)-1, Polarity_Getter, EndpointPolarity_Active);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_add_net_port(c, &p0, addr_str, sizeof(addr_str)-1,
 
			Polarity_Putter, EndpointPolarity_Passive);
 
	rw_err_peek(c);
 
	
 
	// Connect! Begin communication. 5000ms timeout
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	rw_err_peek(c);
 
	
 
	// Ask to receive a message...
 
	connector_get(c, p0);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	// Prepare a message to send
 
	connector_put_bytes(c, p0, msgbuf, msglen);
 
	rw_err_peek(c);
 
	
 
	// ... or timeout within 1000ms.
 
	// ... reach new consistent state within 1000ms deadline.
 
	connector_sync(c, 1000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// Print the message we received
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, p0, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	rw_err_peek(c);
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
examples/pres_1/bob.c
Show inline comments
 
@@ -2,31 +2,37 @@
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector...
 
	// Create a connector, configured with our (trivial) protocol.
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_1_bob.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	rw_err_peek(c);
 
	
 
	// ... with 1 outgoing network connection
 
	PortId p0;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &p0, addr_str, sizeof(addr_str)-1, Polarity_Putter, EndpointPolarity_Passive);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_add_net_port(c, &p0, addr_str, sizeof(addr_str)-1,
 
			Polarity_Getter, EndpointPolarity_Active);
 
	rw_err_peek(c);
 
	
 
	// Connect (5000ms timeout). Begin communication. 
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	rw_err_peek(c);
 
	
 
	// Send a single 2-byte message...
 
	connector_put_bytes(c, p0, "hi", 2);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	// Prepare to receive a message.
 
	connector_get(c, p0);
 
	rw_err_peek(c);
 
	
 
	// ... and acknowledge receipt within 1000ms.
 
	// ... reach new consistent state within 1000ms deadline.
 
	connector_sync(c, 1000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	rw_err_peek(c);
 

	
 
	// Read our received message
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, p0, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
examples/pres_2/bob.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with a protocol defined in a file
 
	char * pdl = buffer_pdl("./eg_protocols.pdl");
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, strlen(pdl));
 
	char logpath[] = "./pres_2_bob.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	// ... with 1 outgoing network connection
 
	PortId ports[3];
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr_str, sizeof(addr_str)-1,
 
			Polarity_Getter, EndpointPolarity_Active);
 
	connector_add_port_pair(c, &ports[1], &ports[2]);
 
	connector_add_component(c, "pres_2", 6, ports, 2);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 
	
 
	// Prepare to receive a message.
 
	connector_get(c, ports[2]);
 
	rw_err_peek(c);
 
	
 
	// ... reach new consistent state within 1000ms deadline.
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	// Read our received message
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[2], &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(pdl);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/pres_3/amy.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with our (trivial) protocol.
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_3_amy.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	// ... with 2 outgoing network connections
 
	PortId ports[2];
 
	char * addr = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr, strlen(addr),
 
			Polarity_Putter, EndpointPolarity_Passive);
 
	rw_err_peek(c);
 
	addr = "127.0.0.1:8001";
 
	connector_add_net_port(c, &ports[1], addr, strlen(addr),
 
			Polarity_Putter, EndpointPolarity_Passive);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 
	
 
	printf("\nputting {A}...\n");
 
	connector_put_bytes(c, ports[0], "A", 1);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("\nputting {B}...\n");
 
	connector_put_bytes(c, ports[1], "B", 1);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("\nputting {A, B}...\n");
 
	connector_put_bytes(c, ports[0], "A", 1);
 
	connector_put_bytes(c, ports[1], "B", 1);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 
	
 
	printf("\nExiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/pres_3/bob.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with our (trivial) protocol.
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_3_bob.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 

	
 
	// ... with 2 outgoing network connections
 
	PortId ports[2];
 
	char * addr = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	rw_err_peek(c);
 
	addr = "127.0.0.1:8001";
 
	connector_add_net_port(c, &ports[1], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 

	
 
	for(int i=0; i<3; i++) {
 
		printf("\nGetting from both...\n");
 
		connector_get(c, ports[0]);
 
		rw_err_peek(c);
 
		connector_get(c, ports[1]);
 
		rw_err_peek(c);
 
		connector_sync(c, 1000);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/pres_4/bob.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with our (trivial) protocol.
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_3_bob.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 

	
 
	// ... with 2 outgoing network connections
 
	PortId ports[2];
 
	char * addr = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	rw_err_peek(c);
 
	addr = "127.0.0.1:8001";
 
	connector_add_net_port(c, &ports[1], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 

	
 
	for(int i=0; i<3; i++) {
 
		printf("\nNext round...\n");
 
		printf("\nOption 0: Get {A}\n");
 
		connector_get(c, ports[0]);
 
		connector_next_batch(c);
 
		rw_err_peek(c);
 

	
 
		printf("\nOption 1: Get {B}\n");
 
		connector_get(c, ports[1]);
 
		connector_next_batch(c);
 
		rw_err_peek(c);
 

	
 
		printf("\nOption 2: Get {A, B}\n");
 
		connector_get(c, ports[0]);
 
		connector_get(c, ports[1]);
 
		int code = connector_sync(c, 1000);
 
		printf("Outcome: %d\n", code);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/utility.c
Show inline comments
 
@@ -3,6 +3,20 @@
 
#include <errno.h>
 
#include <string.h>
 
#include <unistd.h>
 
#include "../reowolf.h"
 

	
 
size_t get_user_msg(char * buf, size_t cap) {
 
	memset(buf, 0, cap);
 
	printf("Insert a msg of max len %d: ", cap);
 
	fgets(buf, cap, stdin);
 
	for(size_t len = 0; len<cap; len++)
 
		if(buf[len]==0 || buf[len]=='\n')
 
			return len;
 
	return cap;
 
}
 
void rw_err_peek(Connector * c) {
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
}
 

	
 
// allocates a buffer!
 
char * buffer_pdl(char * filename) {
 
@@ -19,4 +33,4 @@ char * buffer_pdl(char * filename) {
 
	fclose(f);
 
	pdl[fsize] = 0;
 
	return pdl;
 
}
 
\ No newline at end of file
 
}
src/common.rs
Show inline comments
 
@@ -26,8 +26,11 @@ pub(crate) use std::{
 
};
 
pub(crate) use Polarity::*;
 

	
 
pub(crate) trait IdParts {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix);
 
}
 
pub type ConnectorId = u32;
 
pub type PortSuffix = u32;
 
pub type U32Suffix = u32;
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
@@ -39,7 +42,7 @@ pub struct ProtoComponentId(Id);
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
    pub(crate) u32_suffix: U32Suffix,
 
}
 
#[derive(Clone, Debug, Default)]
 
pub struct U32Stream {
 
@@ -86,6 +89,21 @@ pub(crate) enum SyncBlocker {
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        (self.connector_id, self.u32_suffix)
 
    }
 
}
 
impl IdParts for PortId {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl IdParts for ProtoComponentId {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl U32Stream {
 
    pub(crate) fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
@@ -158,12 +176,14 @@ impl From<Vec<u8>> for Payload {
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "ptID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
        let (a, b) = self.id_parts();
 
        write!(f, "pid{}_{}", a, b)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
        let (a, b) = self.id_parts();
 
        write!(f, "cid{}_{}", a, b)
 
    }
 
}
 
impl Debug for Payload {
src/ffi/mod.rs
Show inline comments
 
@@ -377,6 +377,7 @@ pub unsafe extern "C" fn connector_put_bytes(
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.get(port) {
 
        Ok(()) => ERR_OK,
 
        Err(err) => {
 
@@ -388,6 +389,7 @@ pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId)
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isize {
 
    StoredError::tl_clear();
 
    match connector.next_batch() {
 
        Ok(n) => n as isize,
 
        Err(err) => {
src/runtime/communication.rs
Show inline comments
 
@@ -19,7 +19,7 @@ struct BranchingNative {
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>, // native branch is ended iff to_get.is_empty()
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
@@ -248,6 +248,11 @@ impl Connector {
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.id_manager.new_spec_var_stream(),
 
@@ -296,8 +301,8 @@ impl Connector {
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            if to_get.is_empty() {
 
                // this branch is immediately ready to be part of a solution
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                log!(
 
                    cu.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
@@ -310,7 +315,6 @@ impl Connector {
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
@@ -649,6 +653,11 @@ impl Connector {
 
        comm.endpoint_manager.send_to_comms(parent, &msg)
 
    }
 
}
 
impl NativeBranch {
 
    fn is_ended(&self) -> bool {
 
        self.to_get.is_empty()
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
@@ -667,13 +676,13 @@ impl BranchingNative {
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                branch.to_get.remove(&getter);
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                if branch.is_ended() {
 
                    log!(
 
                        cu.logger,
 
                        "new native solution with {:?} (to_get.is_empty()) with gotten {:?}",
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
@@ -683,6 +692,13 @@ impl BranchingNative {
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                } else {
 
                    log!(
 
                        cu.logger,
 
                        "Fed native {:?} still has to_get {:?}",
 
                        &predicate,
 
                        &branch.to_get
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
@@ -692,7 +708,7 @@ impl BranchingNative {
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                finished.insert(predicate, branch);
 
                Self::fold_into(finished, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
@@ -704,13 +720,13 @@ impl BranchingNative {
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                    Self::fold_into(finished, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    finished.insert(predicate, branch);
 
                    Self::fold_into(finished, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
@@ -723,8 +739,8 @@ impl BranchingNative {
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::fold_into(finished, predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
@@ -735,11 +751,33 @@ impl BranchingNative {
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::fold_into(finished, predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn fold_into(
 
        branches: &mut HashMap<Predicate, NativeBranch>,
 
        predicate: Predicate,
 
        mut branch: NativeBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                let b = eo.get_mut();
 
                for (k, v) in branch.gotten.drain() {
 
                    if b.gotten.insert(k, v).is_none() {
 
                        b.to_get.remove(&k);
 
                    }
 
                }
 
            }
 
        }
 
        // if let Some(prev) = branches.insert(predicate, branch)
 
    }
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
        log!(
 
@@ -749,7 +787,14 @@ impl BranchingNative {
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            if branch.to_get.is_empty() && branch_predicate.assigns_subset(solution_predicate) {
 
            log!(
 
                logger,
 
                "Considering native branch {:?} with to_get {:?} gotten {:?}",
 
                &branch_predicate,
 
                &branch.to_get,
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
src/runtime/logging.rs
Show inline comments
 
use super::*;
 

	
 
fn secs_since_unix_epoch() -> f64 {
 
    std::time::SystemTime::now()
 
        .duration_since(std::time::UNIX_EPOCH)
 
        .map(|dur| dur.as_secs_f64())
 
        .unwrap_or(0.)
 
}
 
impl FileLogger {
 
    pub fn new(connector_id: ConnectorId, file: std::fs::File) -> Self {
 
        Self(connector_id, file)
 
@@ -16,15 +22,16 @@ impl Logger for DummyLogger {
 
        None
 
    }
 
}
 

	
 
impl Logger for VecLogger {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write> {
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        let _ = write!(&mut self.1, "CID({}) at {:.6} ", self.0, secs_since_unix_epoch());
 
        Some(self)
 
    }
 
}
 
impl Logger for FileLogger {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write> {
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        let _ = write!(&mut self.1, "CID({}) at {:.6} ", self.0, secs_since_unix_epoch());
 
        Some(&mut self.1)
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -351,10 +351,12 @@ impl IdManager {
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        SpecVarStream {
 
            connector_id: self.connector_id,
 
            port_suffix_stream: self.port_suffix_stream.clone(),
 
        let mut port_suffix_stream = self.port_suffix_stream.clone();
 
        const JUMP_OVER: usize = 100; // Jumping is entirely unnecessary. It's only used to make spec vars easier to spot in logs
 
        for _ in 0..JUMP_OVER {
 
            port_suffix_stream.next(); // throw away an ID
 
        }
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
@@ -557,7 +559,13 @@ impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_tuple("Predicate").field(&self.assigned).finish()
 
        struct Assignment<'a>((&'a SpecVar, &'a SpecVal));
 
        impl Debug for Assignment<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                write!(f, "{:?}={:?}", (self.0).0, (self.0).1)
 
            }
 
        }
 
        f.debug_set().entries(self.assigned.iter().map(Assignment)).finish()
 
    }
 
}
 
impl serde::Serialize for SerdeProtocolDescription {
 
@@ -578,9 +586,15 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 
impl IdParts for SpecVar {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl Debug for SpecVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_tuple("vrID").field(&self.0).finish()
 
        let (a, b) = self.id_parts();
 
        write!(f, "v{}_{}", a, b)
 
    }
 
}
 
impl SpecVal {
src/runtime/tests.rs
Show inline comments
 
@@ -791,3 +791,46 @@ fn udp_reowolf_swap() {
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn pres_3() {
 
    let test_log_path = Path::new("./logs/pres_3");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            // put {A} and FAIL
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {B} and FAIL
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {A, B} and SUCCEED
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..2 {
 
                // get {A, B} and FAIL
 
                c.get(p0).unwrap();
 
                c.get(p1).unwrap();
 
                c.sync(SEC1).unwrap_err();
 
            }
 
            // get {A, B} and SUCCEED
 
            c.get(p0).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)