diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 9bbaebbf75274df3ddf976599aee226d2be1d5cc..0b03aa14729445e04e7c8faf891b13aae1d111e8 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -83,8 +83,9 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement - BlockedPutPortsAwaitingAcks,// blocked because we're waiting to send a data message containing ports, but first need to receive Acks for the PortPeerChanged messages - BlockedPutPortsReady, // blocked because we're waitingto send a data message containing ports + PutPortsBlockedTransferredPorts, // sending a message with ports, those sent ports are (partly) blocked + PutPortsBlockedAwaitingAcks, // sent out PPC message for blocking transferred ports, now awaiting Acks + PutPortsBlockedSendingPort, // sending a message with ports, message sent through a still-blocked port NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish @@ -97,7 +98,9 @@ impl CompMode { match self { Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | - BlockedPutPortsAwaitingAcks | BlockedPutPortsReady => true, + PutPortsBlockedTransferredPorts | + PutPortsBlockedAwaitingAcks | + PutPortsBlockedSendingPort => true, NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false, } } @@ -107,7 +110,9 @@ impl CompMode { match self { NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | - BlockedPutPortsAwaitingAcks | BlockedPutPortsReady | + PutPortsBlockedTransferredPorts | + PutPortsBlockedAwaitingAcks | + PutPortsBlockedSendingPort | NewComponentBlocked => false, StartExit | BusyExit => true, Exit => false, @@ -197,7 +202,7 @@ impl CompExecState { } pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) { - self.mode = CompMode::BlockedPutPortsAwaitingAcks; + self.mode = CompMode::PutPortsBlockedTransferredPorts; self.mode_port = port; self.mode_value = value; } @@ -208,12 +213,6 @@ impl CompExecState { self.mode_port == port; } - pub(crate) fn is_blocked_on_put_with_ports(&self, port: PortId) -> bool { - return - self.mode == CompMode::BlockedPutPortsReady && - self.mode_port == port; - } - pub(crate) fn is_blocked_on_create_component(&self) -> bool { return self.mode == CompMode::NewComponentBlocked; } @@ -293,7 +292,7 @@ pub(crate) fn default_send_data_message( format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) )) } else if !ports.is_empty() { - prepare_send_message_with_ports( + start_send_message_with_ports( transmitting_port_id, port_instruction, value, exec_state, comp_ctx, sched_ctx, control )?; @@ -544,7 +543,7 @@ pub(crate) fn default_handle_control_message( ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { - default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); + default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be @@ -577,7 +576,7 @@ pub(crate) fn default_handle_control_message( // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. - default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); + default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); @@ -625,7 +624,7 @@ pub(crate) fn default_handle_control_message( default_handle_recently_unblocked_port( exec_state, control, consensus, port_handle, sched_ctx, comp_ctx, inbox_main, inbox_backup - ); + )?; }, ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to @@ -656,7 +655,7 @@ pub(crate) fn default_handle_control_message( default_handle_recently_unblocked_port( exec_state, control, consensus, port_handle, sched_ctx, comp_ctx, inbox_main, inbox_backup - ); + )?; } } @@ -1105,20 +1104,43 @@ fn send_message_without_ports( /// Prepares sending a message that contains ports. Only once a particular /// protocol has completed (where we notify all the peers that the ports will /// be transferred) will we actually send the message to the recipient. -fn prepare_send_message_with_ports( +fn start_send_message_with_ports( sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send + // Retrieve ports we're going to transfer let sending_port_handle = comp_ctx.get_port_handle(sending_port_id); let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); sending_port_info.last_instruction = sending_port_instruction; let mut transmit_ports = Vec::new(); find_ports_in_value_group(&value, &mut transmit_ports); - debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function + debug_assert!(!transmit_ports.is_empty()); // required from caller + + // Enter the state where we'll wait until all transferred ports are not + // blocked. + exec_state.set_as_blocked_put_with_ports(sending_port_id, value); + + if ports_not_blocked(comp_ctx, &transmit_ports) { + // Ports are not blocked, so we can send them right away. + perform_send_message_with_ports_notify_peers( + exec_state, comp_ctx, sched_ctx, control, transmit_ports + )?; + } // else: wait until they become unblocked + + return Ok(()) +} + +fn perform_send_message_with_ports_notify_peers( + exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, + control: &mut ControlLayer, transmit_ports: EncounteredPorts +) -> Result<(), (PortInstruction, String)> { + // Check we're in the correct state in debug mode + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedTransferredPorts); + debug_assert!(ports_not_blocked(comp_ctx, &transmit_ports)); // Set up the final Ack that triggers us to send our final message let unblock_put_control_id = control.add_unblock_put_with_ports_entry(); @@ -1128,9 +1150,12 @@ fn prepare_send_message_with_ports( let peer_comp_id = transmit_port_info.peer_comp_id; let peer_port_id = transmit_port_info.peer_port_id; + // Note: we checked earlier that we are currently in sync mode. Now we // will check if we've already used the port we're about to transmit. if !transmit_port_info.last_instruction.is_none() { + let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; return Err(( sending_port_instruction, String::from("Cannot transmit one of the ports in this message, as it is used in this sync round") @@ -1138,6 +1163,8 @@ fn prepare_send_message_with_ports( } if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { + let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; return Err(( sending_port_instruction, String::from("Cannot transmit one of the ports in this message, as that port is already transmitted") @@ -1158,7 +1185,7 @@ fn prepare_send_message_with_ports( // We've set up the protocol, once all the PPC's are blocked we are supposed // to transfer the message to the recipient. So store it temporarily - exec_state.set_as_blocked_put_with_ports(sending_port_id, value); + exec_state.mode = CompMode::PutPortsBlockedAwaitingAcks; return Ok(()); } @@ -1166,23 +1193,36 @@ fn prepare_send_message_with_ports( /// Performs the transmission of a data message that contains ports. These were /// all stored in the component's execution state by the /// `prepare_send_message_with_ports` function. Port must be ready to send! -fn perform_send_message_with_ports( +fn perform_send_message_with_ports_to_receiver( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup -) { - debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady); +) -> Result<(), (PortInstruction, String)> { + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedSendingPort); // Find all ports again let mut transmit_ports = Vec::new(); find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports); + // Retrieve the port over which we're going to send the message let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state.can_send() && !port_info.state.is_blocked()); + + if !port_info.state.is_open() { + return Err(( + port_info.last_instruction, + String::from("cannot send over this port, as it is closed") + )); + } + + debug_assert!(!port_info.state.is_blocked_due_to_port_change()); // caller should have checked this let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - // Annotate the data message + // Change state back to its default + exec_state.mode = CompMode::Sync; let message_value = exec_state.mode_value.take(); + exec_state.mode_port = PortId::new_invalid(); + + // Annotate the data message let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value); // And further enhance the message by adding data about the ports that are @@ -1212,6 +1252,8 @@ fn perform_send_message_with_ports( // And finally, send the message to the peer let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); + + return Ok(()); } /// Handles an `Ack` for the control layer. @@ -1219,7 +1261,7 @@ fn default_handle_ack( exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup -) { +) -> Result<(), (PortInstruction, String)>{ // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; @@ -1249,13 +1291,17 @@ fn default_handle_ack( // Send the message (containing ports) stored in the component // execution state to the recipient println!("DEBUG: Unblocking put with ports"); - exec_state.mode = CompMode::BlockedPutPortsReady; + debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks); + exec_state.mode = CompMode::PutPortsBlockedSendingPort; let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + // Little bit of a hack, we didn't really unblock the sending + // port, but this will mesh nicely with waiting for the sending + // port to become unblocked. default_handle_recently_unblocked_port( exec_state, control, consensus, port_handle, sched_ctx, comp_ctx, inbox_main, inbox_backup - ); + )?; }, AckAction::None => {} } @@ -1265,6 +1311,8 @@ fn default_handle_ack( None => break, } } + + return Ok(()); } /// Little helper for sending the most common kind of `Ack` @@ -1291,21 +1339,17 @@ fn default_handle_recently_unblocked_port( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup -) { +) -> Result<(), (PortInstruction, String)> { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; if port_info.state.is_blocked() { // Port is still blocked. We wait until the next control message where // we unblock the port. - return; + return Ok(()); } if exec_state.is_blocked_on_put_without_ports(port_id) { - // Return to the regular execution mode - exec_state.mode = CompMode::Sync; - exec_state.mode_port = PortId::new_invalid(); - // Annotate the message that we're going to send let port_info = comp_ctx.get_port(port_handle); // for immutable access debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -1317,18 +1361,25 @@ fn default_handle_recently_unblocked_port( let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true); - exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. - exec_state.mode_port = PortId::new_invalid(); - } else if exec_state.is_blocked_on_put_with_ports(port_id) { - // Port is not blocked, and we've completed our part of the - // port-transfer protocol. So send the message - perform_send_message_with_ports( - exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup - ); - + // Return to the regular execution mode exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); - debug_assert!(exec_state.mode_value.values.is_empty()); + } else if exec_state.mode == CompMode::PutPortsBlockedTransferredPorts { + // We are waiting until all of the transferred ports become unblocked, + // check so here. + let mut transfer_ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut transfer_ports); + if ports_not_blocked(comp_ctx, &transfer_ports) { + perform_send_message_with_ports_notify_peers( + exec_state, comp_ctx, sched_ctx, control, transfer_ports + )?; + } + } else if exec_state.mode == CompMode::PutPortsBlockedSendingPort && exec_state.mode_port == port_id { + // We checked above that the port became unblocked, so we can send the + // message + perform_send_message_with_ports_to_receiver( + exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup + )?; } else if exec_state.is_blocked_on_create_component() { let mut ports = Vec::new(); find_ports_in_value_group(&exec_state.mode_value, &mut ports); @@ -1338,6 +1389,8 @@ fn default_handle_recently_unblocked_port( ); } } + + return Ok(()); } #[inline] diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index b99866bd3bdf0ced33a409c11e79177ef3e32e37..4d508f2545e458c5747fd9fdd2eed7fe9d269b92 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -126,8 +126,9 @@ impl Component for ComponentTcpClient { match self.exec_state.mode { CompMode::BlockedSelect | - CompMode::BlockedPutPortsAwaitingAcks | - CompMode::BlockedPutPortsReady | + CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | + CompMode::PutPortsBlockedSendingPort | CompMode::NewComponentBlocked => { // Not possible: we never enter this state unreachable!(); diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index bb07b2eebc6ae0795010dcbafac7bff60d5c7f92..36f5c267d82c7242ee2da12950b07ab58a05f5b1 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -285,8 +285,8 @@ impl Component for CompPDL { // continue and run PDL code }, CompMode::SyncEnd | CompMode::BlockedGet | - CompMode::BlockedPut | CompMode::BlockedSelect | - CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady | + CompMode::BlockedPut | CompMode::BlockedSelect | CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | CompMode::PutPortsBlockedSendingPort | CompMode::NewComponentBlocked => { return CompScheduling::Sleep; } diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index 538448c89e06a2e7e4d17383a2fed66c921d9d02..27ec3cd96ae08c6d4aa4bac672cd76742eb63f70 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -62,8 +62,11 @@ impl Component for ComponentRandomU32 { sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { - CompMode::BlockedGet | CompMode::BlockedSelect | - CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady | + CompMode::BlockedGet | + CompMode::BlockedSelect | + CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | + CompMode::PutPortsBlockedSendingPort | CompMode::NewComponentBlocked => { // impossible for this component, no input ports and no select // blocks