Changeset - 175721d796d7
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-06-24 16:17:11
christopher.esterhuyse@gmail.com
Included more fields in session optimization
8 files changed with 99 insertions and 73 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -149,27 +149,22 @@ impl Payload {
 
        let bytes = other.as_slice().iter().copied();
 
        let me = Arc::make_mut(&mut self.0);
 
        me.extend(bytes);
 
    }
 
}
 
impl serde::Serialize for Payload {
 
    fn serialize<S>(
 
        &self,
 
        serializer: S,
 
    ) -> std::result::Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &Vec<u8> = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for Payload {
 
    fn deserialize<D>(
 
        deserializer: D,
 
    ) -> std::result::Result<Self, <D as serde::Deserializer<'de>>::Error>
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: Vec<u8> = Vec::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
src/protocol/ast.rs
Show inline comments
 
@@ -402,13 +402,13 @@ pub struct ImportedDeclarationId(DeclarationId);
 
impl ImportedDeclarationId {
 
    pub fn upcast(self) -> DeclarationId {
 
        self.0
 
    }
 
}
 

	
 
#[derive(serde::Serialize, serde::Deserialize)]
 
#[derive(Debug, serde::Serialize, serde::Deserialize)]
 
pub struct Heap {
 
    // Phase 0: allocation
 
    protocol_descriptions: Arena<Root>,
 
    pragmas: Arena<Pragma>,
 
    imports: Arena<Import>,
 
    identifiers: Arena<Identifier>,
src/protocol/eval.rs
Show inline comments
 
@@ -3,14 +3,14 @@ use std::fmt;
 
use std::fmt::{Debug, Display, Formatter};
 
use std::{i16, i32, i64, i8};
 

	
 
use crate::common::*;
 

	
 
use crate::protocol::ast::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 
// use crate::protocol::inputsource::*;
 
// use crate::protocol::parser::*;
 
use crate::protocol::EvalContext;
 

	
 
const MAX_RECURSION: usize = 1024;
 

	
 
const BYTE_MIN: i64 = i8::MIN as i64;
 
const BYTE_MAX: i64 = i8::MAX as i64;
 
@@ -81,13 +81,13 @@ impl Value {
 
                } else if val >= INT_MIN && val <= INT_MAX {
 
                    Value::Int(IntValue(val as i32))
 
                } else {
 
                    Value::Long(LongValue(val))
 
                }
 
            }
 
            Constant::Character(data) => unimplemented!(),
 
            Constant::Character(_data) => unimplemented!(),
 
        }
 
    }
 
    fn set(&mut self, index: &Value, value: &Value) -> Option<Value> {
 
        // The index must be of integer type, and non-negative
 
        let the_index: usize;
 
        match index {
 
@@ -1775,34 +1775,35 @@ impl Prompt {
 
                // Continue to next statement
 
                self.position = stmt.next;
 
                Err(EvalContinuation::Stepping)
 
            }
 
        }
 
    }
 
    fn compute_function(h: &Heap, fun: FunctionId, args: &Vec<Value>) -> Option<Value> {
 
        let mut prompt = Self::new(h, fun.upcast(), args);
 
        let mut context = EvalContext::None;
 
        loop {
 
            let result = prompt.step(h, &mut context);
 
            match result {
 
                Ok(val) => return Some(val),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return None,
 
                    // Functions never terminate without returning
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // Functions never encounter any blocking behavior
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(_, _) => unreachable!(),
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    fn compute_function(_h: &Heap, _fun: FunctionId, _args: &Vec<Value>) -> Option<Value> {
 
        todo!()
 
        // let mut prompt = Self::new(h, fun.upcast(), args);
 
        // let mut context = EvalContext::None;
 
        // loop {
 
        //     let result = prompt.step(h, &mut context);
 
        //     match result {
 
        //         Ok(val) => return Some(val),
 
        //         Err(cont) => match cont {
 
        //             EvalContinuation::Stepping => continue,
 
        //             EvalContinuation::Inconsistent => return None,
 
        //             // Functions never terminate without returning
 
        //             EvalContinuation::Terminal => unreachable!(),
 
        //             // Functions never encounter any blocking behavior
 
        //             EvalContinuation::SyncBlockStart => unreachable!(),
 
        //             EvalContinuation::SyncBlockEnd => unreachable!(),
 
        //             EvalContinuation::NewComponent(_, _) => unreachable!(),
 
        //             EvalContinuation::BlockFires(val) => unreachable!(),
 
        //             EvalContinuation::BlockGet(val) => unreachable!(),
 
        //             EvalContinuation::Put(port, msg) => unreachable!(),
 
        //         },
 
        //     }
 
        // }
 
    }
 
}
 

	
 
// #[cfg(test)]
 
// mod tests {
 
//     extern crate test_generator;
src/protocol/inputsource.rs
Show inline comments
 
@@ -2,13 +2,13 @@ use std::fmt;
 
use std::fs::File;
 
use std::io;
 
use std::path::Path;
 

	
 
use backtrace::Backtrace;
 

	
 
#[derive(Clone, serde::Serialize, serde::Deserialize)]
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct InputSource {
 
    filename: String,
 
    input: Vec<u8>,
 
    line: usize,
 
    column: usize,
 
    offset: usize,
src/protocol/mod.rs
Show inline comments
 
@@ -22,19 +22,19 @@ pub struct ProtocolDescription {
 
pub struct ComponentState {
 
    prompt: Prompt,
 
}
 
pub enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    None,
 
    // None,
 
}
 
//////////////////////////////////////////////
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "Protocol")
 
        write!(f, "(A big honkin' protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        let mut heap = Heap::new();
 
        let mut source = InputSource::from_buffer(buffer).unwrap();
 
@@ -214,20 +214,20 @@ impl ComponentState {
 
        }
 
    }
 
}
 
impl EvalContext<'_> {
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
 
    //         EvalContext::None => unreachable!(),
 
    //         // EvalContext::None => unreachable!(),
 
    //         EvalContext::Nonsync(_context) => todo!(),
 
    //         EvalContext::Sync(_) => unreachable!(),
 
    //     }
 
    // }
 
    fn new_component(&mut self, args: &[Value], init_state: ComponentState) -> () {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let mut moved_ports = HashSet::new();
 
                for arg in args.iter() {
 
                    match arg {
 
                        Value::Output(OutputValue(port)) => {
 
                            moved_ports.insert(*port);
 
@@ -242,36 +242,36 @@ impl EvalContext<'_> {
 
            }
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Value; 2] {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_port_pair();
 
                let from = Value::Output(OutputValue(from));
 
                let to = Value::Input(InputValue(to));
 
                return [from, to];
 
            }
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from),
 
                Value::Input(InputValue(port)) => context.is_firing(port).map(Value::from),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => {
 
                    context.read_msg(port).map(Value::receive_message)
 
                }
 
                Value::Input(InputValue(port)) => {
src/runtime/communication.rs
Show inline comments
 
@@ -192,17 +192,15 @@ impl Connector {
 

	
 
        // NOTE: all msgs in outbox are of form (Getter, Payload)
 
        let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
        // create the solution storage
 
        let mut solution_storage = {
 
            let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
            let c = cu
 
                .proto_components
 
                .keys()
 
                .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
            let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
            let c =
 
                cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
            let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
            SolutionStorage::new(n.chain(c).chain(e))
 
        };
 
        log!(cu.logger, "Solution storage initialized");
 

	
 
        // 2. kick off the native
 
@@ -243,13 +241,13 @@ impl Connector {
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(LocalComponentId::Native),
 
                    Route::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                // TODO
 
@@ -295,20 +293,20 @@ impl Connector {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to(*index, &msg).unwrap();
 
                    }
 
                    Route::LocalComponent(LocalComponentId::Native) => branching_native.feed_msg(
 
                    Route::LocalComponent(ComponentId::Native) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        send_payload_msg,
 
                    ),
 
                    Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                    Route::LocalComponent(ComponentId::Proto(proto_component_id)) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
@@ -532,13 +530,13 @@ impl BranchingNative {
 
            let var = cu.port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
@@ -650,13 +648,13 @@ impl BranchingProtoComponent {
 
                            let var = cu.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        // submit solution for this component
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            &mut *cu.logger,
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)),
 
                            Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                            predicate.clone(),
 
                        );
 
                        // move to "blocked"
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntReadMsg(port) => {
 
@@ -917,15 +915,13 @@ impl NonsyncProtoContext<'_> {
 
        );
 
        assert!(self.proto_component_ports.is_subset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
            self.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
@@ -934,13 +930,13 @@ impl NonsyncProtoContext<'_> {
 
        self.proto_component_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id));
 
        let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id));
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
src/runtime/mod.rs
Show inline comments
 
@@ -17,20 +17,20 @@ pub struct RoundOk {
 
}
 
#[derive(Debug)]
 
pub struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum Route {
 
    LocalComponent(LocalComponentId),
 
    LocalComponent(ComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
@@ -51,15 +51,20 @@ pub enum SetupMsg {
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SessionInfo {}
 
pub struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
@@ -83,13 +88,13 @@ pub enum CommonSatResult {
 
    Nonexistant,
 
}
 
pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write;
 
@@ -135,13 +140,13 @@ pub struct EndpointManager {
 
    events: Events,
 
    polled_undrained: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Debug, Default)]
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
pub struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
@@ -258,13 +263,13 @@ impl Connector {
 
        cu.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.port_info.polarities.insert(o, Putter);
 
        cu.port_info.polarities.insert(i, Getter);
 
        cu.port_info.peers.insert(o, i);
 
        cu.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        cu.port_info.routes.insert(o, route);
 
        cu.port_info.routes.insert(i, route);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
@@ -288,15 +293,13 @@ impl Connector {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = cu.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            cu.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
            cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        cu.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        cu.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
@@ -431,6 +434,25 @@ impl Debug for Predicate {
 
        f.debug_struct("Predicate")
 
            .field("Trues", &MySet(self, true))
 
            .field("Falses", &MySet(self, false))
 
            .finish()
 
    }
 
}
 

	
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &ProtocolDescription = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -33,13 +33,13 @@ impl Connector {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity };
 
                let p = up.id_manager.new_port_id();
 
                up.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                up.port_info.polarities.insert(p, polarity);
 
                up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    up.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
                    &endpoint_setup
 
@@ -540,13 +540,17 @@ fn session_optimize(
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {};
 
    let my_session_info = SessionInfo {
 
        port_info: cu.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
    };
 
    unoptimized_map.insert(cu.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
@@ -583,36 +587,44 @@ fn session_optimize(
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(unoptimized_map)?
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info =
 
        optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimization complete");
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 
    _cu: &mut ConnectorUnphased,
 
    cu: &mut ConnectorUnphased,
 
    _comm: &mut ConnectorCommunication,
 
    _session_info: SessionInfo,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo { proto_components, port_info, serde_proto_description } = session_info;
 
    cu.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    Ok(())
 
}
0 comments (0 inline, 0 general)