diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 2044a5d5017d9723d998127ae9e8f312a689cd0d..f71dfb0556f1fb8627dc2c7a333ee96b53b84ddf 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -762,6 +762,68 @@ pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling // Internal messaging/state utilities // ----------------------------------------------------------------------------- +/// Sends a message without any transmitted ports. Does not check if sending +/// is actually valid. +fn send_message_without_ports( + sending_port_handle: LocalPortHandle, value: ValueGroup, + comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, +) { + let port_info = comp_ctx.get_port(sending_port_handle); + debug_assert!(port_info.state.can_send()); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); +} + +fn send_message_with_ports( + sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction, + value: ValueGroup, + exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, + control: &mut ControlLayer, consensus: &mut Consensus +) -> Result<(), (PortInstruction, String)> { + debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send + + let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); + sending_port_info.last_instruction = sending_port_instruction; + + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&value, &mut transmit_ports); + debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function + + // Set up the final Ack that triggers us to send our final message + let unblock_put_control_id = control.add_unblock_put_with_ports_entry(); + for (_, port_id) in &transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(*port_id); + let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle); + + // Note: we checked earlier that we are currently in sync mode. Now we + // will check if we've already used the port we're about to transmit. + if !transmit_port_info.last_instruction.is_none() { + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports, as it is used in this sync round") + )); + } + + if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports, as that port is already transmitted") + )); + } + + transmit_port_info.state.set(PortStateFlag::Transmitted); + + } + + let port_info = comp_ctx.get_port(sending_port_handle); + + + return Ok(()); +} + /// Handles an `Ack` for the control layer. fn default_handle_ack( control: &mut ControlLayer, control_id: ControlId, @@ -856,23 +918,27 @@ pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } +// TODO: Optimize double vec +type EncounteredPorts = Vec<(Vec, PortId)>; + /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. -pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<(ValueId, PortId)>) { +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) { // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut Vec<(ValueId, PortId)>) { + fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortId(port_id.id); - for prev_port in ports.iter() { - if *prev_port == cur_port.1 { + for prev_port in ports.iter_mut() { + if prev_port.1 == cur_port { // Already added + prev_port.0.push(value_location); return; } } - ports.push((value_location, cur_port)); + ports.push((vec![value_location], cur_port)); }, Value::Array(heap_pos) | Value::Message(heap_pos) |