Changeset - 175721d796d7
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-06-24 16:17:11
christopher.esterhuyse@gmail.com
Included more fields in session optimization
8 files changed with 99 insertions and 73 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -152,10 +152,7 @@ impl Payload {
 
    }
 
}
 
impl serde::Serialize for Payload {
 
    fn serialize<S>(
 
        &self,
 
        serializer: S,
 
    ) -> std::result::Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
@@ -164,9 +161,7 @@ impl serde::Serialize for Payload {
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for Payload {
 
    fn deserialize<D>(
 
        deserializer: D,
 
    ) -> std::result::Result<Self, <D as serde::Deserializer<'de>>::Error>
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
src/protocol/ast.rs
Show inline comments
 
@@ -405,7 +405,7 @@ impl ImportedDeclarationId {
 
    }
 
}
 

	
 
#[derive(serde::Serialize, serde::Deserialize)]
 
#[derive(Debug, serde::Serialize, serde::Deserialize)]
 
pub struct Heap {
 
    // Phase 0: allocation
 
    protocol_descriptions: Arena<Root>,
src/protocol/eval.rs
Show inline comments
 
@@ -6,8 +6,8 @@ use std::{i16, i32, i64, i8};
 
use crate::common::*;
 

	
 
use crate::protocol::ast::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 
// use crate::protocol::inputsource::*;
 
// use crate::protocol::parser::*;
 
use crate::protocol::EvalContext;
 

	
 
const MAX_RECURSION: usize = 1024;
 
@@ -84,7 +84,7 @@ impl Value {
 
                    Value::Long(LongValue(val))
 
                }
 
            }
 
            Constant::Character(data) => unimplemented!(),
 
            Constant::Character(_data) => unimplemented!(),
 
        }
 
    }
 
    fn set(&mut self, index: &Value, value: &Value) -> Option<Value> {
 
@@ -1778,28 +1778,29 @@ impl Prompt {
 
            }
 
        }
 
    }
 
    fn compute_function(h: &Heap, fun: FunctionId, args: &Vec<Value>) -> Option<Value> {
 
        let mut prompt = Self::new(h, fun.upcast(), args);
 
        let mut context = EvalContext::None;
 
        loop {
 
            let result = prompt.step(h, &mut context);
 
            match result {
 
                Ok(val) => return Some(val),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return None,
 
                    // Functions never terminate without returning
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // Functions never encounter any blocking behavior
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(_, _) => unreachable!(),
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    fn compute_function(_h: &Heap, _fun: FunctionId, _args: &Vec<Value>) -> Option<Value> {
 
        todo!()
 
        // let mut prompt = Self::new(h, fun.upcast(), args);
 
        // let mut context = EvalContext::None;
 
        // loop {
 
        //     let result = prompt.step(h, &mut context);
 
        //     match result {
 
        //         Ok(val) => return Some(val),
 
        //         Err(cont) => match cont {
 
        //             EvalContinuation::Stepping => continue,
 
        //             EvalContinuation::Inconsistent => return None,
 
        //             // Functions never terminate without returning
 
        //             EvalContinuation::Terminal => unreachable!(),
 
        //             // Functions never encounter any blocking behavior
 
        //             EvalContinuation::SyncBlockStart => unreachable!(),
 
        //             EvalContinuation::SyncBlockEnd => unreachable!(),
 
        //             EvalContinuation::NewComponent(_, _) => unreachable!(),
 
        //             EvalContinuation::BlockFires(val) => unreachable!(),
 
        //             EvalContinuation::BlockGet(val) => unreachable!(),
 
        //             EvalContinuation::Put(port, msg) => unreachable!(),
 
        //         },
 
        //     }
 
        // }
 
    }
 
}
 

	
src/protocol/inputsource.rs
Show inline comments
 
@@ -5,7 +5,7 @@ use std::path::Path;
 

	
 
use backtrace::Backtrace;
 

	
 
#[derive(Clone, serde::Serialize, serde::Deserialize)]
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct InputSource {
 
    filename: String,
 
    input: Vec<u8>,
src/protocol/mod.rs
Show inline comments
 
@@ -25,13 +25,13 @@ pub struct ComponentState {
 
pub enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    None,
 
    // None,
 
}
 
//////////////////////////////////////////////
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "Protocol")
 
        write!(f, "(A big honkin' protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
 
@@ -217,14 +217,14 @@ impl ComponentState {
 
impl EvalContext<'_> {
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
 
    //         EvalContext::None => unreachable!(),
 
    //         // EvalContext::None => unreachable!(),
 
    //         EvalContext::Nonsync(_context) => todo!(),
 
    //         EvalContext::Sync(_) => unreachable!(),
 
    //     }
 
    // }
 
    fn new_component(&mut self, args: &[Value], init_state: ComponentState) -> () {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let mut moved_ports = HashSet::new();
 
                for arg in args.iter() {
 
@@ -245,7 +245,7 @@ impl EvalContext<'_> {
 
    }
 
    fn new_channel(&mut self) -> [Value; 2] {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_port_pair();
 
                let from = Value::Output(OutputValue(from));
 
@@ -257,7 +257,7 @@ impl EvalContext<'_> {
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from),
 
@@ -268,7 +268,7 @@ impl EvalContext<'_> {
 
    }
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            // EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => {
src/runtime/communication.rs
Show inline comments
 
@@ -195,11 +195,9 @@ impl Connector {
 

	
 
        // create the solution storage
 
        let mut solution_storage = {
 
            let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
            let c = cu
 
                .proto_components
 
                .keys()
 
                .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
            let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
            let c =
 
                cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
            let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
            SolutionStorage::new(n.chain(c).chain(e))
 
        };
 
@@ -246,7 +244,7 @@ impl Connector {
 
                );
 
                solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(LocalComponentId::Native),
 
                    Route::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
@@ -298,14 +296,14 @@ impl Connector {
 
                        });
 
                        comm.endpoint_manager.send_to(*index, &msg).unwrap();
 
                    }
 
                    Route::LocalComponent(LocalComponentId::Native) => branching_native.feed_msg(
 
                    Route::LocalComponent(ComponentId::Native) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        send_payload_msg,
 
                    ),
 
                    Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                    Route::LocalComponent(ComponentId::Proto(proto_component_id)) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
@@ -535,7 +533,7 @@ impl BranchingNative {
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
@@ -653,7 +651,7 @@ impl BranchingProtoComponent {
 
                        // submit solution for this component
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            &mut *cu.logger,
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)),
 
                            Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                            predicate.clone(),
 
                        );
 
                        // move to "blocked"
 
@@ -920,9 +918,7 @@ impl NonsyncProtoContext<'_> {
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
            self.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
@@ -937,7 +933,7 @@ impl NonsyncProtoContext<'_> {
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id));
 
        let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id));
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(
src/runtime/mod.rs
Show inline comments
 
@@ -20,14 +20,14 @@ pub struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum Route {
 
    LocalComponent(LocalComponentId),
 
    LocalComponent(ComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
@@ -54,9 +54,14 @@ pub enum SetupMsg {
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SessionInfo {}
 
pub struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct CommMsg {
 
@@ -86,7 +91,7 @@ pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
@@ -138,7 +143,7 @@ pub struct EndpointManager {
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Debug, Default)]
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
pub struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
@@ -261,7 +266,7 @@ impl Connector {
 
        cu.port_info.polarities.insert(i, Getter);
 
        cu.port_info.peers.insert(o, i);
 
        cu.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        cu.port_info.routes.insert(o, route);
 
        cu.port_info.routes.insert(i, route);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
@@ -291,9 +296,7 @@ impl Connector {
 
        // 3. remove ports from old component & update port->route
 
        let new_id = cu.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            cu.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
            cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        cu.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
@@ -434,3 +437,22 @@ impl Debug for Predicate {
 
            .finish()
 
    }
 
}
 

	
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &ProtocolDescription = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -36,7 +36,7 @@ impl Connector {
 
                up.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                up.port_info.polarities.insert(p, polarity);
 
                up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    up.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
@@ -543,7 +543,11 @@ fn session_optimize(
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {};
 
    let my_session_info = SessionInfo {
 
        port_info: cu.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
    };
 
    unoptimized_map.insert(cu.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
@@ -586,7 +590,7 @@ fn session_optimize(
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(unoptimized_map)?
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
@@ -594,6 +598,7 @@ fn session_optimize(
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info =
 
        optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(SessionScatter { optimized_map });
 
@@ -601,18 +606,25 @@ fn session_optimize(
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimization complete");
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 
    _cu: &mut ConnectorUnphased,
 
    cu: &mut ConnectorUnphased,
 
    _comm: &mut ConnectorCommunication,
 
    _session_info: SessionInfo,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo { proto_components, port_info, serde_proto_description } = session_info;
 
    cu.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    Ok(())
 
}
0 comments (0 inline, 0 general)