Changeset - c8c589cbcc4a
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-03-06 10:44:22
christopher.esterhuyse@gmail.com
fixed serialization mistake
4 files changed with 167 insertions and 91 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        if let Decision::Success(predicate) = &decision {
 
            // overwrite MonoN/P
 
            self.inner.mono_n =
 
                self.ephemeral.poly_n.take().unwrap().choose_mono(predicate).unwrap();
 
            self.inner.mono_ps.clear();
 
            self.inner.mono_ps.extend(
 
                self.ephemeral
 
                    .poly_ps
 
                    .drain(..)
 
                    .map(|poly_p| poly_p.choose_mono(predicate).unwrap()),
 
            )
 
        }
 
        let ret = match &decision {
 
            Decision::Success(predicate) => {
 
                // overwrite MonoN/P
 
                self.inner.mono_n =
 
                    self.ephemeral.poly_n.take().unwrap().choose_mono(predicate).unwrap();
 
                self.inner.mono_ps.clear();
 
                self.inner.mono_ps.extend(
 
                    self.ephemeral
 
                        .poly_ps
 
                        .drain(..)
 
                        .map(|poly_p| poly_p.choose_mono(predicate).unwrap()),
 
                );
 
                Ok(())
 
            }
 
            Decision::Failure => Err(SyncErr::Timeout),
 
        };
 
        let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with ekey {:?}",
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        Ok(())
 
        ret
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(predicate) = maybe_predicate {
 
                let decision = Decision::Success(predicate);
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
@@ -108,49 +112,49 @@ impl Controller {
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Instant,
 
        mut deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 
        assert!(self.ephemeral.is_clear());
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned());
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
        while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                mono_ps: &mut self.ephemeral.mono_ps,
 
                inner: &mut self.inner,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
@@ -243,57 +247,85 @@ impl Controller {
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made
 
        log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            log!(&mut self.inner.logger, "`POLLING`...");
 
            let received = self.recv(deadline)?.ok_or_else(|| {
 
                log!(&mut self.inner.logger, ":( timing out");
 
                SyncErr::Timeout
 
            })?;
 
            log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline);
 
            let received = match deadline {
 
                Some(d) => match self.recv(d)? {
 
                    Some(received) => received,
 
                    None => {
 
                        deadline = None;
 
                        match self.inner.family.parent_ekey {
 
                            Some(parent_ekey) => {
 
                                let announcement = Msg::CommMsg(CommMsg {
 
                                    round_index: self.inner.round_index,
 
                                    contents: CommMsgContents::Failure,
 
                                });
 
                                log!(
 
                                    &mut self.inner.logger,
 
                                    "Forwarding {:?} to parent with ekey {:?}",
 
                                    &announcement,
 
                                    parent_ekey
 
                                );
 
                                self.inner
 
                                    .endpoint_exts
 
                                    .get_mut(parent_ekey)
 
                                    .expect("ss")
 
                                    .endpoint
 
                                    .send(announcement.clone())?;
 
                            }
 
                            None => return self.end_round_with_decision(Decision::Failure),
 
                        }
 
                        continue;
 
                    }
 
                },
 
                None => self.recv(Instant::now() + Duration::from_secs(2))?.expect("DRIED UP"),
 
            };
 
            log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(_) => {
 
                Msg::SetupMsg(s) => {
 
                    // This occurs in the event the connector was malformed during connect()
 
                    println!("WASNT EXPECTING {:?}", s);
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "...and its OLD! :(");
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(&mut self.inner.logger, "... DELAY! :(");
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "... its a round-appropriate CommMsg with key {:?}",
 
                        received.recipient
 
                    );
 
@@ -430,67 +462,48 @@ impl Controller {
 
                                Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
                                    {
 
                                        let peeked = self
 
                                            .ephemeral
 
                                            .solution_storage
 
                                            .peek_new_locals()
 
                                            .collect::<Vec<_>>();
 
                                        log!(
 
                                            &mut self.inner.logger,
 
                                            "Got {} new controller-local solutions from RECV: {:?}",
 
                                            peeked.len(),
 
                                            peeked
 
                                        );
 
                                    }
 
                                    if self.handle_locals_maybe_decide()? {
 
                                        return Ok(());
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    };
 
                }
 
            }
 
        }
 
        // 'timeout_loop: loop {
 
        //     log!(&mut self.inner.logger, "`POLLING (already timed out)`...");
 
        //     let received = self.recv_blocking()?;
 
        //     log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
        //     let current_content = match received.msg {
 
        //         Msg::SetupMsg(_) => {
 
        //             // This occurs in the event the connector was malformed during connect()
 
        //             return Err(SyncErr::UnexpectedSetupMsg);
 
        //         }
 
        //         Msg::CommMsg(CommMsg { round_index, contents }) => {
 
        //             if round_index > self.inner.round_index {
 
        //                 self.delay(received);
 
        //                 continue 'timeout_loop;
 
        //             } else {
 
        //                 contents
 
        //             }
 
        //         }
 
        //     };
 
        // }
 
    }
 
}
 
impl ControllerEphemeral {
 
    fn is_clear(&self) -> bool {
 
        self.solution_storage.is_clear()
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.mono_ps.is_empty()
 
            && self.ekey_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.ekey_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
src/runtime/connector.rs
Show inline comments
 
@@ -94,89 +94,91 @@ impl Connector {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Putter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().unwrap();
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.puts.contains_key(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.puts.insert(ekey, payload);
 
        Ok(())
 
    }
 

	
 
    pub fn get(&mut self, native_port_index: usize) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().unwrap();
 
        let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!");
 
        if sync_batch.gets.contains(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.gets.insert(ekey);
 
        Ok(())
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, ()> {
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(()),
 
        };
 
        connected.sync_batches.push(SyncBatch::default());
 
        Ok(connected.sync_batches.len() - 1)
 
    }
 

	
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncErr> {
 
        let deadline = Instant::now() + timeout;
 
        use SyncErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 

	
 
        // do the synchronous round!
 
        connected.controller.sync_round(deadline, Some(connected.sync_batches.drain(..)))?;
 
        let res =
 
            connected.controller.sync_round(Some(deadline), Some(connected.sync_batches.drain(..)));
 
        connected.sync_batches.push(SyncBatch::default());
 
        Ok(connected.controller.inner.mono_n.result.as_mut().unwrap().0)
 
        res?;
 
        Ok(connected.controller.inner.mono_n.result.as_mut().expect("qqqs").0)
 
    }
 

	
 
    pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> {
 
        use ReadGottenErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let &(key, polarity) =
 
            connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let result = connected.controller.inner.mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidNotGet)?;
 
        Ok(payload)
 
    }
 
}
src/runtime/serde.rs
Show inline comments
 
@@ -16,184 +16,185 @@ pub trait De<T>: Read {
 
pub struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 

	
 
/////////////////////////////////////////
 

	
 
struct VarLenInt(u64);
 

	
 
macro_rules! ser_seq {
 
    ( $w:expr ) => {{
 
        io::Result::Ok(())
 
    }};
 
    ( $w:expr, $first:expr ) => {{
 
        $w.ser($first)
 
    }};
 
    ( $w:expr, $first:expr, $( $x:expr ),+ ) => {{
 
        $w.ser($first)?;
 
        ser_seq![$w, $( $x ),*]
 
    }};
 
}
 
/////////////////////////////////////////
 

	
 
impl<W: Write> Ser<bool> for W {
 
    fn ser(&mut self, t: &bool) -> Result<(), std::io::Error> {
 
        self.ser(&match t {
 
            true => b'T',
 
            false => b'F',
 
        })
 
    }
 
}
 
impl<R: Read> De<bool> for R {
 
    fn de(&mut self) -> Result<bool, std::io::Error> {
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            b'T' => true,
 
            b'F' => false,
 
            _ => return Err(InvalidData.into()),
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<u8> for W {
 
    fn ser(&mut self, t: &u8) -> Result<(), std::io::Error> {
 
        self.write_u8(*t)
 
    }
 
}
 
impl<R: Read> De<u8> for R {
 
    fn de(&mut self) -> Result<u8, std::io::Error> {
 
        self.read_u8()
 
    }
 
}
 

	
 
impl<W: Write> Ser<u16> for W {
 
    fn ser(&mut self, t: &u16) -> Result<(), std::io::Error> {
 
        self.write_u16::<BigEndian>(*t)
 
    }
 
}
 
impl<R: Read> De<u16> for R {
 
    fn de(&mut self) -> Result<u16, std::io::Error> {
 
        self.read_u16::<BigEndian>()
 
    }
 
}
 

	
 
impl<W: Write> Ser<u32> for W {
 
    fn ser(&mut self, t: &u32) -> Result<(), std::io::Error> {
 
        self.write_u32::<BigEndian>(*t)
 
    }
 
}
 
impl<R: Read> De<u32> for R {
 
    fn de(&mut self) -> Result<u32, std::io::Error> {
 
        self.read_u32::<BigEndian>()
 
    }
 
}
 

	
 
impl<W: Write> Ser<u64> for W {
 
    fn ser(&mut self, t: &u64) -> Result<(), std::io::Error> {
 
        self.write_u64::<BigEndian>(*t)
 
    }
 
}
 
impl<R: Read> De<u64> for R {
 
    fn de(&mut self) -> Result<u64, std::io::Error> {
 
        self.read_u64::<BigEndian>()
 
    }
 
}
 

	
 
impl<W: Write> Ser<Payload> for W {
 
    fn ser(&mut self, t: &Payload) -> Result<(), std::io::Error> {
 
        self.ser(&ZigZag(t.len() as u64))?;
 
        self.ser(&VarLenInt(t.len() as u64))?;
 
        for byte in t {
 
            self.ser(byte)?;
 
        }
 
        Ok(())
 
    }
 
}
 
impl<R: Read> De<Payload> for R {
 
    fn de(&mut self) -> Result<Payload, std::io::Error> {
 
        let ZigZag(len) = self.de()?;
 
        let VarLenInt(len) = self.de()?;
 
        let mut x = Vec::with_capacity(len as usize);
 
        for _ in 0..len {
 
            x.push(self.de()?);
 
        }
 
        Ok(x)
 
    }
 
}
 

	
 
struct ZigZag(u64);
 
impl<W: Write> Ser<ZigZag> for W {
 
    fn ser(&mut self, t: &ZigZag) -> Result<(), std::io::Error> {
 
impl<W: Write> Ser<VarLenInt> for W {
 
    fn ser(&mut self, t: &VarLenInt) -> Result<(), std::io::Error> {
 
        integer_encoding::VarIntWriter::write_varint(self, t.0).map(|_| ())
 
    }
 
}
 
impl<R: Read> De<ZigZag> for R {
 
    fn de(&mut self) -> Result<ZigZag, std::io::Error> {
 
        integer_encoding::VarIntReader::read_varint(self).map(ZigZag)
 
impl<R: Read> De<VarLenInt> for R {
 
    fn de(&mut self) -> Result<VarLenInt, std::io::Error> {
 
        integer_encoding::VarIntReader::read_varint(self).map(VarLenInt)
 
    }
 
}
 

	
 
impl<W: Write> Ser<ChannelId> for W {
 
    fn ser(&mut self, t: &ChannelId) -> Result<(), std::io::Error> {
 
        self.ser(&t.controller_id)?;
 
        self.ser(&ZigZag(t.channel_index as u64))
 
        self.ser(&VarLenInt(t.channel_index as u64))
 
    }
 
}
 
impl<R: Read> De<ChannelId> for R {
 
    fn de(&mut self) -> Result<ChannelId, std::io::Error> {
 
        Ok(ChannelId {
 
            controller_id: self.de()?,
 
            channel_index: De::<ZigZag>::de(self)?.0 as ChannelIndex,
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<bool> for W {
 
    fn ser(&mut self, t: &bool) -> Result<(), std::io::Error> {
 
        self.ser(&match t {
 
            true => b'T',
 
            false => b'F',
 
        })
 
    }
 
}
 
impl<R: Read> De<bool> for R {
 
    fn de(&mut self) -> Result<bool, std::io::Error> {
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            b'T' => true,
 
            b'F' => false,
 
            _ => return Err(InvalidData.into()),
 
            channel_index: De::<VarLenInt>::de(self)?.0 as ChannelIndex,
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<Predicate> for W {
 
    fn ser(&mut self, t: &Predicate) -> Result<(), std::io::Error> {
 
        self.ser(&ZigZag(t.assigned.len() as u64))?;
 
        self.ser(&VarLenInt(t.assigned.len() as u64))?;
 
        for (channel_id, boolean) in &t.assigned {
 
            ser_seq![self, channel_id, boolean]?;
 
        }
 
        Ok(())
 
    }
 
}
 
impl<R: Read> De<Predicate> for R {
 
    fn de(&mut self) -> Result<Predicate, std::io::Error> {
 
        let ZigZag(len) = self.de()?;
 
        let VarLenInt(len) = self.de()?;
 
        let mut assigned = BTreeMap::<ChannelId, bool>::default();
 
        for _ in 0..len {
 
            assigned.insert(self.de()?, self.de()?);
 
        }
 
        Ok(Predicate { assigned })
 
    }
 
}
 
impl<W: Write> Ser<Decision> for W {
 
    fn ser(&mut self, t: &Decision) -> Result<(), std::io::Error> {
 
        match t {
 
            Decision::Failure => self.ser(&b'F'),
 
            Decision::Success(predicate) => {
 
                self.ser(&b'S')?;
 
                self.ser(predicate)
 
            }
 
        }
 
    }
 
}
 
impl<R: Read> De<Decision> for R {
 
    fn de(&mut self) -> Result<Decision, std::io::Error> {
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            b'F' => Decision::Failure,
 
            b'S' => Decision::Success(self.de()?),
 
@@ -217,69 +218,74 @@ impl<R: Read> De<Polarity> for R {
 
            b'P' => Polarity::Putter,
 
            b'G' => Polarity::Getter,
 
            _ => return Err(InvalidData.into()),
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<EndpointInfo> for W {
 
    fn ser(&mut self, t: &EndpointInfo) -> Result<(), std::io::Error> {
 
        let EndpointInfo { channel_id, polarity } = t;
 
        ser_seq![self, channel_id, polarity]
 
    }
 
}
 
impl<R: Read> De<EndpointInfo> for R {
 
    fn de(&mut self) -> Result<EndpointInfo, std::io::Error> {
 
        Ok(EndpointInfo { channel_id: self.de()?, polarity: self.de()? })
 
    }
 
}
 

	
 
impl<W: Write> Ser<Msg> for W {
 
    fn ser(&mut self, t: &Msg) -> Result<(), std::io::Error> {
 
        use {CommMsgContents::*, SetupMsg::*};
 
        match t {
 
            Msg::SetupMsg(s) => match s {
 
                // [flag, data]
 
                ChannelSetup { info } => ser_seq![self, &0u8, info],
 
                LeaderEcho { maybe_leader } => ser_seq![self, &1u8, maybe_leader],
 
                LeaderAnnounce { leader } => ser_seq![self, &2u8, leader],
 
                YouAreMyParent => ser_seq![self, &3u8],
 
            },
 
            Msg::CommMsg(CommMsg { round_index, contents }) => {
 
                let zig = &ZigZag(*round_index as u64);
 
                // [flag, round_num, data]
 
                let varlenint = &VarLenInt(*round_index as u64);
 
                match contents {
 
                    SendPayload { payload_predicate, payload } => {
 
                        ser_seq![self, &4u8, zig, payload_predicate, payload]
 
                        ser_seq![self, &4u8, varlenint, payload_predicate, payload]
 
                    }
 
                    Elaborate { partial_oracle } => ser_seq![self, &5u8, zig, partial_oracle],
 
                    Announce { decision } => ser_seq![self, &6u8, zig, decision],
 
                    Failure => ser_seq![self, &7u8],
 
                    Elaborate { partial_oracle } => ser_seq![self, &5u8, varlenint, partial_oracle],
 
                    Announce { decision } => ser_seq![self, &6u8, varlenint, decision],
 
                    Failure => ser_seq![self, &7u8, varlenint],
 
                }
 
            }
 
        }
 
    }
 
}
 
impl<R: Read> De<Msg> for R {
 
    fn de(&mut self) -> Result<Msg, std::io::Error> {
 
        use {CommMsgContents::*, SetupMsg::*};
 
        let b: u8 = self.de()?;
 
        Ok(match b {
 
            0..=3 => Msg::SetupMsg(match b {
 
                0 => ChannelSetup { info: self.de()? },
 
                1 => LeaderEcho { maybe_leader: self.de()? },
 
                2 => LeaderAnnounce { leader: self.de()? },
 
                3 => YouAreMyParent,
 
                // [flag, data]
 
                0u8 => ChannelSetup { info: self.de()? },
 
                1u8 => LeaderEcho { maybe_leader: self.de()? },
 
                2u8 => LeaderAnnounce { leader: self.de()? },
 
                3u8 => YouAreMyParent,
 
                _ => unreachable!(),
 
            }),
 
            _ => {
 
                let ZigZag(zig) = self.de()?;
 
            4..=7 => {
 
                // [flag, round_num, data]
 
                let VarLenInt(varlenint) = self.de()?;
 
                let contents = match b {
 
                    4 => SendPayload { payload_predicate: self.de()?, payload: self.de()? },
 
                    5 => Elaborate { partial_oracle: self.de()? },
 
                    6 => Announce { decision: self.de()? },
 
                    7 => Failure,
 
                    _ => return Err(InvalidData.into()),
 
                    4u8 => SendPayload { payload_predicate: self.de()?, payload: self.de()? },
 
                    5u8 => Elaborate { partial_oracle: self.de()? },
 
                    6u8 => Announce { decision: self.de()? },
 
                    7u8 => Failure,
 
                    _ => unreachable!(),
 
                };
 
                Msg::CommMsg(CommMsg { round_index: zig as usize, contents })
 
                Msg::CommMsg(CommMsg { round_index: varlenint as usize, contents })
 
            }
 
            _ => return Err(InvalidData.into()),
 
        })
 
    }
 
}
src/test/connector.rs
Show inline comments
 
@@ -754,24 +754,79 @@ fn connector_causal_loop2() {
 
    /*
 
        /-->\     /<---\
 
    Alice   samelen-->repl
 
        \<-------------/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    // let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"samelen_repl").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_recover() {
 
    let connect_timeout = Duration::from_millis(1500);
 
    let comm_timeout = Duration::from_millis(300);
 
    let addrs = [next_addr()];
 
    fn putter_does(i: usize) -> bool {
 
        i % 3 == 0
 
    }
 
    fn getter_does(i: usize) -> bool {
 
        i % 2 == 0
 
    }
 
    fn expect_res(i: usize) -> Result<usize, SyncErr> {
 
        if putter_does(i) && getter_does(i) {
 
            Ok(0)
 
        } else {
 
            Err(SyncErr::Timeout)
 
        }
 
    }
 
    const N: usize = 11;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(connect_timeout).unwrap();
 

	
 
            for i in 0..N {
 
                if putter_does(i) {
 
                    assert_eq!(Ok(()), x.put(0, b"msg".to_vec()));
 
                }
 
                assert_eq!(expect_res(i), x.sync(comm_timeout));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(connect_timeout).unwrap();
 

	
 
            for i in 0..N {
 
                if getter_does(i) {
 
                    assert_eq!(Ok(()), x.get(0));
 
                }
 
                assert_eq!(expect_res(i), x.sync(comm_timeout));
 
                if expect_res(i).is_ok() {
 
                    assert_eq!(Ok(b"msg" as &[u8]), x.read_gotten(0));
 
                }
 
            }
 
        },
 
    ]));
 
}
0 comments (0 inline, 0 general)