From 07b6791e8eb0f5fd2b22860aba0f0a4a173e6a80 2020-07-01 09:15:46 From: Christopher Esterhuyse Date: 2020-07-01 09:15:46 Subject: [PATCH] bumped up bincode dependency version. using varint decoding everywhere to shrink msg lengths --- diff --git a/Cargo.toml b/Cargo.toml index f5fcc33d9d5fdd984de14a02734d8a3e029b9f7a..e629105de36dcf87b4bca2d7812891f476d239a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ maplit = "1.0.2" derive_more = "0.99.2" # runtime -bincode = "1.2.1" +bincode = "1.3.1" serde = { version = "1.0.114", features = ["derive"] } getrandom = "0.1.14" # tiny crate. used to guess controller-id diff --git a/src/macros.rs b/src/macros.rs index 1180dc6c40848074665aed5d726b4547eb6ca067..5ef220b075dfc0d5e665dd6ebb2097a6bda75141 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,8 +1,8 @@ macro_rules! endptlog { ($logger:expr, $($arg:tt)*) => {{ - // let w = $logger.line_writer(); - // let _ = write!(w, "[ENDPT]"); - // let _ = writeln!(w, $($arg)*); + let w = $logger.line_writer(); + let _ = write!(w, "[ENDPT]"); + let _ = writeln!(w, $($arg)*); }}; } macro_rules! log { diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index d4157a0f9b0341311b4d170945725c0b09a355f8..9ecab8f8f7dccca0cc1263d248e2113af58aff08 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -13,9 +13,12 @@ enum TryRecyAnyError { ///////////////////// impl Endpoint { + fn bincode_opts() -> impl bincode::config::Options { + bincode::config::DefaultOptions::default() + } pub(super) fn try_recv( &mut self, - _logger: &mut dyn Logger, + logger: &mut dyn Logger, ) -> Result, EndpointError> { use EndpointError::*; // populate inbox as much as possible @@ -29,8 +32,10 @@ impl Endpoint { Err(_e) => return Err(BrokenEndpoint), } } + endptlog!(logger, "Inbox bytes {:x?}", &self.inbox); let mut monitored = MonitoredReader::from(&self.inbox[..]); - match bincode::deserialize_from(&mut monitored) { + use bincode::config::Options; + match Self::bincode_opts().deserialize_from(&mut monitored) { Ok(msg) => { let msg_size = monitored.bytes_read(); self.inbox.drain(0..(msg_size.try_into().unwrap())); @@ -41,13 +46,14 @@ impl Endpoint { Ok(None) } _ => Err(MalformedMessage), - // println!("SERDE ERRKIND {:?}", e); - // Err(MalformedMessage) }, } } pub(super) fn send(&mut self, msg: &T) -> Result<(), EndpointError> { - bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint) + use bincode::config::Options; + Self::bincode_opts() + .serialize_into(&mut self.stream, msg) + .map_err(|_| EndpointError::BrokenEndpoint) } } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index bdf623dd3a781958a81d5b155db871e15205197c..35dae0109db4b4137b07db5a3737d1fa1bfae2b6 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -485,3 +485,40 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription { Ok(Self(Arc::new(inner))) } } + +#[test] +fn bincode_serde() { + let mut b = Vec::with_capacity(64); + use bincode::config::Options; + let opt = bincode::config::DefaultOptions::default(); + opt.serialize_into(&mut b, &Decision::Failure).unwrap(); + println!("failure {:x?}", b); + b.clear(); + + opt.serialize_into(&mut b, &CommMsgContents::Suggest { suggestion: Decision::Failure }) + .unwrap(); + println!("decision {:x?}", b); + b.clear(); + + opt.serialize_into( + &mut b, + &CommMsg { + round_index: 4, + contents: CommMsgContents::Suggest { suggestion: Decision::Failure }, + }, + ) + .unwrap(); + println!("commmsg {:x?}", b); + b.clear(); + + opt.serialize_into( + &mut b, + &Msg::CommMsg(CommMsg { + round_index: 4, + contents: CommMsgContents::Suggest { suggestion: Decision::Failure }, + }), + ) + .unwrap(); + println!("msg {:x?}", b); + b.clear(); +} diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 7310a07d7ae3169a1b2514f182b82229335661f2..6f8ff25182f6097b3016a470abf682bcb706f97f 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -17,7 +17,6 @@ fn next_test_addr() -> SocketAddr { let port = TEST_PORT.fetch_add(1, SeqCst); SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into() } - fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector { let _ = std::fs::create_dir(dir_path); // we will check failure soon let path = dir_path.join(format!("cid_{:?}.txt", connector_id)); @@ -25,7 +24,6 @@ fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connecto let file_logger = Box::new(FileLogger::new(connector_id, file)); Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8) } - static MINIMAL_PDL: &'static [u8] = b" primitive together(in ia, in ib, out oa, out ob){ while(true) synchronous() { @@ -485,31 +483,31 @@ fn chain_connect() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(Some(Duration::from_secs(2))).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(10, test_log_path); c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(Some(Duration::from_secs(2))).unwrap(); }); s.spawn(|_| { // LEADER let mut c = file_logged_connector(7, test_log_path); c.new_net_port(Getter, sock_addrs[1], Active).unwrap(); c.new_net_port(Putter, sock_addrs[2], Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(Some(Duration::from_secs(2))).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(4, test_log_path); c.new_net_port(Getter, sock_addrs[2], Active).unwrap(); c.new_net_port(Putter, sock_addrs[3], Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(Some(Duration::from_secs(2))).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); c.new_net_port(Getter, sock_addrs[3], Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(Some(Duration::from_secs(2))).unwrap(); }); }) .unwrap();