Changeset - 9ff4792ea5d6
[Not reviewed]
0 2 0
MH - 4 years ago 2021-11-29 11:31:09
contact@maxhenger.nl
Remove message hack
2 files changed with 0 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -496,57 +496,48 @@ impl ConnectorPDL {
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Helper that moves the component's state back into non-sync mode, using
 
    /// the provided solution branch ID as the branch that should be comitted to
 
    /// memory. If this function returns false, then the component is supposed
 
    /// to exit.
 
    fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError);
 

	
 
        // Depending on local state decide what to do
 
        let final_branch_id = match conclusion {
 
            RoundConclusion::Success(branch_id) => Some(branch_id),
 
            RoundConclusion::Failure => None,
 
        };
 

	
 
        // TODO: Hack
 
        ctx.remove_messages(|m| match m {
 
            Message::Data(_) | Message::SyncPort(_) | Message::SyncControl(_) | Message::Control(_) => false,
 
            Message::SyncComp(m) => match m.content {
 
                SyncCompContent::Notification | SyncCompContent::Presence(_) => true,
 
                _ => false,
 
            }
 
        });
 

	
 
        if let Some(solution_branch_id) = final_branch_id {
 
            let mut fake_vec = Vec::new();
 
            self.tree.end_sync(solution_branch_id);
 
            self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 
            debug_assert!(fake_vec.is_empty());
 

	
 
            ctx.notify_sync_end(&[]);
 
            self.last_finished_handled = None;
 
            self.eval_error = None; // in case we came from the SyncError mode
 
            self.mode = Mode::NonSync;
 

	
 
            return ConnectorScheduling::Immediate;
 
        } else {
 
            // No final branch, because we're supposed to exit!
 
            self.last_finished_handled = None;
 
            self.mode = Mode::Error;
 
            if let Some(eval_error) = self.eval_error.take() {
 
                ctx.push_error(eval_error);
 
            }
 

	
 
            return ConnectorScheduling::Exit;
 
        }
 
    }
 

	
src/runtime2/scheduler.rs
Show inline comments
 
@@ -524,61 +524,48 @@ impl ComponentCtx {
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> {
 
        debug_assert!(self.is_in_sync);
 
        if let Some(port_id) = contents.source_port() {
 
            let port_info = self.get_port_by_id(port_id);
 
            let is_valid = match port_info {
 
                Some(port_info) => {
 
                    port_info.state == PortState::Open
 
                },
 
                None => false,
 
            };
 
            if !is_valid {
 
                // We don't own the port
 
                println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index);
 
                return Err(());
 
            }
 
        }
 

	
 
        self.outbox.push_back(contents);
 
        return Ok(());
 
    }
 

	
 
    /// Removes messages in the outbox using a match
 
    pub(crate) fn remove_messages<F: Fn(&Message) -> bool>(&mut self, remove_if_fn: F) {
 
        let mut idx = 0;
 
        while idx < self.outbox.len() {
 
            let should_remove = remove_if_fn(&self.outbox[idx]);
 
            if should_remove {
 
                self.outbox.remove(idx);
 
            } else {
 
                idx += 1;
 
            }
 
        }
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return self.inbox.get_read_data_messages(match_port_id);
 
    }
 

	
 
    pub(crate) fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if !self.is_in_sync { return None; }
 
        return self.inbox.get_next_message_ticket();
0 comments (0 inline, 0 general)