Changeset - 89dcc879a83a
[Not reviewed]
0 3 0
MH - 3 years ago 2022-03-04 12:39:39
contact@maxhenger.nl
WIP: Fixing fundamental issue in consensus
3 files changed with 42 insertions and 6 deletions:
0 comments (0 inline, 0 general)
docs/runtime/sync.md
Show inline comments
 
@@ -117,13 +117,13 @@ We'll only shortly discuss the mechanisms that are present in the synchronizatio
 

	
 
And so at the end of the synchronous round a component will somehow broadcast its port mapping. Note from the discussion above that a transmitting port's annotation is only associated with that transmitting port, since a transmitting port can only truly ever know its own component/port ID. While the receiving port's annotation knows about the peer's component/port ID as well. And so a component can broadcast `(component ID, port ID, mapping)` for each of its transmitting ports, while it can broadcast `(own component ID, own port ID, peer component ID, peer port ID, mapping)` for each receiving port. Then a recipient of these mappings can match them up and make sure that the mappings agree.
 

	
 
Note that this broadcasting of synchronous messages is essentially a component-to-component operation. However these messages must still be sent over ports anyway (and any port that was used to transmit messages to a particular receiving component will do). There are two reasons:
 

	
 
1. The sync message may need to be rerouted (e.g. a sender quickly fires both a data message and a subsequent sync message while the receiving port is being transferred to a new component), but needs to arrive at the same target as the data message. This is essentially restating that a transmitter never knows about the component ID of the recipient.
 
2. The sync message must not be taken into account by the recipient if it has not accepted any messages from the sender yet. Ofcourse this can be achieved in various ways but a simple way to achieve this is to send the sync message over ports.
 

	
 
## Annotating Data Messages
 

	
 
These port mappings are also sent along when sending data messages. We will not go into details but here the mapping makes sure that messages arrive in the right order, and certain kinds of deadlock or inconsistent protocol behaviour may be detected. This port mapping is checked for consistency by the recipient and, when consistent, the target port is updated with its new mapping.
 

	
 
As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering.
 
\ No newline at end of file
 
As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering. Likewise, if the recipient of a port mapping has not yet received anything on its receiving port, then it cannot be sure about the identity of the sender.
 
\ No newline at end of file
src/runtime2/communication.rs
Show inline comments
 
@@ -8,24 +8,29 @@ use super::component::*;
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
pub struct CompPortIds {
 
    pub comp: CompId,
 
    pub port: PortId,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    BlockedDueToPeerChange,
 
    BlockedDueToFullBuffers,
 
    Closed,
 
@@ -47,27 +52,44 @@ pub struct Channel {
 

	
 
// -----------------------------------------------------------------------------
 
// Data messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
pub enum PortAnnotationKind {
 
    Getter(PortAnnotationGetter),
 
    Putter(PortAnnotationPutter),
 
}
 

	
 
pub struct PortAnnotationGetter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
}
 

	
 
pub struct PortAnnotationPutter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, Option<u32>)>,
 
    pub expected_mapping: Vec<(PortAnnotationKind, Option<u32>)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Sync messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct SyncMessage {
 
    pub sync_header: MessageSyncHeader,
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -2,34 +2,36 @@ use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_context::*;
 

	
 
pub struct PortAnnotation {
 
    self_comp_id: CompId,
 
    self_port_id: PortId,
 
    peer_comp_id: CompId, // only valid for getter ports
 
    peer_port_id: PortId, // only valid for getter ports
 
    mapping: Option<u32>,
 
    kind: PortKind,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(comp_id: CompId, port_id: PortId) -> Self {
 
    fn new(comp_id: CompId, port_id: PortId, kind: PortKind) -> Self {
 
        return Self{
 
            self_comp_id: comp_id,
 
            self_port_id: port_id,
 
            peer_comp_id: CompId::new_invalid(),
 
            peer_port_id: PortId::new_invalid(),
 
            mapping: None
 
            mapping: None,
 
            kind,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
enum Mode {
 
    NonSync,
 
    SyncBusy,
 
    SyncAwaitingSolution,
 
    SelectBusy,
 
    SelectWait,
 
}
 
@@ -338,25 +340,25 @@ impl Consensus {
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        if needs_setting_ports {
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.num_ports());
 
            for port in comp_ctx.iter_ports() {
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id))
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind));
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
@@ -584,25 +586,37 @@ impl Consensus {
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // -------------------------------------------------------------------------
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_port_id == port_info.self_id {
 
                port_index = index;
 
                port_index = index; // remember for later updating
 
            }
 

	
 
            let annotation_kind = match port.kind {
 
                PortKind::Putter => PortAnnotationKind::Putter(PortAnnotationPutter{
 
                    self_comp_id: port.self_comp_id,
 
                    self_port_id: port.self_port_id
 
                }),
 
                PortKind::Getter => PortAnnotationKind::Getter(PortAnnotationGetter{
 
                    self_comp_id: port.self_comp_id,
 
                    self_port_id: port.self_port_id,
 
                    peer_comp_id:
 
                })
 
            }
 
            expected_mapping.push((port.self_port_id, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
 
        self.ports[port_index].mapping = Some(new_mapping);
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
            new_mapping,
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_port_id,
0 comments (0 inline, 0 general)