Changeset - 89dcc879a83a
[Not reviewed]
0 3 0
MH - 3 years ago 2022-03-04 12:39:39
contact@maxhenger.nl
WIP: Fixing fundamental issue in consensus
3 files changed with 42 insertions and 6 deletions:
0 comments (0 inline, 0 general)
docs/runtime/sync.md
Show inline comments
 
@@ -126,4 +126,4 @@ Note that this broadcasting of synchronous messages is essentially a component-t
 

	
 
These port mappings are also sent along when sending data messages. We will not go into details but here the mapping makes sure that messages arrive in the right order, and certain kinds of deadlock or inconsistent protocol behaviour may be detected. This port mapping is checked for consistency by the recipient and, when consistent, the target port is updated with its new mapping.
 

	
 
As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering.
 
\ No newline at end of file
 
As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering. Likewise, if the recipient of a port mapping has not yet received anything on its receiving port, then it cannot be sure about the identity of the sender.
 
\ No newline at end of file
src/runtime2/communication.rs
Show inline comments
 
@@ -17,6 +17,11 @@ impl PortId {
 
    }
 
}
 

	
 
pub struct CompPortIds {
 
    pub comp: CompId,
 
    pub port: PortId,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
@@ -56,9 +61,26 @@ pub struct DataMessage {
 
    pub content: ValueGroup,
 
}
 

	
 
pub enum PortAnnotationKind {
 
    Getter(PortAnnotationGetter),
 
    Putter(PortAnnotationPutter),
 
}
 

	
 
pub struct PortAnnotationGetter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
}
 

	
 
pub struct PortAnnotationPutter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, Option<u32>)>,
 
    pub expected_mapping: Vec<(PortAnnotationKind, Option<u32>)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -11,16 +11,18 @@ pub struct PortAnnotation {
 
    peer_comp_id: CompId, // only valid for getter ports
 
    peer_port_id: PortId, // only valid for getter ports
 
    mapping: Option<u32>,
 
    kind: PortKind,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(comp_id: CompId, port_id: PortId) -> Self {
 
    fn new(comp_id: CompId, port_id: PortId, kind: PortKind) -> Self {
 
        return Self{
 
            self_comp_id: comp_id,
 
            self_port_id: port_id,
 
            peer_comp_id: CompId::new_invalid(),
 
            peer_port_id: PortId::new_invalid(),
 
            mapping: None
 
            mapping: None,
 
            kind,
 
        }
 
    }
 
}
 
@@ -347,7 +349,7 @@ impl Consensus {
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.num_ports());
 
            for port in comp_ctx.iter_ports() {
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id))
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind));
 
            }
 
        }
 
    }
 
@@ -593,7 +595,19 @@ impl Consensus {
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_port_id == port_info.self_id {
 
                port_index = index;
 
                port_index = index; // remember for later updating
 
            }
 

	
 
            let annotation_kind = match port.kind {
 
                PortKind::Putter => PortAnnotationKind::Putter(PortAnnotationPutter{
 
                    self_comp_id: port.self_comp_id,
 
                    self_port_id: port.self_port_id
 
                }),
 
                PortKind::Getter => PortAnnotationKind::Getter(PortAnnotationGetter{
 
                    self_comp_id: port.self_comp_id,
 
                    self_port_id: port.self_port_id,
 
                    peer_comp_id:
 
                })
 
            }
 
            expected_mapping.push((port.self_port_id, port.mapping));
 
        }
0 comments (0 inline, 0 general)