diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index e951d0d9c29ff1c2f208c3dccd1df388d9d9b54a..419df819b60b039f6930dbfea3d6269311a82723 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -59,6 +59,7 @@ pub(crate) struct Branch { parent_index: BranchId, // Code execution state code_state: ComponentState, + prepared_channel: Option<(Value, Value)>, sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state @@ -74,6 +75,7 @@ impl Branch { index: BranchId::new_invalid(), parent_index: BranchId::new_invalid(), code_state: component_state, + prepared_channel: None, sync_state: SpeculativeState::RunningNonSync, next_branch_in_queue: None, received: HashMap::new(), @@ -88,11 +90,13 @@ impl Branch { (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) ); + debug_assert!(parent_branch.prepared_channel.is_none()); Branch{ index: BranchId::new(new_index), parent_index: parent_branch.index, code_state: parent_branch.code_state.clone(), + prepared_channel: None, sync_state: SpeculativeState::RunningInSync, next_branch_in_queue: None, received: parent_branch.received.clone(), @@ -268,6 +272,7 @@ impl ConnectorPorts { let branch_idx = branch_idx as usize; let num_ports = self.owned_ports.len(); + println!("port_idx = {}, branch_idx = {}, num_ports = {}, port_mapping.len() = {}", port_idx, branch_idx, num_ports, self.port_mapping.len()); debug_assert!(port_idx < num_ports); debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len()); @@ -333,28 +338,31 @@ pub(crate) struct ConnectorPDL { pub ports: ConnectorPorts, } +// TODO: Remove this monstrosity struct ConnectorRunContext<'a> { - inbox: &'a PrivateInbox, + branch_index: u32, ports: &'a ConnectorPorts, - branch: &'a Branch, + ports_delta: &'a Vec, + received: &'a HashMap, scheduler: SchedulerCtx<'a>, + prepared_channel: Option<(Value, Value)>, } impl<'a> RunContext for ConnectorRunContext<'a> { fn did_put(&mut self, port: PortId) -> bool { - if self.branch.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { + if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { // Either acquired or released, must be silent return false; } let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap(); - let mapping = self.ports.get_port(self.branch.index.index, port_index); + let mapping = self.ports.get_port(self.branch_index, port_index); return mapping.is_assigned; } fn get(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.branch.received.get(&port_id) { + match self.received.get(&port_id) { Some(message) => Some(message.message.clone()), None => None, } @@ -362,12 +370,12 @@ impl<'a> RunContext for ConnectorRunContext<'a> { fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - if self.branch.ports_delta.iter().any(|v| v.port_id == port_id) { + if self.ports_delta.iter().any(|v| v.port_id == port_id) { return None } let port_index = self.ports.get_port_index(port_id).unwrap(); - let mapping = self.ports.get_port(self.branch.index.index, port_index); + let mapping = self.ports.get_port(self.branch_index, port_index); if mapping.is_assigned { return Some(Value::Bool(mapping.num_times_fired != 0)); @@ -377,9 +385,7 @@ impl<'a> RunContext for ConnectorRunContext<'a> { } fn get_channel(&mut self) -> Option<(Value, Value)> { - let (getter, putter) = self.scheduler.runtime.create_channel(); - debug_assert_eq!(getter.kind, PortKind::Getter); - + return self.prepared_channel.take(); } } @@ -396,9 +402,9 @@ impl Connector for ConnectorPDL { } } - fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, conn_ctx, delta_state); + let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -447,7 +453,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(pd, conn_ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state); return scheduling; } } @@ -719,15 +725,26 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); - debug_assert!(!self.sync_active.is_empty()); + + if self.sync_active.is_empty() { + return ConnectorScheduling::NotNow; + } let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point - let mut run_context = ConnectorRunContext {}; - let run_result = branch.code_state.run(&mut run_context, pd); + debug_assert!(branch.prepared_channel.is_none()); + let mut run_context = ConnectorRunContext { + branch_index: branch.index.index, + ports: &self.ports, + ports_delta: &branch.ports_delta, + scheduler: sched_ctx, + prepared_channel: None, + received: &branch.received, + }; + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); // Match statement contains `return` statements only if the particular // run result behind handled requires an immediate re-run of the @@ -908,6 +925,9 @@ impl ConnectorPDL { results.ports.clear(); results.outbox.push(MessageContents::Data(message)); + + let branch_index = branch.index; + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index); return ConnectorScheduling::Immediate } else { branch.sync_state = SpeculativeState::Inconsistent; @@ -926,7 +946,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -935,11 +955,14 @@ impl ConnectorPDL { debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ - inbox: &self.inbox, + branch_index: branch.index.index, ports: &self.ports, - branch: &Branch {} + ports_delta: &branch.ports_delta, + scheduler: sched_ctx, + prepared_channel: branch.prepared_channel.take(), + received: &branch.received, }; - let run_result = branch.code_state.run(&mut run_context, pd); + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { RunResult::ComponentTerminated => { @@ -953,6 +976,7 @@ impl ConnectorPDL { self.in_sync = true; let first_sync_branch = Branch::new_sync_branching_from(1, branch); let first_sync_branch_id = first_sync_branch.index; + self.ports.prepare_sync_branch(0, 1); self.branches.push(first_sync_branch); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id); @@ -973,7 +997,11 @@ impl ConnectorPDL { // Add connector for later execution let new_connector_state = ComponentState { - prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments) + prompt: Prompt::new( + &sched_ctx.runtime.protocol_description.types, + &sched_ctx.runtime.protocol_description.heap, + definition_id, monomorph_idx, arguments + ) }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); @@ -985,9 +1013,17 @@ impl ConnectorPDL { }, RunResult::NewChannel => { // Need to prepare a new channel - todo!("adding channels to some global context"); + let (getter, putter) = sched_ctx.runtime.create_channel(conn_ctx.id); + debug_assert_eq!(getter.kind, PortKind::Getter); + branch.prepared_channel = Some(( + Value::Input(PortId::new(putter.self_id.index)), + Value::Output(PortId::new(getter.self_id.index)) + )); - return ConnectorScheduling::Later; + results.new_ports.push(putter); + results.new_ports.push(getter); + + return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } @@ -1106,6 +1142,10 @@ impl ConnectorPDL { /// Releasing ownership of ports during a sync-session. Will provide an /// error if the port was already used during a sync block. fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + if port_ids.is_empty() { + return Ok(()) + } + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode @@ -1157,6 +1197,10 @@ impl ConnectorPDL { /// Acquiring ports during a sync-session. fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + if port_ids.is_empty() { + return Ok(()) + } + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode