diff --git a/docs/runtime/sync.md b/docs/runtime/sync.md index 53c0cf191436a9ea30471918728bf2a2654a4b0a..dacbb9acb75486d71e8adf1e95bc7d36f9b7b698 100644 --- a/docs/runtime/sync.md +++ b/docs/runtime/sync.md @@ -126,4 +126,4 @@ Note that this broadcasting of synchronous messages is essentially a component-t These port mappings are also sent along when sending data messages. We will not go into details but here the mapping makes sure that messages arrive in the right order, and certain kinds of deadlock or inconsistent protocol behaviour may be detected. This port mapping is checked for consistency by the recipient and, when consistent, the target port is updated with its new mapping. -As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering. \ No newline at end of file +As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering. Likewise, if the recipient of a port mapping has not yet received anything on its receiving port, then it cannot be sure about the identity of the sender. \ No newline at end of file diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index c615f06d5d5d08a99f8dbae16ff36c34639be553..4a50333075fba025712e1fc6790f61b93c6035de 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -17,6 +17,11 @@ impl PortId { } } +pub struct CompPortIds { + pub comp: CompId, + pub port: PortId, +} + #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PortKind { Putter, @@ -56,9 +61,26 @@ pub struct DataMessage { pub content: ValueGroup, } +pub enum PortAnnotationKind { + Getter(PortAnnotationGetter), + Putter(PortAnnotationPutter), +} + +pub struct PortAnnotationGetter { + pub self_comp_id: CompId, + pub self_port_id: PortId, + pub peer_comp_id: CompId, + pub peer_port_id: PortId, +} + +pub struct PortAnnotationPutter { + pub self_comp_id: CompId, + pub self_port_id: PortId, +} + #[derive(Debug)] pub struct MessageDataHeader { - pub expected_mapping: Vec<(PortId, Option)>, + pub expected_mapping: Vec<(PortAnnotationKind, Option)>, pub new_mapping: u32, pub source_port: PortId, pub target_port: PortId, diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 666ecf3c637103d1c7646b01252373c244637536..e0241691fdf1da8af32096d323bfbe981f1032a7 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -11,16 +11,18 @@ pub struct PortAnnotation { peer_comp_id: CompId, // only valid for getter ports peer_port_id: PortId, // only valid for getter ports mapping: Option, + kind: PortKind, } impl PortAnnotation { - fn new(comp_id: CompId, port_id: PortId) -> Self { + fn new(comp_id: CompId, port_id: PortId, kind: PortKind) -> Self { return Self{ self_comp_id: comp_id, self_port_id: port_id, peer_comp_id: CompId::new_invalid(), peer_port_id: PortId::new_invalid(), - mapping: None + mapping: None, + kind, } } } @@ -347,7 +349,7 @@ impl Consensus { self.ports.clear(); self.ports.reserve(comp_ctx.num_ports()); for port in comp_ctx.iter_ports() { - self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id)) + self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind)); } } } @@ -593,7 +595,19 @@ impl Consensus { let mut port_index = usize::MAX; for (index, port) in self.ports.iter().enumerate() { if port.self_port_id == port_info.self_id { - port_index = index; + port_index = index; // remember for later updating + } + + let annotation_kind = match port.kind { + PortKind::Putter => PortAnnotationKind::Putter(PortAnnotationPutter{ + self_comp_id: port.self_comp_id, + self_port_id: port.self_port_id + }), + PortKind::Getter => PortAnnotationKind::Getter(PortAnnotationGetter{ + self_comp_id: port.self_comp_id, + self_port_id: port.self_port_id, + peer_comp_id: + }) } expected_mapping.push((port.self_port_id, port.mapping)); }