Changeset - 07b6791e8eb0
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-01 09:15:46
christopher.esterhuyse@gmail.com
bumped up bincode dependency version. using varint decoding everywhere to shrink msg lengths
5 files changed with 57 insertions and 16 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -10,13 +10,13 @@ edition = "2018"
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.2.1"
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
# integer-encoding = "1.1.5"
 
# byteorder = "1.3.4"
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // let w = $logger.line_writer();
 
        //    let _ = write!(w, "[ENDPT]");
 
        //    let _ = writeln!(w, $($arg)*);
 
        let w = $logger.line_writer();
 
        let _ = write!(w, "[ENDPT]");
 
        let _ = writeln!(w, $($arg)*);
 
    }};
 
}
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        let _ = writeln!($logger.line_writer(), $($arg)*);
 
    }};
src/runtime/endpoints.rs
Show inline comments
 
@@ -10,15 +10,18 @@ enum TryRecyAnyError {
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
}
 

	
 
/////////////////////
 
impl Endpoint {
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        bincode::config::DefaultOptions::default()
 
    }
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        _logger: &mut dyn Logger,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            endptlog!(logger, "Stream read to end {:?}", &res);
 
@@ -26,31 +29,34 @@ impl Endpoint {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
            }
 
        }
 
        endptlog!(logger, "Inbox bytes {:x?}", &self.inbox);
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        match bincode::deserialize_from(&mut monitored) {
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(MalformedMessage),
 
                // println!("SERDE ERRKIND {:?}", e);
 
                // Err(MalformedMessage)
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint)
 
        use bincode::config::Options;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| EndpointError::BrokenEndpoint)
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_endpoints()
src/runtime/mod.rs
Show inline comments
 
@@ -482,6 +482,43 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 

	
 
#[test]
 
fn bincode_serde() {
 
    let mut b = Vec::with_capacity(64);
 
    use bincode::config::Options;
 
    let opt = bincode::config::DefaultOptions::default();
 
    opt.serialize_into(&mut b, &Decision::Failure).unwrap();
 
    println!("failure  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(&mut b, &CommMsgContents::Suggest { suggestion: Decision::Failure })
 
        .unwrap();
 
    println!("decision {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        },
 
    )
 
    .unwrap();
 
    println!("commmsg  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &Msg::CommMsg(CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        }),
 
    )
 
    .unwrap();
 
    println!("msg      {:x?}", b);
 
    b.clear();
 
}
src/runtime/tests.rs
Show inline comments
 
@@ -14,21 +14,19 @@ fn next_test_addr() -> SocketAddr {
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
@@ -482,37 +480,37 @@ fn chain_connect() {
 
    let test_log_path = Path::new("./logs/chain_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(10, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[1], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // LEADER
 
            let mut c = file_logged_connector(7, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[1], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[2], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(4, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[2], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
0 comments (0 inline, 0 general)