diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index f71dfb0556f1fb8627dc2c7a333ee96b53b84ddf..7e450523700cc9aebe4153b98da061478f4cc1c5 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -363,7 +363,7 @@ pub(crate) enum GetResult { /// instruction we're attempting on this port. pub(crate) fn default_attempt_get( exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus ) -> GetResult { let port_handle = comp_ctx.get_port_handle(target_port); @@ -382,13 +382,14 @@ pub(crate) fn default_attempt_get( if let Some(message) = &inbox_main[port_index] { if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // We're allowed to receive this message - let message = inbox_main[port_index].take().unwrap(); + let mut message = inbox_main[port_index].take().unwrap(); debug_assert_eq!(target_port, message.data_header.target_port); // Note: we can still run into an unrecoverable error when actually // receiving this message match default_handle_received_data_message( - target_port, target_port_instruction, inbox_main, inbox_backup, + target_port, target_port_instruction, + &mut message, inbox_main, inbox_backup, comp_ctx, sched_ctx, control, ) { Ok(()) => return GetResult::Received(message), @@ -415,8 +416,8 @@ pub(crate) fn default_attempt_get( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, port_instruction: PortInstruction, - inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, + targeted_port: PortId, port_instruction: PortInstruction, message: &mut DataMessage, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); @@ -424,6 +425,59 @@ pub(crate) fn default_handle_received_data_message( let slot = &mut inbox_main[port_index]; debug_assert!(slot.is_none()); // because we've just received from it + // If we received any ports, add them to the port tracking and inbox struct. + // Then notify the peers that they can continue sending to this port, but + // now at a new address. + for received_port in &mut message.ports { + // Transfer messages to main/backup inbox + let new_inbox_index = inbox_main.len(); + if !received_port.messages.is_empty() { + inbox_main.push(Some(received_port.messages.remove(0))); + } + inbox_backup.extend(received_port.messages.drain(..)); + + // Create a new port locally + let mut new_port_state = received_port.state; + new_port_state.set(PortStateFlag::Received); + let new_port_handle = comp_ctx.add_port( + received_port.peer_comp, received_port.peer_port, + received_port.kind, new_port_state + ); + let new_port = comp_ctx.get_port(new_port_handle); + comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(new_port.peer_comp_id)); + + // Replace all references to the port in the received message + for message_location in received_port.locations { + let value = match message_location { + ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize], + ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize], + }; + + match value { + Value::Input(_) => { + debug_assert_eq!(new_port.kind, PortKind::Getter); + *value = Value::Input(port_id_to_eval(new_port.self_id)); + }, + Value::Output(_) => { + debug_assert_eq!(new_port.kind, PortKind::Putter); + *value = Value::Output(port_id_to_eval(new_port.self_id)); + }, + _ => unreachable!(), + } + } + + // Let the peer know that the port can now be used + let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + + peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ + id: ControlId::new_invalid(), + sender_comp_id: comp_ctx.id, + target_port_id: Some(new_port.peer_port_id), + content: ControlMessageContent::PortPeerChangedUnblock(new_port.id, comp_ctx.id) + }), true); + } + // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller @@ -463,11 +517,12 @@ pub(crate) fn default_handle_received_data_message( /// state may all change. pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, - message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { - default_handle_ack(control, message.id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, inbox_main, inbox_backup); }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be @@ -500,7 +555,7 @@ pub(crate) fn default_handle_control_message( // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. - default_handle_ack(control, control_id, sched_ctx, comp_ctx); + default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, inbox_main, inbox_backup); } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); @@ -777,14 +832,17 @@ fn send_message_without_ports( peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); } -fn send_message_with_ports( - sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction, - value: ValueGroup, +/// Prepares sending a message that contains ports. Only once a particular +/// protocol has completed (where we notify all the peers that the ports will +/// be transferred) will we actually send the message to the recipient. +fn prepare_send_message_with_ports( + sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, - control: &mut ControlLayer, consensus: &mut Consensus + control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send + let sending_port_handle = comp_ctx.get_port_handle(sending_port_id); let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); sending_port_info.last_instruction = sending_port_instruction; @@ -797,37 +855,98 @@ fn send_message_with_ports( for (_, port_id) in &transmit_ports { let transmit_port_handle = comp_ctx.get_port_handle(*port_id); let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle); + let peer_comp_id = transmit_port_info.peer_comp_id; + let peer_port_id = transmit_port_info.peer_port_id; // Note: we checked earlier that we are currently in sync mode. Now we // will check if we've already used the port we're about to transmit. if !transmit_port_info.last_instruction.is_none() { return Err(( sending_port_instruction, - String::from("Cannot transmit one of the ports, as it is used in this sync round") + String::from("Cannot transmit one of the ports in this message, as it is used in this sync round") )); } if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { return Err(( sending_port_instruction, - String::from("Cannot transmit one of the ports, as that port is already transmitted") + String::from("Cannot transmit one of the ports in this message, as that port is already transmitted") )); } + // Set the flag for transmission transmit_port_info.state.set(PortStateFlag::Transmitted); - } + // Block the peer of the port + let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); - let port_info = comp_ctx.get_port(sending_port_handle); + peer_info.handle.send_message_logged(sched_ctx, message, true); + } + // We've set up the protocol, once all the PPC's are blocked we are supposed + // to transfer the message to the recipient. So store it temporarily + exec_state.mode = CompMode::BlockedPutPorts; + exec_state.mode_port = sending_port_id; + exec_state.mode_value = value; return Ok(()); } +/// Performs the transmission of a data message that contains ports. These were +/// all stored in the component's execution state by the +/// `prepare_send_message_with_ports` function. +fn perform_send_message_with_ports( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) { + debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPorts); + + // Find all ports again + let mut transmit_ports = Vec::new(); + find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports); + + let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); + let port_info = comp_ctx.get_port(port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + + // Annotate the data message + let message_value = exec_state.mode_value.take(); + let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value); + + // And further enhance the message by adding data about the ports that are + // being transferred + for (port_locations, port_id) in transmit_ports { + let transmit_port_handle = comp_ctx.get_port_handle(port_id); + let transmit_port_info = comp_ctx.get_port(transmit_port_handle); + + let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup); + + let mut transmit_port_state = transmit_port_info.state; + debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted)); + transmit_port_state.clear(PortStateFlag::Transmitted); + + annotated_message.ports.push(TransmittedPort{ + locations: port_locations, + messages: transmit_messages, + peer_comp: transmit_port_info.peer_comp_id, + peer_port: transmit_port_info.peer_port_id, + kind: transmit_port_info.kind, + state: transmit_port_state + }) + } + + // And finally, send the message to the peer + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); +} + /// Handles an `Ack` for the control layer. fn default_handle_ack( - control: &mut ControlLayer, control_id: ControlId, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, + inbox_backup: &mut InboxBackup ) { // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; @@ -853,6 +972,12 @@ fn default_handle_ack( let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, + AckAction::UnblockPutWithPorts => { + // Send the message (containing ports) to the recipient + perform_send_message_with_ports(exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + }, AckAction::None => {} } @@ -962,4 +1087,32 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut En for (value_index, value) in &value_group.values.iter().enumerate() { find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); } +} + +/// Goes through the inbox of a component and takes out all the messages that +/// are targeted at a specific port +pub(crate) fn take_port_messages( + comp_ctx: &CompCtx, port_id: PortId, + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup +) -> Vec { + let mut messages = Vec::new(); + let port_handle = comp_ctx.get_port_handle(port_id); + let port_index = comp_ctx.get_port_index(port_handle); + + if let Some(message) = inbox_main[port_index].take() { + messages.push(message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == port_id { + let message = inbox_backup.remove(message_index); + messages.push(message); + } else { + message_index += 1; + } + } + + return messages; } \ No newline at end of file