Changeset - 07b6791e8eb0
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-01 09:15:46
christopher.esterhuyse@gmail.com
bumped up bincode dependency version. using varint decoding everywhere to shrink msg lengths
5 files changed with 57 insertions and 16 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -13,7 +13,7 @@ maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.2.1"
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // let w = $logger.line_writer();
 
        //    let _ = write!(w, "[ENDPT]");
 
        //    let _ = writeln!(w, $($arg)*);
 
        let w = $logger.line_writer();
 
        let _ = write!(w, "[ENDPT]");
 
        let _ = writeln!(w, $($arg)*);
 
    }};
 
}
 
macro_rules! log {
src/runtime/endpoints.rs
Show inline comments
 
@@ -13,9 +13,12 @@ enum TryRecyAnyError {
 

	
 
/////////////////////
 
impl Endpoint {
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        bincode::config::DefaultOptions::default()
 
    }
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        _logger: &mut dyn Logger,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
@@ -29,8 +32,10 @@ impl Endpoint {
 
                Err(_e) => return Err(BrokenEndpoint),
 
            }
 
        }
 
        endptlog!(logger, "Inbox bytes {:x?}", &self.inbox);
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        match bincode::deserialize_from(&mut monitored) {
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
@@ -41,13 +46,14 @@ impl Endpoint {
 
                    Ok(None)
 
                }
 
                _ => Err(MalformedMessage),
 
                // println!("SERDE ERRKIND {:?}", e);
 
                // Err(MalformedMessage)
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint)
 
        use bincode::config::Options;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| EndpointError::BrokenEndpoint)
 
    }
 
}
 

	
src/runtime/mod.rs
Show inline comments
 
@@ -485,3 +485,40 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 

	
 
#[test]
 
fn bincode_serde() {
 
    let mut b = Vec::with_capacity(64);
 
    use bincode::config::Options;
 
    let opt = bincode::config::DefaultOptions::default();
 
    opt.serialize_into(&mut b, &Decision::Failure).unwrap();
 
    println!("failure  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(&mut b, &CommMsgContents::Suggest { suggestion: Decision::Failure })
 
        .unwrap();
 
    println!("decision {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        },
 
    )
 
    .unwrap();
 
    println!("commmsg  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &Msg::CommMsg(CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        }),
 
    )
 
    .unwrap();
 
    println!("msg      {:x?}", b);
 
    b.clear();
 
}
src/runtime/tests.rs
Show inline comments
 
@@ -17,7 +17,6 @@ fn next_test_addr() -> SocketAddr {
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
@@ -25,7 +24,6 @@ fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connecto
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
@@ -485,31 +483,31 @@ fn chain_connect() {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(10, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[1], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // LEADER
 
            let mut c = file_logged_connector(7, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[1], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[2], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(4, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[2], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.connect(Some(Duration::from_secs(2))).unwrap();
 
        });
 
    })
 
    .unwrap();
0 comments (0 inline, 0 general)