diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index febb0a5125eee904e53feefb132373f9e78e0c21..993e4e64f949e0ff8efe028d9ccc4231b21c7d8d 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -133,7 +133,7 @@ impl ProtocolDescription { /// A somewhat temporary method. Can be used by components to lookup type /// definitions by their name (to have their implementation somewhat /// resistant to changes in the standard library) - pub(crate) fn find_type(&self, module_name: &[u8], type_name: &[u8]) -> Option { + pub(crate) fn find_type<'a>(&'a self, module_name: &[u8], type_name: &[u8]) -> Option> { // Lookup type definition in module let root_id = self.lookup_module_root(module_name)?; let module = &self.heap[root_id]; @@ -291,8 +291,8 @@ pub struct TypeInspector<'a> { type_table: &'a MonoType, } -impl TypeInspector { - pub fn as_union(&self) -> UnionTypeInspector { +impl<'a> TypeInspector<'a> { + pub fn as_union(&'a self) -> UnionTypeInspector<'a> { let heap = self.heap.as_union(); let type_table = self.type_table.variant.as_union(); return UnionTypeInspector{ heap, type_table }; @@ -304,7 +304,7 @@ pub struct UnionTypeInspector<'a> { type_table: &'a UnionMonomorph, } -impl UnionTypeInspector { +impl UnionTypeInspector<'_> { /// Retrieves union variant tag value. pub fn get_variant_tag_value(&self, variant_name: &[u8]) -> Option { let variant_index = self.heap.variants.iter() diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index d50a9ef1a923719d29f2ab9e348e11c9e66117a9..a18a96f7a77b3b69cd72e287478626649fb3b423 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -137,6 +137,96 @@ pub(crate) fn create_component( // Generic component messaging utilities (for sending and receiving) // ----------------------------------------------------------------------------- +/// Default handling of sending a data message. In case the port is blocked then +/// the `ExecState` will become blocked as well. Note that +/// `default_handle_control_message` will ensure that the port becomes +/// unblocked if so instructed by the receiving component. The returned +/// scheduling value must be used. +#[must_use] +pub(crate) fn default_send_data_message( + exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::Sync); + + // TODO: Handle closed ports + let port_handle = comp_ctx.get_port_handle(transmitting_port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state.is_blocked() { + // Port is blocked, so we cannot send + exec_state.set_as_blocked_put(transmitting_port_id, value); + + return CompScheduling::Sleep; + } else { + // Port is not blocked, so send to the peer + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); + + return CompScheduling::Immediate; + } +} + +pub(crate) enum IncomingData { + PlacedInSlot, + SlotFull(DataMessage), +} + +/// Default handling of receiving a data message. In case there is no room for +/// the message it is returned from this function. Note that this function is +/// different from PDL code performing a `get` on a port; this is the case where +/// the message first arrives at the component. +// NOTE: This is supposed to be a somewhat temporary implementation. It would be +// nicest if the sending component can figure out it cannot send any more data. +#[must_use] +pub(crate) fn default_handle_incoming_data_message( + exec_state: &mut CompExecState, port_value_slot: &mut Option, + comp_ctx: &mut CompCtx, incoming_message: DataMessage, + sched_ctx: &SchedulerCtx, control: &mut ControlLayer +) -> IncomingData { + let target_port_id = incoming_message.data_header.target_port; + + if port_value_slot.is_none() { + // We can put the value in the slot + *port_value_slot = Some(incoming_message); + + // Check if we're blocked on receiving this message. + dbg_code!({ + // Our port cannot have been blocked itself, because we're able to + // directly insert the message into its slot. + let port_handle = comp_ctx.get_port_handle(target_port_id); + assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); + }); + + if exec_state.is_blocked_on_get(target_port_id) { + // Return to normal operation + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + debug_assert!(exec_state.mode_value.values.is_empty()); + } + + return IncomingData::PlacedInSlot + } else { + // Slot is already full, so if the port was previously opened, it will + // now become closed + let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future + + if port_info.state == PortState::Open { + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); + let (peer_handle, message) = + control.initiate_port_blocking(comp_ctx, port_handle); + let peer = comp_ctx.get_peer(peer_handle); + peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } + + return IncomingData::SlotFull(incoming_message) + } +} + /// Handles control messages in the default way. Note that this function may /// take a lot of actions in the name of the caller: pending messages may be /// sent, ports may become blocked/unblocked, etc. So the execution @@ -222,6 +312,7 @@ pub(crate) fn default_handle_control_message( /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by /// checking and modifying the mode in the execution state). +#[must_use] pub(crate) fn default_handle_start_exit( exec_state: &mut CompExecState, control: &mut ControlLayer, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx @@ -254,6 +345,7 @@ pub(crate) fn default_handle_start_exit( /// Handles a component waiting until all peers are notified that it is quitting /// (i.e. after calling `default_handle_start_exit`). +#[must_use] pub(crate) fn default_handle_busy_exit( exec_state: &mut CompExecState, control: &ControlLayer, sched_ctx: &SchedulerCtx @@ -271,6 +363,7 @@ pub(crate) fn default_handle_busy_exit( /// Handles a potential synchronous round decision. If there was a decision then /// the `Some(success)` value indicates whether the round succeeded or not. +/// Might also end up changing the `ExecState`. pub(crate) fn default_handle_sync_decision( exec_state: &mut CompExecState, decision: SyncRoundDecision, consensus: &mut Consensus diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 64cf8f35d58c40467a2a7d157570f43f64a5f2b4..2873ce68b0775c786311f1ab12314c9fcb512d95 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -26,17 +26,20 @@ impl SocketState { /// States from the point of view of the component that is connecting to this /// TCP component (i.e. from the point of view of attempting to interface with /// a socket). +#[derive(PartialEq, Debug)] enum SyncState { AwaitingCmd, Getting, - Putting + Putting, + FinishSync, } pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, sync_state: SyncState, - pending_recv: Vec, // on the input port + inbox_main: Option, + inbox_backup: Vec, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket input_union_send_tag_value: i64, @@ -54,7 +57,8 @@ impl Component for ComponentTcpClient { fn on_creation(&mut self, sched_ctx: &SchedulerCtx) { let pd = &sched_ctx.runtime.protocol; let cmd_type = pd.find_type(b"std.internet", b"Cmd") - .expect("'Cmd' type in the 'std.internet' module") + .expect("'Cmd' type in the 'std.internet' module"); + let cmd_type = cmd_type .as_union(); self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); @@ -63,13 +67,17 @@ impl Component for ComponentTcpClient { } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { - self.handle_incoming_data_message(message); + if self.inbox_main.is_none() { + self.inbox_main = Some(message); + } else { + self.inbox_backup.push(message); + } } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { - match mesage { + match message { Message::Data(message) => { - self.handle_incoming_data_message(message); + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); @@ -80,7 +88,10 @@ impl Component for ComponentTcpClient { &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); - } + }, + Message::Poll => { + sched_ctx.log("Received polling event"); + }, } } @@ -113,7 +124,7 @@ impl Component for ComponentTcpClient { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { - if let Some(message) = self.pending_recv.pop() { + if let Some(message) = self.inbox_backup.pop() { if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { // Check which command we're supposed to execute. let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); @@ -186,13 +197,11 @@ impl Component for ComponentTcpClient { self.byte_buffer.resize(BUFFER_SIZE, 0); match socket.receive(&mut self.byte_buffer) { Ok(num_received) => { - self.byte_buffer.resize(num_received); + self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); - - let port_handle = comp_ctx.get_port_handle(self.pdl_output_port_id); - let port_info = comp_ctx.get_port(port_handle); - let message = self.consensus.annotate_data_message(comp_ctx, port_info, message_content); - + let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); + self.sync_state = SyncState::FinishSync; + return Ok(scheduling); }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { @@ -203,12 +212,18 @@ impl Component for ComponentTcpClient { } } }, + SyncState::FinishSync => { + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.exec_state.mode = CompMode::SyncEnd; + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Requeue); + } } }, CompMode::BlockedGet => { // Entered when awaiting a new command debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); - if self. + return Ok(CompScheduling::Sleep); }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), @@ -252,6 +267,9 @@ impl ComponentTcpClient { return Self{ socket_state: SocketState::Connected(socket.unwrap()), + sync_state: SyncState::AwaitingCmd, + inbox_main: None, + inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, input_union_finish_tag_value: -1, @@ -260,13 +278,24 @@ impl ComponentTcpClient { exec_state: CompExecState::new(), control: ControlLayer::default(), consensus: Consensus::new(), + byte_buffer: Vec::new(), } } // Handles incoming data from the PDL side (hence, going into the socket) - fn handle_incoming_data_message(&mut self, message: DataMessage) { - // Input message is an array of bytes (u8) - self.pending_recv.push(message); + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } } fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index fcd9a8fe33a342cbaed4babdfef2d85ad7512f25..a18aaab3c7e49842356fd65651692255b7743501 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -222,6 +222,10 @@ pub(crate) struct CompPDL { } impl Component for CompPDL { + fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) { + // Intentionally empty + } + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); @@ -325,18 +329,19 @@ impl Component for CompPDL { EC::Put(port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); - let port_id = port_id_from_eval(port_id); - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - if port_info.state.is_blocked() { - self.exec_state.set_as_blocked_put(port_id, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked - return Ok(CompScheduling::Sleep); - } else { - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(CompScheduling::Immediate); - } + + // Send the message + let target_port_id = port_id_from_eval(port_id); + let scheduling = component::default_send_data_message( + &mut self.exec_state, target_port_id, value, + sched_ctx, &mut self.consensus, comp_ctx + ); + + // When `run` is called again (potentially after becoming + // unblocked) we need to instruct the executor that we performed + // the `put` + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return Ok(scheduling); }, EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); @@ -443,7 +448,7 @@ impl CompPDL { self.consensus.notify_sync_start(comp_ctx); for message in self.inbox_main.iter() { if let Some(message) = message { - self.consensus.handle_new_data_message(comp_ctx, message); + self.consensus.handle_incoming_data_message(comp_ctx, message); } } debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -513,64 +518,36 @@ impl CompPDL { // Handling messages // ------------------------------------------------------------------------- - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) { - let port_info = comp_ctx.get_port(source_port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); - } - /// Handles a message that came in through the public inbox. This function /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + use component::IncomingData; + // Whatever we do, glean information from headers in message if self.exec_state.mode.is_in_sync_block() { - self.consensus.handle_new_data_message(comp_ctx, &message); + self.consensus.handle_incoming_data_message(comp_ctx, &message); } - // Check if we can insert it directly into the storage associated with - // the port - let target_port_id = message.data_header.target_port; - let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); - if self.inbox_main[port_index].is_none() { - self.inbox_main[port_index] = Some(message); - - // After direct insertion, check if this component's execution is - // blocked on receiving a message on that port - debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly - if self.exec_state.is_blocked_on_get(target_port_id) { - // We were indeed blocked - self.exec_state.mode = CompMode::Sync; - self.exec_state.mode_port = PortId::new_invalid(); - } else if self.exec_state.mode == CompMode::BlockedSelect { - let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); - if let SelectDecision::Case(case_index) = select_decision { - self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.exec_state.mode = CompMode::Sync; + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, + sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => { + if self.exec_state.mode == CompMode::BlockedSelect { + let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.exec_state.mode = CompMode::Sync; + } } + }, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); } - - return; } - - // The direct inbox is full, so the port will become (or was already) blocked - let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); - - if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - let (peer_handle, message) = - self.control.initiate_port_blocking(comp_ctx, port_handle); - - let peer = comp_ctx.get_peer(peer_handle); - peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); - } - - // But we still need to remember the message, so: - self.inbox_backup.push(message); } /// Handles when a message has been handed off from the inbox to the PDL @@ -732,6 +709,7 @@ impl CompPDL { let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); + component.component.on_creation(sched_ctx); // Now modify the creator's ports: remove every transferred port and // potentially remove the peer component. diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index 56ffe495972a8f55b68e4927b38b07a945fa01cc..2bba8b26583bd3d39d59adfd39f1b83ebb2b3857 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -27,6 +27,9 @@ pub struct ComponentRandomU32 { } impl Component for ComponentRandomU32 { + fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) { + } + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { // Impossible since this component does not have any input ports in its // signature. @@ -90,24 +93,10 @@ impl Component for ComponentRandomU32 { random += self.random_minimum; let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); - let port_handle = comp_ctx.get_port_handle(self.output_port_id); - let port_info = comp_ctx.get_port(port_handle); - - let scheduling = if port_info.state.is_blocked() { - // Need to wait until we can send the message - self.exec_state.set_as_blocked_put(self.output_port_id, value_group); - - CompScheduling::Sleep - } else { - let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(message), true); - - // Remain in sync mode, but after `did_perform_send` was - // set to true. - CompScheduling::Immediate - }; + let scheduling = component::default_send_data_message( + &mut self.exec_state, self.output_port_id, value_group, + sched_ctx, &mut self.consensus, comp_ctx + ); // Blocked or not, we set `did_perform_send` to true. If // blocked then the moment we become unblocked (and are back diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 72e7c475fc94b9f82e2f4de4da0d1cc4cc6cafb8..f9f3ab3f1df69fe514ced318d8eb388adec96ea9 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -378,7 +378,10 @@ impl Consensus { /// Handles the arrival of a new data message (needs to be called for every /// new data message, even though it might not end up being received). This /// is used to determine peers of `get`ter ports. - pub(crate) fn handle_new_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { + // TODO: The use of this function is rather ugly. Find a more robust + // scheme about owners of `get`ter ports not knowing about their peers. + // (also, figure out why this was written again, I forgot). + pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); let target_index = comp_ctx.get_port_index(target_handle); let annotation = &mut self.ports[target_index];