use crate::common::*; use crate::runtime::*; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{ErrorKind::InvalidData, Read, Write}; pub trait Ser: Write { fn ser(&mut self, t: &T) -> Result<(), std::io::Error>; } pub trait De: Read { fn de(&mut self) -> Result; } pub struct MonitoredReader { bytes: usize, r: R, } impl From for MonitoredReader { fn from(r: R) -> Self { Self { r, bytes: 0 } } } impl MonitoredReader { pub fn bytes_read(&self) -> usize { self.bytes } } impl Read for MonitoredReader { fn read(&mut self, buf: &mut [u8]) -> Result { let n = self.r.read(buf)?; self.bytes += n; Ok(n) } } ///////////////////////////////////////// struct VarLenInt(u64); macro_rules! ser_seq { ( $w:expr ) => {{ io::Result::Ok(()) }}; ( $w:expr, $first:expr ) => {{ $w.ser($first) }}; ( $w:expr, $first:expr, $( $x:expr ),+ ) => {{ $w.ser($first)?; ser_seq![$w, $( $x ),*] }}; } ///////////////////////////////////////// impl Ser for W { fn ser(&mut self, t: &PortId) -> Result<(), std::io::Error> { self.ser(&t.controller_id)?; self.ser(&VarLenInt(t.port_index as u64)) } } impl De for R { fn de(&mut self) -> Result { Ok(PortId { controller_id: self.de()?, port_index: De::::de(self)?.0 as u32 }) } } impl Ser for W { fn ser(&mut self, t: &bool) -> Result<(), std::io::Error> { self.ser(&match t { true => b'T', false => b'F', }) } } impl De for R { fn de(&mut self) -> Result { let b: u8 = self.de()?; Ok(match b { b'T' => true, b'F' => false, _ => return Err(InvalidData.into()), }) } } impl Ser for W { fn ser(&mut self, t: &u8) -> Result<(), std::io::Error> { self.write_u8(*t) } } impl De for R { fn de(&mut self) -> Result { self.read_u8() } } impl Ser for W { fn ser(&mut self, t: &u16) -> Result<(), std::io::Error> { self.write_u16::(*t) } } impl De for R { fn de(&mut self) -> Result { self.read_u16::() } } impl Ser for W { fn ser(&mut self, t: &u32) -> Result<(), std::io::Error> { self.write_u32::(*t) } } impl De for R { fn de(&mut self) -> Result { self.read_u32::() } } impl Ser for W { fn ser(&mut self, t: &u64) -> Result<(), std::io::Error> { self.write_u64::(*t) } } impl De for R { fn de(&mut self) -> Result { self.read_u64::() } } impl Ser for W { fn ser(&mut self, t: &Payload) -> Result<(), std::io::Error> { self.ser(&VarLenInt(t.len() as u64))?; for byte in t.as_slice() { self.ser(byte)?; } Ok(()) } } impl De for R { fn de(&mut self) -> Result { let VarLenInt(len) = self.de()?; let mut x = Vec::with_capacity(len as usize); for _ in 0..len { x.push(self.de()?); } Ok(x.into()) } } impl Ser for W { fn ser(&mut self, t: &VarLenInt) -> Result<(), std::io::Error> { integer_encoding::VarIntWriter::write_varint(self, t.0).map(|_| ()) } } impl De for R { fn de(&mut self) -> Result { integer_encoding::VarIntReader::read_varint(self).map(VarLenInt) } } impl Ser for W { fn ser(&mut self, t: &Predicate) -> Result<(), std::io::Error> { self.ser(&VarLenInt(t.assigned.len() as u64))?; for (channel_id, boolean) in &t.assigned { ser_seq![self, channel_id, boolean]?; } Ok(()) } } impl De for R { fn de(&mut self) -> Result { let VarLenInt(len) = self.de()?; let mut assigned = BTreeMap::::default(); for _ in 0..len { assigned.insert(self.de()?, self.de()?); } Ok(Predicate { assigned }) } } impl Ser for W { fn ser(&mut self, t: &Decision) -> Result<(), std::io::Error> { match t { Decision::Failure => self.ser(&b'F'), Decision::Success(predicate) => { self.ser(&b'S')?; self.ser(predicate) } } } } impl De for R { fn de(&mut self) -> Result { let b: u8 = self.de()?; Ok(match b { b'F' => Decision::Failure, b'S' => Decision::Success(self.de()?), _ => return Err(InvalidData.into()), }) } } impl Ser for W { fn ser(&mut self, t: &Polarity) -> Result<(), std::io::Error> { self.ser(&match t { Polarity::Putter => b'P', Polarity::Getter => b'G', }) } } impl De for R { fn de(&mut self) -> Result { let b: u8 = self.de()?; Ok(match b { b'P' => Polarity::Putter, b'G' => Polarity::Getter, _ => return Err(InvalidData.into()), }) } } impl Ser for W { fn ser(&mut self, t: &Msg) -> Result<(), std::io::Error> { use {CommMsgContents::*, SetupMsg::*}; match t { Msg::SetupMsg(s) => match s { // [flag, data] MyPortInfo { polarity, port } => ser_seq![self, &0u8, polarity, port], LeaderEcho { maybe_leader } => ser_seq![self, &1u8, maybe_leader], LeaderAnnounce { leader } => ser_seq![self, &2u8, leader], YouAreMyParent => ser_seq![self, &3u8], }, Msg::CommMsg(CommMsg { round_index, contents }) => { // [flag, round_num, data] let varlenint = &VarLenInt(*round_index as u64); match contents { SendPayload { payload_predicate, payload } => { ser_seq![self, &4u8, varlenint, payload_predicate, payload] } Elaborate { partial_oracle } => ser_seq![self, &5u8, varlenint, partial_oracle], Announce { decision } => ser_seq![self, &6u8, varlenint, decision], Failure => ser_seq![self, &7u8, varlenint], } } } } } impl De for R { fn de(&mut self) -> Result { use {CommMsgContents::*, SetupMsg::*}; let b: u8 = self.de()?; Ok(match b { 0..=3 => Msg::SetupMsg(match b { // [flag, data] 0u8 => MyPortInfo { polarity: self.de()?, port: self.de()? }, 1u8 => LeaderEcho { maybe_leader: self.de()? }, 2u8 => LeaderAnnounce { leader: self.de()? }, 3u8 => YouAreMyParent, _ => unreachable!(), }), 4..=7 => { // [flag, round_num, data] let VarLenInt(varlenint) = self.de()?; let contents = match b { 4u8 => SendPayload { payload_predicate: self.de()?, payload: self.de()? }, 5u8 => Elaborate { partial_oracle: self.de()? }, 6u8 => Announce { decision: self.de()? }, 7u8 => Failure, _ => unreachable!(), }; Msg::CommMsg(CommMsg { round_index: varlenint as usize, contents }) } _ => return Err(InvalidData.into()), }) } }