diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs new file mode 100644 index 0000000000000000000000000000000000000000..f4ffd3baaebf388e8895263ae7c009750667e422 --- /dev/null +++ b/src/runtime/connector.rs @@ -0,0 +1,175 @@ +use crate::common::*; +use crate::runtime::{errors::*, *}; + +pub fn random_controller_id() -> ControllerId { + type Bytes8 = [u8; std::mem::size_of::()]; + let mut bytes = Bytes8::default(); + getrandom::getrandom(&mut bytes).unwrap(); + unsafe { std::mem::transmute::(bytes) } +} + +impl Default for Unconfigured { + fn default() -> Self { + let controller_id = random_controller_id(); + Self { controller_id } + } +} +impl Default for Connector { + fn default() -> Self { + Self::Unconfigured(Unconfigured::default()) + } +} +impl Connector { + /// Configure the Connector with the given Pdl description. + pub fn configure(&mut self, pdl: &[u8]) -> Result<(), ConfigErr> { + use ConfigErr::*; + let controller_id = match self { + Connector::Configured(_) => return Err(AlreadyConfigured), + Connector::Connected(_) => return Err(AlreadyConnected), + Connector::Unconfigured(Unconfigured { controller_id }) => *controller_id, + }; + let protocol_description = Arc::new(ProtocolD::parse(pdl).map_err(ParseErr)?); + let proto_maybe_bindings = protocol_description + .main_interface_polarities() + .into_iter() + .zip(std::iter::repeat(None)) + .collect(); + let configured = Configured { controller_id, protocol_description, proto_maybe_bindings }; + *self = Connector::Configured(configured); + Ok(()) + } + + /// Bind the (configured) connector's port corresponding to the + pub fn bind_port( + &mut self, + proto_port_index: usize, + binding: PortBinding, + ) -> Result<(), PortBindErr> { + use PortBindErr::*; + match self { + Connector::Unconfigured { .. } => Err(NotConfigured), + Connector::Connected(_) => Err(AlreadyConnected), + Connector::Configured(configured) => { + match configured.proto_maybe_bindings.get_mut(proto_port_index) { + None => Err(IndexOutOfBounds), + Some((_polarity, Some(_))) => Err(PortAlreadyBound), + Some((_polarity, x @ None)) => { + *x = Some(binding); + Ok(()) + } + } + } + } + } + pub fn connect(&mut self, timeout: Duration) -> Result<(), ConnectErr> { + let deadline = Instant::now() + timeout; + use ConnectErr::*; + let configured = match self { + Connector::Unconfigured { .. } => return Err(NotConfigured), + Connector::Connected(_) => return Err(AlreadyConnected), + Connector::Configured(configured) => configured, + }; + // 1. Unwrap bindings or err + let bound_proto_interface: Vec<(_, _)> = configured + .proto_maybe_bindings + .iter() + .copied() + .enumerate() + .map(|(native_index, (polarity, maybe_binding))| { + Ok((maybe_binding.ok_or(PortNotBound { native_index })?, polarity)) + }) + .collect::, ConnectErr>>()?; + let (controller, native_interface) = Controller::connect( + configured.controller_id, + configured.protocol_description.clone(), + &bound_proto_interface[..], + deadline, + )?; + *self = Connector::Connected(Connected { + native_interface, + sync_batches: vec![Default::default()], + controller, + }); + Ok(()) + } + + pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> { + use PortOpErr::*; + let connected = match self { + Connector::Connected(connected) => connected, + _ => return Err(NotConnected), + }; + let (ekey, native_polarity) = + *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; + if native_polarity != Putter { + return Err(WrongPolarity); + } + let sync_batch = connected.sync_batches.iter_mut().last().unwrap(); + if sync_batch.puts.contains_key(&ekey) { + return Err(DuplicateOperation); + } + sync_batch.puts.insert(ekey, payload); + Ok(()) + } + + pub fn get(&mut self, native_port_index: usize) -> Result<(), PortOpErr> { + use PortOpErr::*; + let connected = match self { + Connector::Connected(connected) => connected, + _ => return Err(NotConnected), + }; + let (ekey, native_polarity) = + *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; + if native_polarity != Getter { + return Err(WrongPolarity); + } + let sync_batch = connected.sync_batches.iter_mut().last().unwrap(); + if sync_batch.gets.contains(&ekey) { + return Err(DuplicateOperation); + } + sync_batch.gets.insert(ekey); + Ok(()) + } + pub fn next_batch(&mut self) -> Result { + let connected = match self { + Connector::Connected(connected) => connected, + _ => return Err(()), + }; + connected.sync_batches.push(SyncBatch::default()); + Ok(connected.sync_batches.len() - 1) + } + + pub fn sync(&mut self, timeout: Duration) -> Result { + let deadline = Instant::now() + timeout; + use SyncErr::*; + let connected = match self { + Connector::Connected(connected) => connected, + _ => return Err(NotConnected), + }; + + // do the synchronous round! + connected.controller.sync_round(deadline, Some(connected.sync_batches.drain(..)))?; + connected.sync_batches.push(SyncBatch::default()); + + let mono_n = connected.controller.inner.mono_n.as_mut().unwrap(); + let result = mono_n.result.as_mut().unwrap(); + Ok(result.0) + } + + pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> { + use ReadGottenErr::*; + let connected = match self { + Connector::Connected(connected) => connected, + _ => return Err(NotConnected), + }; + let &(key, polarity) = + connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; + if polarity != Getter { + return Err(WrongPolarity); + } + let mono_n = connected.controller.inner.mono_n.as_ref().expect("controller has no mono_n?"); + let result = mono_n.result.as_ref().ok_or(NoPreviousRound)?; + let payload = result.1.get(&key).ok_or(DidntGet)?; + Ok(payload) + } +}