diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 2044a5d5017d9723d998127ae9e8f312a689cd0d..f71dfb0556f1fb8627dc2c7a333ee96b53b84ddf 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -762,6 +762,68 @@ pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling // Internal messaging/state utilities // ----------------------------------------------------------------------------- +/// Sends a message without any transmitted ports. Does not check if sending +/// is actually valid. +fn send_message_without_ports( + sending_port_handle: LocalPortHandle, value: ValueGroup, + comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, +) { + let port_info = comp_ctx.get_port(sending_port_handle); + debug_assert!(port_info.state.can_send()); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); +} + +fn send_message_with_ports( + sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction, + value: ValueGroup, + exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, + control: &mut ControlLayer, consensus: &mut Consensus +) -> Result<(), (PortInstruction, String)> { + debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send + + let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); + sending_port_info.last_instruction = sending_port_instruction; + + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&value, &mut transmit_ports); + debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function + + // Set up the final Ack that triggers us to send our final message + let unblock_put_control_id = control.add_unblock_put_with_ports_entry(); + for (_, port_id) in &transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(*port_id); + let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle); + + // Note: we checked earlier that we are currently in sync mode. Now we + // will check if we've already used the port we're about to transmit. + if !transmit_port_info.last_instruction.is_none() { + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports, as it is used in this sync round") + )); + } + + if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports, as that port is already transmitted") + )); + } + + transmit_port_info.state.set(PortStateFlag::Transmitted); + + } + + let port_info = comp_ctx.get_port(sending_port_handle); + + + return Ok(()); +} + /// Handles an `Ack` for the control layer. fn default_handle_ack( control: &mut ControlLayer, control_id: ControlId, @@ -856,23 +918,27 @@ pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } +// TODO: Optimize double vec +type EncounteredPorts = Vec<(Vec, PortId)>; + /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. -pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<(ValueId, PortId)>) { +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) { // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut Vec<(ValueId, PortId)>) { + fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortId(port_id.id); - for prev_port in ports.iter() { - if *prev_port == cur_port.1 { + for prev_port in ports.iter_mut() { + if prev_port.1 == cur_port { // Already added + prev_port.0.push(value_location); return; } } - ports.push((value_location, cur_port)); + ports.push((vec![value_location], cur_port)); }, Value::Array(heap_pos) | Value::Message(heap_pos) | diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 3a72bdffe37e35420b89c72d12824a8a517d5ea1..9aab35cfd66f36787b4f538ce99d455de7475cee 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -14,6 +14,15 @@ pub enum PortInstruction { SourceLocation(ExpressionId), } +impl PortInstruction { + pub fn is_none(&self) -> bool { + match self { + PortInstruction::None => return true, + _ => return false, + } + } +} + /// Directionality of a port #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PortKind { @@ -29,7 +38,7 @@ pub enum PortStateFlag { Closed = 0x01, // If not closed, then the port is open BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked BlockedDueToFullBuffers = 0x04, - Transmitted, // Transmitted, so cannot be used anymore + Transmitted = 0x08, // Transmitted, so cannot be used anymore } #[derive(Copy, Clone)] @@ -49,6 +58,13 @@ impl PortState { return !self.is_closed(); } + #[inline] + pub fn can_send(&self) -> bool { + return + !self.is_set(PortStateFlag::Closed) && + !self.is_set(PortStateFlag::Transmitted); + } + #[inline] pub fn is_closed(&self) -> bool { return self.is_set(PortStateFlag::Closed); diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index b7ad2c40b907deac5b614da1e0cba99181993434..da6a34d2932f0cdf9a8f072b5966d0fdc47481a3 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -25,6 +25,7 @@ enum ControlContent { PeerChange(ContentPeerChange), ScheduleComponent(CompId), ClosedPort(PortId), + UnblockPutWithPorts } struct ContentPeerChange { @@ -45,6 +46,7 @@ pub(crate) enum AckAction { None, SendMessage(CompId, ControlMessage), ScheduleComponent(CompId), + UnblockPutWithPorts, } /// Handling/sending control messages. @@ -128,6 +130,9 @@ impl ControlLayer { comp_ctx.change_port_peer(sched_ctx, port_handle, None); return (AckAction::None, None); + }, + ControlContent::UnblockPutWithPorts => { + return (AckAction::UnblockPutWithPorts, None); } } } @@ -152,6 +157,18 @@ impl ControlLayer { return entry_id; } + /// Adds an entry that returns the similarly named Ack action + pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId { + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 1, // incremented by calls to `add_reroute_entry` + content: ControlContent::UnblockPutWithPorts, + }); + + return entry_id; + } + /// Removes a schedule entry. Only used if the caller preemptively called /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, /// hence the `ack_countdown` in the scheduling entry is at 0.