diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index dd3081ee1d12512335b33c706a690cfa21fc7612..ed94ded96a5f9d0277ae3ffe542378e49cf3b5f3 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -13,7 +13,8 @@ use crate::runtime2::communication::*; use super::component::{ self, - CompExecState, Component, CompScheduling, CompMode, + InboxMain, InboxBackup, GetResult, + CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason, port_id_from_eval, port_id_to_eval }; use super::component_context::*; @@ -107,8 +108,6 @@ enum SelectDecision { Case(u32), // contains case index, should be passed along to PDL code } -type InboxMain = Vec>; - impl SelectState { fn new() -> Self { return Self{ @@ -245,7 +244,7 @@ impl Component for CompPDL { // sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle - target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks + target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; @@ -256,10 +255,12 @@ impl Component for CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + self.handle_generic_component_error(sched_ctx, location_and_message); + } }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); @@ -270,10 +271,10 @@ impl Component for CompPDL { } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { use EvalContinuation as EC; - sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); + sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. @@ -282,87 +283,115 @@ impl Component for CompPDL { // continue and run PDL code }, CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } - CompMode::StartExit => return Ok(component::default_handle_start_exit( - &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx - )), - CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + CompMode::StartExit => return component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus + ), + CompMode::BusyExit => return component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx - )), - CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + ), + CompMode::Exit => return component::default_handle_exit(&self.exec_state), + } + + let run_result = self.execute_prompt(&sched_ctx); + if let Err(error) = run_result { + self.handle_component_error(sched_ctx, CompError::Executor(error)); + return CompScheduling::Immediate; } - let run_result = self.execute_prompt(&sched_ctx)?; + let run_result = run_result.unwrap(); match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { - debug_assert_eq!(self.exec_state.mode, CompMode::Sync); - self.handle_sync_end(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Immediate; }, - EC::BlockGet(port_id) => { + EC::BlockGet(expr_id, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); - let port_handle = comp_ctx.get_port_handle(port_id); - let port_index = comp_ctx.get_port_index(port_handle); - if let Some(message) = &self.inbox_main[port_index] { - // Check if we can actually receive the message - if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { - // Message was received. Make sure any blocked peers and - // pending messages are handled. - let message = self.inbox_main[port_index].take().unwrap(); - component::default_handle_received_data_message( - port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup, - comp_ctx, sched_ctx, &mut self.control - ); - + match component::default_attempt_get( + &mut self.exec_state, port_id, PortInstruction::SourceLocation(expr_id), + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); - return Ok(CompScheduling::Immediate); - } else { - todo!("handle sync failure due to message deadlock"); - return Ok(CompScheduling::Sleep); + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { + return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + self.handle_generic_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate; } - } else { - // We need to wait - self.exec_state.set_as_blocked_get(port_id); - return Ok(CompScheduling::Sleep); } }, - EC::Put(port_id, value) => { + EC::Put(expr_id, port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); - sched_ctx.log(&format!("Putting value {:?}", value)); + sched_ctx.info(&format!("Putting value {:?}", value)); // Send the message let target_port_id = port_id_from_eval(port_id); - let scheduling = component::default_send_data_message( - &mut self.exec_state, target_port_id, value, + let send_result = component::default_send_data_message( + &mut self.exec_state, target_port_id, + PortInstruction::SourceLocation(expr_id), value, sched_ctx, &mut self.consensus, comp_ctx ); - - // When `run` is called again (potentially after becoming - // unblocked) we need to instruct the executor that we performed - // the `put` - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(scheduling); + if let Err(location_and_message) = send_result { + self.handle_generic_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate; + } else { + // When `run` is called again (potentially after becoming + // unblocked) we need to instruct the executor that we performed + // the `put` + let scheduling = send_result.unwrap(); + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return scheduling; + } }, EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.select_state.handle_select_start(num_cases); - return Ok(CompScheduling::Requeue); + return CompScheduling::Requeue; }, - EC::SelectRegisterPort(case_index, port_index, port_id) => { + EC::SelectRegisterPort(expr_id, case_index, port_index, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + + // Note: we register the "last_instruction" here already. This + // way if we get a `ClosePort` message, the condition to fail + // the synchronous round is satisfied. + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = PortInstruction::SourceLocation(expr_id); + let port_is_closed = port_info.state.is_closed(); + + // Register port as part of select guard if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { - todo!("handle registering a port multiple times"); + // Failure occurs if a port is used twice in the same guard + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + String::from("Cannot have the one port appear in the same guard twice") + ))); + } else if port_is_closed { + // Port is closed + let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + format!("Cannot register port, as the peer component (id:{}) has shut down", peer_id.0) + ))); } - return Ok(CompScheduling::Immediate); + + return CompScheduling::Immediate; }, EC::SelectWait => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); @@ -371,22 +400,23 @@ impl Component for CompPDL { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); self.exec_state.mode = CompMode::Sync; - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; } else { // No decision yet self.exec_state.mode = CompMode::BlockedSelect; - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit - return Ok(CompScheduling::Immediate); + self.exec_state.set_as_start_exit(ExitReason::Termination); + return CompScheduling::Immediate; }, EC::SyncBlockStart => { - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.handle_sync_start(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); + return CompScheduling::Immediate; }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -394,7 +424,7 @@ impl Component for CompPDL { sched_ctx, comp_ctx, definition_id, type_id, arguments ); - return Ok(CompScheduling::Requeue); + return CompScheduling::Requeue; }, EC::NewChannel => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -406,7 +436,7 @@ impl Component for CompPDL { )); self.inbox_main.push(None); self.inbox_main.push(None); - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; } } } @@ -451,77 +481,6 @@ impl CompPDL { return Ok(step_result) } - fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component starting sync mode"); - self.consensus.notify_sync_start(comp_ctx); - for message in self.inbox_main.iter() { - if let Some(message) = message { - self.consensus.handle_incoming_data_message(comp_ctx, message); - } - } - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.exec_state.mode = CompMode::Sync; - } - - /// Handles end of sync. The conclusion to the sync round might arise - /// immediately (and be handled immediately), or might come later through - /// messaging. In any case the component should be scheduled again - /// immediately - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component ending sync mode (now waiting for solution)"); - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - self.handle_sync_decision(sched_ctx, comp_ctx, decision); - } - - /// Handles decision from the consensus round. This will cause a change in - /// the internal `Mode`, such that the next call to `run` can take the - /// appropriate next steps. - fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { - sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode)); - match decision { - SyncRoundDecision::None => { - // No decision yet - return; - }, - SyncRoundDecision::Solution => { - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - self.exec_state.mode = CompMode::NonSync; - self.consensus.notify_sync_decision(decision); - }, - SyncRoundDecision::Failure => { - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - self.exec_state.mode = CompMode::StartExit; - }, - } - } - - fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component exiting"); - debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); - self.exec_state.mode = CompMode::BusyExit; - - // Doing this by index, then retrieving the handle is a bit rediculous, - // but Rust is being Rust with its borrowing rules. - for port_index in 0..comp_ctx.num_ports() { - let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state == PortState::Closed { - // Already closed, or in the process of being closed - continue; - } - - // Mark as closed - let port_id = port.self_id; - port.state = PortState::Closed; - - // Notify peer of closing - let port_handle = comp_ctx.get_port_handle(port_id); - let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); - let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); - } - } - // ------------------------------------------------------------------------- // Handling messages // ------------------------------------------------------------------------- @@ -537,10 +496,8 @@ impl CompPDL { self.consensus.handle_incoming_data_message(comp_ctx, &message); } - let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); - let port_index = comp_ctx.get_port_index(port_handle); match component::default_handle_incoming_data_message( - &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control ) { IncomingData::PlacedInSlot => { @@ -560,7 +517,40 @@ impl CompPDL { fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - self.handle_sync_decision(sched_ctx, comp_ctx, decision); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); + } + + /// Handles an error coming from the generic `component::handle_xxx` + /// functions. Hence accepts argument as a tuple. + fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) { + // Retrieve location and message, display in terminal + let (location, message) = location_and_message; + let error = match location { + PortInstruction::None => CompError::Component(message), + PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location + PortInstruction::SourceLocation(expression_id) => { + let protocol = &sched_ctx.runtime.protocol; + CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, + expression_id, message + )) + } + }; + + self.handle_component_error(sched_ctx, error); + } + + fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) { + sched_ctx.error(&format!("{}", error)); + + // Set state to handle subsequent error + let exit_reason = if self.exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + self.exec_state.set_as_start_exit(exit_reason); } // ------------------------------------------------------------------------- @@ -590,9 +580,8 @@ impl CompPDL { let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); - let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - + // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; // dbg_code!({ // sched_ctx.log(&format!( // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", @@ -624,7 +613,7 @@ impl CompPDL { created_id: created_port_id, }; - if creator_port.state == PortState::Closed { + if creator_port.state.is_closed() { closed_port_id_pairs.push(port_id_pair) } else { opened_port_id_pairs.push(port_id_pair); @@ -653,23 +642,24 @@ impl CompPDL { let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { - // Port peer is owned by the creator as well + // Peer of the transferred port is the component that is + // creating the new component. let created_peer_port_index = opened_port_id_pairs .iter() .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { - // Peer port moved to the new component as well. So - // adjust IDs appropriately. + // Addendum to the above comment: but that port is also + // moving to the new component let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); - todo!("either add 'self peer', or remove that idea from Ctx altogether") + todo!("either add 'self peer', or remove that idea from Ctx altogether");` }, None => { // Peer port remains with creator component. created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id)); } } } else { @@ -677,7 +667,7 @@ impl CompPDL { // appropriate messages later let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); let peer_info = creator_ctx.get_peer(peer_handle); - created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle)); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); created_component_has_remote_peers = true; } } @@ -697,10 +687,8 @@ impl CompPDL { // potentially remove the peer component. for pair in opened_port_id_pairs.iter() { // Remove peer if appropriate - let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - let creator_peer_comp_id = creator_port_info.peer_comp_id; - creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id, false); + creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None); creator_ctx.remove_port(pair.creator_handle); // Transfer any messages @@ -722,28 +710,42 @@ impl CompPDL { } } - // Handle potential channel between creator and created component let created_port_info = component.ctx.get_port(pair.created_handle); - if created_port_info.peer_comp_id == creator_ctx.id { + // This handles the creation of a channel between the creator + // component and the newly created component. So if the creator + // had a `a -> b` channel, and `b` is moved to the new + // component, then `a` needs to set its peer component. let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); peer_port_info.peer_comp_id = component.ctx.id; peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None); + creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id)); } } - // Do the same for the closed ports + // Do the same for the closed ports. Note that we might still have to + // transfer messages that cause the new owner of the port to fail. for pair in closed_port_id_pairs.iter() { let port_index = creator_ctx.get_port_index(pair.creator_handle); creator_ctx.remove_port(pair.creator_handle); - let _removed_message = self.inbox_main.remove(port_index); + if let Some(mut message) = self.inbox_main.remove(port_index) { + message.data_header.target_port = pair.created_id; + component.component.adopt_message(&mut component.ctx, message); + } - // In debug mode: since we've closed the port we shouldn't have any - // messages for that port. - debug_assert!(_removed_message.is_none()); - debug_assert!(!self.inbox_backup.iter().any(|v| v.data_header.target_port == pair.creator_id)); + let mut message_index = 0; + while message_index < self.inbox_backup.len() { + let message = &self.inbox_backup[message_index]; + if message.data_header.target_port == pair.created_id { + // Transfer message + let mut message = self.inbox_backup.remove(message_index); + message.data_header.target_port = pair.created_id; + component.component.adopt_message(&mut component.ctx, message); + } else { + message_index += 1; + } + } } // By now all ports and messages have been transferred. If there are any @@ -762,7 +764,7 @@ impl CompPDL { ); let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, message, true); + peer_info.handle.send_message_logged(sched_ctx, message, true); } } } else {