Changeset - 83d0cd8db5fb
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-25 20:24:24
contact@maxhenger.nl
WIP on more bugfixing
4 files changed with 88 insertions and 46 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -218,7 +218,7 @@ impl ConnectorPDL {
 
    pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if self.consensus.handle_new_data_message(ticket, ctx) {
 
        if !self.consensus.handle_new_data_message(ticket, ctx) {
 
            // Message should not be handled now
 
            return;
 
        }
 
@@ -248,6 +248,7 @@ impl ConnectorPDL {
 
    }
 

	
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        println!("DEBUG: Actually really handling {:?}", message);
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
src/runtime2/consensus.rs
Show inline comments
 
@@ -88,7 +88,7 @@ pub(crate) enum Consistency {
 
    Inconsistent,
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum MessageOrigin {
 
    Past,
 
    Present,
 
@@ -307,6 +307,7 @@ impl Consensus {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        println!("DEBUG: ***** Incrementing sync round stuff");
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
@@ -593,6 +594,7 @@ impl Consensus {
 
    fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        let origin = self.handle_peer(&sync_header);
 
        println!(" ********************** GOT {:?}", origin);
 
        if origin != MessageOrigin::Present {
 
            // We do not have to handle it now
 
            return origin;
src/runtime2/scheduler.rs
Show inline comments
 
@@ -65,7 +65,6 @@ impl Scheduler {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    self.handle_inbox_while_shutting_down(scheduled);
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
@@ -118,6 +117,13 @@ impl Scheduler {
 
                        }
 
                    }
 

	
 
                    // Any messages still in the public inbox should be handled
 
                    scheduled.ctx.inbox.clear_read_messages();
 
                    while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
                        let message = scheduled.ctx.take_message_using_ticket(ticket);
 
                        self.handle_message_while_shutting_down(message, scheduled);
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.destroy_component(connector_key);
 
@@ -149,9 +155,7 @@ impl Scheduler {
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                Message::Control(message) => {
 
            if let Message::Control(message) = &message {
 
                match message.content {
 
                    ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                        // Need to change port target
 
@@ -195,27 +199,21 @@ impl Scheduler {
 
                    },
 
                    ControlContent::Ping => {},
 
                }
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
            } else {
 
                // Not a control message
 
                if scheduled.shutting_down {
 
                    // Since we're shutting down, we just want to respond with a
 
                    // message saying the message did not arrive.
 
                    debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared
 
                    self.handle_message_while_shutting_down(message, scheduled);
 
                } else {
 
                    scheduled.ctx.inbox.insert_new(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Handles inbox messages while shutting down. This intends to handle the
 
    /// case where a component cleanly exited outside of a sync region, but a
 
    /// peer, before receiving the `CloseChannel` message, sent a message inside
 
    /// a sync region. This peer should be notified that its message is not
 
    /// received by a component in a sync region.
 
    fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) {
 
        // Note: we're not handling the public inbox, we're dealing with the
 
        // private one!
 
        debug_assert!(scheduled.shutting_down);
 

	
 
        while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
            let message = scheduled.ctx.read_message_using_ticket(ticket);
 
    fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) {
 
        let target_port_and_round_number = match message {
 
            Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
            Message::SyncComp(_) => None,
 
@@ -239,7 +237,6 @@ impl Scheduler {
 
            self.runtime.send_message(port.peer_connector, Message::SyncControl(message));
 
        }
 
    }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
@@ -351,7 +348,7 @@ impl Scheduler {
 
            } else {
 
                // Just left sync region. So prepare inbox for the next sync
 
                // round
 
                scheduled.ctx.inbox.prepare_for_next_round();
 
                scheduled.ctx.inbox.clear_read_messages();
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
@@ -617,6 +614,8 @@ impl<'a> Iterator for MessagesIter<'a> {
 
/// for later reading. Later reading in this case implies that they are put back
 
/// for reading in the next sync round.
 
struct Inbox {
 
    temp_m: Vec<Message>,
 
    temp_d: Vec<Message>,
 
    messages: RawVec<Message>,
 
    next_delay_idx: u32,
 
    start_read_idx: u32,
 
@@ -633,6 +632,7 @@ pub(crate) struct MessageTicket {
 
impl Inbox {
 
    fn new() -> Self {
 
        return Inbox {
 
            temp_m: Vec::new(), temp_d: Vec::new(),
 
            messages: RawVec::new(),
 
            next_delay_idx: 0,
 
            start_read_idx: 0,
 
@@ -643,10 +643,17 @@ impl Inbox {
 

	
 
    fn insert_new(&mut self, message: Message) {
 
        assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size
 
        self.temp_m.push(message);
 
        return;
 
        self.messages.push(message);
 
    }
 

	
 
    fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if self.next_read_idx as usize >= self.temp_m.len() { return None };
 
        let idx = self.next_read_idx;
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{ index: idx, generation: self.generation });
 
        let cur_read_idx = self.next_read_idx as usize;
 
        if cur_read_idx >= self.messages.len() {
 
            return None;
 
@@ -662,11 +669,15 @@ impl Inbox {
 

	
 
    fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        return &self.temp_m[ticket.index as usize];
 
        return unsafe{ &*self.messages.get(ticket.index as usize) }
 
    }
 

	
 
    fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        debug_assert!(ticket.index < self.next_read_idx);
 
        self.next_read_idx -= 1;
 
        return self.temp_m.remove(ticket.index as usize);
 
        unsafe {
 
            let take_idx = ticket.index as usize;
 
            let val = std::ptr::read(self.messages.get(take_idx));
 
@@ -689,6 +700,8 @@ impl Inbox {
 
    fn put_back_message(&mut self, message: Message) {
 
        // We have space in front of the array because we've taken out a message
 
        // before.
 
        self.temp_d.push(message);
 
        return;
 
        debug_assert!(self.next_delay_idx < self.start_read_idx);
 
        unsafe {
 
            // Write to front of the array
 
@@ -698,6 +711,12 @@ impl Inbox {
 
    }
 

	
 
    fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return MessagesIter{
 
            messages: self.temp_m.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
        return MessagesIter{
 
            messages: self.messages.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
@@ -706,7 +725,13 @@ impl Inbox {
 
        };
 
    }
 

	
 
    fn prepare_for_next_round(&mut self) {
 
    fn clear_read_messages(&mut self) {
 
        self.temp_m.drain(0..self.next_read_idx as usize);
 
        for (idx, v) in self.temp_d.drain(..).enumerate() {
 
            self.temp_m.insert(idx, v);
 
        }
 
        self.next_read_idx = 0;
 
        return;
 
        // Deallocate everything that was read
 
        self.destroy_range(self.start_read_idx, self.next_read_idx);
 
        self.generation += 1;
 
@@ -728,8 +753,21 @@ impl Inbox {
 
    }
 

	
 
    fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) {
 
        // Convoluted assert to make sure we're in non-sync mode, as that is
 
        // when this is called, and that makes our lives easier
 
        debug_assert!(self.temp_d.is_empty());
 
        let mut idx = 0;
 
        while idx < self.temp_m.len() {
 
            let msg = &self.temp_m[idx];
 
            if let Some(target) = msg.target_port() {
 
                if target == port {
 
                    new_inbox.temp_m.push(self.temp_m.remove(idx));
 
                    continue;
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
        return;
 

	
 
        let mut idx = 0;
 
        while idx < self.messages.len() {
 
            let message = unsafe{ &*self.messages.get(idx) };
 
@@ -738,18 +776,19 @@ impl Inbox {
 
                    // Transfer port
 
                    unsafe {
 
                        let message = std::ptr::read(message as *const _);
 
                        let remaining = self.messages.len() - idx;
 
                        if remaining > 1 {
 
                            self.messages.move_range(idx + 1, idx, remaining - 1);
 
                        let remaining = self.messages.len() - idx - 1; // idx < len, due to loop condition
 
                        if remaining > 0 {
 
                            self.messages.move_range(idx + 1, idx, remaining);
 
                        }
 
                        self.messages.len -= 1;
 
                        new_inbox.insert_new(message);
 
                    }
 
                } else {
 
                    // Do not transfer port
 
                    idx += 1;
 

	
 
                    continue; // do not increment index
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
    }
 

	
 
@@ -763,17 +802,17 @@ impl Inbox {
 
        }
 
    }
 
}
 

	
 
impl Drop for Inbox {
 
    fn drop(&mut self) {
 
        // Whether in sync or not in sync. We have two ranges of allocated
 
        // messages:
 
        // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync)
 
        // - readable messages: from `start_read_idx` to `messages.len`
 
        self.destroy_range(0, self.next_delay_idx);
 
        self.destroy_range(self.start_read_idx, self.messages.len as u32);
 
    }
 
}
 
//
 
// impl Drop for Inbox {
 
//     fn drop(&mut self) {
 
//         // Whether in sync or not in sync. We have two ranges of allocated
 
//         // messages:
 
//         // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync)
 
//         // - readable messages: from `start_read_idx` to `messages.len`
 
//         self.destroy_range(0, self.next_delay_idx);
 
//         self.destroy_range(self.start_read_idx, self.messages.len as u32);
 
//     }
 
// }
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -15,9 +15,9 @@ use crate::runtime2::native::{ApplicationSyncAction};
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 4;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 3;
 
pub(crate) const NUM_THREADS: u32 = 2;
 
pub(crate) const NUM_INSTANCES: u32 = 2;
 
pub(crate) const NUM_LOOPS: u32 = 15;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
0 comments (0 inline, 0 general)