Files @ 33da6b69e9a2
Branch filter:

Location: CSY/reowolf/src/runtime/actors.rs

33da6b69e9a2 19.3 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
Christopher Esterhuyse
bugfixing + ffi fleshing out + example refining
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
use crate::common::*;
use crate::runtime::{endpoint::*, *};

#[derive(Debug, Clone)]
pub(crate) struct MonoN {
    pub ekeys: HashSet<Key>,
    pub result: Option<(usize, HashMap<Key, Payload>)>,
}
#[derive(Debug)]
pub(crate) struct PolyN {
    pub ekeys: HashSet<Key>,
    pub branches: HashMap<Predicate, BranchN>,
}
#[derive(Debug, Clone)]
pub(crate) struct BranchN {
    pub to_get: HashSet<Key>,
    pub gotten: HashMap<Key, Payload>,
    pub sync_batch_index: usize,
}

#[derive(Debug, Clone)]
pub struct MonoP {
    pub state: ProtocolS,
    pub ekeys: HashSet<Key>,
}
#[derive(Debug)]
pub(crate) struct PolyP {
    pub incomplete: HashMap<Predicate, BranchP>,
    pub complete: HashMap<Predicate, BranchP>,
    pub ekeys: HashSet<Key>,
}
#[derive(Debug, Clone)]
pub(crate) struct BranchP {
    pub blocking_on: Option<Key>,
    pub outbox: HashMap<Key, Payload>,
    pub inbox: HashMap<Key, Payload>,
    pub state: ProtocolS,
}

//////////////////////////////////////////////////////////////////

impl PolyP {
    pub(crate) fn poly_run(
        &mut self,
        m_ctx: PolyPContext,
        protocol_description: &ProtocolD,
    ) -> Result<SyncRunResult, EndpointErr> {
        let to_run: Vec<_> = self.incomplete.drain().collect();
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
    }

    pub(crate) fn poly_run_these_branches(
        &mut self,
        mut m_ctx: PolyPContext,
        protocol_description: &ProtocolD,
        mut to_run: Vec<(Predicate, BranchP)>,
    ) -> Result<SyncRunResult, EndpointErr> {
        use SyncRunResult as Srr;
        log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,);
        'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() {
            let mut r_ctx = BranchPContext {
                m_ctx: m_ctx.reborrow(),
                ekeys: &self.ekeys,
                predicate: &predicate,
                inbox: &branch.inbox,
            };
            use PolyBlocker as Sb;
            let blocker = branch.state.sync_run(&mut r_ctx, protocol_description);
            log!(
                &mut r_ctx.m_ctx.inner.logger,
                "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
                r_ctx.m_ctx.my_subtree_id,
                &predicate,
                &blocker
            );
            match blocker {
                Sb::Inconsistent => {} // DROP
                Sb::CouldntReadMsg(ekey) => {
                    assert!(self.ekeys.contains(&ekey));
                    let channel_id =
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
                    log!(
                        &mut r_ctx.m_ctx.inner.logger,
                        "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
                        r_ctx.m_ctx.my_subtree_id,
                        channel_id,
                        &branch.inbox,
                    );
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
                        // don't rerun now. Rerun at next `sync_run`

                        log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,);
                        branch.blocking_on = Some(ekey);
                        self.incomplete.insert(predicate, branch);
                    } else {
                        log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,);
                    }
                    // ELSE DROP
                }
                Sb::CouldntCheckFiring(ekey) => {
                    assert!(self.ekeys.contains(&ekey));
                    let channel_id =
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
                    // split the branch!
                    let branch_f = branch.clone();
                    let mut predicate_f = predicate.clone();
                    if predicate_f.replace_assignment(channel_id, false).is_some() {
                        panic!("OI HANS QUERY FIRST!");
                    }
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
                    to_run.push((predicate, branch));
                    to_run.push((predicate_f, branch_f));
                }
                Sb::SyncBlockEnd => {
                    let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner;
                    log!(
                        logger,
                        "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...",
                        m_ctx.my_subtree_id,
                        &predicate,
                    );
                    // come up with the predicate for this local solution

                    for ekey in self.ekeys.iter() {
                        let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id;
                        let fired =
                            branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey);
                        match predicate.query(channel_id) {
                            Some(true) => {
                                if !fired {
                                    // This branch should have fired but didn't!
                                    log!(
                                        logger,
                                        "~ ... ... should have fired {:?} and didn't! pruning!",
                                        channel_id,
                                    );
                                    continue 'to_run_loop;
                                }
                            }
                            Some(false) => {
                                if fired {
                                    println!(
                                        "pred {:#?} in {:#?} out {:#?}",
                                        &predicate,
                                        branch.inbox.get(ekey),
                                        branch.outbox.get(ekey)
                                    );
                                    panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had Some(false)!" ,channel_id)
                                }
                            }
                            None => {
                                predicate.replace_assignment(channel_id, false);
                                if fired {
                                    println!(
                                        "pred {:#?} in {:#?} out {:#?}",
                                        &predicate,
                                        branch.inbox.get(ekey),
                                        branch.outbox.get(ekey)
                                    );
                                    panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had None!" ,channel_id)
                                }
                            }
                        }
                    }
                    log!(logger, "~ ... ... and finished just fine!",);
                    m_ctx.solution_storage.submit_and_digest_subtree_solution(
                        &mut m_ctx.inner.logger,
                        m_ctx.my_subtree_id,
                        predicate.clone(),
                    );
                    self.complete.insert(predicate, branch);
                }
                Sb::PutMsg(ekey, payload) => {
                    assert!(self.ekeys.contains(&ekey));
                    let EndpointExt { info, endpoint } =
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
                        branch.outbox.insert(ekey, payload.clone());
                        let msg = CommMsgContents::SendPayload {
                            payload_predicate: predicate.clone(),
                            payload,
                        }
                        .into_msg(m_ctx.inner.round_index);
                        log!(
                            &mut m_ctx.inner.logger,
                            "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!",
                            &msg,
                            ekey,
                            (info.channel_id.controller_id, info.channel_id.channel_index),
                        );
                        endpoint.send(msg)?;
                        to_run.push((predicate, branch));
                    }
                    // ELSE DROP
                }
            }
        }
        // all in self.incomplete most recently returned Blocker::CouldntReadMsg
        Ok(if self.incomplete.is_empty() {
            if self.complete.is_empty() {
                Srr::NoBranches
            } else {
                Srr::AllBranchesComplete
            }
        } else {
            Srr::BlockingForRecv
        })
    }

    pub(crate) fn poly_recv_run(
        &mut self,
        m_ctx: PolyPContext,
        protocol_description: &ProtocolD,
        ekey: Key,
        payload_predicate: Predicate,
        payload: Payload,
    ) -> Result<SyncRunResult, EndpointErr> {
        // try exact match

        let to_run = if self.complete.contains_key(&payload_predicate) {
            // exact match with stopped machine

            log!(
                &mut m_ctx.inner.logger,
                "... poly_recv_run matched stopped machine exactly! nothing to do here",
            );
            vec![]
        } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) {
            // exact match with running machine

            log!(
                &mut m_ctx.inner.logger,
                "... poly_recv_run matched running machine exactly! pred is {:?}",
                &payload_predicate
            );
            branch.inbox.insert(ekey, payload);
            if branch.blocking_on == Some(ekey) {
                branch.blocking_on = None;
                vec![(payload_predicate, branch)]
            } else {
                vec![]
            }
        } else {
            log!(
                &mut m_ctx.inner.logger,
                "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
            );
            let mut incomplete2 = HashMap::<_, _>::default();
            let to_run = self
                .incomplete
                .drain()
                .filter_map(|(old_predicate, mut branch)| {
                    use CommonSatResult as Csr;
                    match old_predicate.common_satisfier(&payload_predicate) {
                        Csr::FormerNotLatter | Csr::Equivalent => {
                            log!(
                                &mut m_ctx.inner.logger,
                                "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
                                &old_predicate
                            );
                            // old_predicate COVERS the assumptions of payload_predicate

                            if let Some(prev_payload) = branch.inbox.get(&ekey) {
                                // Incorrect to receive two distinct messages in same branch!
                                assert_eq!(prev_payload, &payload);
                            }
                            branch.inbox.insert(ekey, payload.clone());
                            if branch.blocking_on == Some(ekey) {
                                // run.
                                branch.blocking_on = None;
                                Some((old_predicate, branch))
                            } else {
                                // don't bother running. its awaiting something else
                                incomplete2.insert(old_predicate, branch);
                                None
                            }
                        }
                        Csr::New(new) => {
                            log!(
                                &mut m_ctx.inner.logger,
                                "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
                                &payload_predicate,
                                &old_predicate,
                                &new,
                            );
                            // payload_predicate has new assumptions. FORK!
                            let mut payload_branch = branch.clone();
                            if let Some(prev_payload) = payload_branch.inbox.get(&ekey) {
                                // Incorrect to receive two distinct messages in same branch!
                                assert_eq!(prev_payload, &payload);
                            }
                            payload_branch.inbox.insert(ekey, payload.clone());

                            // put the original back untouched
                            incomplete2.insert(old_predicate, branch);
                            if payload_branch.blocking_on == Some(ekey) {
                                // run the fork
                                payload_branch.blocking_on = None;
                                Some((new, payload_branch))
                            } else {
                                // don't bother running. its awaiting something else
                                incomplete2.insert(new, payload_branch);
                                None
                            }
                        }
                        Csr::LatterNotFormer => {
                            log!(
                                &mut m_ctx.inner.logger,
                                "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
                                &old_predicate,
                                &payload_predicate,
                            );
                            // payload_predicate has new assumptions. FORK!
                            let mut payload_branch = branch.clone();
                            if let Some(prev_payload) = payload_branch.inbox.get(&ekey) {
                                // Incorrect to receive two distinct messages in same branch!
                                assert_eq!(prev_payload, &payload);
                            }
                            payload_branch.inbox.insert(ekey, payload.clone());

                            // put the original back untouched
                            incomplete2.insert(old_predicate.clone(), branch);
                            if payload_branch.blocking_on == Some(ekey) {
                                // run the fork
                                payload_branch.blocking_on = None;
                                Some((old_predicate, payload_branch))
                            } else {
                                // don't bother running. its awaiting something else
                                incomplete2.insert(old_predicate, payload_branch);
                                None
                            }
                        }
                        Csr::Nonexistant => {
                            log!(
                                &mut m_ctx.inner.logger,
                                "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
                                &old_predicate,
                                &payload_predicate,
                            );
                            // predicates contradict
                            incomplete2.insert(old_predicate, branch);
                            None
                        }
                    }
                })
                .collect();
            std::mem::swap(&mut self.incomplete, &mut incomplete2);
            to_run
        };
        log!(
            &mut m_ctx.inner.logger,
            "... DONE FEEDING BRANCHES. {} branches to run!",
            to_run.len(),
        );
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
    }

    pub(crate) fn choose_mono(&self, decision: &Predicate) -> Option<MonoP> {
        self.complete
            .iter()
            .find(|(p, _)| decision.satisfies(p))
            .map(|(_, branch)| MonoP { state: branch.state.clone(), ekeys: self.ekeys.clone() })
    }
}

impl PolyN {
    pub fn sync_recv(
        &mut self,
        ekey: Key,
        logger: &mut String,
        payload: Payload,
        payload_predicate: Predicate,
        solution_storage: &mut SolutionStorage,
    ) {
        let mut branches2: HashMap<_, _> = Default::default();
        for (old_predicate, mut branch) in self.branches.drain() {
            use CommonSatResult as Csr;
            let case = old_predicate.common_satisfier(&payload_predicate);
            let mut report_if_solution =
                |branch: &BranchN, pred: &Predicate, logger: &mut String| {
                    if branch.to_get.is_empty() {
                        solution_storage.submit_and_digest_subtree_solution(
                            logger,
                            SubtreeId::PolyN,
                            pred.clone(),
                        );
                    }
                };
            log!(
                logger,
                "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}",
                &payload_predicate,
                &payload,
                &old_predicate,
                &case
            );
            match case {
                Csr::Nonexistant => { /* skip branch */ }
                Csr::FormerNotLatter | Csr::Equivalent => {
                    // Feed the message to this branch in-place. no need to modify pred.
                    if branch.to_get.remove(&ekey) {
                        branch.gotten.insert(ekey, payload.clone());
                        report_if_solution(&branch, &old_predicate, logger);
                    }
                }
                Csr::LatterNotFormer => {
                    // create a new branch with the payload_predicate.
                    let mut forked = branch.clone();
                    if forked.to_get.remove(&ekey) {
                        forked.gotten.insert(ekey, payload.clone());
                        report_if_solution(&forked, &payload_predicate, logger);
                        branches2.insert(payload_predicate.clone(), forked);
                    }
                }
                Csr::New(new) => {
                    // create a new branch with the newly-created predicate
                    let mut forked = branch.clone();
                    if forked.to_get.remove(&ekey) {
                        forked.gotten.insert(ekey, payload.clone());
                        report_if_solution(&forked, &new, logger);
                        branches2.insert(new.clone(), forked);
                    }
                }
            }
            // unlike PolyP machines, Native branches do not become inconsistent
            branches2.insert(old_predicate, branch);
        }
        log!(
            logger,
            "Native now has {} branches with predicates: {:?}",
            branches2.len(),
            branches2.keys().collect::<Vec<_>>()
        );
        std::mem::swap(&mut branches2, &mut self.branches);
    }

    pub fn choose_mono(&self, decision: &Predicate) -> Option<MonoN> {
        self.branches
            .iter()
            .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p))
            .map(|(_, branch)| {
                let BranchN { gotten, sync_batch_index, .. } = branch.clone();
                MonoN { ekeys: self.ekeys.clone(), result: Some((sync_batch_index, gotten)) }
            })
    }
}