Files @ 3b6c40dc10e1
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component.rs

3b6c40dc10e1 20.2 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Initial TCP component implementation
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
use crate::protocol::*;
use crate::runtime2::*;
use crate::runtime2::communication::*;

use super::{CompCtx, CompPDL};
use super::component_context::*;
use super::component_random::*;
use super::control_layer::*;
use super::consensus::*;

pub enum CompScheduling {
    Immediate,
    Requeue,
    Sleep,
    Exit,
}

/// Generic representation of a component (as viewed by a scheduler).
pub(crate) trait Component {
    /// Called upon the creation of the component.
    fn on_creation(&mut self, sched_ctx: &SchedulerCtx);

    /// Called if the component is created by another component and the messages
    /// are being transferred between the two.
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);

    /// Called if the component receives a new message. The component is
    /// responsible for deciding where that messages goes.
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);

    /// Called if the component's routine should be executed. The return value
    /// can be used to indicate when the routine should be run again.
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError>;
}

/// Representation of the generic operating mode of a component.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum CompMode {
    NonSync, // not in sync mode
    Sync, // in sync mode, can interact with other components
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
    BlockedGet, // blocked because we need to receive a message on a particular port
    BlockedPut, // component is blocked because the port is blocked
    BlockedSelect, // waiting on message to complete the select statement
    StartExit, // temporary state: if encountered then we start the shutdown process
    BusyExit, // temporary state: waiting for Acks for all the closed ports
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
}

impl CompMode {
    pub(crate) fn is_in_sync_block(&self) -> bool {
        use CompMode::*;

        match self {
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true,
            NonSync | StartExit | BusyExit | Exit => false,
        }
    }
}

/// Component execution state: the execution mode along with some descriptive
/// fields. Fields are public for ergonomic reasons, use member functions when
/// appropriate.
pub(crate) struct CompExecState {
    pub mode: CompMode,
    pub mode_port: PortId, // valid if blocked on a port (put/get)
    pub mode_value: ValueGroup, // valid if blocked on a put
}

impl CompExecState {
    pub(crate) fn new() -> Self {
        return Self{
            mode: CompMode::NonSync,
            mode_port: PortId::new_invalid(),
            mode_value: ValueGroup::default(),
        }
    }

    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
        self.mode = CompMode::BlockedGet;
        self.mode_port = port;
        debug_assert!(self.mode_value.values.is_empty());
    }

    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
        return
            self.mode == CompMode::BlockedGet &&
            self.mode_port == port;
    }

    pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) {
        self.mode = CompMode::BlockedPut;
        self.mode_port = port;
        self.mode_value = value;
    }

    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
        return
            self.mode == CompMode::BlockedPut &&
            self.mode_port == port;
    }
}

/// Creates a new component based on its definition. Meaning that if it is a
/// user-defined component then we set up the PDL code state. Otherwise we
/// construct a custom component. This does NOT take care of port and message
/// management.
pub(crate) fn create_component(
    protocol: &ProtocolDescription,
    definition_id: ProcedureDefinitionId, type_id: TypeId,
    arguments: ValueGroup, num_ports: usize
) -> Box<dyn Component> {
    let definition = &protocol.heap[definition_id];
    debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite);

    if definition.source.is_builtin() {
        // Builtin component
        let component = match definition.source {
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
            _ => unreachable!(),
        };

        return component;
    } else {
        // User-defined component
        let prompt = Prompt::new(
            &protocol.types, &protocol.heap,
            definition_id, type_id, arguments
        );
        let component = CompPDL::new(prompt, num_ports);
        return Box::new(component);
    }
}

// -----------------------------------------------------------------------------
// Generic component messaging utilities (for sending and receiving)
// -----------------------------------------------------------------------------

/// Default handling of sending a data message. In case the port is blocked then
/// the `ExecState` will become blocked as well. Note that
/// `default_handle_control_message` will ensure that the port becomes
/// unblocked if so instructed by the receiving component. The returned
/// scheduling value must be used.
#[must_use]
pub(crate) fn default_send_data_message(
    exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup,
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx
) -> CompScheduling {
    debug_assert_eq!(exec_state.mode, CompMode::Sync);

    // TODO: Handle closed ports
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
    let port_info = comp_ctx.get_port(port_handle);
    debug_assert_eq!(port_info.kind, PortKind::Putter);
    if port_info.state.is_blocked() {
        // Port is blocked, so we cannot send
        exec_state.set_as_blocked_put(transmitting_port_id, value);

        return CompScheduling::Sleep;
    } else {
        // Port is not blocked, so send to the peer
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
        let peer_info = comp_ctx.get_peer(peer_handle);
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true);

        return CompScheduling::Immediate;
    }
}

pub(crate) enum IncomingData {
    PlacedInSlot,
    SlotFull(DataMessage),
}

/// Default handling of receiving a data message. In case there is no room for
/// the message it is returned from this function. Note that this function is
/// different from PDL code performing a `get` on a port; this is the case where
/// the message first arrives at the component.
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
//  nicest if the sending component can figure out it cannot send any more data.
#[must_use]
pub(crate) fn default_handle_incoming_data_message(
    exec_state: &mut CompExecState, port_value_slot: &mut Option<DataMessage>,
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
) -> IncomingData {
    let target_port_id = incoming_message.data_header.target_port;

    if port_value_slot.is_none() {
        // We can put the value in the slot
        *port_value_slot = Some(incoming_message);

        // Check if we're blocked on receiving this message.
        dbg_code!({
            // Our port cannot have been blocked itself, because we're able to
            // directly insert the message into its slot.
            let port_handle = comp_ctx.get_port_handle(target_port_id);
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
        });

        if exec_state.is_blocked_on_get(target_port_id) {
            // Return to normal operation
            exec_state.mode = CompMode::Sync;
            exec_state.mode_port = PortId::new_invalid();
            debug_assert!(exec_state.mode_value.values.is_empty());
        }

        return IncomingData::PlacedInSlot
    } else {
        // Slot is already full, so if the port was previously opened, it will
        // now become closed
        let port_handle = comp_ctx.get_port_handle(target_port_id);
        let port_info = comp_ctx.get_port_mut(port_handle);
        debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future

        if port_info.state == PortState::Open {
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
            let (peer_handle, message) =
                control.initiate_port_blocking(comp_ctx, port_handle);
            let peer = comp_ctx.get_peer(peer_handle);
            peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
        }

        return IncomingData::SlotFull(incoming_message)
    }
}

/// Handles control messages in the default way. Note that this function may
/// take a lot of actions in the name of the caller: pending messages may be
/// sent, ports may become blocked/unblocked, etc. So the execution
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
/// state may all change.
pub(crate) fn default_handle_control_message(
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) {
    match message.content {
        ControlMessageContent::Ack => {
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
        },
        ControlMessageContent::BlockPort(port_id) => {
            // One of our messages was accepted, but the port should be
            // blocked.
            let port_handle = comp_ctx.get_port_handle(port_id);
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert_eq!(port_info.kind, PortKind::Putter);
            if port_info.state == PortState::Open {
                // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
            }
        },
        ControlMessageContent::ClosePort(port_id) => {
            // Request to close the port. We immediately comply and remove
            // the component handle as well
            let port_handle = comp_ctx.get_port_handle(port_id);
            let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);

            // One exception to sending an `Ack` is if we just closed the
            // port ourselves, meaning that the `ClosePort` messages got
            // sent to one another.
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
            } else {
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
            }
        },
        ControlMessageContent::UnblockPort(port_id) => {
            // We were previously blocked (or already closed)
            let port_handle = comp_ctx.get_port_handle(port_id);
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert_eq!(port_info.kind, PortKind::Putter);
            if port_info.state == PortState::BlockedDueToFullBuffers {
                default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
            }
        },
        ControlMessageContent::PortPeerChangedBlock(port_id) => {
            // The peer of our port has just changed. So we are asked to
            // temporarily block the port (while our original recipient is
            // potentially rerouting some of the in-flight messages) and
            // Ack. Then we wait for the `unblock` call.
            debug_assert_eq!(message.target_port_id, Some(port_id));
            let port_handle = comp_ctx.get_port_handle(port_id);
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);

            let port_info = comp_ctx.get_port(port_handle);
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);

            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
        },
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
            let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
            let port_info = comp_ctx.get_port(port_handle);
            debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
            let old_peer_id = port_info.peer_comp_id;

            comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);

            let port_info = comp_ctx.get_port_mut(port_handle);
            port_info.peer_comp_id = new_comp_id;
            port_info.peer_port_id = new_port_id;
            comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
        }
    }
}

/// Handles a component initiating the exiting procedure, and closing all of its
/// ports. Should only be called once per component (which is ensured by
/// checking and modifying the mode in the execution state).
#[must_use]
pub(crate) fn default_handle_start_exit(
    exec_state: &mut CompExecState, control: &mut ControlLayer,
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) -> CompScheduling {
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
    sched_ctx.log("Component starting exit");
    exec_state.mode = CompMode::BusyExit;

    // Iterating by index to work around borrowing rules
    for port_index in 0..comp_ctx.num_ports() {
        let port = comp_ctx.get_port_by_index_mut(port_index);
        if port.state == PortState::Closed {
            // Already closed, or in the process of being closed
            continue;
        }

        // Mark as closed
        let port_id = port.self_id;
        port.state = PortState::Closed;

        // Notify peer of closing
        let port_handle = comp_ctx.get_port_handle(port_id);
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
        let peer_info = comp_ctx.get_peer(peer);
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
    }

    return CompScheduling::Immediate; // to check if we can shut down immediately
}

/// Handles a component waiting until all peers are notified that it is quitting
/// (i.e. after calling `default_handle_start_exit`).
#[must_use]
pub(crate) fn default_handle_busy_exit(
    exec_state: &mut CompExecState, control: &ControlLayer,
    sched_ctx: &SchedulerCtx
) -> CompScheduling {
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
    if control.has_acks_remaining() {
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
        return CompScheduling::Sleep;
    } else {
        sched_ctx.log("Component busy exiting, now shutting down");
        exec_state.mode = CompMode::Exit;
        return CompScheduling::Exit;
    }
}

/// Handles a potential synchronous round decision. If there was a decision then
/// the `Some(success)` value indicates whether the round succeeded or not.
/// Might also end up changing the `ExecState`.
pub(crate) fn default_handle_sync_decision(
    exec_state: &mut CompExecState, decision: SyncRoundDecision,
    consensus: &mut Consensus
) -> Option<bool> {
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
    let success = match decision {
        SyncRoundDecision::None => return None,
        SyncRoundDecision::Solution => true,
        SyncRoundDecision::Failure => false,
    };

    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
    if success {
        exec_state.mode = CompMode::NonSync;
        consensus.notify_sync_decision(decision);
        return Some(true);
    } else {
        exec_state.mode = CompMode::StartExit;
        return Some(false);
    }
}


#[inline]
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
    return CompScheduling::Exit;
}

// -----------------------------------------------------------------------------
// Internal messaging/state utilities
// -----------------------------------------------------------------------------

/// Handles an `Ack` for the control layer.
fn default_handle_ack(
    control: &mut ControlLayer, control_id: ControlId,
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
) {
    // Since an `Ack` may cause another one, handle them in a loop
    let mut to_ack = control_id;
    loop {
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
        match action {
            AckAction::SendMessage(target_comp, message) => {
                // FIX @NoDirectHandle
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
                handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
                let _should_remove = handle.decrement_users();
                debug_assert!(_should_remove.is_none());
            },
            AckAction::ScheduleComponent(to_schedule) => {
                // FIX @NoDirectHandle
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);

                // Note that the component is intentionally not
                // sleeping, so we just wake it up
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
                let key = unsafe { to_schedule.upgrade() };
                sched_ctx.runtime.enqueue_work(key);
                let _should_remove = handle.decrement_users();
                debug_assert!(_should_remove.is_none());
            },
            AckAction::None => {}
        }

        match new_to_ack {
            Some(new_to_ack) => to_ack = new_to_ack,
            None => break,
        }
    }
}

/// Little helper for sending the most common kind of `Ack`
fn default_send_ack(
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
) {
    let peer_info = comp_ctx.get_peer(peer_handle);
    peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{
        id: causer_of_ack_id,
        sender_comp_id: comp_ctx.id,
        target_port_id: None,
        content: ControlMessageContent::Ack
    }), true);
}

/// Handles the unblocking of a putter port. In case there is a pending message
/// on that port then it will be sent.
fn default_handle_unblock_put(
    exec_state: &mut CompExecState, consensus: &mut Consensus,
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
) {
    let port_info = comp_ctx.get_port_mut(port_handle);
    let port_id = port_info.self_id;
    debug_assert!(port_info.state.is_blocked());
    port_info.state = PortState::Open;

    if exec_state.is_blocked_on_put(port_id) {
        // Annotate the message that we're going to send
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
        debug_assert_eq!(port_info.kind, PortKind::Putter);
        let to_send = exec_state.mode_value.take();
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);

        // Retrieve peer to send the message
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
        let peer_info = comp_ctx.get_peer(peer_handle);
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true);

        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
        exec_state.mode_port = PortId::new_invalid();
    }
}

#[inline]
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
    return PortId(port_id.id);
}

#[inline]
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
    return EvalPortId{ id: port_id.0 };
}