diff --git a/src/common.rs b/src/common.rs index a712288779b10a1995443ee22c188860f279c50f..76c59b4852f60d051f2211e5a497da41b415ae8e 100644 --- a/src/common.rs +++ b/src/common.rs @@ -152,10 +152,7 @@ impl Payload { } } impl serde::Serialize for Payload { - fn serialize( - &self, - serializer: S, - ) -> std::result::Result<::Ok, ::Error> + fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { @@ -164,9 +161,7 @@ impl serde::Serialize for Payload { } } impl<'de> serde::Deserialize<'de> for Payload { - fn deserialize( - deserializer: D, - ) -> std::result::Result>::Error> + fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index 6b94de786baa4d6499717d8cb2949e119babfe62..560ffe71caf14fc10a9c50fbb723d3b565fb0f25 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -405,7 +405,7 @@ impl ImportedDeclarationId { } } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Heap { // Phase 0: allocation protocol_descriptions: Arena, diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index 9fb2531c1176c94b65cc4ce1f89d27cf966d585f..06b0c137d25ca05383402156ffbd8f00b0543e45 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -6,8 +6,8 @@ use std::{i16, i32, i64, i8}; use crate::common::*; use crate::protocol::ast::*; -use crate::protocol::inputsource::*; -use crate::protocol::parser::*; +// use crate::protocol::inputsource::*; +// use crate::protocol::parser::*; use crate::protocol::EvalContext; const MAX_RECURSION: usize = 1024; @@ -84,7 +84,7 @@ impl Value { Value::Long(LongValue(val)) } } - Constant::Character(data) => unimplemented!(), + Constant::Character(_data) => unimplemented!(), } } fn set(&mut self, index: &Value, value: &Value) -> Option { @@ -1778,28 +1778,29 @@ impl Prompt { } } } - fn compute_function(h: &Heap, fun: FunctionId, args: &Vec) -> Option { - let mut prompt = Self::new(h, fun.upcast(), args); - let mut context = EvalContext::None; - loop { - let result = prompt.step(h, &mut context); - match result { - Ok(val) => return Some(val), - Err(cont) => match cont { - EvalContinuation::Stepping => continue, - EvalContinuation::Inconsistent => return None, - // Functions never terminate without returning - EvalContinuation::Terminal => unreachable!(), - // Functions never encounter any blocking behavior - EvalContinuation::SyncBlockStart => unreachable!(), - EvalContinuation::SyncBlockEnd => unreachable!(), - EvalContinuation::NewComponent(_, _) => unreachable!(), - EvalContinuation::BlockFires(val) => unreachable!(), - EvalContinuation::BlockGet(val) => unreachable!(), - EvalContinuation::Put(port, msg) => unreachable!(), - }, - } - } + fn compute_function(_h: &Heap, _fun: FunctionId, _args: &Vec) -> Option { + todo!() + // let mut prompt = Self::new(h, fun.upcast(), args); + // let mut context = EvalContext::None; + // loop { + // let result = prompt.step(h, &mut context); + // match result { + // Ok(val) => return Some(val), + // Err(cont) => match cont { + // EvalContinuation::Stepping => continue, + // EvalContinuation::Inconsistent => return None, + // // Functions never terminate without returning + // EvalContinuation::Terminal => unreachable!(), + // // Functions never encounter any blocking behavior + // EvalContinuation::SyncBlockStart => unreachable!(), + // EvalContinuation::SyncBlockEnd => unreachable!(), + // EvalContinuation::NewComponent(_, _) => unreachable!(), + // EvalContinuation::BlockFires(val) => unreachable!(), + // EvalContinuation::BlockGet(val) => unreachable!(), + // EvalContinuation::Put(port, msg) => unreachable!(), + // }, + // } + // } } } diff --git a/src/protocol/inputsource.rs b/src/protocol/inputsource.rs index 654371b0d3f8704d8763ccf41fa2289d7cf2c214..362a7bb49a7ff89cfa1c6ab7783f5d6716da0f47 100644 --- a/src/protocol/inputsource.rs +++ b/src/protocol/inputsource.rs @@ -5,7 +5,7 @@ use std::path::Path; use backtrace::Backtrace; -#[derive(Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct InputSource { filename: String, input: Vec, diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 8f9465dcf03b75edb18c65fb83525bb3a61721cb..b87ab6815f6f15684065fb1c3b351579b5e13528 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -25,13 +25,13 @@ pub struct ComponentState { pub enum EvalContext<'a> { Nonsync(&'a mut NonsyncProtoContext<'a>), Sync(&'a mut SyncProtoContext<'a>), - None, + // None, } ////////////////////////////////////////////// impl std::fmt::Debug for ProtocolDescription { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "Protocol") + write!(f, "(A big honkin' protocol description)") } } impl ProtocolDescription { @@ -217,14 +217,14 @@ impl ComponentState { impl EvalContext<'_> { // fn random(&mut self) -> LongValue { // match self { - // EvalContext::None => unreachable!(), + // // EvalContext::None => unreachable!(), // EvalContext::Nonsync(_context) => todo!(), // EvalContext::Sync(_) => unreachable!(), // } // } fn new_component(&mut self, args: &[Value], init_state: ComponentState) -> () { match self { - EvalContext::None => unreachable!(), + // EvalContext::None => unreachable!(), EvalContext::Nonsync(context) => { let mut moved_ports = HashSet::new(); for arg in args.iter() { @@ -245,7 +245,7 @@ impl EvalContext<'_> { } fn new_channel(&mut self) -> [Value; 2] { match self { - EvalContext::None => unreachable!(), + // EvalContext::None => unreachable!(), EvalContext::Nonsync(context) => { let [from, to] = context.new_port_pair(); let from = Value::Output(OutputValue(from)); @@ -257,7 +257,7 @@ impl EvalContext<'_> { } fn fires(&mut self, port: Value) -> Option { match self { - EvalContext::None => unreachable!(), + // EvalContext::None => unreachable!(), EvalContext::Nonsync(_) => unreachable!(), EvalContext::Sync(context) => match port { Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from), @@ -268,7 +268,7 @@ impl EvalContext<'_> { } fn get(&mut self, port: Value) -> Option { match self { - EvalContext::None => unreachable!(), + // EvalContext::None => unreachable!(), EvalContext::Nonsync(_) => unreachable!(), EvalContext::Sync(context) => match port { Value::Output(OutputValue(port)) => { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index e7db616c1ac626a4471c7c0ba5a446b86ad02755..87fe7756d1a7f8d1ee6370396576792e7c77b974 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -195,11 +195,9 @@ impl Connector { // create the solution storage let mut solution_storage = { - let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); - let c = cu - .proto_components - .keys() - .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); + let n = std::iter::once(Route::LocalComponent(ComponentId::Native)); + let c = + cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id))); let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); SolutionStorage::new(n.chain(c).chain(e)) }; @@ -246,7 +244,7 @@ impl Connector { ); solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, - Route::LocalComponent(LocalComponentId::Native), + Route::LocalComponent(ComponentId::Native), predicate.clone(), ); } @@ -298,14 +296,14 @@ impl Connector { }); comm.endpoint_manager.send_to(*index, &msg).unwrap(); } - Route::LocalComponent(LocalComponentId::Native) => branching_native.feed_msg( + Route::LocalComponent(ComponentId::Native) => branching_native.feed_msg( cu, &mut solution_storage, // &mut Pay getter, send_payload_msg, ), - Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { + Route::LocalComponent(ComponentId::Proto(proto_component_id)) => { if let Some(branching_component) = branching_proto_components.get_mut(proto_component_id) { @@ -535,7 +533,7 @@ impl BranchingNative { assert!(was.is_none()); branch.to_get.remove(&getter); if branch.to_get.is_empty() { - let route = Route::LocalComponent(LocalComponentId::Native); + let route = Route::LocalComponent(ComponentId::Native); solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, route, @@ -653,7 +651,7 @@ impl BranchingProtoComponent { // submit solution for this component solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, - Route::LocalComponent(LocalComponentId::Proto(proto_component_id)), + Route::LocalComponent(ComponentId::Proto(proto_component_id)), predicate.clone(), ); // move to "blocked" @@ -920,9 +918,7 @@ impl NonsyncProtoContext<'_> { let new_id = self.id_manager.new_proto_component_id(); for port in moved_ports.iter() { self.proto_component_ports.remove(port); - self.port_info - .routes - .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); + self.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); } // 3. create a new component self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); @@ -937,7 +933,7 @@ impl NonsyncProtoContext<'_> { self.port_info.polarities.insert(i, Getter); self.port_info.peers.insert(o, i); self.port_info.peers.insert(i, o); - let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id)); + let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id)); self.port_info.routes.insert(o, route); self.port_info.routes.insert(i, route); log!( diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index c1acf7cc7637f20dce218a1a06d9bf0ed1e60447..3a098dcca503e27a53d98cbb55a213603094be10 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -20,14 +20,14 @@ pub struct VecSet { // invariant: ordered, deduplicated vec: Vec, } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -pub enum LocalComponentId { +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +pub enum ComponentId { Native, Proto(ProtoComponentId), } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub enum Route { - LocalComponent(LocalComponentId), + LocalComponent(ComponentId), Endpoint { index: usize }, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -54,9 +54,14 @@ pub enum SetupMsg { SessionGather { unoptimized_map: HashMap }, SessionScatter { optimized_map: HashMap }, } - +#[derive(Debug, Clone)] +pub(crate) struct SerdeProtocolDescription(Arc); #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct SessionInfo {} +pub struct SessionInfo { + serde_proto_description: SerdeProtocolDescription, + port_info: PortInfo, + proto_components: HashMap, +} #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct CommMsg { @@ -86,7 +91,7 @@ pub struct Endpoint { inbox: Vec, stream: TcpStream, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ProtoComponent { state: ComponentState, ports: HashSet, @@ -138,7 +143,7 @@ pub struct EndpointManager { undelayed_messages: Vec<(usize, Msg)>, endpoint_exts: Vec, } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct PortInfo { polarities: HashMap, peers: HashMap, @@ -261,7 +266,7 @@ impl Connector { cu.port_info.polarities.insert(i, Getter); cu.port_info.peers.insert(o, i); cu.port_info.peers.insert(i, o); - let route = Route::LocalComponent(LocalComponentId::Native); + let route = Route::LocalComponent(ComponentId::Native); cu.port_info.routes.insert(o, route); cu.port_info.routes.insert(i, route); log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); @@ -291,9 +296,7 @@ impl Connector { // 3. remove ports from old component & update port->route let new_id = cu.id_manager.new_proto_component_id(); for port in ports.iter() { - cu.port_info - .routes - .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); + cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); } cu.native_ports.retain(|port| !ports.contains(port)); // 4. add new component @@ -434,3 +437,22 @@ impl Debug for Predicate { .finish() } } + +impl serde::Serialize for SerdeProtocolDescription { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let inner: &ProtocolDescription = &self.0; + inner.serialize(serializer) + } +} +impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?; + Ok(Self(Arc::new(inner))) + } +} diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 97f9673898fe7bb6cc82e6499ec79bf3b55817e4..d28d7576b823ad095337b0449bd89d14789a00ba 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -36,7 +36,7 @@ impl Connector { up.native_ports.insert(p); // {polarity, route} known. {peer} unknown. up.port_info.polarities.insert(p, polarity); - up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); + up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native)); log!( up.logger, "Added net port {:?} with polarity {:?} and endpoint setup {:?} ", @@ -543,7 +543,11 @@ fn session_optimize( "Gathered all children's maps. ConnectorId set is... {:?}", unoptimized_map.keys() ); - let my_session_info = SessionInfo {}; + let my_session_info = SessionInfo { + port_info: cu.port_info.clone(), + proto_components: cu.proto_components.clone(), + serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), + }; unoptimized_map.insert(cu.id_manager.connector_id, my_session_info); log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); @@ -586,7 +590,7 @@ fn session_optimize( } else { // by computing it myself log!(cu.logger, "I am the leader! I will optimize this session"); - leader_session_map_optimize(unoptimized_map)? + leader_session_map_optimize(&mut *cu.logger, unoptimized_map)? }; log!( cu.logger, @@ -594,6 +598,7 @@ fn session_optimize( &optimized_map, comm.neighborhood.children.iter() ); + log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); let optimized_info = optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); let msg = S(SessionScatter { optimized_map }); @@ -601,18 +606,25 @@ fn session_optimize( comm.endpoint_manager.send_to_setup(child, &msg)?; } apply_optimizations(cu, comm, optimized_info)?; - log!(cu.logger, "Session optimization complete"); + log!(cu.logger, "Session optimizations applied"); Ok(()) } fn leader_session_map_optimize( + logger: &mut dyn Logger, unoptimized_map: HashMap, ) -> Result, ConnectError> { + log!(logger, "Session map optimize START"); + log!(logger, "Session map optimize END"); Ok(unoptimized_map) } fn apply_optimizations( - _cu: &mut ConnectorUnphased, + cu: &mut ConnectorUnphased, _comm: &mut ConnectorCommunication, - _session_info: SessionInfo, + session_info: SessionInfo, ) -> Result<(), ConnectError> { + let SessionInfo { proto_components, port_info, serde_proto_description } = session_info; + cu.port_info = port_info; + cu.proto_components = proto_components; + cu.proto_description = serde_proto_description.0; Ok(()) }