Changeset - 869d51fc1127
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-07-02 14:37:05
christopher.esterhuyse@gmail.com
refactored predicate methods to be more intuitive
6 files changed with 128 insertions and 132 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -35,6 +35,7 @@ lazy_static = "1.4.0"
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
 
default = ["ffi", "session_optimization"]
 
ffi = [] # no feature dependencies
 
endpoint_logging = [] # see src/macros where a conditional check include endpoint logging
 
\ No newline at end of file
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
src/common.rs
Show inline comments
 
@@ -41,7 +41,7 @@ pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
}
 
#[derive(Debug, Default)]
 
#[derive(Clone, Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 
@@ -68,11 +68,6 @@ pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
 
}
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub(crate) struct FiringVar(pub(crate) PortId);
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) enum NonsyncBlocker {
 
    Inconsistent,
 
@@ -165,11 +160,6 @@ impl Debug for PortId {
 
        write!(f, "ptID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for FiringVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "fvID({}'{})", (self.0).0.connector_id, (self.0).0.u32_suffix)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix)
src/runtime/communication.rs
Show inline comments
 
@@ -156,6 +156,8 @@ impl Connector {
 
            comm.round_index
 
        );
 

	
 
        let mut spec_var_stream = cu.id_manager.new_spec_var_stream();
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
@@ -224,23 +226,27 @@ impl Connector {
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = spec_var_stream.next();
 
        log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for (index, NativeBatch { to_get, to_put }) in
 
            comm.native_batches.drain(..).enumerate()
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                let mut predicate =
 
                    Predicate::default().inserted(native_branch_spec_var, branch_spec_val);
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.insert(var, true);
 
                    let var = cu.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for silent ports
 
                for &port in cu.native_ports.difference(&firing_ports) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    if let Some(true) = predicate.assigned.insert(var, false) {
 
                    let var = cu.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
@@ -269,8 +275,9 @@ impl Connector {
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            if let Some(_existing) = branching_native.branches.insert(predicate, branch) {
 
                unreachable!()
 
                // return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        // restore the invariant
 
@@ -645,7 +652,7 @@ impl BranchingNative {
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.firing_var_for(getter);
 
            let var = cu.port_info.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
@@ -665,7 +672,7 @@ impl BranchingNative {
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
@@ -675,9 +682,9 @@ impl BranchingNative {
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
            use AllMapperResult as Amr;
 
            match predicate.all_mapper(&send_payload_msg.predicate) {
 
                Amr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
@@ -686,13 +693,13 @@ impl BranchingNative {
 
                    );
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                Amr::Equivalent | Amr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                Amr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
@@ -706,7 +713,7 @@ impl BranchingNative {
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                Amr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
@@ -729,7 +736,7 @@ impl BranchingNative {
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) {
 
            if branch.to_get.is_empty() && solution_predicate.consistent_with(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
@@ -775,8 +782,8 @@ impl BranchingProtoComponent {
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.firing_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(false);
 
                        let var = cu.port_info.spec_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                    }
 
                    // submit solution for this component
 
                    solution_storage.submit_and_digest_subtree_solution(
 
@@ -795,19 +802,19 @@ impl BranchingProtoComponent {
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.firing_var_for(port);
 
                    let var = cu.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, true), branch);
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.firing_var_for(putter);
 
                    let was = predicate.assigned.insert(var, true);
 
                    if was == Some(false) {
 
                    let var = cu.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
@@ -851,21 +858,21 @@ impl BranchingProtoComponent {
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            use AllMapperResult as Amr;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
            match predicate.all_mapper(&send_payload_msg.predicate) {
 
                Amr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                Amr::Equivalent | Amr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                Amr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
@@ -874,7 +881,7 @@ impl BranchingProtoComponent {
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                Amr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
@@ -904,7 +911,7 @@ impl BranchingProtoComponent {
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.satisfies(solution_predicate) {
 
            if branch.ended && solution_predicate.consistent_with(&branch_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
@@ -1034,8 +1041,8 @@ impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> {
 
}
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.port_info.firing_var_for(port);
 
        self.predicate.query(var)
 
        let var = self.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
src/runtime/mod.rs
Show inline comments
 
@@ -45,6 +45,15 @@ pub(crate) struct SyncProtoContext<'a> {
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 

	
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVar(PortId);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVal(u16);
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
@@ -115,7 +124,7 @@ struct SendPayloadMsg {
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
enum CommonSatResult {
 
enum AllMapperResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
@@ -153,6 +162,11 @@ struct IdManager {
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
@@ -194,7 +208,7 @@ enum ConnectorPhased {
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<FiringVar, bool>,
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
@@ -232,13 +246,21 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        SpecVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
@@ -247,6 +269,12 @@ impl IdManager {
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        SpecVarStream {
 
            connector_id: self.connector_id,
 
            port_suffix_stream: self.port_suffix_stream.clone(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
@@ -338,28 +366,19 @@ impl Connector {
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
    pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
    pub fn consistent_with(&self, other: &Self) -> bool {
 
        let [larger, smaller] =
 
            if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] };
 

	
 
        for (var, val) in smaller.assigned.iter() {
 
            match larger.assigned.get(var) {
 
                Some(val2) if val2 != val => return false,
 
                _ => {}
 
            }
 
        }
 
        true
 
@@ -371,8 +390,8 @@ impl Predicate {
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult as Csr;
 
    fn all_mapper(&self, other: &Self) -> AllMapperResult {
 
        use AllMapperResult as Amr;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
@@ -403,7 +422,7 @@ impl Predicate {
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Csr::Nonexistant;
 
                        return Amr::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
@@ -414,9 +433,9 @@ impl Predicate {
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Csr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Csr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Csr::LatterNotFormer, // ... equivalent to other.
 
            [true, true] => Amr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Amr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Amr::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
@@ -424,7 +443,7 @@ impl Predicate {
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Csr::New(new)
 
                Amr::New(new)
 
            }
 
        }
 
    }
 
@@ -438,7 +457,7 @@ impl Predicate {
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, var: FiringVar) -> Option<bool> {
 
    pub fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
@@ -449,23 +468,7 @@ impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct MySet<'a>(&'a Predicate, bool);
 
        impl Debug for MySet<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                let iter = self.0.assigned.iter().filter_map(|(port, &firing)| {
 
                    if firing == self.1 {
 
                        Some(port)
 
                    } else {
 
                        None
 
                    }
 
                });
 
                f.debug_set().entries(iter).finish()
 
            }
 
        }
 
        f.debug_struct("Predicate")
 
            .field("Trues", &MySet(self, true))
 
            .field("Falses", &MySet(self, false))
 
            .finish()
 
        f.debug_tuple("Predicate").field(&self.assigned).finish()
 
    }
 
}
 
impl serde::Serialize for SerdeProtocolDescription {
 
@@ -486,40 +489,24 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 

	
 
#[test]
 
fn bincode_serde() {
 
    let mut b = Vec::with_capacity(64);
 
    use bincode::config::Options;
 
    let opt = bincode::config::DefaultOptions::default();
 
    opt.serialize_into(&mut b, &Decision::Failure).unwrap();
 
    println!("failure  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(&mut b, &CommMsgContents::Suggest { suggestion: Decision::Failure })
 
        .unwrap();
 
    println!("decision {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        },
 
    )
 
    .unwrap();
 
    println!("commmsg  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &Msg::CommMsg(CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        }),
 
    )
 
    .unwrap();
 
    println!("msg      {:x?}", b);
 
    b.clear();
 
impl Debug for SpecVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_tuple("vrID").field(&self.0).finish()
 
    }
 
}
 
impl SpecVal {
 
    const FIRING: Self = SpecVal(1);
 
    const SILENT: Self = SpecVal(0);
 
    fn is_firing(self) -> bool {
 
        self == Self::FIRING
 
        // all else treated as SILENT
 
    }
 
    fn iter_domain() -> impl Iterator<Item = Self> {
 
        (0..).map(SpecVal)
 
    }
 
}
 
impl Debug for SpecVal {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        self.0.fmt(f)
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -87,7 +87,9 @@ impl Connector {
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                session_optimize(cu, &mut comm, deadline)?;
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
src/runtime/tests.rs
Show inline comments
 
@@ -577,3 +577,12 @@ fn together() {
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_batch_distinguish() {
 
    let test_log_path = Path::new("./logs/native_batch_distinguish");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.next_batch().unwrap();
 
    c.sync(Some(Duration::from_secs(3))).unwrap();
 
}
0 comments (0 inline, 0 general)