diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 7b70bab641d8ae67019d6bfeaf8abd8c1a65dec8..48e2428f48d7dd91f2efd6535ee7e4d07a5f0f63 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -97,7 +97,7 @@ impl ConnectorApplication { tree: FakeTree::new(), consensus: Consensus::new(), last_finished_handled: None, - branch_extra: Vec::new(), + branch_extra: vec![0], }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); @@ -107,9 +107,9 @@ impl ConnectorApplication { fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { while let Some(message) = comp_ctx.read_next_message() { match message { - Message::Data(_) => todo!("data message in API connector"), - Message::Sync(_) => todo!("sync message in API connector"), - Message::Control(_) => todo!("impossible control message"), + Message::Data(message) => self.handle_new_data_message(message, comp_ctx), + Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx), + Message::Control(_) => unreachable!("control message in native API component"), } } } @@ -132,6 +132,8 @@ impl ConnectorApplication { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; @@ -247,37 +249,49 @@ impl ConnectorApplication { // In non-sync mode the application component doesn't really do anything // except performing jobs submitted from the API. This is the only // case where we expect to be woken up. + // Note that we have to communicate to the scheduler when we've received + // ports or created components (hence: given away ports) *before* we + // enter a sync round. let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { comp_ctx.push_port(endpoint_a); comp_ctx.push_port(endpoint_b); + + return ConnectorScheduling::Immediate; } ApplicationJob::NewConnector(connector, initial_ports) => { comp_ctx.push_component(connector, initial_ports); + + return ConnectorScheduling::Later; }, ApplicationJob::SyncRound(mut description) => { // Entering sync mode + comp_ctx.notify_sync_start(); self.sync_desc = description; self.is_in_sync = true; debug_assert!(self.last_finished_handled.is_none()); - debug_assert!(self.branch_extra.is_empty()); + debug_assert!(self.branch_extra.len() == 1); let first_branch_id = self.tree.start_sync(); self.tree.push_into_queue(QueueKind::Runnable, first_branch_id); + debug_assert!(first_branch_id.index == 1); self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id); self.branch_extra.push(0); // set first branch to first instruction return ConnectorScheduling::Immediate; }, ApplicationJob::Shutdown => { debug_assert!(queue.is_empty()); + return ConnectorScheduling::Exit; } } } + // Queue was empty return ConnectorScheduling::NotNow; } @@ -309,7 +323,7 @@ impl ConnectorApplication { // Notifying interface of ending sync self.is_in_sync = false; self.sync_desc.clear(); - self.branch_extra.clear(); + self.branch_extra.truncate(1); self.last_finished_handled = None; let (results, notification) = &*self.sync_done; @@ -479,7 +493,7 @@ impl ApplicationInterface { } /// Wait until the next sync-round is finished - pub fn wait(&self) -> Result, ApplicationEndSyncError> { + pub fn wait(&mut self) -> Result, ApplicationEndSyncError> { if !self.is_in_sync { return Err(ApplicationEndSyncError::NotInSync); } @@ -488,6 +502,7 @@ impl ApplicationInterface { let mut lock = is_done.lock().unwrap(); lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done + self.is_in_sync = false; return Ok(lock.take().unwrap().inbox); }