Changeset - c62d6f0cc48a
[Not reviewed]
1 10 2
mh - 3 years ago 2022-04-07 13:09:23
contact@maxhenger.nl
WIP on implementing (figuring out) tcp component
12 files changed with 295 insertions and 118 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::{CompCtx, CompPDL};
 
use super::component_context::*;
 
use super::component_ip::*;
 
use super::component_random::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
/// Generic representation of a component (as viewed by a scheduler).
 
pub(crate) trait Component {
 
    /// Called if the component is created by another component and the messages
 
    /// are being transferred between the two.
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);
 

	
 
    /// Called if the component receives a new message. The component is
 
    /// responsible for deciding where that messages goes.
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);
 

	
 
    /// Called if the component's routine should be executed. The return value
 
    /// can be used to indicate when the routine should be run again.
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError>;
 
}
 
@@ -245,48 +245,73 @@ pub(crate) fn default_handle_start_exit(
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.log("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
/// Handles a potential synchronous round decision. If there was a decision then
 
/// the `Some(success)` value indicates whether the round succeeded or not.
 
pub(crate) fn default_handle_sync_decision(
 
    exec_state: &mut CompExecState, decision: SyncRoundDecision,
 
    consensus: &mut Consensus
 
) -> Option<bool> {
 
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
    let success = match decision {
 
        SyncRoundDecision::None => return None,
 
        SyncRoundDecision::Solution => true,
 
        SyncRoundDecision::Failure => false,
 
    };
 

	
 
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
    if success {
 
        exec_state.mode = CompMode::NonSync;
 
        consensus.notify_sync_decision(decision);
 
        return Some(true);
 
    } else {
 
        exec_state.mode = CompMode::StartExit;
 
        return Some(false);
 
    }
 
}
 

	
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
@@ -336,34 +361,33 @@ fn default_handle_unblock_put(
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(port_info.state.is_blocked());
 
    port_info.state = PortState::Open;
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    }
 
}
 

	
 

	
 
#[inline]
 
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
src/runtime2/component/component_internet.rs
Show inline comments
 
new file 100644
 
use crate::protocol::eval::{ValueGroup, Value, EvalError};
 
use crate::runtime2::*;
 
use crate::runtime2::component::CompCtx;
 
use crate::runtime2::stdlib::internet::*;
 

	
 
use super::component::{self, *};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
enum SocketState {
 
    Connected(SocketTcpClient),
 
    Error,
 
}
 

	
 
enum SyncState {
 
    Getting,
 
    Putting
 
}
 

	
 
pub struct ComponentTcpClient {
 
    // Properties for the tcp socket
 
    socket_state: SocketState,
 
    pending_recv: Vec<DataMessage>, // on the input port
 
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
 
    pdl_output_port_id: PortId, // output towards PDL, so received over socket
 
    // Generic component state
 
    exec_state: CompExecState,
 
    control: ControlLayer,
 
    consensus: Consensus,
 
}
 

	
 
impl Component for ComponentTcpClient {
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
 
        self.handle_incoming_data_message(message);
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match mesage {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
                // impossible for this component. We always accept the input
 
                // values, and we never perform an explicit select.
 
                unreachable!();
 
            },
 
            CompMode::NonSync => {
 
                // When in non-sync mode
 
                match &mut self.socket_state {
 
                    SocketState::Connected(socket) => {
 
                        if self.pending_tx
 
                    },
 
                    SocketState::Error => {
 
                        self.exec_state.mode = CompMode::StartExit;
 
                        return Ok(CompScheduling::Immediate);
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 

	
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut =>
 
                return Ok(CompScheduling::Sleep),
 
            CompMode::StartExit =>
 
                return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)),
 
            CompMode::BusyExit =>
 
                return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)),
 
            CompMode::Exit =>
 
                return Ok(component::default_handle_exit(&self.exec_state)),
 
        }
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        use std::net::{IpAddr, Ipv4Addr};
 

	
 
        debug_assert_eq!(arguments.values.len(), 4);
 

	
 
        // Parsing arguments
 
        let ip_heap_pos = arguments.values[0].as_array();
 
        let ip_elements = &arguments.regions[ip_heap_pos as usize];
 
        if ip_elements.len() != 4 {
 
            todo!("friendly error reporting: ip contains 4 octects");
 
        }
 
        let ip_address = IpAddr::V4(Ipv4Addr::new(
 
            ip_elements[0].as_uint8(), ip_elements[1].as_uint8(),
 
            ip_elements[2].as_uint8(), ip_elements[3].as_uint8()
 
        ));
 

	
 
        let port = arguments.values[1].as_uint16();
 
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
        let socket = SocketTcpClient::new(ip_address, port);
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket {:?}", socket);
 
        }
 

	
 
        return Self{
 
            socket_state: SocketState::Connected(socket.unwrap()),
 
            pending_tx: Vec::new(),
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 
    // Handles incoming data from the PDL side (hence, going into the socket)
 
    fn handle_incoming_data_message(&mut self, message: DataMessage) {
 
        // Input message is an array of bytes (u8)
 
        self.pending_recv.push(message);
 

	
 
    }
 

	
 
    fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec<u8>) {
 
        debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id);
 
        debug_assert_eq!(message.content.values.len(), 1);
 

	
 
        if let Value::Array(array_pos) = message.content.values[0] {
 
            let region = &message.content.regions[array_pos as usize];
 
            bytes.reserve(region.len());
 
            for value in region {
 
                bytes.push(value.as_uint8());
 
            }
 
        } else {
 
            unreachable!();
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -214,49 +214,49 @@ pub(crate) struct CompPDL {
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl Component for CompPDL {
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle
 
            target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
src/runtime2/component/component_random.rs
Show inline comments
 
file renamed from src/runtime2/component/component_ip.rs to src/runtime2/component/component_random.rs
 
@@ -17,49 +17,49 @@ pub struct ComponentRandomU32 {
 
    random_minimum: u32,
 
    random_maximum: u32,
 
    num_sends: u32,
 
    max_num_sends: u32,
 
    generator: random::ThreadRng,
 
    // Generic state-tracking
 
    exec_state: CompExecState,
 
    did_perform_send: bool, // when in sync mode
 
    control: ControlLayer,
 
    consensus: Consensus,
 
}
 

	
 
impl Component for ComponentRandomU32 {
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        // Impossible since this component does not have any input ports in its
 
        // signature.
 
        unreachable!();
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(_message) => unreachable!(),
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            },
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
                unreachable!();
 
            }
 
            CompMode::NonSync => {
 
                // If in non-sync mode then we check if the arguments make sense
 
                // (at some point in the future, this is just a testing
 
                // component).
 
@@ -99,79 +99,63 @@ impl Component for ComponentRandomU32 {
 

	
 
                        CompScheduling::Sleep
 
                    } else {
 
                        let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group);
 
                        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
                        let peer_info = comp_ctx.get_peer(peer_handle);
 
                        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(message), true);
 

	
 
                        // Remain in sync mode, but after `did_perform_send` was
 
                        // set to true.
 
                        CompScheduling::Immediate
 
                    };
 

	
 
                    // Blocked or not, we set `did_perform_send` to true. If
 
                    // blocked then the moment we become unblocked (and are back
 
                    // at the `Sync` mode) we have sent the message.
 
                    self.did_perform_send = true;
 
                    self.num_sends += 1;
 
                    return Ok(scheduling)
 
                } else {
 
                    // Message was sent, finish this sync round
 
                    sched_ctx.log("Waiting for consensus");
 
                    self.exec_state.mode = CompMode::SyncEnd;
 
                    let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                    self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
                    component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                    return Ok(CompScheduling::Requeue);
 
                }
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
        }
 
    }
 
}
 

	
 
impl ComponentRandomU32 {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 4);
 
        debug_assert!(arguments.regions.is_empty());
 
        let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
 
        let minimum = arguments.values[1].as_uint32();
 
        let maximum = arguments.values[2].as_uint32();
 
        let num_sends = arguments.values[3].as_uint32();
 

	
 
        return Self{
 
            output_port_id: port_id,
 
            random_minimum: minimum,
 
            random_maximum: maximum,
 
            num_sends: 0,
 
            max_num_sends: num_sends,
 
            generator: random::thread_rng(),
 
            exec_state: CompExecState::new(),
 
            did_perform_send: false,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 
    fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        let success = match decision {
 
            SyncRoundDecision::None => return,
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
        if success {
 
            self.exec_state.mode = CompMode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.exec_state.mode = CompMode::StartExit;
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/mod.rs
Show inline comments
 
mod component_pdl;
 
mod component_context;
 
mod control_layer;
 
mod consensus;
 
mod component;
 
mod component_ip;
 
mod component_random;
 
mod component_internet;
 

	
 
pub(crate) use component::{Component, CompScheduling};
 
pub(crate) use component_pdl::{CompPDL};
 
pub(crate) use component_context::CompCtx;
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
 
use super::runtime::*;
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
pub(crate) fn wake_up_if_sleeping(runtime: &RuntimeInner, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/error.rs
Show inline comments
 
@@ -7,56 +7,56 @@ pub struct RtError {
 
    file: &'static str,
 
    line: u32,
 
    message: String,
 
    cause: Option<Box<RtError>>,
 
}
 

	
 
impl RtError {
 
    pub(crate) fn new(file: &'static str, line: u32, message: String) -> RtError {
 
        return RtError {
 
            file, line, message, cause: None,
 
        }
 
    }
 

	
 
    pub(crate) fn wrap(self, file: &'static str, line: u32, message: String) -> RtError {
 
        return RtError {
 
            file, line, message, cause: Some(Box::new(self))
 
        }
 
    }
 
}
 

	
 
impl Display for RtError {
 
    fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult {
 
        let mut error = self;
 
        loop {
 
            write!(f, "[{}:{}] {}", self.file, self.line, self.message).unwrap();
 
            write!(f, "[{}:{}] {}", self.file, self.line, self.message)?;
 
            match &error.cause {
 
                Some(cause) => {
 
                    writeln!(f, " ...");
 
                    writeln!(f, " ...")?;
 
                    error = cause.as_ref()
 
                },
 
                None => {
 
                    writeln!(f).unwrap();
 
                    writeln!(f)?;
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
impl Debug for RtError {
 
    fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult {
 
        return (self as &dyn Display).fmt(f);
 
    }
 
}
 

	
 
macro_rules! rt_error {
 
    ($fmt:expr) => {
 
        $crate::runtime2::error::RtError::new(file!(), line!(), $fmt.to_string())
 
    };
 
    ($fmt:expr, $($args:expr),*) => {
 
        $crate::runtime2::error::RtError::new(file!(), line!(), format!($fmt, $($args),*))
 
    };
 
}
 

	
 
macro_rules! rt_error_try {
 
    ($prev:expr, $($fmt_and_args:expr),*) => {
 
        {
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -101,116 +101,97 @@ impl Poller {
 
        }
 

	
 
        return events as u32;
 
    }
 
}
 

	
 
#[cfg(unix)]
 
impl Drop for Poller {
 
    fn drop(&mut self) {
 
        unsafe{ libc::close(self.handle); }
 
    }
 
}
 

	
 
#[inline]
 
fn syscall_result(result: c_int) -> io::Result<c_int> {
 
    if result < 0 {
 
        return Err(io::Error::last_os_error());
 
    } else {
 
        return Ok(result);
 
    }
 
}
 

	
 
#[cfg(not(unix))]
 
struct Poller {
 

	
 
    // Not implemented for OS's other than unix
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Polling Thread
 
// -----------------------------------------------------------------------------
 

	
 
enum PollCmd {
 
    Register(CompHandle, UserData),
 
    Unregister(FileDescriptor, UserData),
 
    Shutdown,
 
}
 

	
 
/// Represents the data needed to build interfaces to the polling thread (which
 
/// should happen first) and to create the polling thread itself.
 
pub(crate) struct PollingThreadBuilder {
 
pub struct PollingThread {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    runtime: Arc<RuntimeInner>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    logging_enabled: bool,
 
}
 

	
 
impl PollingThreadBuilder {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<PollingThreadBuilder, RtError> {
 
impl PollingThread {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> {
 
        let poller = Poller::new()
 
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;
 

	
 
        return Ok(PollingThreadBuilder {
 
            poller: Arc::new(poller),
 
            generation_counter: Arc::new(AtomicU32::new(0)),
 
            queue: QueueDynMpsc::new(64),
 
            runtime,
 
        let poller = Arc::new(poller);
 
        let queue = QueueDynMpsc::new(64);
 
        let queue_producers = queue.producer_factory();
 

	
 
        let mut thread_data = PollingThread{
 
            poller: poller.clone(),
 
            runtime: runtime.clone(),
 
            queue,
 
            logging_enabled,
 
        })
 
    }
 
        };
 
        let thread_handle = thread::spawn(move || { thread_data.run() });
 

	
 
    pub(crate) fn client(&self) -> PollingClient {
 
        return PollingClient{
 
            poller: self.poller.clone(),
 
            generation_counter: self.generation_counter.clone(),
 
            queue: self.queue.producer(),
 
        }
 
    }
 
        let thread_handle = PollingThreadHandle{
 
            queue: Some(queue_producers.producer()),
 
            handle: Some(thread_handle),
 
        };
 
        let client_factory = PollingClientFactory{
 
            poller,
 
            generation_counter: Arc::new(AtomicU32::new(0)),
 
            queue_factory: queue_producers,
 
        };
 

	
 
    pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) {
 
        let destroyer = self.queue.producer();
 

	
 
        return (
 
            PollingThread{
 
                poller: self.poller,
 
                runtime: self.runtime,
 
                queue: self.queue,
 
                logging_enabled: self.logging_enabled,
 
            },
 
            PollingThreadDestroyer::new(destroyer)
 
        );
 
        return Ok((thread_handle, client_factory));
 
    }
 
}
 

	
 
pub(crate) struct PollingThread {
 
    poller: Arc<Poller>,
 
    runtime: Arc<RuntimeInner>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    logging_enabled: bool,
 
}
 

	
 
impl PollingThread {
 
    pub(crate) fn run(&mut self) {
 
        use crate::runtime2::scheduler::SchedulerCtx;
 
        use crate::runtime2::communication::Message;
 

	
 
        const NUM_EVENTS: usize = 256;
 
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);
 

	
 
        // @performance: Lot of improvements possible here, a HashMap is likely
 
        // a horrible way to do this.
 
        let mut events = Vec::with_capacity(NUM_EVENTS);
 
        let mut lookup = HashMap::with_capacity(64);
 
        self.log("Starting polling thread");
 

	
 
        loop {
 
            // Retrieve events first (because the PollingClient will first
 
            // register at epoll, and then push a command into the queue).
 
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();
 

	
 
            // Then handle everything in the command queue.
 
            while let Some(command) = self.queue.pop() {
 
                match command {
 
                    PollCmd::Register(handle, user_data) => {
 
                        self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0));
 
                        let key = Self::user_data_as_key(user_data);
 
@@ -223,85 +204,103 @@ impl PollingThread {
 
                        let mut handle = lookup.remove(&key).unwrap();
 
                        self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0));
 
                        if let Some(key) = handle.decrement_users() {
 
                            self.runtime.destroy_component(key);
 
                        }
 
                    },
 
                    PollCmd::Shutdown => {
 
                        // The contract is that all scheduler threads shutdown
 
                        // before the polling thread. This happens when all
 
                        // components are removed.
 
                        self.log("Received shutdown signal");
 
                        debug_assert!(lookup.is_empty());
 
                        return;
 
                    }
 
                }
 
            }
 

	
 
            // Now process all of the events. Because we might have had a
 
            // `Register` command followed by an `Unregister` command (e.g. a
 
            // component has died), we might get events that are not associated
 
            // with an entry in the lookup.
 
            for event in events.drain(..) {
 
                let key = event.u64;
 
                if let Some(handle) = lookup.get(&key) {
 
                    self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events));
 
                    let events = event.events;
 
                    self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events));
 
                    handle.send_message(&self.runtime, Message::Poll, true);
 
                }
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn user_data_as_key(data: UserData) -> u64 {
 
        return data.0;
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.logging_enabled {
 
            println!("[polling] {}", message);
 
        }
 
    }
 
}
 

	
 
// bit convoluted, but it works
 
pub(crate) struct PollingThreadDestroyer {
 
    queue: Option<QueueDynProducer<PollCmd>>,
 
pub(crate) struct PollingThreadHandle {
 
    // requires Option, because:
 
    queue: Option<QueueDynProducer<PollCmd>>, // destructor needs to be called
 
    handle: Option<thread::JoinHandle<()>>, // we need to call `join`
 
}
 

	
 
impl PollingThreadDestroyer {
 
    fn new(queue: QueueDynProducer<PollCmd>) -> Self {
 
        return Self{ queue: Some(queue) };
 
    }
 

	
 
    pub(crate) fn initiate_destruction(&mut self) {
 
impl PollingThreadHandle {
 
    pub(crate) fn shutdown(&mut self) -> thread::Result<()> {
 
        debug_assert!(self.handle.is_some(), "polling thread already destroyed");
 
        self.queue.take().unwrap().push(PollCmd::Shutdown);
 
        return self.handle.take().unwrap().join();
 
    }
 
}
 

	
 
impl Drop for PollingThreadDestroyer {
 
impl Drop for PollingThreadHandle {
 
    fn drop(&mut self) {
 
        debug_assert!(self.queue.is_none());
 
        debug_assert!(self.queue.is_none() && self.handle.is_none());
 
    }
 
}
 

	
 
// oh my god, now I'm writing factory objects. I'm not feeling too well
 
pub(crate) struct PollingClientFactory {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue_factory: QueueDynProducerFactory<PollCmd>,
 
}
 

	
 
impl PollingClientFactory {
 
    pub(crate) fn client(&self) -> PollingClient {
 
        return PollingClient{
 
            poller: self.poller.clone(),
 
            generation_counter: self.generation_counter.clone(),
 
            queue: self.queue_factory.producer(),
 
        };
 
    }
 
}
 

	
 
pub(crate) struct PollTicket(FileDescriptor, u64);
 

	
 
/// A structure that allows the owner to register components at the polling
 
/// thread. Because of assumptions in the communication queue all of these
 
/// clients should be dropped before stopping the polling thread.
 
pub(crate) struct PollingClient {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue: QueueDynProducer<PollCmd>,
 
}
 

	
 
impl PollingClient {
 
    fn register<F: AsFileDescriptor>(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
 
        let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed);
 
        let user_data = user_data_for_component(handle.id().0, generation);
 
        self.queue.push(PollCmd::Register(handle, user_data));
 

	
 
        let file_descriptor = entity.as_file_descriptor();
 
        self.poller.register(file_descriptor, user_data, read, write)
 
            .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?;
 

	
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::thread;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 
use crate::runtime2::poll::{PollingThreadBuilder, PollingThreadDestroyer};
 
use crate::runtime2::poll::{PollingThread, PollingThreadHandle};
 
use crate::runtime2::RtError;
 

	
 
use super::communication::Message;
 
use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 

	
 
/// Key to a component. Type system somewhat ensures that there can only be one
 
/// of these. Only with a key one may retrieve privately-accessible memory for
 
/// a component. Practically just a generational index, like `CompId` is.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct CompKey(pub u32);
 

	
 
impl CompKey {
 
    pub(crate) fn downgrade(&self) -> CompId {
 
        return CompId(self.0);
 
    }
 
}
 

	
 
/// Generational ID of a component.
 
@@ -141,122 +141,114 @@ impl Clone for CompHandle {
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        dbg_code!(assert!(!self.decremented)); // cannot access if control is relinquished
 
        return unsafe{ &*self.target };
 
    }
 
}
 

	
 
impl Drop for CompHandle {
 
    fn drop(&mut self) {
 
        dbg_code!(assert!(self.decremented, "need call to 'decrement_users' before dropping"));
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
pub struct Runtime {
 
    pub(crate) inner: Arc<RuntimeInner>,
 
    scheduler_threads: Vec<thread::JoinHandle<()>>,
 
    polling_destroyer: PollingThreadDestroyer,
 
    polling_thread: Option<thread::JoinHandle<()>>,
 
    polling_handle: PollingThreadHandle,
 
}
 

	
 
impl Runtime {
 
    // TODO: debug_logging should be removed at some point
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
 
        if num_threads == 0 {
 
            return Err(rt_error!("need at least one thread to create the runtime"));
 
        }
 
        let runtime_inner = Arc::new(RuntimeInner {
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let polling_builder = rt_error_try!(
 
            PollingThreadBuilder::new(runtime_inner.clone(), debug_logging),
 
        let (polling_handle, polling_clients) = rt_error_try!(
 
            PollingThread::new(runtime_inner.clone(), debug_logging),
 
            "failed to build polling thread"
 
        );
 

	
 
        let mut scheduler_threads = Vec::with_capacity(num_threads as usize);
 

	
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(
 
                runtime_inner.clone(), polling_builder.client(),
 
                runtime_inner.clone(), polling_clients.client(),
 
                thread_index, debug_logging
 
            );
 
            let thread_handle = thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            scheduler_threads.push(thread_handle);
 
        }
 

	
 
        let (mut poller, polling_destroyer) = polling_builder.into_thread();
 
        let polling_thread = thread::spawn(move || {
 
            poller.run();
 
        });
 

	
 
        return Ok(Runtime{
 
            inner: runtime_inner,
 
            scheduler_threads,
 
            polling_destroyer,
 
            polling_thread: Some(polling_thread),
 
            polling_handle,
 
        });
 
    }
 

	
 
    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
 
        use crate::protocol::eval::ValueGroup;
 
        let prompt = self.inner.protocol.new_component(
 
            module_name, routine_name,
 
            ValueGroup::new_stack(Vec::new())
 
        )?;
 
        let reserved = self.inner.start_create_pdl_component();
 
        let ctx = CompCtx::new(&reserved);
 
        let component = Box::new(CompPDL::new(prompt, 0));
 
        let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false);
 
        self.inner.enqueue_work(key);
 

	
 
        return Ok(())
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_components();
 
        for handle in self.scheduler_threads.drain(..) {
 
            handle.join().expect("join scheduler thread");
 
        }
 

	
 
        self.polling_destroyer.initiate_destruction();
 
        self.polling_thread.take().unwrap().join().expect("join polling thread");
 
        self.polling_handle.shutdown().expect("shutdown polling thread");
 
    }
 
}
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub(crate) struct RuntimeInner {
 
    pub protocol: ProtocolDescription,
 
    components: ComponentStore<RuntimeComp>,
 
    work_queue: Mutex<VecDeque<CompKey>>,
 
    work_condvar: Condvar,
 
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
 
}
 

	
 
impl RuntimeInner {
 
    // Scheduling and retrieving work
 

	
 
    pub(crate) fn take_work(&self) -> Option<CompKey> {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
 
            lock = self.work_condvar.wait(lock).unwrap();
 
        }
 

	
 
        // We have work, or the schedulers should exit.
 
        return lock.pop_front();
src/runtime2/stdlib/internet.rs
Show inline comments
 
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
 
use std::mem::size_of;
 
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
 

	
 
use libc::{
 
    c_int,
 
    sockaddr_in, sockaddr_in6, in_addr, in6_addr,
 
    socket, bind, listen, accept, connect, close,
 
};
 
use mio::{event, Interest, Registry, Token};
 

	
 
#[derive(Debug)]
 
pub enum SocketError {
 
    Opening,
 
    Modifying,
 
    Binding,
 
    Listening,
 
    Connecting,
 
    Accepted,
 
    Accepting,
 
}
 

	
 
enum SocketState {
 
    Opened,
 
    Listening,
 
}
 

	
 
@@ -41,85 +42,85 @@ impl SocketTcpClient {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: BLOCKING,
 
        })
 
    }
 

	
 
    pub fn send(&self, message: &[u8]) -> Result<usize, ()> {
 
        let result = unsafe{
 
            let message_pointer = message.as_ptr().cast();
 
            libc::send(self.socket_handle, message_pointer, message.len() as libc::size_t, 0)
 
        };
 
        if result < 0 {
 
            return Err(())
 
        }
 

	
 
        return Ok(result as usize);
 
    }
 

	
 
    /// Receives data from the TCP socket. Returns the number of bytes received.
 
    /// More bytes may be present even thought `used < buffer.len()`.
 
    pub fn receive(&self, buffer: &mut [u8]) -> Result<usize, ()> {
 
    pub fn receive(&self, buffer: &mut [u8]) -> Result<usize, IoError> {
 
        if self.is_blocking {
 
            return self.receive_blocking(buffer);
 
        } else {
 
            return self.receive_nonblocking(buffer);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn receive_blocking(&self, buffer: &mut [u8]) -> Result<usize, ()> {
 
    fn receive_blocking(&self, buffer: &mut [u8]) -> Result<usize, IoError> {
 
        let result = unsafe {
 
            let message_pointer = buffer.as_mut_ptr().cast();
 
            libc::recv(self.socket_handle, message_pointer, buffer.len(), 0)
 
        };
 
        if result < 0 {
 
            return Err(());
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(result as usize);
 
    }
 

	
 
    #[inline]
 
    fn receive_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, ()> {
 
    fn receive_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, IoError> {
 
        unsafe {
 
            let mut message_pointer = buffer.as_mut_ptr().cast();
 
            let mut remaining = buffer.len();
 

	
 
            loop {
 
                // Receive more data
 
                let result = libc::recv(self.socket_handle, message_pointer, remaining, 0);
 
                if result < 0 {
 
                    // Check reason
 
                    let errno = std::io::Error::last_os_error().raw_os_error().expect("os error after failed recv");
 
                    if errno == libc::EWOULDBLOCK || errno == libc::EAGAIN {
 
                    let os_error = IoError::last_os_error();
 
                    if os_error.kind() == IoErrorKind::WouldBlock {
 
                        return Ok(buffer.len() - remaining);
 
                    } else {
 
                        return Err(());
 
                        return Err(os_error);
 
                    }
 
                }
 

	
 
                // Modify pointer and remaining bytes
 
                let received = result as usize;
 
                message_pointer = message_pointer.add(received);
 
                remaining -= received;
 

	
 
                if remaining == 0 {
 
                    return Ok(buffer.len());
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
impl Drop for SocketTcpClient {
 
    fn drop(&mut self) {
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
/// Raw socket receiver. Essentially a listener that accepts a single connection
src/runtime2/stdlib/mod.rs
Show inline comments
 
#[cfg(feature="internet")] mod internet;
 
\ No newline at end of file
 
#[cfg(feature="internet")] pub(crate) mod internet;
 
\ No newline at end of file
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
use crate::collections::RawArray;
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Multiple-producer single-consumer queue. Generally used in the publicly
 
/// accessible fields of a component. The holder of this struct should be the
 
/// consumer. To retrieve access to the producer-side: call `producer()`.
 
/// consumer. To retrieve access to the producer-side: call `producer()`. In
 
/// case the queue is moved before one can call `producer()`, call
 
/// `producer_factory()`. This incurs a bit more overhead.
 
///
 
/// This is a queue that will resize (indefinitely) if it becomes full, and will
 
/// not shrink. So probably a temporary thing.
 
///
 
/// In debug mode we'll make sure that there are no producers when the queue is
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
// NOTE: Addendum to the above remark, not true if the thread owning the
 
// consumer sides crashes, unwinds, and drops the `Box` with it. Question is: do
 
// I want to take that into account?
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
}
 

	
 
// One may move around the queue between threads, as long as there is only one
 
// instance of it.
 
unsafe impl<T> Send for QueueDynMpsc<T>{}
 

	
 
/// Shared data between queue consumer and the queue producers
 
struct Shared<T> {
 
    data: UnfairSeLock<Inner<T>>,
 
@@ -54,49 +56,54 @@ impl<T> QueueDynMpsc<T> {
 
        assert_correct_capacity(initial_capacity);
 

	
 
        let mut data = RawArray::new();
 
        data.resize(initial_capacity);
 

	
 
        let initial_capacity = initial_capacity as u32;
 

	
 
        return Self{
 
            inner: Box::new(Shared {
 
                data: UnfairSeLock::new(Inner{
 
                    data,
 
                    compare_mask: (2 * initial_capacity) - 1,
 
                    read_mask: initial_capacity - 1,
 
                }),
 
                read_head: AtomicU32::new(0),
 
                write_head: AtomicU32::new(initial_capacity),
 
                limit_head: AtomicU32::new(initial_capacity),
 
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
 
            }),
 
        };
 
    }
 

	
 
    #[inline]
 
    pub fn producer(&self) -> QueueDynProducer<T> {
 
        return QueueDynProducer::new(self);
 
        return QueueDynProducer::new(self.inner.as_ref());
 
    }
 

	
 
    #[inline]
 
    pub fn producer_factory(&self) -> QueueDynProducerFactory<T> {
 
        return QueueDynProducerFactory::new(self.inner.as_ref());
 
    }
 

	
 
    /// Return `true` if a subsequent call to `pop` will return a value. Note
 
    /// that if it returns `false`, there *might* also be a value returned by
 
    /// `pop`.
 
    pub fn can_pop(&mut self) -> bool {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap() as u32;
 
        return (cur_read + buf_size) & data_lock.compare_mask != cur_limit;
 
    }
 

	
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
 
    pub fn pop(&mut self) -> Option<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap() as u32;
 

	
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
@@ -123,60 +130,53 @@ impl<T> Drop for QueueDynMpsc<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let write_index = self.inner.write_head.load(Ordering::Acquire);
 
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
 

	
 
        // Every item that has not yet been taken out of the queue needs to
 
        // have its destructor called. We immediately apply the
 
        // increment-by-size trick and wait until we've hit the write head.
 
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
 
        read_index += data_lock.data.cap() as u32;
 
        while read_index & data_lock.compare_mask != write_index {
 
            unsafe {
 
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
 
                std::ptr::drop_in_place(target);
 
            }
 
            read_index += 1;
 
        }
 
    }
 
}
 

	
 
pub struct QueueDynProducer<T> {
 
    queue: *const Shared<T>,
 
}
 

	
 
impl<T> QueueDynProducer<T> {
 
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
 
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue = consumer.inner.as_ref() as *const _;
 
            return Self{ queue };
 
        }
 
    fn new(queue: &Shared<T>) -> Self {
 
        dbg_code!(queue.dbg.fetch_add(1, Ordering::AcqRel));
 
        return Self{ queue: queue as *const _ };
 
    }
 

	
 

	
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let mut data_lock = queue.data.lock_shared();
 
        let mut write_index = queue.write_head.load(Ordering::Acquire);
 

	
 
        'attempt_write: loop {
 
            let read_index = queue.read_head.load(Ordering::Acquire);
 

	
 
            if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
                // Need to resize, try loading read/write index afterwards
 
                let expected_capacity = data_lock.data.cap();
 
                data_lock = self.resize(data_lock, expected_capacity);
 
                write_index = queue.write_head.load(Ordering::Acquire);
 
                continue 'attempt_write;
 
            }
 

	
 
            // If here try to advance write index
 
            let new_write_index = (write_index + 1) & data_lock.compare_mask;
 
            if let Err(actual_write_index) = queue.write_head.compare_exchange(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Acquire
 
            ) {
 
                write_index = actual_write_index;
 
                continue 'attempt_write;
 
@@ -249,58 +249,80 @@ impl<T> QueueDynProducer<T> {
 

	
 
                queue.read_head.store(read_index, Ordering::Release);
 
                queue.limit_head.store(write_index, Ordering::Release);
 
                queue.write_head.store(write_index, Ordering::Release);
 

	
 
                // Update the masks
 
                exclusive_lock.read_mask = new_capacity - 1;
 
                exclusive_lock.compare_mask = (2 * new_capacity) - 1;
 
            }
 
        }
 

	
 
        // Reacquire shared lock
 
        return queue.data.lock_shared();
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducer<T> {
 
    fn drop(&mut self) {
 
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
 
    }
 
}
 

	
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans. Although, now that I think
 
// about it, we're rather likely to just drop a single "producer" into the
 
// public part of a component.
 
// would circumvent our atomic counter shenanigans.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 

	
 
#[inline]
 
fn assert_correct_capacity(capacity: usize) {
 
    assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
}
 

	
 
pub struct QueueDynProducerFactory<T> {
 
    queue: *const Shared<T>
 
}
 

	
 
impl<T> QueueDynProducerFactory<T> {
 
    fn new(queue: &Shared<T>) -> Self {
 
        dbg_code!(queue.dbg.fetch_add(1, Ordering::AcqRel));
 
        return Self{ queue: queue as *const _ };
 
    }
 

	
 
    pub fn producer(&self) -> QueueDynProducer<T> {
 
        return QueueDynProducer::new(unsafe{ &*self.queue });
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducerFactory<T> {
 
    fn drop(&mut self) {
 
        dbg_code!({
 
            let queue = unsafe{ &*self.queue };
 
            queue.dbg.fetch_sub(1, Ordering::AcqRel);
 
        });
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use super::super::tests::*;
 

	
 
    fn queue_size<T>(queue: &QueueDynMpsc<T>) -> usize {
 
        let lock = queue.inner.data.lock_exclusive();
 
        return lock.data.cap();
 
    }
 

	
 
    #[test]
 
    fn single_threaded_fixed_size_push_pop() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_ROUNDS: usize = 3;
 
        let mut cons = QueueDynMpsc::new(INIT_SIZE);
 
        let prod = cons.producer();
 

	
 
        let counters = Counters::new();
 

	
 
        for _round in 0..NUM_ROUNDS {
 
            // Fill up with indices
 
            for idx in 0..INIT_SIZE {
 
                prod.push(Resource::new(&counters, idx as u64));
 
            }
std/std.internet.pdl
Show inline comments
 
#module std.internet
 

	
 
primitive tcp_client(u8[] ip, u16 port, in<u8[]> tx, out<u8[]> rx) {
 
    #builtin
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)