Changeset - 0d46914f7c2e
[Not reviewed]
0 6 0
MH - 4 years ago 2021-11-22 22:20:53
contact@maxhenger.nl
WIP on fixing bug where only partial sync region is available after error
6 files changed with 54 insertions and 32 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -69,97 +69,97 @@ pub(crate) enum ConnectorScheduling {
 
    Later,              // Schedule for running, at some later point in time
 
    NotNow,             // Do not reschedule for running
 
    Exit,               // Connector has exited
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    mode: Mode,
 
    eval_error: Option<EvalError>,
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
// TODO: Remove remaining fields once 'fires()' is removed from language.
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    prepared: PreparedStatement,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn performed_put(&mut self, _port: PortId) -> bool {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => false,
 
            PreparedStatement::PerformedPut => true,
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken)
 
        };
 
    }
 

	
 
    fn performed_get(&mut self, _port: PortId) -> Option<ValueGroup> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::PerformedGet(value) => Some(value),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken),
 
        };
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!("Remove fires() now");
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::CreatedChannel(ports) => Some(ports),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken),
 
        };
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::ForkedExecution(path) => Some(path),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if let Some(scheduling) = self.handle_new_messages(comp_ctx) {
 
            return scheduling;
 
        }
 

	
 
        match self.mode {
 
            Mode::Sync => {
 
                // Run in sync mode
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 

	
 
                // Handle any new finished branches
 
                let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
                while let Some(branch_id) = iter_id {
 
                    iter_id = self.tree.get_queue_next(branch_id);
 
                    self.last_finished_handled = Some(branch_id);
 

	
 
                    if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                        // Actually found a solution
 
                        return self.enter_non_sync_mode(round_conclusion, comp_ctx);
 
                    }
 

	
 
                    self.last_finished_handled = Some(branch_id);
 
                }
 

	
 
                return scheduling;
 
            },
 
            Mode::NonSync => {
 
                let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::SyncError => {
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::Error => {
 
@@ -341,108 +341,114 @@ impl ConnectorPDL {
 
                    if self.consensus.branch_can_receive(branch_id, &message) {
 
                        // This branch can receive the message, so we do the
 
                        // fork-and-receive dance
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
                    }
 
                }
 

	
 
                if any_message_received {
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
            EvalContinuation::SyncBlockEnd => {
 
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
                if consistency == Consistency::Valid {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            EvalContinuation::NewFork => {
 
                // Like the `NewChannel` result. This means we're setting up
 
                // a branch and putting a marker inside the RunContext for the
 
                // next time we run the PDL code
 
                let left_id = branch_id;
 
                let right_id = self.tree.fork_branch(left_id);
 
                self.consensus.notify_of_new_branch(left_id, right_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, left_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            EvalContinuation::Put(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content,
 
                })) {
 
                    // We don't own the port
 
                    let pd = &sched_ctx.runtime.protocol_description;
 
                    let eval_error = branch.code_state.new_error_at_expr(
 
                        &pd.modules, &pd.heap,
 
                        String::from("attempted to 'put' on port that is no longer owned")
 
                    );
 
                    self.eval_error = Some(eval_error);
 
                    self.mode = Mode::SyncError;
 
                let message = DataMessage{ sync_header, data_header, content };
 
                match comp_ctx.submit_message(Message::Data(message)) {
 
                    Ok(_) => {
 
                        // Message is underway
 
                        branch.prepared = PreparedStatement::PerformedPut;
 
                        self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                        return ConnectorScheduling::Immediate;
 
                    },
 
                    Err(_) => {
 
                        // We don't own the port
 
                        let pd = &sched_ctx.runtime.protocol_description;
 
                        let eval_error = branch.code_state.new_error_at_expr(
 
                            &pd.modules, &pd.heap,
 
                            String::from("attempted to 'put' on port that is no longer owned")
 
                        );
 
                        self.eval_error = Some(eval_error);
 
                        self.mode = Mode::SyncError;
 
                    }
 
                }
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 

	
 
        // If here then the run result did not require a particular action. We
 
        // return whether we have more active branches to run or not.
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());
 

	
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            comp_ctx.push_error(eval_error);
 
            return ConnectorScheduling::Exit
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EvalContinuation::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::Exit;
 
            },
 
            EvalContinuation::SyncBlockStart => {
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                debug_assert!(self.last_finished_handled.is_none());
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
src/runtime2/consensus.rs
Show inline comments
 
@@ -141,186 +141,200 @@ impl Consensus {
 
        self.highest_connector_id = ctx.id;
 

	
 
    }
 

	
 
    /// Notifies the consensus algorithm that a new branch has appeared. Must be
 
    /// called for each forked branch in the execution tree.
 
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
 
        // If called correctly. Then each time we are notified the new branch's
 
        // index is the length in `branch_annotations`.
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            channel_mapping: parent_branch_annotations.channel_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has
 
    /// encountered an unrecoverable error.
 
    pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Check for trivial case, where branch has not yet communicated within
 
        // the consensus algorithm
 
        let branch = &self.branch_annotations[failed_branch_id.index as usize];
 
        if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            return Some(RoundConclusion::Failure);
 
        }
 

	
 
        // We need to go through the hassle of notifying all participants in the
 
        // sync round that we've encountered an error.
 
        // --- notify leader
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx);
 

	
 
        // --- initiate discovery wave (to let leader know about all components)
 
        self.handled_wave = true;
 
        for mapping in &self.branch_annotations[0].channel_mapping {
 
            let channel_id = mapping.channel_id;
 
            let port_info = ctx.get_port_by_channel_id(channel_id).unwrap();
 
            let message = SyncPortMessage{
 
                sync_header: self.create_sync_header(ctx),
 
                source_port: port_info.self_id,
 
                target_port: port_info.peer_id,
 
                content: SyncPortContent::NotificationWave,
 
            };
 
            ctx.submit_message(Message::SyncPort(message));
 

	
 
            // Note: submitting the message might fail. But we're attempting to
 
            // handle the error anyway.
 
            // TODO: Think about this a second time: how do we make sure the
 
            //  entire network will fail if we reach this condition
 
            let _unused = ctx.submit_message(Message::SyncPort(message));
 
        }
 

	
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
    /// the sync block. A final check for consistency will be performed that the
 
    /// caller has to handle. Note that
 
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for mapping in &branch.channel_mapping {
 
            match mapping.expected_firing {
 
                Some(expected) => {
 
                    if expected != mapping.registered_id.is_some() {
 
                        // Inconsistent speculative state and actual state
 
                        debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier
 
                        return Consistency::Inconsistent;
 
                    }
 
                },
 
                None => {},
 
            }
 
        }
 

	
 
        return Consistency::Valid;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has assumed
 
    /// a speculative value for its port mapping.
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let port_desc = ctx.get_port_by_id(port_id).unwrap();
 
        let channel_id = port_desc.channel_id;
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == channel_id {
 
                match mapping.expected_firing {
 
                    None => {
 
                        // Not yet mapped, perform speculative mapping
 
                        mapping.expected_firing = Some(does_fire);
 
                        return Consistency::Valid;
 
                    },
 
                    Some(current) => {
 
                        // Already mapped
 
                        if current == does_fire {
 
                            return Consistency::Valid;
 
                        } else {
 
                            return Consistency::Inconsistent;
 
                        }
 
                    }
 
                }
 
            }
 
        }
 

	
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates a new local solution from a finished branch. If the component
 
    /// is not the leader of the sync region then it will be sent to the
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap();
 
            let self_port_id = port_desc.self_id;
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                ctx.submit_message(Message::SyncPort(SyncPortMessage {
 
                let message = SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                    content: SyncPortContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(self_port_id);
 
                };
 
                match ctx.submit_message(Message::SyncPort(message)) {
 
                    Ok(_) => {
 
                        self.encountered_ports.push(self_port_id);
 
                    },
 
                    Err(_) => {
 
                        // Seems like we were done with this branch, but one of
 
                        // the silent ports (in scope) is actually closed
 
                        return self.notify_of_fatal_branch(branch_id, ctx);
 
                    }
 
                }
 
            }
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
@@ -424,376 +438,378 @@ impl Consensus {
 
                return Some(RoundConclusion::Failure);
 
            },
 
            SyncCompContent::Notification => {
 
                // We were just interested in the sync header we handled above
 
                return None;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return;
 
        }
 

	
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(ctx.get_port_by_id(message.target_port).is_some());
 
        match message.content {
 
            SyncPortContent::SilentPortNotification => {
 
                // The point here is to let us become part of the sync round and
 
                // take note of the leader in case all of our ports are silent.
 
                self.encountered_ports.push(message.target_port);
 
            }
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
                // sure we propagate the wave
 
                if self.handled_wave {
 
                    return;
 
                }
 

	
 
                self.handled_wave = true;
 

	
 
                // Propagate wave to all peers except the one that has sent us
 
                // the wave.
 
                for mapping in &self.branch_annotations[0].channel_mapping {
 
                    let channel_id = mapping.channel_id;
 
                    let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap();
 
                    if port_desc.self_id == message.target_port {
 
                        // Wave came from this port, no need to send one back
 
                        continue;
 
                    }
 

	
 
                    let message = SyncPortMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        source_port: port_desc.self_id,
 
                        target_port: port_desc.peer_id,
 
                        content: SyncPortContent::NotificationWave,
 
                    };
 
                    ctx.submit_message(Message::SyncPort(message)).unwrap();
 
                    // As with the other SyncPort where we throw away the
 
                    // result: we're dealing with an error here anyway
 
                    let _unused = ctx.submit_message(Message::SyncPort(message));
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if message.in_response_to_sync_round < self.sync_round {
 
            // Old message
 
            return None
 
        }
 

	
 
        match message.content {
 
            SyncControlContent::ChannelIsClosed(_) => {
 
                return Some(RoundConclusion::Failure);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) {
 
        debug_assert!(self.branch_can_receive(branch_id, message));
 

	
 
        let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap();
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == target_port.channel_id {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(message.data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(&message.content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) {
 
            if message.sync_header.sync_round < peer.expected_sync_round {
 
                return false;
 
            }
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.channel_mapping {
 
                if expected.channel_id == current.channel_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        if !self.handle_peer(sync_header) {
 
            // We can drop this package
 
            return false;
 
        }
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for peer in self.peers.iter() {
 
                if peer.id == sync_header.sending_component_id || !peer.encountered_this_round {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncCompMessage {
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: peer.id,
 
                    content: SyncCompContent::Notification,
 
                };
 
                ctx.submit_message(Message::SyncComp(message));
 
                ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_data_to_new_leader(ctx);
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncCompContent::Notification
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        } // else: exactly equal, so do nothing
 

	
 
        return true;
 
    }
 

	
 
    /// Handles a (potentially new) peer. Returns `false` if the provided sync
 
    /// number is different then the expected one.
 
    fn handle_peer(&mut self, sync_header: &SyncHeader) -> bool {
 
        let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id);
 
        match position {
 
            Some(index) => {
 
                let entry = &mut self.peers[index];
 
                entry.encountered_this_round = true;
 
                // TODO: Proper handling of potential overflow
 
                if sync_header.sync_round >= entry.expected_sync_round {
 
                    entry.expected_sync_round = sync_header.sync_round;
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            None => {
 
                self.peers.push(Peer{
 
                    id: sync_header.sending_component_id,
 
                    encountered_this_round: true,
 
                    expected_sync_round: sync_header.sync_round,
 
                });
 
                return true;
 
            }
 
        }
 
    }
 

	
 
    /// Sends a message towards the leader, if already the leader then the
 
    /// message will be handled immediately.
 
    fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            match content {
 
                SyncCompContent::LocalFailure => {
 
                    if self.solution_combiner.mark_failure_and_check_for_global_failure() {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::LocalSolution(local_solution) => {
 
                    if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) {
 
                        return self.handle_global_solution_as_leader(global_solution, ctx);
 
                    }
 
                },
 
                SyncCompContent::PartialSolution(partial_solution) => {
 
                    if let Some(conclusion) = self.solution_combiner.combine(partial_solution) {
 
                        match conclusion {
 
                            LeaderConclusion::Solution(global_solution) => {
 
                                return self.handle_global_solution_as_leader(global_solution, ctx);
 
                            },
 
                            LeaderConclusion::Failure => {
 
                                return self.handle_global_failure_as_leader(ctx);
 
                            }
 
                        }
 
                    }
 
                },
 
                SyncCompContent::Presence(component_id, presence) => {
 
                    if self.solution_combiner.add_presence_and_check_for_global_failure(component_id, &presence) {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::AckFailure => {
 
                    debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion);
 
                    debug_assert!(self.ack_remaining > 0);
 
                    self.ack_remaining -= 1;
 
                    if self.ack_remaining == 0 {
 
                        return Some(RoundConclusion::Failure);
 
                    }
 
                }
 
                SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) |
 
                SyncCompContent::GlobalFailure => {
 
                    unreachable!("unexpected message content for leader");
 
                },
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content,
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // Handle the global solution
 
        let mut my_final_branch_id = BranchId::new_invalid();
 
        for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
            if connector_id == ctx.id {
 
                // This is our solution branch
 
                my_final_branch_id = branch_id;
 
                continue;
 
            }
 

	
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: connector_id,
 
                content: SyncCompContent::GlobalSolution(global_solution.clone()),
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 

	
 
        debug_assert!(my_final_branch_id.is_valid());
 
        self.conclusion = Some(RoundConclusion::Success(my_final_branch_id));
 
        return Some(RoundConclusion::Success(my_final_branch_id));
 
    }
 

	
 
    fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure());
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // TODO: Performance
 
        let mut encountered = VecSet::new();
 
        for presence in &self.solution_combiner.presence {
 
            if presence.added_by != ctx.id {
 
                // Did not add it ourselves
 
                if encountered.push(presence.added_by) {
 
                    // Not yet sent a message
 
                    let message = SyncCompMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: presence.added_by,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
 
                    ctx.submit_message(Message::SyncComp(message));
 
                    ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
                }
 
            }
 
        }
 

	
 
        self.conclusion = Some(RoundConclusion::Failure);
 
        if encountered.is_empty() {
 
            // We don't have to wait on Acks
 
            return Some(RoundConclusion::Failure);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
            sync_round: self.sync_round,
 
        }
 
    }
 

	
 
    fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        if let Some(partial_solution) = self.solution_combiner.drain() {
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncCompContent::PartialSolution(partial_solution),
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
// TODO: Remove all debug derives
 

	
 
#[derive(Debug, Clone)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ChannelPresence {
 
    added_by: ConnectorId,
 
    channel: ChannelId,
 
    both_sides_present: bool,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
#[derive(Debug)]
src/runtime2/mod.rs
Show inline comments
 
@@ -386,93 +386,93 @@ impl ConnectorStore {
 
        }
 
    }
 

	
 
    /// Retrieves private part of connector - accessible by one thread at a
 
    /// time.
 
    fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        unsafe {
 
            debug_assert!(!self.free.contains(&(key.index as usize)));
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return &mut (**connector);
 
        }
 
    }
 

	
 
    /// Creates a new connector. Caller should ensure ports are set up correctly
 
    /// and the connector is queued for execution if needed.
 
    fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            ctx: ComponentCtx::new_empty(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
        };
 

	
 
        let index;
 
        let key;
 

	
 
        if self.free.is_empty() {
 
            // No free entries, allocate new entry
 
            index = self.connectors.len();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(connector));
 
            self.connectors.push(connector);
 
        } else {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx.id = key.downcast();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
 
                std::ptr::write(*target, connector);
 
            }
 
        }
 

	
 
        // println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        return key;
 
    }
 

	
 
    /// Destroys a connector. Caller should make sure it is not scheduled for
 
    /// execution. Otherwise one experiences "bad stuff" (tm).
 
    fn destroy(&mut self, key: ConnectorKey) {
 
        unsafe {
 
            let target = self.connectors.get_mut(key.index as usize);
 
            std::ptr::drop_in_place(*target);
 
            // Note: but not deallocating!
 
        }
 

	
 
        // println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        self.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        // Everything in the freelist already had its destructor called, so only
 
        // has to be deallocated
 
        for free_idx in self.free.iter().copied() {
 
            unsafe {
 
                let memory = self.connectors.get_mut(free_idx);
 
                let layout = std::alloc::Layout::for_value(&**memory);
 
                std::alloc::dealloc(*memory as *mut u8, layout);
 

	
 
                // mark as null for the remainder
 
                *memory = std::ptr::null_mut();
 
            }
 
        }
 

	
 
        // With the deallocated stuff marked as null, clear the remainder that
 
        // is not null
 
        for idx in 0..self.connectors.len() {
 
            unsafe {
 
                let memory = *self.connectors.get_mut(idx);
 
                if !memory.is_null() {
 
                    let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh
 
                }
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -360,101 +360,101 @@ impl Scheduler {
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn get_message_target_port(message: &Message) -> Option<PortIdLocal> {
 
        match message {
 
            Message::Data(data) => return Some(data.data_header.target_port),
 
            Message::SyncComp(_) => {},
 
            Message::SyncPort(content) => return Some(content.target_port),
 
            Message::SyncControl(_) => return None,
 
            Message::Control(control) => {
 
                match &control.content {
 
                    ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id),
 
                    ControlContent::CloseChannel(port_id) => return Some(*port_id),
 
                    ControlContent::Ping | ControlContent::Ack => {},
 
                }
 
            },
 
        }
 

	
 
        return None
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<Message>,
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 

	
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
impl ComponentCtx {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation;
 
mod data_transmission;
 
mod sync_failure;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
// pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 1;
 
pub(crate) const NUM_INSTANCES: u32 = 5;
 
pub(crate) const NUM_LOOPS: u32 = 5;
 
pub(crate) const NUM_THREADS: u32 = 2;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 
}
 

	
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    pub(crate) fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
        let delta = std::time::Instant::now() - self.started;
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
src/runtime2/tests/sync_failure.rs
Show inline comments
 
@@ -20,56 +20,56 @@ fn test_local_sync_failure() {
 
        u32[] only_allows_index_0 = { 1 };
 
        auto never_gonna_get = only_allows_index_0[1];
 
        while (true) sync {}
 
    }
 
    ";
 

	
 
    // let thing = TestTimer::new("local_sync_failure");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "immediate_failure_outside_sync", ValueGroup::new_stack(Vec::new()))
 
            .expect("create component");
 

	
 
        api.create_connector("", "immediate_failure_inside_sync", ValueGroup::new_stack(Vec::new()))
 
            .expect("create component");
 
    })
 
}
 

	
 
#[test]
 
fn test_shared_sync_failure() {
 
    // Same as above. One of the components should fail, the other should follow
 
    // suit because it cannot complete a sync round. We intentionally have an
 
    // infinite loop in the while condition because we need at least two loops
 
    // for the last error to get picked up.
 
    const CODE: &'static str = "
 
    enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never }
 
    primitive failing_at_location(in<bool> input, out<bool> output, Location loc) {
 
        u32[] failure_array = {};
 
        while (true) {
 
            if (loc == Location::BeforeSync) failure_array[0];
 
            sync {
 
                put(output, true);
 
                if (loc == Location::AfterPut) failure_array[0];
 
                auto received = get(input);
 
                assert(received);
 
                if (loc == Location::AfterGet) failure_array[0];
 
            }
 
            if (loc == Location::AfterSync) failure_array[0];
 
        }
 
    }
 

	
 
    composite constructor(Location loc) {
 
        channel output_a -> input_a;
 
        channel output_b -> input_b;
 
        new failing_at_location(input_a, output_b, Location::Never);
 
        new failing_at_location(input_b, output_a, loc);
 
    }
 
    ";
 

	
 
    run_test_in_runtime(CODE, |api| {
 
        for variant in 0..4 { // all `Location` enum variants, except `Never`.
 
        for variant in 3..4 { // all `Location` enum variants, except `Never`.
 
            // Create the channels
 
            api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)