Changeset - 175721d796d7
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-06-24 16:17:11
christopher.esterhuyse@gmail.com
Included more fields in session optimization
8 files changed with 99 insertions and 73 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -131,63 +131,58 @@ impl From<&[u8]> for Payload {
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(Arc::new(v))
 
    }
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        Arc::make_mut(&mut self.0) as _
 
    }
 
    pub fn concat_with(&mut self, other: &Self) {
 
        let bytes = other.as_slice().iter().copied();
 
        let me = Arc::make_mut(&mut self.0);
 
        me.extend(bytes);
 
    }
 
}
 
impl serde::Serialize for Payload {
 
    fn serialize<S>(
 
        &self,
 
        serializer: S,
 
    ) -> std::result::Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &Vec<u8> = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for Payload {
 
    fn deserialize<D>(
 
        deserializer: D,
 
    ) -> std::result::Result<Self, <D as serde::Deserializer<'de>>::Error>
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: Vec<u8> = Vec::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 
impl std::iter::FromIterator<u8> for Payload {
 
    fn from_iter<I: IntoIterator<Item = u8>>(it: I) -> Self {
 
        Self(Arc::new(it.into_iter().collect()))
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "PID<{},{}>", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for FiringVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
src/protocol/ast.rs
Show inline comments
 
@@ -384,49 +384,49 @@ impl VariableExpressionId {
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize, serde::Deserialize)]
 
pub struct DeclarationId(Id<Declaration>);
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize, serde::Deserialize)]
 
pub struct DefinedDeclarationId(DeclarationId);
 

	
 
impl DefinedDeclarationId {
 
    pub fn upcast(self) -> DeclarationId {
 
        self.0
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize, serde::Deserialize)]
 
pub struct ImportedDeclarationId(DeclarationId);
 

	
 
impl ImportedDeclarationId {
 
    pub fn upcast(self) -> DeclarationId {
 
        self.0
 
    }
 
}
 

	
 
#[derive(serde::Serialize, serde::Deserialize)]
 
#[derive(Debug, serde::Serialize, serde::Deserialize)]
 
pub struct Heap {
 
    // Phase 0: allocation
 
    protocol_descriptions: Arena<Root>,
 
    pragmas: Arena<Pragma>,
 
    imports: Arena<Import>,
 
    identifiers: Arena<Identifier>,
 
    type_annotations: Arena<TypeAnnotation>,
 
    variables: Arena<Variable>,
 
    definitions: Arena<Definition>,
 
    statements: Arena<Statement>,
 
    expressions: Arena<Expression>,
 
    declarations: Arena<Declaration>,
 
}
 

	
 
impl Heap {
 
    pub fn new() -> Heap {
 
        Heap {
 
            protocol_descriptions: Arena::new(),
 
            pragmas: Arena::new(),
 
            imports: Arena::new(),
 
            identifiers: Arena::new(),
 
            type_annotations: Arena::new(),
 
            variables: Arena::new(),
 
            definitions: Arena::new(),
src/protocol/eval.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::fmt;
 
use std::fmt::{Debug, Display, Formatter};
 
use std::{i16, i32, i64, i8};
 

	
 
use crate::common::*;
 

	
 
use crate::protocol::ast::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 
// use crate::protocol::inputsource::*;
 
// use crate::protocol::parser::*;
 
use crate::protocol::EvalContext;
 

	
 
const MAX_RECURSION: usize = 1024;
 

	
 
const BYTE_MIN: i64 = i8::MIN as i64;
 
const BYTE_MAX: i64 = i8::MAX as i64;
 
const SHORT_MIN: i64 = i16::MIN as i64;
 
const SHORT_MAX: i64 = i16::MAX as i64;
 
const INT_MIN: i64 = i32::MIN as i64;
 
const INT_MAX: i64 = i32::MAX as i64;
 

	
 
const MESSAGE_MAX_LENGTH: i64 = SHORT_MAX;
 

	
 
const ONE: Value = Value::Byte(ByteValue(1));
 

	
 
trait ValueImpl {
 
    fn exact_type(&self) -> Type;
 
    fn is_type_compatible(&self, t: &Type) -> bool;
 
}
 

	
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub enum Value {
 
    Input(InputValue),
 
    Output(OutputValue),
 
@@ -63,49 +63,49 @@ impl Value {
 
                }
 
            }
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn from_constant(constant: &Constant) -> Value {
 
        match constant {
 
            Constant::Null => Value::Message(MessageValue(None)),
 
            Constant::True => Value::Boolean(BooleanValue(true)),
 
            Constant::False => Value::Boolean(BooleanValue(false)),
 
            Constant::Integer(data) => {
 
                // Convert raw ASCII data to UTF-8 string
 
                let raw = String::from_utf8_lossy(data);
 
                let val = raw.parse::<i64>().unwrap();
 
                if val >= BYTE_MIN && val <= BYTE_MAX {
 
                    Value::Byte(ByteValue(val as i8))
 
                } else if val >= SHORT_MIN && val <= SHORT_MAX {
 
                    Value::Short(ShortValue(val as i16))
 
                } else if val >= INT_MIN && val <= INT_MAX {
 
                    Value::Int(IntValue(val as i32))
 
                } else {
 
                    Value::Long(LongValue(val))
 
                }
 
            }
 
            Constant::Character(data) => unimplemented!(),
 
            Constant::Character(_data) => unimplemented!(),
 
        }
 
    }
 
    fn set(&mut self, index: &Value, value: &Value) -> Option<Value> {
 
        // The index must be of integer type, and non-negative
 
        let the_index: usize;
 
        match index {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let index = i64::from(index);
 
                if index < 0 || index >= MESSAGE_MAX_LENGTH {
 
                    // It is inconsistent to update out of bounds
 
                    return None;
 
                }
 
                the_index = index.try_into().unwrap();
 
            }
 
            _ => unreachable!(),
 
        }
 
        // The subject must be either a message or an array
 
        // And the value and the subject must be compatible
 
        match (self, value) {
 
            (Value::Message(MessageValue(None)), _) => {
 
                // It is inconsistent to update the null message
 
                None
 
            }
 
            (Value::Message(MessageValue(Some(payload))), Value::Byte(ByteValue(b))) => {
 
@@ -1757,70 +1757,71 @@ impl Prompt {
 
                    let value = self.store.eval(h, ctx, arg)?;
 
                    args.push(value);
 
                }
 
                self.position = stmt.next;
 
                Err(EvalContinuation::NewComponent(expr.declaration.unwrap(), args))
 
            }
 
            Statement::Put(stmt) => {
 
                // Evaluate port and message
 
                let port = self.store.eval(h, ctx, stmt.port)?;
 
                let message = self.store.eval(h, ctx, stmt.message)?;
 
                // Continue to next statement
 
                self.position = stmt.next;
 
                // Signal the put upwards
 
                Err(EvalContinuation::Put(port, message))
 
            }
 
            Statement::Expression(stmt) => {
 
                // Evaluate expression
 
                let value = self.store.eval(h, ctx, stmt.expression)?;
 
                // Continue to next statement
 
                self.position = stmt.next;
 
                Err(EvalContinuation::Stepping)
 
            }
 
        }
 
    }
 
    fn compute_function(h: &Heap, fun: FunctionId, args: &Vec<Value>) -> Option<Value> {
 
        let mut prompt = Self::new(h, fun.upcast(), args);
 
        let mut context = EvalContext::None;
 
        loop {
 
            let result = prompt.step(h, &mut context);
 
            match result {
 
                Ok(val) => return Some(val),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return None,
 
                    // Functions never terminate without returning
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // Functions never encounter any blocking behavior
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(_, _) => unreachable!(),
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    fn compute_function(_h: &Heap, _fun: FunctionId, _args: &Vec<Value>) -> Option<Value> {
 
        todo!()
 
        // let mut prompt = Self::new(h, fun.upcast(), args);
 
        // let mut context = EvalContext::None;
 
        // loop {
 
        //     let result = prompt.step(h, &mut context);
 
        //     match result {
 
        //         Ok(val) => return Some(val),
 
        //         Err(cont) => match cont {
 
        //             EvalContinuation::Stepping => continue,
 
        //             EvalContinuation::Inconsistent => return None,
 
        //             // Functions never terminate without returning
 
        //             EvalContinuation::Terminal => unreachable!(),
 
        //             // Functions never encounter any blocking behavior
 
        //             EvalContinuation::SyncBlockStart => unreachable!(),
 
        //             EvalContinuation::SyncBlockEnd => unreachable!(),
 
        //             EvalContinuation::NewComponent(_, _) => unreachable!(),
 
        //             EvalContinuation::BlockFires(val) => unreachable!(),
 
        //             EvalContinuation::BlockGet(val) => unreachable!(),
 
        //             EvalContinuation::Put(port, msg) => unreachable!(),
 
        //         },
 
        //     }
 
        // }
 
    }
 
}
 

	
 
// #[cfg(test)]
 
// mod tests {
 
//     extern crate test_generator;
 

	
 
//     use std::fs::File;
 
//     use std::io::Read;
 
//     use std::path::Path;
 
//     use test_generator::test_resources;
 

	
 
//     use super::*;
 

	
 
//     #[test_resources("testdata/eval/positive/*.pdl")]
 
//     fn batch1(resource: &str) {
 
//         let path = Path::new(resource);
 
//         let expect = path.with_extension("txt");
 
//         let mut heap = Heap::new();
 
//         let mut source = InputSource::from_file(&path).unwrap();
 
//         let mut parser = Parser::new(&mut source);
 
//         let pd = parser.parse(&mut heap).unwrap();
 
//         let def = heap[pd].get_definition_ident(&heap, b"test").unwrap();
 
//         let fun = heap[def].as_function().this;
src/protocol/inputsource.rs
Show inline comments
 
use std::fmt;
 
use std::fs::File;
 
use std::io;
 
use std::path::Path;
 

	
 
use backtrace::Backtrace;
 

	
 
#[derive(Clone, serde::Serialize, serde::Deserialize)]
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct InputSource {
 
    filename: String,
 
    input: Vec<u8>,
 
    line: usize,
 
    column: usize,
 
    offset: usize,
 
}
 

	
 
static STD_LIB_PDL: &'static [u8] = b"
 
primitive forward(in i, out o) {
 
    while(true) synchronous() put(o, get(i));
 
}
 
primitive sync(in i, out o) {
 
    while(true) synchronous() if(fires(i)) put(o, get(i));
 
}
 
primitive alternator_2(in i, out l, out r) {
 
    while(true) {
 
        synchronous() put(l, get(i));
 
        synchronous() put(r, get(i));
 
    }
 
}
 
primitive replicator_2(in i, out l, out r) {
 
    while(true) synchronous() if(fires(i)) {
 
        msg m = get(i);
src/protocol/mod.rs
Show inline comments
 
@@ -4,55 +4,55 @@ mod eval;
 
pub mod inputsource;
 
mod lexer;
 
mod library;
 
mod parser;
 

	
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 

	
 
#[derive(serde::Serialize, serde::Deserialize)]
 
pub struct ProtocolDescription {
 
    heap: Heap,
 
    source: InputSource,
 
    root: RootId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ComponentState {
 
    prompt: Prompt,
 
}
 
pub enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    None,
 
    // None,
 
}
 
//////////////////////////////////////////////
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "Protocol")
 
        write!(f, "(A big honkin' protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        let mut heap = Heap::new();
 
        let mut source = InputSource::from_buffer(buffer).unwrap();
 
        let mut parser = Parser::new(&mut source);
 
        match parser.parse(&mut heap) {
 
            Ok(root) => {
 
                return Ok(ProtocolDescription { heap, source, root });
 
            }
 
            Err(err) => {
 
                let mut vec: Vec<u8> = Vec::new();
 
                err.write(&source, &mut vec).unwrap();
 
                Err(String::from_utf8_lossy(&vec).to_string())
 
            }
 
        }
 
    }
 
    pub fn component_polarities(
 
        &self,
 
        identifier: &[u8],
 
    ) -> Result<Vec<Polarity>, AddComponentError> {
 
        use AddComponentError::*;
 
        let h = &self.heap;
 
@@ -196,89 +196,89 @@ impl ComponentState {
 
                            _ => unreachable!(),
 
                        }
 
                        let payload;
 
                        match message {
 
                            Value::Message(MessageValue(None)) => {
 
                                // Putting a null message is inconsistent
 
                                return SyncBlocker::Inconsistent;
 
                            }
 
                            Value::Message(MessageValue(Some(buffer))) => {
 
                                // Create a copy of the payload
 
                                payload = buffer;
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return SyncBlocker::PutMsg(value, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
 
impl EvalContext<'_> {
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
 
    //         EvalContext::None => unreachable!(),
 
    //         // EvalContext::None => unreachable!(),
 
    //         EvalContext::Nonsync(_context) => todo!(),
 
    //         EvalContext::Sync(_) => unreachable!(),
 
    //     }
 
    // }
 
    fn new_component(&mut self, args: &[Value], init_state: ComponentState) -> () {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let mut moved_ports = HashSet::new();
 
                for arg in args.iter() {
 
                    match arg {
 
                        Value::Output(OutputValue(port)) => {
 
                            moved_ports.insert(*port);
 
                        }
 
                        Value::Input(InputValue(port)) => {
 
                            moved_ports.insert(*port);
 
                        }
 
                        _ => {}
 
                    }
 
                }
 
                context.new_component(moved_ports, init_state)
 
            }
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Value; 2] {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_port_pair();
 
                let from = Value::Output(OutputValue(from));
 
                let to = Value::Input(InputValue(to));
 
                return [from, to];
 
            }
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from),
 
                Value::Input(InputValue(port)) => context.is_firing(port).map(Value::from),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => {
 
                    context.read_msg(port).map(Value::receive_message)
 
                }
 
                Value::Input(InputValue(port)) => {
 
                    context.read_msg(port).map(Value::receive_message)
 
                }
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -174,100 +174,98 @@ impl Connector {
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            cu.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // NOTE: all msgs in outbox are of form (Getter, Payload)
 
        let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
        // create the solution storage
 
        let mut solution_storage = {
 
            let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
            let c = cu
 
                .proto_components
 
                .keys()
 
                .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
            let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
            let c =
 
                cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
            let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
            SolutionStorage::new(n.chain(c).chain(e))
 
        };
 
        log!(cu.logger, "Solution storage initialized");
 

	
 
        // 2. kick off the native
 
        log!(
 
            cu.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        for (index, NativeBatch { to_get, to_put }) in comm.native_batches.drain(..).enumerate() {
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.insert(var, true);
 
                }
 
                // assign falses
 
                for &port in cu.native_ports.iter() {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.entry(var).or_insert(false);
 
                }
 
                predicate
 
            };
 
            log!(cu.logger, "Native branch {} has pred {:?}", index, &predicate);
 

	
 
            // put all messages
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                // rely on invariant: sync batches respect port polarity
 
                let getter = *cu.port_info.peers.get(&putter).unwrap();
 
                payloads_to_get.push((getter, msg));
 
            }
 
            if to_get.is_empty() {
 
                log!(
 
                    cu.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(LocalComponentId::Native),
 
                    Route::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                // TODO
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 
        comm.native_batches.push(Default::default());
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
@@ -277,56 +275,56 @@ impl Connector {
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            );
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        let decision = 'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                match cu.port_info.routes.get(&getter).unwrap() {
 
                    Route::Endpoint { index } => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to(*index, &msg).unwrap();
 
                    }
 
                    Route::LocalComponent(LocalComponentId::Native) => branching_native.feed_msg(
 
                    Route::LocalComponent(ComponentId::Native) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        send_payload_msg,
 
                    ),
 
                    Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                    Route::LocalComponent(ComponentId::Proto(proto_component_id)) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                send_payload_msg,
 
                            )
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // check if we have a solution yet
 
            log!(cu.logger, "Check if we have any local decisions...");
 
            for solution in solution_storage.iter_new_local_make_old() {
 
                log!(cu.logger, "New local decision with solution {:?}...", &solution);
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
@@ -514,49 +512,49 @@ impl Connector {
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
@@ -632,49 +630,49 @@ impl BranchingProtoComponent {
 
                };
 
                let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
                log!(
 
                    cu.logger,
 
                    "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                    proto_component_id,
 
                    &predicate,
 
                    &blocker,
 
                );
 
                use SyncBlocker as B;
 
                match blocker {
 
                    B::Inconsistent => {
 
                        // branch is inconsistent. throw it away
 
                        drop((predicate, branch));
 
                    }
 
                    B::SyncBlockEnd => {
 
                        // make concrete all variables
 
                        for &port in ports.iter() {
 
                            let var = cu.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        // submit solution for this component
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            &mut *cu.logger,
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)),
 
                            Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                            predicate.clone(),
 
                        );
 
                        // move to "blocked"
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntReadMsg(port) => {
 
                        // move to "blocked"
 
                        assert!(!branch.inbox.contains_key(&port));
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntCheckFiring(port) => {
 
                        // sanity check
 
                        let var = cu.port_info.firing_var_for(port);
 
                        assert!(predicate.query(var).is_none());
 
                        // keep forks in "unblocked"
 
                        drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                        drainer.add_input(predicate.inserted(var, true), branch);
 
                    }
 
                    B::PutMsg(putter, payload) => {
 
                        // sanity check
 
                        assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                        // overwrite assignment
 
                        let var = cu.port_info.firing_var_for(putter);
 
                        let was = predicate.assigned.insert(var, true);
 
@@ -899,66 +897,64 @@ impl SyncProtoContext<'_> {
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        assert!(self.proto_component_ports.is_subset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
            self.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.proto_component_ports.insert(o);
 
        self.proto_component_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id));
 
        let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id));
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl ProtoComponentBranch {
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let was = self.inbox.insert(getter, payload);
 
        assert!(was.is_none())
 
    }
 
}
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
mod endpoints;
 
pub mod error;
 
mod logging;
 
mod setup;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(Debug)]
 
pub struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Debug)]
 
pub struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum Route {
 
    LocalComponent(LocalComponentId),
 
    LocalComponent(ComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SessionInfo {}
 
pub struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
pub enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub sock_addr: SocketAddr,
 
    pub endpoint_polarity: EndpointPolarity,
 
}
 
#[derive(Debug)]
 
pub struct EndpointExt {
 
    endpoint: Endpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
@@ -117,49 +122,49 @@ pub struct Neighborhood {
 
}
 
#[derive(Debug)]
 
pub struct MemInMsg {
 
    inp: PortId,
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
pub struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
pub struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    polled_undrained: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Debug, Default)]
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
pub struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    mem_inbox: Vec<MemInMsg>,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
@@ -240,81 +245,79 @@ impl IdManager {
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()];
 
        cu.native_ports.insert(o);
 
        cu.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.port_info.polarities.insert(o, Putter);
 
        cu.port_info.polarities.insert(i, Getter);
 
        cu.port_info.peers.insert(o, i);
 
        cu.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        cu.port_info.routes.insert(o, route);
 
        cu.port_info.routes.insert(i, route);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        // 1. check if this is OK
 
        let cu = &mut self.unphased;
 
        let polarities = cu.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !cu.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != *cu.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = cu.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            cu.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
            cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        cu.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        cu.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
                state: cu.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
@@ -413,24 +416,43 @@ impl Predicate {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct MySet<'a>(&'a Predicate, bool);
 
        impl Debug for MySet<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                let iter = self.0.assigned.iter().filter_map(|(port, &firing)| {
 
                    if firing == self.1 {
 
                        Some(port)
 
                    } else {
 
                        None
 
                    }
 
                });
 
                f.debug_set().entries(iter).finish()
 
            }
 
        }
 
        f.debug_struct("Predicate")
 
            .field("Trues", &MySet(self, true))
 
            .field("Falses", &MySet(self, false))
 
            .finish()
 
    }
 
}
 

	
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &ProtocolDescription = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -15,49 +15,49 @@ impl Connector {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                id_manager: IdManager::new(connector_id),
 
                native_ports: Default::default(),
 
                port_info: Default::default(),
 
            },
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, ()> {
 
        let Self { unphased: up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity };
 
                let p = up.id_manager.new_port_id();
 
                up.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                up.port_info.polarities.insert(p, polarity);
 
                up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    up.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
                    &endpoint_setup
 
                );
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError::*;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
@@ -522,97 +522,109 @@ fn session_optimize(
 
            | msg @ S(LeaderAnnounce { .. })
 
            | msg @ S(LeaderWave { .. }) => {
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(SessionScatter { .. }) => {
 
                log!(
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
                );
 
                return Err(SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {};
 
    let my_session_info = SessionInfo {
 
        port_info: cu.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
    };
 
    unoptimized_map.insert(cu.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(SessionGather { .. })
 
                | msg @ S(YouAreMyParent)
 
                | msg @ S(MyPortInfo(..))
 
                | msg @ S(LeaderAnnounce { .. })
 
                | msg @ S(LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(unoptimized_map)?
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info =
 
        optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimization complete");
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 
    _cu: &mut ConnectorUnphased,
 
    cu: &mut ConnectorUnphased,
 
    _comm: &mut ConnectorCommunication,
 
    _session_info: SessionInfo,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo { proto_components, port_info, serde_proto_description } = session_info;
 
    cu.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    Ok(())
 
}
0 comments (0 inline, 0 general)