Changeset - a5594f90afa6
[Not reviewed]
0 3 0
mh - 3 years ago 2022-03-23 14:29:24
contact@maxhenger.nl
Fix bug in select statement
3 files changed with 25 insertions and 2 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_ip.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use crate::runtime2::*;
 
use super::*;
 

	
 
/// TODO: Temporary component to figure out what to do with custom components.
 
///     This component sends random numbers between two u32 limits
 
struct ComponentRandSend {
 
}
 

	
 
impl Component for ComponentRandSend {
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        unreachable!("should not adopt messages");
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        todo!()
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        todo!()
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -192,196 +192,196 @@ impl SelectState {
 
        }
 
        self.next_case = new_case_index + 1;
 
    }
 

	
 
    /// Checks if a decision can be reached
 
    fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        self.candidates_workspace.clear();
 
        if self.cases.is_empty() {
 
            // If there are no cases then we can immediately reach a "bogus
 
            // decision".
 
            return SelectDecision::Case(0);
 
        }
 

	
 
        // Need to check for valid case
 
        'case_loop: for (case_index, case) in self.cases.iter().enumerate() {
 
            for port_handle in case.involved_ports.iter().copied() {
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if inbox[port_index].is_none() {
 
                    // Condition not satisfied
 
                    continue 'case_loop;
 
                }
 
            }
 

	
 
            // If here then the case guard is satisfied
 
            self.candidates_workspace.push(case_index);
 
        }
 

	
 
        if self.candidates_workspace.is_empty() {
 
            return SelectDecision::None;
 
        } else {
 
            let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len();
 
            return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32);
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    select: SelectState,
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl Component for CompPDL {
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.mode {
 
            Mode::NonSync | Mode::Sync | Mode::BlockedSelect => {
 
            Mode::NonSync | Mode::Sync => {
 
                // continue and run PDL code
 
            },
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => {
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::BlockedSelect => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            Mode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            Mode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.mode = Mode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            Mode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        self.handle_received_data_message(sched_ctx, comp_ctx, port_handle);
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state.is_blocked() {
 
                    self.mode = Mode::BlockedPut;
 
                    self.mode_port = port_id;
 
                    self.mode_value = value;
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return Ok(CompScheduling::Immediate);
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.select.handle_select_start(num_cases);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.mode = Mode::Sync;
src/runtime2/component/mod.rs
Show inline comments
 
mod component_pdl;
 
mod component_context;
 
mod control_layer;
 
mod consensus;
 
mod component;
 
mod component_ip;
 

	
 
pub(crate) use component::{Component, CompScheduling};
 
pub(crate) use component_pdl::{CompPDL};
 
pub(crate) use component_context::CompCtx;
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
 
use super::runtime::*;
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)