Files @ 0de39654770f
Branch filter:

Location: CSY/reowolf/src/runtime2/component/consensus.rs - annotation

0de39654770f 2.9 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
WIP: Closing ports
use crate::protocol::eval::*;
use crate::runtime2::communication::*;

use super::component_pdl::*;

pub struct PortAnnotation {
    id: PortId,
    mapping: Option<u32>,
}

impl PortAnnotation {
    fn new(id: PortId) -> Self {
        return Self{ id, mapping: None }
    }
}

/// Tracking consensus state
pub struct Consensus {
    round: u32,
    mapping_counter: u32,
    ports: Vec<PortAnnotation>,
}

impl Consensus {
    pub(crate) fn new() -> Self {
        return Self{
            round: 0,
            mapping_counter: 0,
            ports: Vec::new(),
        }
    }

    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
        // Make sure we locally still have all of the same ports
        self.transfer_ports(comp_ctx);
        self.mapping_counter = 0;
    }

    pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage {
        debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id));
        let data_header = self.create_data_header(port_info);
        let sync_header = self.create_sync_header();

        return DataMessage{ data_header, sync_header, content };
    }

    pub(crate) fn notify_sync_end(&mut self) {
        self.round = self.round.wrapping_add(1);
        todo!("implement sync end")
    }

    pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) {
        let mut needs_setting_ports = false;
        if comp_ctx.ports.len() != self.ports.len() {
            ports_same = true;
        } else {
            for idx in 0..comp_ctx.ports.len() {
                let comp_port_id = comp_ctx.ports[idx].self_id;
                let cons_port_id = self.ports[idx].id;
                if comp_port_id != cons_port_id {
                    needs_setting_ports = true;
                    break;
                }
            }
        }

        if needs_setting_ports {
            self.ports.clear();
            self.ports.reserve(comp_ctx.ports.len());
            for port in &comp_ctx.ports {
                self.ports.push(PortAnnotation::new(port.self_id))
            }
        }
    }

    fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader {
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
        for port in &self.ports {
            if let Some(mapping) = port.mapping {
                expected_mapping.push((port.id, mapping));
            }
        }

        debug_assert_eq!(port_info.kind, PortKind::Putter);
        return MessageDataHeader{
            expected_mapping,
            new_mapping: self.take_mapping(),
            source_port: port_info.self_id,
            target_port: port_info.peer_id,
        };
    }

    fn create_sync_header(&self) -> MessageSyncHeader {
        return MessageSyncHeader{
            sync_round: self.round,
        };
    }

    fn take_mapping(&mut self) -> u32 {
        let mapping = self.mapping_counter;
        self.mapping_counter += 1;
        return mapping;
    }
}