diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 299c64f158dd53f70f55f17456bcb4a1d7570d8c..d5712961e4599ebea7ed0b73bbdf3aa03a2d2c81 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,6 +1,11 @@ +/* + * Default toolkit for creating components. Contains handlers for initiating and + * responding to various events. + */ + use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; -use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; +use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; @@ -78,6 +83,10 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement + PutPortsBlockedTransferredPorts, // sending a message with ports, those sent ports are (partly) blocked + PutPortsBlockedAwaitingAcks, // sent out PPC message for blocking transferred ports, now awaiting Acks + PutPortsBlockedSendingPort, // sending a message with ports, message sent through a still-blocked port + NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 @@ -88,8 +97,11 @@ impl CompMode { use CompMode::*; match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, - NonSync | StartExit | BusyExit | Exit => false, + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | + PutPortsBlockedTransferredPorts | + PutPortsBlockedAwaitingAcks | + PutPortsBlockedSendingPort => true, + NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false, } } @@ -97,7 +109,11 @@ impl CompMode { use CompMode::*; match self { - NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false, + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | + PutPortsBlockedTransferredPorts | + PutPortsBlockedAwaitingAcks | + PutPortsBlockedSendingPort | + NewComponentBlocked => false, StartExit | BusyExit => true, Exit => false, } @@ -138,6 +154,7 @@ pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put + pub mode_component: (ProcedureDefinitionId, TypeId), pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } @@ -147,6 +164,7 @@ impl CompExecState { mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), exit_reason: ExitReason::Termination, } } @@ -162,23 +180,42 @@ impl CompExecState { debug_assert!(self.mode_value.values.is_empty()); } + pub(crate) fn set_as_create_component_blocked( + &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId, + arguments: ValueGroup + ) { + self.mode = CompMode::NewComponentBlocked; + self.mode_value = arguments; + self.mode_component = (proc_id, type_id); + } + pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedGet && self.mode_port == port; } - pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) { + pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) { self.mode = CompMode::BlockedPut; self.mode_port = port; self.mode_value = value; } - pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool { + pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) { + self.mode = CompMode::PutPortsBlockedTransferredPorts; + self.mode_port = port; + self.mode_value = value; + } + + pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedPut && self.mode_port == port; } + + pub(crate) fn is_blocked_on_create_component(&self) -> bool { + return self.mode == CompMode::NewComponentBlocked; + } } // TODO: Replace when implementing port sending. Should probably be incorporated @@ -232,7 +269,8 @@ pub(crate) fn create_component( pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, port_instruction: PortInstruction, value: ValueGroup, - sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, + control: &mut ControlLayer, comp_ctx: &mut CompCtx ) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); @@ -243,6 +281,9 @@ pub(crate) fn default_send_data_message( let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); + let mut ports = Vec::new(); + find_ports_in_value_group(&value, &mut ports); + if port_info.state.is_closed() { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) @@ -250,13 +291,20 @@ pub(crate) fn default_send_data_message( port_info.last_instruction, format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) )) + } else if !ports.is_empty() { + start_send_message_with_ports( + transmitting_port_id, port_instruction, value, exec_state, + comp_ctx, sched_ctx, control + )?; + + return Ok(CompScheduling::Sleep); } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send - exec_state.set_as_blocked_put(transmitting_port_id, value); + exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value); return Ok(CompScheduling::Sleep); } else { - // Port is not blocked, so send to the peer + // Port is not blocked and no ports to transfer: send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); @@ -337,7 +385,7 @@ pub(crate) enum GetResult { /// instruction we're attempting on this port. pub(crate) fn default_attempt_get( exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus ) -> GetResult { let port_handle = comp_ctx.get_port_handle(target_port); @@ -356,14 +404,15 @@ pub(crate) fn default_attempt_get( if let Some(message) = &inbox_main[port_index] { if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // We're allowed to receive this message - let message = inbox_main[port_index].take().unwrap(); + let mut message = inbox_main[port_index].take().unwrap(); debug_assert_eq!(target_port, message.data_header.target_port); // Note: we can still run into an unrecoverable error when actually // receiving this message match default_handle_received_data_message( - target_port, target_port_instruction, inbox_main, inbox_backup, - comp_ctx, sched_ctx, control, + target_port, target_port_instruction, + &mut message, inbox_main, inbox_backup, + comp_ctx, sched_ctx, control, consensus ) { Ok(()) => return GetResult::Received(message), Err(location_and_message) => return GetResult::Error(location_and_message) @@ -389,14 +438,70 @@ pub(crate) fn default_attempt_get( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, - comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer + targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer, + consensus: &mut Consensus ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); let port_index = comp_ctx.get_port_index(port_handle); - let slot = &mut inbox_main[port_index]; - debug_assert!(slot.is_none()); // because we've just received from it + debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it + + // If we received any ports, add them to the port tracking and inbox struct. + // Then notify the peers that they can continue sending to this port, but + // now at a new address. + for received_port in &mut message.ports { + // Transfer messages to main/backup inbox + let _new_inbox_index = inbox_main.len(); + if !received_port.messages.is_empty() { + inbox_main.push(Some(received_port.messages.remove(0))); + inbox_backup.extend(received_port.messages.drain(..)); + } else { + inbox_main.push(None); + } + + // Create a new port locally + let mut new_port_state = received_port.state; + new_port_state.set(PortStateFlag::Received); + let new_port_handle = comp_ctx.add_port( + received_port.peer_comp, received_port.peer_port, + received_port.kind, new_port_state + ); + debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle)); + comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); + let new_port = comp_ctx.get_port(new_port_handle); + + // Add the port tho the consensus + consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx); + + // Replace all references to the port in the received message + for message_location in received_port.locations.iter().copied() { + let value = message.content.get_value_mut(message_location); + + match value { + Value::Input(_) => { + debug_assert_eq!(new_port.kind, PortKind::Getter); + *value = Value::Input(port_id_to_eval(new_port.self_id)); + }, + Value::Output(_) => { + debug_assert_eq!(new_port.kind, PortKind::Putter); + *value = Value::Output(port_id_to_eval(new_port.self_id)); + }, + _ => unreachable!(), + } + } + + // Let the peer know that the port can now be used + let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ + id: ControlId::new_invalid(), + sender_comp_id: comp_ctx.id, + target_port_id: Some(new_port.peer_port_id), + content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id) + }), true); + } // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port(port_handle); @@ -410,7 +515,7 @@ pub(crate) fn default_handle_received_data_message( // One more message, place it in the slot let message = inbox_backup.remove(message_index); debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup - *slot = Some(message); + inbox_main[port_index] = Some(message); return Ok(()); } @@ -437,11 +542,12 @@ pub(crate) fn default_handle_received_data_message( /// state may all change. pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, - message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { - default_handle_ack(control, message.id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be @@ -474,7 +580,7 @@ pub(crate) fn default_handle_control_message( // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. - default_handle_ack(control, control_id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); @@ -495,7 +601,7 @@ pub(crate) fn default_handle_control_message( if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; + let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; if closed_during_sync_round || closed_before_sync_round { return Err(( @@ -519,7 +625,10 @@ pub(crate) fn default_handle_control_message( debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); - default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port( + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup + )?; }, ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to @@ -541,14 +650,16 @@ pub(crate) fn default_handle_control_message( let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); - let old_peer_id = port_info.peer_comp_id; let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_port_id = new_port_id; port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); - default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + default_handle_recently_unblocked_port( + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup + )?; } } @@ -603,6 +714,13 @@ pub(crate) fn default_handle_start_exit( sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); + for port_index in 0..comp_ctx.num_ports() { + let port_info = comp_ctx.get_port_by_index_mut(port_index); + if port_info.state.is_blocked() { + return CompScheduling::Sleep; + } + } + sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync(); @@ -617,7 +735,8 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state.is_closed() || port.close_at_sync_end { + println!("DEBUG: Considering port:\n{:?}", port); + if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } @@ -691,6 +810,7 @@ pub(crate) fn default_handle_sync_decision( if port_info.close_at_sync_end { port_info.state.set(PortStateFlag::Closed); } + port_info.state.clear(PortStateFlag::Received); } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; @@ -706,6 +826,240 @@ pub(crate) fn default_handle_sync_decision( } } + +pub(crate) fn default_start_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup +) { + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + + let mut transferred_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut transferred_ports); + + // Set execution state as waiting until we can create the component. If we + // can do so right away, then we will. + exec_state.set_as_create_component_blocked(definition_id, type_id, arguments); + if ports_not_blocked(comp_ctx, &transferred_ports) { + perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup); + } +} + +/// Actually creates a component (and assumes that the caller made sure that +/// none of the ports are involved in a blocking operation). +pub(crate) fn perform_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) { + // Small internal utilities + struct PortPair { + instantiator_id: PortId, + instantiator_handle: LocalPortHandle, + created_id: PortId, + created_handle: LocalPortHandle, + is_open: bool, + } + + // Retrieve ports from the arguments + debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); + + let (procedure_id, procedure_type_id) = exec_state.mode_component; + let mut arguments = exec_state.mode_value.take(); + let mut ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut ports); + debug_assert!(ports_not_blocked(instantiator_ctx, &ports)); + + // Reserve a location for the new component + let reservation = sched_ctx.runtime.start_create_component(); + let mut created_ctx = CompCtx::new(&reservation); + + let mut port_pairs = Vec::with_capacity(ports.len()); + + // Go over all the ports that will be transferred. Since the ports will get + // a new ID in the new component, we will take care of that here. + for (port_location, instantiator_port_id) in &ports { + // Retrieve port information from instantiator + let instantiator_port_id = *instantiator_port_id; + let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); + let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); + + // Create port at created component + let created_port_handle = created_ctx.add_port( + instantiator_port.peer_comp_id, instantiator_port.peer_port_id, + instantiator_port.kind, instantiator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); + let created_port_id = created_port.self_id; + + // Modify port ID in the arguments to the new component and store them + // for later access + let is_open = instantiator_port.state.is_open(); + port_pairs.push(PortPair{ + instantiator_id: instantiator_port_id, + instantiator_handle: instantiator_port_handle, + created_id: created_port_id, + created_handle: created_port_handle, + is_open, + }); + + for location in port_location.iter().copied() { + let value = arguments.get_value_mut(location); + match value { + Value::Input(id) => *id = port_id_to_eval(created_port_id), + Value::Output(id) => *id = port_id_to_eval(created_port_id), + _ => unreachable!(), + } + } + } + + // For each of the ports in the newly created component we set the peer to + // the correct value. We will not yet change the peer on the instantiator's + // ports (as we haven't yet stored the new component in the runtime's + // component storage) + let mut created_component_has_remote_peers = false; + for pair in port_pairs.iter() { + let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); + + if created_port_info.peer_comp_id == instantiator_ctx.id { + // The peer of the created component's port seems to be the + // instantiator. + let created_port_peer_index = port_pairs.iter() + .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); + + match created_port_peer_index { + Some(created_port_peer_index) => { + // However, the peer port is also moved to the new + // component, so the complete channel is owned by the new + // component. + let peer_pair = &port_pairs[created_port_peer_index]; + created_port_info.peer_port_id = peer_pair.created_id; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + // Peer port remains with instantiator. However, we cannot + // set the peer on the instantiator yet, because the new + // component has not yet been stored in the runtime's + // component storage. So we do this later + created_port_info.peer_comp_id = instantiator_ctx.id; + if pair.is_open { + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); + } + } + } + } else { + // Peer is a different component + if pair.is_open { + // And the port is still open, so we need to notify the peer + let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id); + let peer_info = instantiator_ctx.get_peer(peer_handle); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); + created_component_has_remote_peers = true; + } + } + } + + // Now we store the new component into the runtime's component storage using + // the reservation. + let component = create_component( + &sched_ctx.runtime.protocol, procedure_id, procedure_type_id, + arguments, port_pairs.len() + ); + let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( + reservation, component, created_ctx, false + ); + let created_ctx = &mut created_runtime_component.ctx; + let created_component = &mut created_runtime_component.component; + created_component.on_creation(created_key.downgrade(), sched_ctx); + + // We now pass along the messages that the instantiator component still has + // that belong to the new component. At the same time we'll take care of + // setting the correct peer of the instantiator component + for pair in port_pairs.iter() { + // Transferring the messages and removing the port from the + // instantiator component + let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); + instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + instantiator_ctx.remove_port(pair.instantiator_handle); + + if let Some(mut message) = inbox_main[instantiator_port_index].take() { + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == pair.instantiator_id { + // Transfer the message + let mut message = inbox_backup.remove(message_index); + message.data_header.target_port = pair.created_id; + created_component.adopt_message(created_ctx, message); + } else { + // Message does not belong to the port pair that we're + // transferring to the new component. + message_index += 1; + } + } + + // Here we take care of the case where the instantiator previously owned + // both ends of the channel, but has transferred one port to the new + // component (hence creating a channel between the instantiator + // component and the new component). + let created_port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { + // Note: the port we're receiving here belongs to the instantiator + // and is NOT in the "port_pairs" array. + let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); + let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); + instantiator_port_info.peer_port_id = created_port_info.self_id; + instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); + } + } + + // Finally: if we did move ports around whose peers are different + // components, then we'll initiate the appropriate protocol to notify them. + if created_component_has_remote_peers { + let schedule_entry_id = control.add_schedule_entry(created_ctx.id); + for pair in &port_pairs { + let port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id { + // Peer component is not the instantiator, and it is not the + // new component itself + let message = control.add_reroute_entry( + instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, + pair.instantiator_id, pair.created_id, created_ctx.id, + schedule_entry_id + ); + let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = created_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + } + } else { + // We can schedule the component immediately, we do not have to wait + // for any peers: there are none. + sched_ctx.runtime.enqueue_work(created_key); + } + + exec_state.mode = CompMode::NonSync; + exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); +} + +pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { + for (_port_locations, port_id) in ports { + let port_handle = comp_ctx.get_port_handle(*port_id); + let port_info = comp_ctx.get_port(port_handle); + + if port_info.state.is_blocked_due_to_port_change() { + return false; + } + } + + return true; +} + /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more @@ -736,13 +1090,185 @@ pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling // Internal messaging/state utilities // ----------------------------------------------------------------------------- +/// Sends a message without any transmitted ports. Does not check if sending +/// is actually valid. +fn send_message_without_ports( + sending_port_handle: LocalPortHandle, value: ValueGroup, + comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, +) { + let port_info = comp_ctx.get_port(sending_port_handle); + debug_assert!(port_info.state.can_send()); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); +} + +/// Prepares sending a message that contains ports. Only once a particular +/// protocol has completed (where we notify all the peers that the ports will +/// be transferred) will we actually send the message to the recipient. +fn start_send_message_with_ports( + sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup, + exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, + control: &mut ControlLayer +) -> Result<(), (PortInstruction, String)> { + debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send + + // Retrieve ports we're going to transfer + let sending_port_handle = comp_ctx.get_port_handle(sending_port_id); + let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); + sending_port_info.last_instruction = sending_port_instruction; + + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&value, &mut transmit_ports); + debug_assert!(!transmit_ports.is_empty()); // required from caller + + // Enter the state where we'll wait until all transferred ports are not + // blocked. + exec_state.set_as_blocked_put_with_ports(sending_port_id, value); + + if ports_not_blocked(comp_ctx, &transmit_ports) { + // Ports are not blocked, so we can send them right away. + perform_send_message_with_ports_notify_peers( + exec_state, comp_ctx, sched_ctx, control, transmit_ports + )?; + } // else: wait until they become unblocked + + return Ok(()) +} + +fn perform_send_message_with_ports_notify_peers( + exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, + control: &mut ControlLayer, transmit_ports: EncounteredPorts +) -> Result<(), (PortInstruction, String)> { + // Check we're in the correct state in debug mode + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedTransferredPorts); + debug_assert!(ports_not_blocked(comp_ctx, &transmit_ports)); + + // Set up the final Ack that triggers us to send our final message + let unblock_put_control_id = control.add_unblock_put_with_ports_entry(); + for (_, port_id) in &transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(*port_id); + let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle); + let peer_comp_id = transmit_port_info.peer_comp_id; + let peer_port_id = transmit_port_info.peer_port_id; + + + // Note: we checked earlier that we are currently in sync mode. Now we + // will check if we've already used the port we're about to transmit. + if !transmit_port_info.last_instruction.is_none() { + let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports in this message, as it is used in this sync round") + )); + } + + if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { + let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; + return Err(( + sending_port_instruction, + String::from("Cannot transmit one of the ports in this message, as that port is already transmitted") + )); + } + + // Set the flag for transmission + transmit_port_info.state.set(PortStateFlag::Transmitted); + + // Block the peer of the port + let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); + println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message); + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + + // We've set up the protocol, once all the PPC's are blocked we are supposed + // to transfer the message to the recipient. So store it temporarily + exec_state.mode = CompMode::PutPortsBlockedAwaitingAcks; + + return Ok(()); +} + +/// Performs the transmission of a data message that contains ports. These were +/// all stored in the component's execution state by the +/// `prepare_send_message_with_ports` function. Port must be ready to send! +fn perform_send_message_with_ports_to_receiver( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Result<(), (PortInstruction, String)> { + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedSendingPort); + + // Find all ports again + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports); + + // Retrieve the port over which we're going to send the message + let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let port_info = comp_ctx.get_port(port_handle); + + if !port_info.state.is_open() { + return Err(( + port_info.last_instruction, + String::from("cannot send over this port, as it is closed") + )); + } + + debug_assert!(!port_info.state.is_blocked_due_to_port_change()); // caller should have checked this + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + + // Change state back to its default + exec_state.mode = CompMode::Sync; + let message_value = exec_state.mode_value.take(); + exec_state.mode_port = PortId::new_invalid(); + + // Annotate the data message + let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value); + + // And further enhance the message by adding data about the ports that are + // being transferred + for (port_locations, transmit_port_id) in transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id); + let transmit_port_info = comp_ctx.get_port(transmit_port_handle); + + let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup); + + let mut transmit_port_state = transmit_port_info.state; + debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted)); + transmit_port_state.clear(PortStateFlag::Transmitted); + + annotated_message.ports.push(TransmittedPort{ + locations: port_locations, + messages: transmit_messages, + peer_comp: transmit_port_info.peer_comp_id, + peer_port: transmit_port_info.peer_port_id, + kind: transmit_port_info.kind, + state: transmit_port_state + }); + + comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None); + } + + // And finally, send the message to the peer + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); + + return Ok(()); +} + /// Handles an `Ack` for the control layer. fn default_handle_ack( - control: &mut ControlLayer, control_id: ControlId, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx -) { + exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Result<(), (PortInstruction, String)>{ // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; + loop { let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { @@ -765,6 +1291,22 @@ fn default_handle_ack( let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, + AckAction::UnblockPutWithPorts => { + // Send the message (containing ports) stored in the component + // execution state to the recipient + println!("DEBUG: Unblocking put with ports"); + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks); + exec_state.mode = CompMode::PutPortsBlockedSendingPort; + let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + + // Little bit of a hack, we didn't really unblock the sending + // port, but this will mesh nicely with waiting for the sending + // port to become unblocked. + default_handle_recently_unblocked_port( + exec_state, control, consensus, port_handle, sched_ctx, + comp_ctx, inbox_main, inbox_backup + )?; + }, AckAction::None => {} } @@ -773,6 +1315,8 @@ fn default_handle_ack( None => break, } } + + return Ok(()); } /// Little helper for sending the most common kind of `Ack` @@ -790,16 +1334,26 @@ fn default_send_ack( } /// Handles the unblocking of a putter port. In case there is a pending message -/// on that port then it will be sent. +/// on that port then it will be sent. There are two reasons for calling this +/// function: either a port was blocked (i.e. the Blocked state flag was +/// cleared), or the component is ready to send a message containing ports +/// (stored in the execution state). In this latter case we might still have +/// a blocked port. fn default_handle_recently_unblocked_port( - exec_state: &mut CompExecState, consensus: &mut Consensus, + exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, -) { + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Result<(), (PortInstruction, String)> { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller - if exec_state.is_blocked_on_put(port_id) { + if port_info.state.is_blocked() { + // Port is still blocked. We wait until the next control message where + // we unblock the port. + return Ok(()); + } + + if exec_state.is_blocked_on_put_without_ports(port_id) { // Annotate the message that we're going to send let port_info = comp_ctx.get_port(port_handle); // for immutable access debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -811,9 +1365,36 @@ fn default_handle_recently_unblocked_port( let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true); - exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. + // Return to the regular execution mode + exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); + } else if exec_state.mode == CompMode::PutPortsBlockedTransferredPorts { + // We are waiting until all of the transferred ports become unblocked, + // check so here. + let mut transfer_ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut transfer_ports); + if ports_not_blocked(comp_ctx, &transfer_ports) { + perform_send_message_with_ports_notify_peers( + exec_state, comp_ctx, sched_ctx, control, transfer_ports + )?; + } + } else if exec_state.mode == CompMode::PutPortsBlockedSendingPort && exec_state.mode_port == port_id { + // We checked above that the port became unblocked, so we can send the + // message + perform_send_message_with_ports_to_receiver( + exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup + )?; + } else if exec_state.is_blocked_on_create_component() { + let mut ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut ports); + if ports_not_blocked(comp_ctx, &ports) { + perform_create_component( + exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup + ); + } } + + return Ok(()); } #[inline] @@ -825,3 +1406,77 @@ pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } + +// TODO: Optimize double vec +type EncounteredPorts = Vec<(Vec, PortId)>; + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) { + // Helper to check a value for a port and recurse if needed. + fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortId(port_id.id); + for prev_port in ports.iter_mut() { + if prev_port.1 == cur_port { + // Already added + prev_port.0.push(value_location); + return; + } + } + + ports.push((vec![value_location], cur_port)); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for (value_index, embedded_value) in heap_region.iter().enumerate() { + let value_location = ValueId::Heap(*heap_pos, value_index as u32); + find_port_in_value(group, embedded_value, value_location, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for (value_index, value) in value_group.values.iter().enumerate() { + find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); + } +} + +/// Goes through the inbox of a component and takes out all the messages that +/// are targeted at a specific port +pub(crate) fn take_port_messages( + comp_ctx: &CompCtx, port_id: PortId, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Vec { + let mut messages = Vec::new(); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_index = comp_ctx.get_port_index(port_handle); + + if let Some(message) = inbox_main[port_index].take() { + messages.push(message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == port_id { + let message = inbox_backup.remove(message_index); + messages.push(message); + } else { + message_index += 1; + } + } + + return messages; +} \ No newline at end of file