diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index b75d686ad2cea3fa8f9f1fe36eb812aca2da7080..e276852bdc0eea566fe69b86c7ebec6d1b25d692 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -9,23 +9,24 @@ use crate::protocol::eval::*; use super::messages::*; -enum AddComponentError { +#[derive(Debug)] +pub enum AddComponentError { ModuleDoesNotExist, ConnectorDoesNotExist, InvalidArgumentType(usize), // value is index of (first) invalid argument } -struct PortDesc { +pub(crate) struct PortDesc { id: u32, peer_id: u32, owning_connector_id: Option, is_getter: bool, // otherwise one can only call `put` } -struct ConnectorDesc { +pub(crate) struct ConnectorDesc { id: u32, in_sync: bool, - branches: Vec, // first one is always non-speculative one + pub(crate) branches: Vec, // first one is always non-speculative one spec_branches_active: VecDeque, // branches that can be run immediately spec_branches_pending_receive: HashMap>, // from port_id to branch index spec_branches_done: Vec, @@ -38,14 +39,11 @@ impl ConnectorDesc { /// Creates a new connector description. Implicit assumption is that there /// is one (non-sync) branch that can be immediately executed. fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { - let mut branches_active = VecDeque::new(); - branches_active.push_back(0); - Self{ id, in_sync: false, branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], - spec_branches_active: branches_active, + spec_branches_active: VecDeque::new(), spec_branches_pending_receive: HashMap::new(), spec_branches_done: Vec::new(), last_checked_done: 0, @@ -56,15 +54,16 @@ impl ConnectorDesc { } #[derive(Debug, PartialEq, Eq)] -enum BranchState { +pub(crate) enum BranchState { RunningNonSync, // regular running non-speculative branch RunningSync, // regular running speculative branch BranchPoint, // branch which ended up being a branching point ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution Failed, // branch that became inconsistent + Finished, // branch (necessarily non-sync) that reached end of code } -#[derive(Clone)] +#[derive(Debug, Clone)] struct BranchPortDesc { last_registered_index: Option, // if putter, then last sent branch ID, if getter, then last received branch ID num_times_fired: u32, // number of puts/gets on this port @@ -75,11 +74,11 @@ struct BranchContext { pending_channel: Option<(Value, Value)>, } -struct BranchDesc { +pub(crate) struct BranchDesc { index: u32, parent_index: Option, - code_state: ComponentState, - branch_state: BranchState, + pub(crate) code_state: ComponentState, + pub(crate) branch_state: BranchState, owned_ports: Vec, message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value port_mapping: HashMap, @@ -159,11 +158,11 @@ struct ProposedSolution { } // TODO: @performance, use freelists+ids instead of HashMaps -struct Runtime { +pub struct Runtime { protocol: Arc, - ports: HashMap, + pub(crate) ports: HashMap, port_counter: u32, - connectors: HashMap, + pub(crate) connectors: HashMap, connector_counter: u32, connectors_active: VecDeque, } @@ -210,7 +209,10 @@ impl Runtime { } }; - // Make sure supplied values (and types) are correct + // Make sure supplied values (and types) are correct. At the same time + // modify the port IDs such that they contain the ID of the connector + // we're about the create. + let component_id = self.generate_connector_id(); let mut ports = Vec::with_capacity(values.values.len()); for (value_idx, value) in values.values.iter().enumerate() { @@ -222,24 +224,35 @@ impl Runtime { return Err(ACE::InvalidArgumentType(value_idx)) } - ports.push(*port_id); + ports.push(PortId(Id{ + connector_id: component_id, + u32_suffix: port_id.0.u32_suffix, + })); }, Value::Output(port_id) => { if *polarity != Polarity::Putter { return Err(ACE::InvalidArgumentType(value_idx)) } - ports.push(*port_id); + ports.push(PortId(Id{ + connector_id: component_id, + u32_suffix: port_id.0.u32_suffix + })); }, _ => return Err(ACE::InvalidArgumentType(value_idx)) } } - // Instantiate the component - let component_id = self.generate_connector_id(); + // Instantiate the component, and mark the ports as being owned by the + // newly instantiated component let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); + for port in &ports { + let desc = self.ports.get_mut(port).unwrap(); + desc.owning_connector_id = Some(component_id); + } + self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); self.connectors_active.push_back(component_id); @@ -274,7 +287,7 @@ impl Runtime { /// connector should be run again in the future, and return `false` if the /// connector has terminated. Note that a terminated connector still /// requires cleanup. - pub fn run_connector(&mut self, connector_id: u32) -> Scheduling { + fn run_connector(&mut self, connector_id: u32) -> Scheduling { let desc = self.connectors.get_mut(&connector_id).unwrap(); if desc.in_sync { @@ -293,9 +306,9 @@ impl Runtime { let branch_index = desc.spec_branches_active.pop_front().unwrap(); let branch = &mut desc.branches[branch_index as usize]; + debug_assert_eq!(branch_index, branch.index); // Run this particular branch to a next blocking point - // TODO: PERSISTENT RUN CTX let mut run_context = Context{ inbox: &branch.message_inbox, port_mapping: &branch.port_mapping, @@ -346,6 +359,8 @@ impl Runtime { debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); let mut insert_in_pending_receive = false; + println!("DEBUG: Connector {} performing get on {:#?}", connector_id, port_id); + match branch.port_mapping.entry(port_id) { Entry::Vacant(entry) => { // No entry yet, so force to firing @@ -372,6 +387,8 @@ impl Runtime { } } + println!("DEBUG: insert = {}, port mapping is now {:#?}", insert_in_pending_receive, &branch.port_mapping); + if insert_in_pending_receive { // Perform the insert match desc.spec_branches_pending_receive.entry(port_id) { @@ -409,8 +426,8 @@ impl Runtime { RunResult::BranchAtSyncEnd => { // Check the branch for any ports that were not used and // insert them in the port mapping as not having fired. - for port_index in branch.owned_ports.iter().copied() { - let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index }); + for port_id in branch.owned_ports.iter().copied() { + let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_id }); if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { entry.insert(BranchPortDesc { last_registered_index: None, @@ -430,6 +447,7 @@ impl Runtime { // Branch just performed a `put()`. Check if we have // assigned the port value and if so, if it is // consistent. + println!("DEBUG: Connector {} performing put on {:#?}", connector_id, port_id); let mut can_put = true; branch.branch_context.just_called_did_put = true; match branch.port_mapping.entry(port_id) { @@ -459,10 +477,12 @@ impl Runtime { } } } + println!("DEBUG: can_put = {}, port mapping is now {:#?}", can_put, &branch.port_mapping); if can_put { // Actually put the message in the outbox let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); + debug_assert_eq!(port_desc.owning_connector_id.unwrap(), connector_id); let peer_id = port_desc.peer_id; let peer_desc = self.ports.get(&peer_id).unwrap(); debug_assert!(peer_desc.owning_connector_id.is_some()); @@ -479,7 +499,7 @@ impl Runtime { sending_port: port_id, receiving_port: peer_id, peer_prev_branch_id: None, - peer_cur_branch_id: 0, + peer_cur_branch_id: branch_index, message: value_group, }); @@ -506,9 +526,9 @@ impl Runtime { fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling { // Retrieve the connector and the branch (which is always the first one, // since we assume we're not running in sync-mode). - // TODO: CONTINUE HERE, PERSEISTENT BRANCH CONTEXT let desc = self.connectors.get_mut(&connector_id).unwrap(); debug_assert!(!desc.in_sync); + debug_assert!(desc.spec_branches_active.is_empty()); debug_assert_eq!(desc.branches.len(), 1); let branch = &mut desc.branches[0]; @@ -522,7 +542,10 @@ impl Runtime { let run_result = branch.code_state.run(&mut run_context, &self.protocol); match run_result { - RunResult::ComponentTerminated => return Scheduling::NotNow, + RunResult::ComponentTerminated => { + branch.branch_state = BranchState::Finished; + return Scheduling::NotNow + }, RunResult::ComponentAtSyncStart => { // Prepare for sync execution Self::prepare_branch_for_sync(desc); @@ -927,7 +950,6 @@ impl Runtime { // And reset the connector's state for further execution connector.in_sync = false; connector.spec_branches_active.clear(); - connector.spec_branches_active.push_back(0); connector.spec_branches_pending_receive.clear(); connector.spec_branches_done.clear(); connector.last_checked_done = 0;