diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index b772b87602a537819130076dfe9751d213c33a2f..12899dc5c6bb601ef88d144ca083df520971ada3 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -3,6 +3,7 @@ use std::time::Duration; use std::thread; use crate::ProtocolDescription; +use super::inbox::InboxMessage; use super::connector::{Connector, ConnectorScheduling, RunDeltaState}; use super::global_store::GlobalStore; @@ -23,13 +24,13 @@ impl Scheduler { // Setup global storage and workspaces that are reused for every // connector that we run // TODO: @Memory, scheme for reducing allocations if excessive. - let mut delta_state = RunDeltaState::new() + let mut delta_state = RunDeltaState::new(); loop { // TODO: Check if we're supposed to exit // Retrieve a unit of work - let connector_key = self.global.pop_key(); + let connector_key = self.global.connector_queue.pop_front(); if connector_key.is_none() { // TODO: @Performance, needs condition variable for waking up thread::sleep(Duration::new(1, 0)); @@ -38,13 +39,15 @@ impl Scheduler { // We have something to do let connector_key = connector_key.unwrap(); - let connector = self.global.get_connector(&connector_key); + let connector = self.global.connectors.get_mut(&connector_key); let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { let new_schedule; + // TODO: Check inbox for new message + if connector.is_in_sync_mode() { // In synchronous mode, so we can expect messages being sent, // but we never expect the creation of connectors @@ -52,8 +55,27 @@ impl Scheduler { debug_assert!(delta_state.new_connectors.is_empty()); if !delta_state.outbox.is_empty() { + // There are message to send for message in delta_state.outbox.drain(..) { + let (inbox_message, target_connector_id) = { + // Note: retrieving a port incurs a read lock + let sending_port = self.global.ports.get(&connector_key, message.sending_port); + ( + InboxMessage { + sending_port: sending_port.self_id, + receiving_port: sending_port.peer_id, + sender_prev_branch_id: message.sender_prev_branch_id, + sender_cur_branch_id: message.sender_cur_branch_id, + message: message.message, + }, + sending_port.peer_connector, + ) + }; + + let target_connector = self.global.connectors.get_shared(target_connector_id); + target_connector.inbox.insert_message(inbox_message); + // TODO: Check silent state. Queue connector if it was silent } } } else { @@ -65,7 +87,13 @@ impl Scheduler { if !delta_state.new_connectors.is_empty() { // Push all connectors into the global state and queue them // for execution - + for connector in delta_state.new_connectors.drain(..) { + // Create connector, modify all of the ports that + // it now owns, then queue it for execution + let connector_key = self.global.connectors.create(connector); + + self.global.connector_queue.push_back(connector_key); + } } }