Changeset - 9ff4792ea5d6
[Not reviewed]
0 2 0
MH - 4 years ago 2021-11-29 11:31:09
contact@maxhenger.nl
Remove message hack
2 files changed with 0 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -472,95 +472,86 @@ impl ConnectorPDL {
 
            EvalContinuation::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Note: we're relinquishing ownership of ports. But because
 
                // we are in non-sync mode the scheduler will handle and check
 
                // port ownership transfer.
 
                debug_assert!(comp_ctx.workspace_ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);
 

	
 
                let new_prompt = Prompt::new(
 
                    &sched_ctx.runtime.protocol_description.types,
 
                    &sched_ctx.runtime.protocol_description.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                let new_component = ConnectorPDL::new(new_prompt);
 
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
 
                comp_ctx.workspace_ports.clear();
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            EvalContinuation::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared = PreparedStatement::CreatedChannel((
 
                    Value::Output(PortId::new(putter.self_id.index)),
 
                    Value::Input(PortId::new(getter.self_id.index)),
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Helper that moves the component's state back into non-sync mode, using
 
    /// the provided solution branch ID as the branch that should be comitted to
 
    /// memory. If this function returns false, then the component is supposed
 
    /// to exit.
 
    fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError);
 

	
 
        // Depending on local state decide what to do
 
        let final_branch_id = match conclusion {
 
            RoundConclusion::Success(branch_id) => Some(branch_id),
 
            RoundConclusion::Failure => None,
 
        };
 

	
 
        // TODO: Hack
 
        ctx.remove_messages(|m| match m {
 
            Message::Data(_) | Message::SyncPort(_) | Message::SyncControl(_) | Message::Control(_) => false,
 
            Message::SyncComp(m) => match m.content {
 
                SyncCompContent::Notification | SyncCompContent::Presence(_) => true,
 
                _ => false,
 
            }
 
        });
 

	
 
        if let Some(solution_branch_id) = final_branch_id {
 
            let mut fake_vec = Vec::new();
 
            self.tree.end_sync(solution_branch_id);
 
            self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 
            debug_assert!(fake_vec.is_empty());
 

	
 
            ctx.notify_sync_end(&[]);
 
            self.last_finished_handled = None;
 
            self.eval_error = None; // in case we came from the SyncError mode
 
            self.mode = Mode::NonSync;
 

	
 
            return ConnectorScheduling::Immediate;
 
        } else {
 
            // No final branch, because we're supposed to exit!
 
            self.last_finished_handled = None;
 
            self.mode = Mode::Error;
 
            if let Some(eval_error) = self.eval_error.take() {
 
                ctx.push_error(eval_error);
 
            }
 

	
 
            return ConnectorScheduling::Exit;
 
        }
 
    }
 

	
 
    /// Runs the prompt repeatedly until some kind of execution-blocking
 
    /// condition appears.
 
    #[inline]
 
    fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result<EvalContinuation, EvalError> {
 
        loop {
 
            let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            if let Ok(EvalContinuation::Stepping) = result {
 
                continue;
 
            }
 

	
 
            return result;
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -500,109 +500,96 @@ impl ComponentCtx {
 

	
 
    pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.channel_id == id);
 
    }
 

	
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in the
 
    /// context by exiting your code-executing loop, and to continue executing
 
    /// code the next time the scheduler picks up the component.
 
    pub(crate) fn notify_sync_start(&mut self) {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> {
 
        debug_assert!(self.is_in_sync);
 
        if let Some(port_id) = contents.source_port() {
 
            let port_info = self.get_port_by_id(port_id);
 
            let is_valid = match port_info {
 
                Some(port_info) => {
 
                    port_info.state == PortState::Open
 
                },
 
                None => false,
 
            };
 
            if !is_valid {
 
                // We don't own the port
 
                println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index);
 
                return Err(());
 
            }
 
        }
 

	
 
        self.outbox.push_back(contents);
 
        return Ok(());
 
    }
 

	
 
    /// Removes messages in the outbox using a match
 
    pub(crate) fn remove_messages<F: Fn(&Message) -> bool>(&mut self, remove_if_fn: F) {
 
        let mut idx = 0;
 
        while idx < self.outbox.len() {
 
            let should_remove = remove_if_fn(&self.outbox[idx]);
 
            if should_remove {
 
                self.outbox.remove(idx);
 
            } else {
 
                idx += 1;
 
            }
 
        }
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return self.inbox.get_read_data_messages(match_port_id);
 
    }
 

	
 
    pub(crate) fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if !self.is_in_sync { return None; }
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option<MessageTicket> {
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        return self.inbox.read_message_using_ticket(ticket);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        return self.inbox.take_message_using_ticket(ticket)
 
    }
 

	
 
    /// Puts back a message back into the inbox. The reason being that the
 
    /// message is actually part of the next sync round. This will
 
    pub(crate) fn put_back_message(&mut self, message: Message) {
 
        self.inbox.put_back_message(message);
 
    }
 
}
 

	
0 comments (0 inline, 0 general)