diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 18443c8a379db1d69ca5027a5f4952ea775ec426..587dffae9711bbdf730e9beed9c7449164a4e940 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,7 +4,7 @@ use super::messages::{Message, Inbox}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::{PortId, ProtocolDescription}; -use crate::protocol::eval::{ValueGroup, Value}; +use crate::protocol::eval::{ValueGroup, Value, Prompt}; #[derive(Clone, Copy, PartialEq, Eq)] pub(crate) struct PortIdLocal { @@ -139,9 +139,9 @@ impl PortAssignment { } #[derive(Clone, Eq)] -enum PortOwnershipDelta { - TakeOwnership(PortIdLocal), - GiveAwayOwnership(PortIdLocal), +struct PortOwnershipDelta { + acquired: bool, // if false, then released ownership + port_id: PortIdLocal, } enum PortOwnershipError { @@ -312,6 +312,10 @@ impl Connector { } } + pub fn is_in_sync_mode(&self) -> bool { + return self.in_sync; + } + /// Runs the connector in synchronous mode. Potential changes to the global /// system's state are added to the `RunDeltaState` object by the connector, /// where it is the caller's responsibility to immediately take care of @@ -368,7 +372,11 @@ impl Connector { // Branch performed a `get` on a port that has not yet received // a value in its inbox. let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); - let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); + let local_port_index = self.ports.get_port_index(local_port_id); + if local_port_index.is_none() { + todo!("deal with the case where the port is acquired"); + } + let local_port_index = local_port_index.unwrap(); let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); // Check for port mapping assignment and, if present, if it is @@ -435,7 +443,11 @@ impl Connector { RunResult::BranchPut(port_id, value_group) => { // Branch performed a `put` on a particualar port. let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix }; - let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); + let local_port_index = self.ports.get_port_index(local_port_id); + if local_port_index.is_none() { + todo!("handle case where port was received before (i.e. in ports_delta)") + } + let local_port_index = local_port_index.unwrap(); // Check the port mapping for consistency // TODO: For now we can only put once, so that simplifies stuff @@ -486,7 +498,7 @@ impl Connector { } /// Runs the connector in non-synchronous mode. - fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -516,14 +528,36 @@ impl Connector { RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Construction of a new component. Find all references to ports // inside of the arguments - let first_port_idx = results.ports.len(); + debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&arguments, &mut results.ports); - for port - } - } + if !results.ports.is_empty() { + // Ports changing ownership + if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) { + todo!("fatal error handling"); + } + } - ConnectorScheduling::NotNow // TODO: @Temp + // Add connector for later execution + let new_connector_state = ComponentState { + prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments) + }; + let new_connector_ports = results.ports.clone(); // TODO: Do something with this + let new_connector_branch = Branch::new_initial_branch(new_connector_state); + let new_connector = Connector::new(0, new_connector_branch, new_connector_ports); + + results.new_connectors.push(new_connector); + + return ConnectorScheduling::Later; + }, + RunResult::NewChannel => { + // Need to prepare a new channel + todo!("adding channels to some global context"); + + return ConnectorScheduling::Later; + }, + _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), + } } // ------------------------------------------------------------------------- @@ -572,79 +606,129 @@ impl Connector { // Helpers for local port management. Specifically for adopting/losing // ownership over ports - /// Marks the ports as being "given away" (e.g. by sending a message over a - /// channel, or by constructing a connector). Will return an error if the - /// connector doesn't own the port in the first place. - fn give_away_ports(ports: &mut ConnectorPorts, in_sync: bool, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { - debug_assert!(in_sync == !branch.index.is_valid()); + /// Releasing ownership of ports while in non-sync mode. This only occurs + /// while instantiating new connectors + fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + debug_assert!(!branch.index.is_valid()); // branch in non-sync mode + + for port_id in port_ids { + // We must own the port, or something is wrong with our code + todo!("Set up some kind of message router"); + debug_assert!(ports.get_port_index(*port_id).is_some()); + ports.remove_port(*port_id); + } + + return Ok(()) + } + + /// Releasing ownership of ports during a sync-session. Will provide an + /// error if the port was already used during a sync block. + fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + debug_assert!(branch.index.is_valid()); // branch in sync mode for port_id in port_ids { match ports.get_port_index(*port_id) { Some(port_index) => { - // We (used to) own the port + // We (used to) own the port. Make sure it is not given away + // already and not used to put/get data. let port_mapping = ports.get_port(branch.index.index, port_index); - if port_mapping.is_assigned { - // Port is used in some kind of interaction. Cannot both - // give away the port and use it in an interaction - return Err(PortOwnershipError::UsedInInteraction(*port_id)) + if port_mapping.is_assigned && port_mapping.num_times_fired != 0 { + // Already used + return Err(PortOwnershipError::UsedInInteraction(*port_id)); } - // Make sure it is not already given away for delta in &branch.ports_delta { - match delta { - PortOwnershipDelta::TakeOwnership(_) => unreachable!(), // because we had a port mapping - PortOwnershipDelta::GiveAwayOwnership(given_away_port_id) => { - if port_id == given_away_port_id { - return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); - } - } + if delta.port_id == port_id { + // We cannot have acquired this port, because the + // call to `ports.get_port_index` returned an index. + debug_assert!(!delta.acquired); + return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); } } - // We're fine, the port will be given away. Note that if we - // are not in sync mode, then we can simply remove the - // ownership immediately. - if in_sync { - branch.ports_delta.push(PortOwnershipDelta::GiveAwayOwnership(*port_id)); - } else { - - } + branch.ports_delta.push(PortOwnershipDelta{ + acquired: false, + port_id: *port_id, + }); }, None => { - // We did not yet own the port, so we must have received it - // this round, and we're going to give it away again. - debug_assert!(branch.ports_delta.contains(&PortOwnershipDelta::TakeOwnership(*port_id))); - let delta_to_find = PortOwnershipDelta::TakeOwnership(*port_id); - for delta_idx in 0..branch.ports_delta.len() { - if branch.ports_delta[delta_idx] == delta_to_find { - branch.ports_delta.remove(delta_idx); + // Not in port mapping, so we must have acquired it before, + // remove the acquirement. + let mut to_delete_index: isize = -1; + for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { + if delta.port_id == *port_id { + debug_assert!(delta.acquired); + to_delete_index = delta_idx as isize; break; } } - // Note for programmers: the fact that the message that - // contains this port will end up at another connector will - // take care of its new ownership. + debug_assert!(to_delete_index != -1); + branch.ports_delta.remove(to_delete_index as usize); } } } - return Ok(()); + return Ok(()) } - /// Adopt ownership of the ports + /// Acquiring ports during a sync-session. + fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + debug_assert!(branch.index.is_valid()); // branch in sync mode + + 'port_loop: for port_id in port_ids { + for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { + if delta.port_id == *port_id { + if delta.acquired { + // Somehow already received this port. + // TODO: @security + todo!("take care of nefarious peers"); + } else { + // Sending ports to ourselves + debug_assert!(ports.get_port_index(*port_id).is_some()); + branch.ports_delta.remove(delta_idx); + continue 'port_loop; + } + } + } + + // If here then we can safely acquire the new port + branch.ports_delta.push(PortOwnershipDelta{ + acquired: true, + port_id: *port_id, + }); + } + + return Ok(()) + } } /// A data structure passed to a connector whose code is being executed that is /// used to queue up various state changes that have to be applied after /// running, e.g. the messages the have to be transferred to other connectors. // TODO: Come up with a better name -struct RunDeltaState { - outbox: Vec, - ports: Vec, +pub(crate) struct RunDeltaState { + // Variables that allow the thread running the connector to pick up global + // state changes and try to apply them. + pub outbox: Vec, + pub new_connectors: Vec, + // Workspaces + pub ports: Vec, +} + +impl RunDeltaState { + /// Constructs a new `RunDeltaState` object with the default amount of + /// reserved memory + pub fn new() -> Self { + RunDeltaState{ + outbox: Vec::with_capacity(64), + new_connectors: Vec::new(), + ports: Vec::with_capacity(64), + } + } } -enum ConnectorScheduling { +pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 9a8c1e0ac1360c8ae4c57f91db609de354e3b1cc..e9d8a99c89c6b4df70ce847c352ab97d8de02f2e 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,13 +1,72 @@ -use std::sync::Condvar; +use std::sync::Arc; +use std::time::Duration; +use std::thread; +use crate::ProtocolDescription; +use super::connector::{Connector, ConnectorScheduling, RunDeltaState}; use super::global_store::GlobalStore; -struct Scheduler<'g> { - global: &'g GlobalStore, +struct Scheduler { + global: Arc, + code: Arc, } -impl<'g> Scheduler<'g> { - pub fn new(store: &'g GlobalStore) { +impl Scheduler { + pub fn new(global: Arc, code: Arc) -> Self { + Self{ + global, + code, + } + } + + pub fn run(&mut self) { + // Setup global storage and workspaces that are reused for every + // connector that we run + // TODO: @Memory, scheme for reducing allocations if excessive. + let mut delta_state = RunDeltaState::new() + + loop { + // TODO: Check if we're supposed to exit + + // Retrieve a unit of work + let connector_key = self.global.pop_key(); + if connector_key.is_none() { + // TODO: @Performance, needs condition variable for waking up + thread::sleep(Duration::new(1, 0)); + continue + } + + // We have something to do + let connector_key = connector_key.unwrap(); + let connector = self.global.get_connector(&connector_key); + + let mut cur_schedule = ConnectorScheduling::Immediate; + + while cur_schedule == ConnectorScheduling::Immediate { + let new_schedule; + + if connector.is_in_sync_mode() { + // In synchronous mode, so we can expect messages being sent, + // but we never expect the creation of connectors + new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); + debug_assert!(delta_state.new_connectors.is_empty()); + + if !delta_state.outbox.is_empty() {} + } else { + // In regular running mode (not in a sync block) we cannot send + // messages but we can create new connectors + new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); + debug_assert!(delta_state.outbox.is_empty()); + + if !delta_state.new_connectors.is_empty() { + // Push all connectors into the global state and queue them + // for execution + + } + } + cur_schedule = new_schedule; + } + } } } \ No newline at end of file