Changeset - 0161dd921e3a
[Not reviewed]
0 5 0
mh - 4 years ago 2021-11-29 09:53:58
contact@maxhenger.nl
WIP on more bugfixing
5 files changed with 68 insertions and 16 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -36,7 +36,7 @@ use crate::protocol::{RunContext, RunResult};
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement};
 
use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group};
 
use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox};
 
use super::inbox::{DataMessage, Message, SyncCompMessage, SyncCompContent, SyncPortMessage, SyncControlMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket};
 
@@ -517,6 +517,15 @@ impl ConnectorPDL {
 
            RoundConclusion::Failure => None,
 
        };
 

	
 
        // TODO: Hack
 
        ctx.remove_messages(|m| match m {
 
            Message::Data(_) | Message::SyncPort(_) | Message::SyncControl(_) | Message::Control(_) => false,
 
            Message::SyncComp(m) => match m.content {
 
                SyncCompContent::Notification | SyncCompContent::Presence(_) => true,
 
                _ => false,
 
            }
 
        });
 

	
 
        if let Some(solution_branch_id) = final_branch_id {
 
            let mut fake_vec = Vec::new();
 
            self.tree.end_sync(solution_branch_id);
 
@@ -536,6 +545,7 @@ impl ConnectorPDL {
 
            if let Some(eval_error) = self.eval_error.take() {
 
                ctx.push_error(eval_error);
 
            }
 

	
 
            return ConnectorScheduling::Exit;
 
        }
 
    }
src/runtime2/inbox.rs
Show inline comments
 
@@ -184,6 +184,16 @@ impl Message {
 
        }
 
    }
 

	
 
    pub(crate) fn source_component(&self) -> Option<ConnectorId> {
 
        match self {
 
            Message::Data(message) => Some(message.sync_header.sending_component_id),
 
            Message::SyncPort(message) => Some(message.sync_header.sending_component_id),
 
            Message::SyncComp(message) => Some(message.sync_header.sending_component_id),
 
            Message::SyncControl(_) => None,
 
            Message::Control(message) => Some(message.sending_component_id)
 
        }
 
    }
 

	
 
    pub(crate) fn as_data(&self) -> &DataMessage {
 
        match self {
 
            Message::Data(v) => v,
src/runtime2/mod.rs
Show inline comments
 
@@ -418,9 +418,9 @@ impl ConnectorStore {
 
    fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic {
 
        unsafe {
 
            let entry = self.entries.get(id.index as usize);
 
            let cur_generation = (**entry).generation.load(Ordering::Acquire);
 
            debug_assert_eq!(cur_generation, id.generation);
 
            debug_assert!(!entry.is_null());
 
            let cur_generation = (**entry).generation.load(Ordering::Acquire);
 
            assert_eq!(cur_generation, id.generation, "accessing {}", id.index);
 
            return &(**entry).connector.public;
 
        }
 
    }
 
@@ -431,6 +431,8 @@ impl ConnectorStore {
 
        unsafe {
 
            let entry = self.entries.get_mut(key.index as usize);
 
            debug_assert!(!entry.is_null());
 
            let cur_generation = (**entry).generation.load(Ordering::Acquire);
 
            assert_eq!(cur_generation, key.generation, "accessing {}", key.index);
 
            return &mut (**entry).connector;
 
        }
 
    }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -112,7 +112,7 @@ impl Scheduler {
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 
@@ -151,6 +151,20 @@ impl Scheduler {
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 

	
 
                match scheduled.ctx.get_port_by_id(target_port) {
 
                    Some(port_info) => {
 
                        if port_info.state == PortState::Closed {
 
                            // We're no longer supposed to receive messages
 
                            // (rerouted message arrived much later!)
 
                            continue
 
                        }
 
                    },
 
                    None => {
 
                        // Apparently we no longer have a handle to the port
 
                        continue;
 
                    }
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
@@ -174,7 +188,7 @@ impl Scheduler {
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::CloseChannel(port_id) => {
 
@@ -188,12 +202,12 @@ impl Scheduler {
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::Ack => {
 
                        if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) {
 
                            self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message));
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [ack ack] \n --- {:?}", target_component, new_control_message));
 
                            self.runtime.send_message(target_component, new_control_message);
 
                        };
 
                    },
 
@@ -228,15 +242,17 @@ impl Scheduler {
 
            // (also: since we're shutting down, we're not in sync mode and
 
            // the context contains the definitive set of owned ports)
 
            let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
            if port.state == PortState::Open {
 
                let message = SyncControlMessage {
 
                    in_response_to_sync_round: sync_round,
 
                    target_component_id: port.peer_connector,
 
                    content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
                };
 
            self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message));
 
                self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message));
 
                self.runtime.send_message(port.peer_connector, Message::SyncControl(message));
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
@@ -247,8 +263,6 @@ impl Scheduler {
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message));
 

	
 
            let target_component_id = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
@@ -282,6 +296,7 @@ impl Scheduler {
 
                Message::Control(_) => unreachable!("component sending 'Control' messages directly"),
 
            };
 

	
 
            self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox] \n --- {:#?}", target_component_id, message));
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
@@ -316,7 +331,7 @@ impl Scheduler {
 
                                &mut new_connector.router
 
                            );
 

	
 
                            self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [newcon]\n --- {:?}", port.peer_connector, reroute_message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(reroute_message));
 
                        }
 
                    }
 
@@ -521,6 +536,19 @@ impl ComponentCtx {
 
        return Ok(());
 
    }
 

	
 
    /// Removes messages in the outbox using a match
 
    pub(crate) fn remove_messages<F: Fn(&Message) -> bool>(&mut self, remove_if_fn: F) {
 
        let mut idx = 0;
 
        while idx < self.outbox.len() {
 
            let should_remove = remove_if_fn(&self.outbox[idx]);
 
            if should_remove {
 
                self.outbox.remove(idx);
 
            } else {
 
                idx += 1;
 
            }
 
        }
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
@@ -620,6 +648,7 @@ struct Inbox {
 
    next_delay_idx: u32,
 
    start_read_idx: u32,
 
    next_read_idx: u32,
 
    last_read_idx: u32,
 
    generation: u32,
 
}
 

	
 
@@ -637,6 +666,7 @@ impl Inbox {
 
            next_delay_idx: 0,
 
            start_read_idx: 0,
 
            next_read_idx: 0,
 
            last_read_idx: 0,
 
            generation: 0,
 
        }
 
    }
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -11,12 +11,12 @@ use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
// pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 
// pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 250;  // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 2;
 
pub(crate) const NUM_INSTANCES: u32 = 2;
 
pub(crate) const NUM_THREADS: u32 = 6;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 15;
 

	
 

	
0 comments (0 inline, 0 general)