Files @ ceaa946df1eb
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs

ceaa946df1eb 20.9 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
WIP on fixing reroute bug
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::runtime2::ScheduledConnector;

use super::{RuntimeInner, ConnectorId, ConnectorKey};
use super::port::{Port, PortState, PortIdLocal};
use super::native::Connector;
use super::connector::{ConnectorScheduling, RunDeltaState};
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};

/// Contains fields that are mostly managed by the scheduler, but may be
/// accessed by the connector
pub(crate) struct ConnectorCtx {
    pub(crate) id: ConnectorId,
    pub(crate) ports: Vec<Port>,
}

impl ConnectorCtx {
    pub(crate) fn new() -> ConnectorCtx {
        Self{
            id: ConnectorId::new_invalid(),
            ports: Vec::new(),
        }
    }

    pub(crate) fn add_port(&mut self, port: Port) {
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
        self.ports.push(port);
    }

    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
        let index = self.port_id_to_index(id);
        return self.ports.remove(index);
    }

    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
        let index = self.port_id_to_index(id);
        return &self.ports[index];
    }

    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
        let index = self.port_id_to_index(id);
        return &mut self.ports[index];
    }

    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
        for (idx, port) in self.ports.iter().enumerate() {
            if port.self_id == id {
                return idx;
            }
        }

        panic!("port {:?}, not owned by connector", id);
    }
}

// Because it contains pointers we're going to do a copy by value on this one
#[derive(Clone, Copy)]
pub(crate) struct SchedulerCtx<'a> {
    pub(crate) runtime: &'a RuntimeInner
}

pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
    scheduler_id: u32,
}

impl Scheduler {
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
        return Self{ runtime, scheduler_id };
    }

    pub fn run(&mut self) {
        // Setup global storage and workspaces that are reused for every
        // connector that we run
        let mut delta_state = RunDeltaState::new();

        'thread_loop: loop {
            // Retrieve a unit of work
            self.debug("Waiting for work");
            let connector_key = self.runtime.wait_for_work();
            if connector_key.is_none() {
                // We should exit
                self.debug(" ... No more work, quitting");
                break 'thread_loop;
            }

            // We have something to do
            let connector_key = connector_key.unwrap();
            let connector_id = connector_key.downcast();
            self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index));

            let scheduled = self.runtime.get_component_private(&connector_key);

            // Keep running until we should no longer immediately schedule the
            // connector.
            let mut cur_schedule = ConnectorScheduling::Immediate;
            while cur_schedule == ConnectorScheduling::Immediate {
                // Check all the message that are in the shared inbox
                while let Some(message) = scheduled.public.inbox.take_message() {
                    // Check for rerouting
                    self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
                        self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0));
                        self.runtime.send_message(other_connector_id, message);
                        continue;
                    }

                    self.debug_conn(connector_id, " ... Handling message myself");
                    // Check for messages that requires special action from the
                    // scheduler.
                    if let MessageContents::Control(content) = message.contents {
                        match content.content {
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
                                // Need to change port target
                                let port = scheduled.context.get_port_mut(port_id);
                                port.peer_connector = new_target_connector_id;

                                // Note: for simplicity we program the scheduler to always finish
                                // running a connector with an empty outbox. If this ever changes
                                // then accepting the "port peer changed" message implies we need
                                // to change the recipient of the message in the outbox.
                                debug_assert!(delta_state.outbox.is_empty());

                                // And respond with an Ack
                                self.runtime.send_message(
                                    message.sending_connector,
                                    Message{
                                        sending_connector: connector_id,
                                        receiving_port: PortIdLocal::new_invalid(),
                                        contents: MessageContents::Control(ControlMessage{
                                            id: content.id,
                                            content: ControlMessageVariant::Ack,
                                        }),
                                    }
                                );
                            },
                            ControlMessageVariant::CloseChannel(port_id) => {
                                // Mark the port as being closed
                                let port = scheduled.context.get_port_mut(port_id);
                                port.state = PortState::Closed;

                                // Send an Ack
                                self.runtime.send_message(
                                    message.sending_connector,
                                    Message{
                                        sending_connector: connector_id,
                                        receiving_port: PortIdLocal::new_invalid(),
                                        contents: MessageContents::Control(ControlMessage{
                                            id: content.id,
                                            content: ControlMessageVariant::Ack,
                                        }),
                                    }
                                );

                            },
                            ControlMessageVariant::Ack => {
                                scheduled.router.handle_ack(content.id);
                            }
                        }
                    } else {
                        // Let connector handle message
                        scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state);
                    }
                }

                // Run the main behaviour of the connector, depending on its
                // current state.
                if scheduled.shutting_down {
                    // Nothing to do. But we're stil waiting for all our pending
                    // control messages to be answered.
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
                    if scheduled.router.num_pending_acks() == 0 {
                        // We're actually done, we can safely destroy the
                        // currently running connector
                        self.runtime.destroy_component(connector_key);
                        continue 'thread_loop;
                    } else {
                        cur_schedule = ConnectorScheduling::NotNow;
                    }
                } else {
                    self.debug_conn(connector_id, "Running ...");
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
                    let new_schedule = scheduled.connector.run(
                        scheduler_ctx, &scheduled.context, &mut delta_state
                    );
                    self.debug_conn(connector_id, "Finished running");

                    // Handle all of the output from the current run: messages to
                    // send and connectors to instantiate.
                    self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state);

                    cur_schedule = new_schedule;
                }
            }

            // If here then the connector does not require immediate execution.
            // So enqueue it if requested, and otherwise put it in a sleeping
            // state.
            match cur_schedule {
                ConnectorScheduling::Immediate => unreachable!(),
                ConnectorScheduling::Later => {
                    // Simply queue it again later
                    self.runtime.push_work(connector_key);
                },
                ConnectorScheduling::NotNow => {
                    // Need to sleep, note that we are the only ones which are
                    // allows to set the sleeping state to `true`, and since
                    // we're running it must currently be `false`.
                    self.try_go_to_sleep(connector_key, scheduled);
                },
                ConnectorScheduling::Exit => {
                    // Prepare for exit. Set the shutdown flag and broadcast
                    // messages to notify peers of closing channels
                    scheduled.shutting_down = true;
                    for port in &scheduled.context.ports {
                        if port.state != PortState::Closed {
                            let message = scheduled.router.prepare_closing_channel(
                                port.self_id, port.peer_id,
                                connector_id
                            );
                            self.runtime.send_message(port.peer_connector, message);
                        }
                    }

                    if scheduled.router.num_pending_acks() == 0 {
                        self.runtime.destroy_component(connector_key);
                        continue 'thread_loop;
                    }

                    self.try_go_to_sleep(connector_key, scheduled);
                }
            }
        }
    }

    fn handle_delta_state(&mut self,
        cur_connector: &mut ScheduledConnector, connector_id: ConnectorId,
        delta_state: &mut RunDeltaState
    ) {
        // Handling any messages that were sent
        if !delta_state.outbox.is_empty() {
            for mut message in delta_state.outbox.drain(..) {
                // Based on the message contents, decide where the message
                // should be sent to. This might end up modifying the message.
                self.debug_conn(connector_id, &format!("Sending message\n --- {:?}", message));
                let (peer_connector, self_port, peer_port) = match &mut message {
                    MessageContents::Data(contents) => {
                        let port = cur_connector.context.get_port(contents.sending_port);
                        (port.peer_connector, contents.sending_port, port.peer_id)
                    },
                    MessageContents::Sync(contents) => {
                        let connector = contents.to_visit.pop().unwrap();
                        (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
                    },
                    MessageContents::RequestCommit(contents)=> {
                        let connector = contents.to_visit.pop().unwrap();
                        (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
                    },
                    MessageContents::ConfirmCommit(contents) => {
                        for to_visit in &contents.to_visit {
                            let message = Message{
                                sending_connector: connector_id,
                                receiving_port: PortIdLocal::new_invalid(),
                                contents: MessageContents::ConfirmCommit(contents.clone()),
                            };
                            self.runtime.send_message(*to_visit, message);
                        }
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
                    },
                    MessageContents::Control(_) | MessageContents::Ping => {
                        // Never generated by the user's code
                        unreachable!();
                    }
                };

                // TODO: Maybe clean this up, perhaps special case for
                //  ConfirmCommit can be handled differently.
                if peer_connector.is_valid() {
                    if peer_port.is_valid() {
                        // Sending a message to a port, so the port may not be
                        // closed.
                        let port = cur_connector.context.get_port(self_port);
                        match port.state {
                            PortState::Open => {},
                            PortState::Closed => {
                                todo!("Handling sending over a closed port");
                            }
                        }
                    }
                    let message = Message {
                        sending_connector: connector_id,
                        receiving_port: peer_port,
                        contents: message,
                    };
                    self.runtime.send_message(peer_connector, message);
                }
            }
        }

        if !delta_state.new_ports.is_empty() {
            for port in delta_state.new_ports.drain(..) {
                cur_connector.context.ports.push(port);
            }
        }

        // Handling any new connectors that were scheduled
        // TODO: Pool outgoing messages to reduce atomic access
        if !delta_state.new_connectors.is_empty() {
            for new_connector in delta_state.new_connectors.drain(..) {
                // Add to global registry to obtain key
                let new_key = self.runtime.create_pdl_component(cur_connector, new_connector);
                let new_connector = self.runtime.get_component_private(&new_key);

                // Call above changed ownership of ports, but we still have to
                // let the other end of the channel know that the port has
                // changed location.
                for port in &new_connector.context.ports {
                    let reroute_message = cur_connector.router.prepare_reroute(
                        port.self_id, port.peer_id, cur_connector.context.id,
                        port.peer_connector, new_connector.context.id
                    );

                    self.runtime.send_message(port.peer_connector, reroute_message);
                }

                // Schedule new connector to run
                self.runtime.push_work(new_key);
            }
        }

        debug_assert!(delta_state.outbox.is_empty());
        debug_assert!(delta_state.new_ports.is_empty());
        debug_assert!(delta_state.new_connectors.is_empty());
    }

    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
        debug_assert_eq!(connector_key.index, connector.context.id.0);
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);

        // This is the running connector, and only the running connector may
        // decide it wants to sleep again.
        connector.public.sleeping.store(true, Ordering::Release);

        // But do to reordering we might have received messages from peers who
        // did not consider us sleeping. If so, then we wake ourselves again.
        if !connector.public.inbox.is_empty() {
            // Try to wake ourselves up
            let should_wake_up_again = connector.public.sleeping
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
                .is_ok();

            if should_wake_up_again {
                self.runtime.push_work(connector_key)
            }
        }
    }

    // TODO: Remove, this is debugging stuff
    fn debug(&self, message: &str) {
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
    }

    fn debug_conn(&self, conn: ConnectorId, message: &str) {
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
    }
}

// -----------------------------------------------------------------------------
// Control messages
// -----------------------------------------------------------------------------

struct ControlEntry {
    id: u32,
    variant: ControlVariant,
}

enum ControlVariant {
    ChangedPort(ControlChangedPort),
    ClosedChannel(ControlClosedChannel),
}

struct ControlChangedPort {
    target_port: PortIdLocal,       // if send to this port, then reroute
    source_connector: ConnectorId,  // connector we expect messages from
    target_connector: ConnectorId,  // connector we need to reroute to
}

struct ControlClosedChannel {
    source_port: PortIdLocal,
    target_port: PortIdLocal,
}

pub(crate) struct ControlMessageHandler {
    id_counter: u32,
    active: Vec<ControlEntry>,
}

impl ControlMessageHandler {
    pub fn new() -> Self {
        ControlMessageHandler {
            id_counter: 0,
            active: Vec::new(),
        }
    }

    /// Prepares a message indicating that a channel has closed, we keep a local
    /// entry to match against the (hopefully) returned `Ack` message.
    pub fn prepare_closing_channel(
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
        self_connector_id: ConnectorId
    ) -> Message {
        let id = self.take_id();

        self.active.push(ControlEntry{
            id,
            variant: ControlVariant::ClosedChannel(ControlClosedChannel{
                source_port: self_port_id,
                target_port: peer_port_id,
            }),
        });

        return Message{
            sending_connector: self_connector_id,
            receiving_port: peer_port_id,
            contents: MessageContents::Control(ControlMessage{
                id,
                content: ControlMessageVariant::CloseChannel(peer_port_id),
            }),
        };
    }

    /// Prepares rerouting messages due to changed ownership of a port. The
    /// control message returned by this function must be sent to the
    /// transferred port's peer connector.
    pub fn prepare_reroute(
        &mut self,
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
        new_owner_connector_id: ConnectorId
    ) -> Message {
        let id = self.take_id();

        self.active.push(ControlEntry{
            id,
            variant: ControlVariant::ChangedPort(ControlChangedPort{
                target_port: port_id,
                source_connector: peer_connector_id,
                target_connector: new_owner_connector_id,
            }),
        });

        return Message{
            sending_connector: self_connector_id,
            receiving_port: peer_port_id,
            contents: MessageContents::Control(ControlMessage{
                id,
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
            })
        };
    }

    /// Returns true if the supplied message should be rerouted. If so then this
    /// function returns the connector that should retrieve this message.
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
        for entry in &self.active {
            if let ControlVariant::ChangedPort(entry) = &entry.variant {
                if entry.source_connector == sending_connector &&
                    entry.target_port == target_port {
                    // Need to reroute this message
                    return Some(entry.target_connector);
                }
            }
        }

        return None;
    }

    /// Handles an Ack as an answer to a previously sent control message
    pub fn handle_ack(&mut self, id: u32) {
        let index = self.active.iter()
            .position(|v| v.id == id);

        match index {
            Some(index) => { self.active.remove(index); },
            None => { todo!("handling of nefarious ACKs"); },
        }
    }

    /// Retrieves the number of responses we still expect to receive from our
    /// peers
    #[inline]
    pub fn num_pending_acks(&self) -> usize {
        return self.active.len();
    }

    fn take_id(&mut self) -> u32 {
        let generated_id = self.id_counter;
        let (new_id, _) = self.id_counter.overflowing_add(1);
        self.id_counter = new_id;

        return generated_id;
    }
}