Changeset - aa7efaf3fd9b
[Not reviewed]
4 8 0
Christopher Esterhuyse - 5 years ago 2020-04-29 13:16:54
christopher.esterhuyse@gmail.com
finished example 6
12 files changed with 110 insertions and 67 deletions:
0 comments (0 inline, 0 general)
examples/1_socketlike/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // AMY
 
int main() {
 
	
 
	// amy hard-codes her protocol.
 
	char* pdl =
 
	"primitive forward(in i, out o) {"
 
	"	while(true) synchronous {"
 
	"		put(o, get(i));"
 
	"	}"
 
	"}"
 
	;
 
	
 
	// fill a buffer with the user's message to send.
 
	char msg_buf[128];
 
	memset(msg_buf, 0, 128);
 
	
 
	printf("input a message to send:");
 

	
 
	check("fgets", fgets(msg_buf, 128-1, stdin) == NULL);
 
	int msg_len = strlen(msg_buf);
 
	msg_buf[msg_len-1] = 0;
 
	printf("will send msg `%s`\n", msg_buf);
 
	
 
	// create a connector with one outgoing network channel.
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_native(c, 0));
 
	check("bind 1 ", connector_bind_passive(c, 1, "127.0.0.1:7000"));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	// send the user-provided message three times
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("put ", connector_put(c, 0, msg_buf, msg_len));
 
		check("sync", connector_sync(c, 1000));
 
		printf("Sent one message!\n");
 
	}
 
	
 
	// clean up
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	return 0;
 
}
 
\ No newline at end of file
examples/1_socketlike/bob.c
Show inline comments
 
#include <stdio.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // BOB!
 
	
 
int main() {
 

	
 
	// bob hard-codes his protocol.
 
	char* pdl =
 
	"primitive forward(in i, out o) {"
 
	"	while(true) synchronous {"
 
	"		put(o, get(i));"
 
	"	}"
 
	"}"
 
	;
 
	
 
	// BOB
 
	// setup a connector with one incoming network channel.
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_active(c, 0, "127.0.0.1:7000"));
 
	check("bind 1 ", connector_bind_native(c, 1));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	// receive a message and print it to stdout three times
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("get ", connector_get(c, 0));
 
		check("sync", connector_sync(c, 1000));
 

	
 
		int msg_len;
 
		const unsigned char * msg;
 
		check("read", connector_gotten(c, 0, &msg, &msg_len));
 

	
 
		printf("Received one message `%s`!\n", msg);
 
	}
 
	
 

	
 
	// cleanup
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	return 0;
 
}
 
\ No newline at end of file
examples/2_dynamic_pdl/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // AMY
 
int main() {
 

	
 
	// amy's protocol is loaded at runtime from a file.
 
	char * pdl = buffer_pdl("eg_protocols.pdl");
 
	
 
	char msg_buf[128];
 
	memset(msg_buf, 0, 128);
 
	
 
	printf("input a message to send:");
 

	
 
	check("fgets", fgets(msg_buf, 128-1, stdin) == NULL);
 
	int msg_len = strlen(msg_buf);
 
	msg_buf[msg_len-1] = 0;
 
	printf("will send msg `%s`\n", msg_buf);
 
	
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_native(c, 0));
 
	check("bind 1 ", connector_bind_passive(c, 1, "127.0.0.1:7000"));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("put ", connector_put(c, 0, msg_buf, msg_len));
 
		check("sync", connector_sync(c, 1000));
 
		printf("Sent one message!\n");
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/2_dynamic_pdl/bob.c
Show inline comments
 
#include <stdio.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // BOB!
 
int main() {
 

	
 
	// bob's behavior is loaded from a file at runtime
 
	char * pdl = buffer_pdl("eg_protocols.pdl");
 
	
 
	// BOB
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_active(c, 0, "127.0.0.1:7000"));
 
	check("bind 1 ", connector_bind_native(c, 1));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("get ", connector_get(c, 0));
 
		check("sync", connector_sync(c, 1000));
 

	
 
		int msg_len;
 
		const unsigned char * msg;
 
		check("read", connector_gotten(c, 0, &msg, &msg_len));
 

	
 
		printf("Received one message `%s`!\n", msg);
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/3_nondeterminism/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // AMY
 
int main() {
 
	char * pdl = buffer_pdl("eg_protocols.pdl");
 
	
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 

	
 
	check("config ", connector_configure(c, pdl, "sync"));
 
	check("bind 0 ", connector_bind_native(c, 0));
 
	check("bind 1 ", connector_bind_passive(c, 1, "127.0.0.1:7000"));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 

	
 
	// amy offers a message to her peer.
 
	// the message is the number of messages the peer previously received.
 

	
 
	int send_next = 0;
 
	char msg_buf[32];
 
	int code;
 
	int i;
 
	for (i = 0; 1; i++) {
 
		itoa(send_next, msg_buf, 10);
 
		
 
		printf("\nround %d. Will send msg `%s` next", i, msg_buf);
 
		
 
		
 
		// batch 0: no messages sent
 
		// option (a): no messages sent
 
		check("next_batch ", connector_next_batch(c));
 
		
 
		// batch 1: put 0
 
		// option (b): one message sent
 
		check("put ", connector_put(c, 0, msg_buf, strlen(msg_buf) + 1));
 
		code = connector_sync(c, 3000);
 
		
 
		// reflect on the outcome of the exchange
 
		if (code == 0) printf("Sent no message!");
 
		else if (code == 1) {
 
			printf("Sent message `%s`!", msg_buf);
 
			send_next++;
 
		} else {
 
			printf(
 
				"Connector error! %d (%s)\nBreaking loop!\n",
 
				code, connector_error_peek()
 
			);
 
			break;
 
		}
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/3_nondeterminism/bob.c
Show inline comments
 
#include <stdio.h>
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // BOB!
 
int main() {
 
	char * pdl = buffer_pdl("eg_protocols.pdl");
 
	Connector* c = connector_new();
 

	
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "sync"));
 

	
 
	check("bind 0 ", connector_bind_active(c, 0, "127.0.0.1:7000"));
 
	check("bind 1 ", connector_bind_native(c, 1));
 

	
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 

	
 
	int msg_len;
 
	int msg_len, i;
 
	const unsigned char * msg;
 

	
 
	int i;
 
	srand(time(NULL));
 
	for (i = 0; i < 10; i++) {
 
		printf("\nround %d...\n", i);
 
		int random = rand() % 2;
 
		if (random == 0) {
 
			printf("I don't want a message!\n");
 
			check("sync", connector_sync(c, 1000));
 
		} else {
 
			printf("I want a message!\n");
 
			check("get ", connector_get(c, 0));
 
			check("sync", connector_sync(c, 1000));
 
			check("read msg", connector_gotten(c, 0, &msg, &msg_len));
 
			printf("Got message: `%.*s`\n", msg_len, msg);
 
		}
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/5_recovery/amy.exe
Show inline comments
 
deleted file
 
binary diff not shown
examples/5_recovery/bob.exe
Show inline comments
 
deleted file
 
binary diff not shown
examples/6_constraint_solve/THIS IS TODO.txt
Show inline comments
 
deleted file
examples/6_constraint_solve/main.c
Show inline comments
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include <time.h>
 
#include <assert.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
#define N 4
 

	
 
typedef struct PeerInfo {
 
	int id;
 
	bool puts; // true iff the channel to this peer is INCOMING.
 
} PeerInfo;
 

	
 
// return the index of (i,j) in the lexicographic ordering of set {(i,j) : i<j, j<N}
 
// for convenience, swaps (i,j) if i>j 
 
int combination_index(int i, int j) {
 
	if (i > j) {
 
		// swap!
 
		i ^= j;
 
		j ^= i;
 
		i ^= j;
 
	}
 
	assert(0 <= i);
 
	assert(i < j);
 
	assert(j < N);
 
		
 
	int idx_in_square = i*N + j;
 
	int skipped = ((i+1) * (i+2)) / 2;
 
	return idx_in_square - skipped;
 
}
 

	
 
// initializes the given 3-element array,
 
// breaking symmetry with put-get direction.
 
void init_peer_infos(PeerInfo * peer_infos, int my_id) {
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		PeerInfo * pi = &peer_infos[i];
 
		pi->puts = i < my_id;
 
		pi->id = i < my_id ? i : i+1;
 
		printf("info %d puts=%d id=%d\n", i, pi->puts, pi->id);
 
	}
 
}
 

	
 
const char* addrs[] = {
 
	"127.0.0.1:7000",
 
	"127.0.0.1:7001",
 
	"127.0.0.1:7002",
 
	"127.0.0.1:7003",
 
	"127.0.0.1:7004",
 
	"127.0.0.1:7005",
 
};
 
int main(int arg_c, char * argv[]) {
 
	int index;
 
	int my_id, peer_id, i;
 
	if (arg_c != 2) {
 
		printf("Expected one arg: which peer I am in 0..4");
 
		return 1;
 
	}
 
	index = atoi(argv[1]);
 
	printf("I am peer %d\n", index);
 
	my_id = atoi(argv[1]);
 
	assert(0 <= my_id && my_id < N);
 
	printf("I have id %d\n", my_id);
 
	
 
	const char* addrs[] = {
 
		"127.0.0.1:7000",
 
		"127.0.0.1:7001",
 
		"127.0.0.1:7002",
 
		"127.0.0.1:7003",
 
		"127.0.0.1:7004",
 
		"127.0.0.1:7005",
 
	};
 

	
 
	char * pdl = buffer_pdl("eg_protocols.pdl");
 
	
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 

	
 
	check("config ", connector_configure(c, pdl, "xor_three"));
 
	int i, j;
 
	int addr_index = 0;
 
	int port = 0;
 
	for (i = 0; i < 4; i++) {
 
		for (j = i+1; j < 4; j++) {
 
			if (i==index) {
 
				printf("ports %d and %d are for a passive channel to peer %d over addr %s\n", port, port+1, j, addrs[addr_index]);
 
				check("bind an ", connector_bind_native(c, port));
 
				port++;
 
				check("bind a  ", connector_bind_active(c, port, addrs[addr_index]));
 
				port++;
 
			} else if (j==index) {
 
				printf("ports %d and %d are for an active channel to peer %d over addr %s\n", port, port+1, i, addrs[addr_index]);
 
				check("bind p  ", connector_bind_passive(c, port, addrs[addr_index]));
 
				port++;
 
				check("bind pn ", connector_bind_native(c, port));
 
				port++;
 
			}
 
			addr_index++;
 
	
 
	PeerInfo peer_infos[3];
 
	init_peer_infos(peer_infos, my_id);
 
	
 
	// for every native port, bind native and protocol port.
 
	for (i = 0; i < 3; i++) {
 
		PeerInfo * pi = &peer_infos[i];
 
		int addr_idx = combination_index(my_id, pi->id);
 
		if (pi->puts) {
 
			check("bind to putter ",
 
				connector_bind_passive(c, i*2, addrs[addr_idx]));
 
			check("bind native ", connector_bind_native(c, i*2 + 1));
 
		} else {
 
			check("bind native ", connector_bind_native(c, i*2));
 
			check("bind to putter ",
 
				connector_bind_active(c, i*2 + 1, addrs[addr_idx]));
 
		}
 
	}
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	for (i = 0; i < 4; i++) {
 
		if (i == index) continue;
 
		// another batch
 
		for (j = 0; j < 4; j++) {
 
			
 
		}
 
	// for every native port, create a singleton batch
 
	for (i = 0; i < 3; i++) {
 
		if (i > 0) check("next ", connector_next_batch(c));
 
		PeerInfo * pi = &peer_infos[i];
 
		check("op ", pi->puts?
 
			connector_get(c, i):
 
			connector_put(c, i, NULL, 0));
 
	}
 
	connector_sync();
 
	// solve!
 
	int batch_idx = connector_sync(c, 3000);
 
	if (batch_idx < 0) printf("Error code on sync! %d\n", batch_idx);
 
	else printf("I was paired with peer %d\n", peer_infos[batch_idx].id);
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/6_constraint_solve/main.exe
Show inline comments
 
deleted file
 
binary diff not shown
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        let ret = match &decision {
 
            Decision::Success(predicate) => {
 
                // overwrite MonoN/P
 
                self.inner.mono_n = {
 
                    let poly_n = self.ephemeral.poly_n.take().unwrap();
 
                    poly_n.choose_mono(predicate).unwrap_or_else(|| {
 
                        panic!(
 
                            "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}",
 
                            &predicate, &poly_n.branches, &self.inner.logger
 
                        );
 
                    })
 
                };
 
                self.inner.mono_ps.clear();
 
                self.inner.mono_ps.extend(
 
                    self.ephemeral
 
                        .poly_ps
 
                        .drain(..)
 
                        .map(|poly_p| poly_p.choose_mono(predicate).unwrap()),
 
                );
 
                Ok(())
 
            }
 
            Decision::Failure => Err(SyncErr::Timeout),
 
        };
 
        let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with ekey {:?}",
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        ret
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(predicate) = maybe_predicate {
 
                let decision = Decision::Success(predicate);
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
    }
 

	
 
    fn kick_off_native(
 
        &mut self,
 
        sync_batches: impl Iterator<Item = SyncBatch>,
 
    ) -> Result<PolyN, EndpointErr> {
 
        let MonoN { ekeys, .. } = self.inner.mono_n.clone();
 
        let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
        let mut branches = HashMap::<_, _>::default();
 
        for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
            let ekey_to_channel_id = |ekey| endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            let all_ekeys = ekeys.iter().copied();
 
            let all_channel_ids = all_ekeys.map(ekey_to_channel_id);
 

	
 
            let mut predicate = Predicate::new_trivial();
 

	
 
            // assign TRUE for puts and gets
 
            let true_ekeys = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
            // assign FALSE for all in interface not assigned true
 
            predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Having multiple batches with the same
 
                    predicate requires the support of oracle boolean variables"
 
                    "Duplicate predicate {:#?}!\nHaving multiple batches with the same
 
                    predicate requires the support of oracle boolean variables",
 
                    &predicate,
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        if let Some(e) = self.unrecoverable_error {
 
            return Err(e.clone());
 
        }
 
        self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e {
 
            SyncErr::Timeout => e, // this isn't unrecoverable
 
            _ => {
 
                // Must set unrecoverable error! and tear down our net channels
 
                self.unrecoverable_error = Some(e);
 
                self.ephemeral.clear();
 
                self.inner.endpoint_exts = Default::default();
 
                e
 
            }
 
        })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    fn sync_round_inner(
 
        &mut self,
 
        mut deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 
        assert!(self.ephemeral.is_clear());
 
        assert!(self.unrecoverable_error.is_none());
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned());
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
        while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                mono_ps: &mut self.ephemeral.mono_ps,
 
                inner: &mut self.inner,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from ekey -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let ekey_to_holder: HashMap<Key, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.ekeys.iter().map(move |&e| (e, N));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}",
 
            &ekey_to_holder
 
        );
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
        //    of the controller in the overlay tree.
 
        self.ephemeral.solution_storage.reset({
 
            let n = std::iter::once(SubtreeId::PolyN);
 
            let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
            let c = self
 
                .inner
 
                .family
 
                .children_ekeys
 
                .iter()
 
                .map(|&ekey| SubtreeId::ChildController { ekey });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            log!(
 
                &mut self.inner.logger,
 
                "Solution Storage has subtree Ids: {:?}",
 
                &subtree_id_iter.clone().collect::<Vec<_>>()
 
            );
 
            subtree_id_iter
 
        });
 

	
 
        // 5. kick off the synchronous round of the native actor if it exists
 

	
 
        log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
        self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
            // using if let because of nested ? operator
 
            // TODO check that there are 1+ branches or NO SOLUTION
 
            let poly_n = self.kick_off_native(sync_batches)?;
 
            log!(
 
                &mut self.inner.logger,
 
                "PolyN kicked off, and has branches with predicates... {:?}",
 
                poly_n.branches.keys().collect::<Vec<_>>()
 
            );
 
            Some(poly_n)
 
        } else {
 
            log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
            None
 
        };
 

	
 
        // 6. Kick off the synchronous round of each protocol actor
 
        //    If just one actor becomes inconsistent now, there can be no solution!
 
        //    TODO distinguish between completed and not completed poly_p's?
 
        log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
        for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
            let my_subtree_id = SubtreeId::PolyP { index };
 
            let m_ctx = PolyPContext {
 
                my_subtree_id,
 
                inner: &mut self.inner,
 
                solution_storage: &mut self.ephemeral.solution_storage,
 
            };
 
            use SyncRunResult as Srr;
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error
 
        log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline);
 
            let received = match deadline {
 
                None => {
 
                    // we have personally timed out. perform a "long" poll.
 
                    self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP")
 
                }
 
                Some(d) => match self.recv(d)? {
 
                    // we have not yet timed out. performed a time-limited poll
 
                    Some(received) => received,
 
                    None => {
 
                        // timed out! send a FAILURE message to the sink,
 
                        // and henceforth don't time out on polling.
 
                        deadline = None;
 
                        match self.inner.family.parent_ekey {
 
                            None => {
 
                                // I am the sink! announce failure and return.
 
                                return self.end_round_with_decision(Decision::Failure);
 
                            }
 
                            Some(parent_ekey) => {
 
                                // I am not the sink! send a failure message.
 
                                let announcement = Msg::CommMsg(CommMsg {
 
                                    round_index: self.inner.round_index,
 
                                    contents: CommMsgContents::Failure,
 
                                });
 
                                log!(
 
                                    &mut self.inner.logger,
 
                                    "Forwarding {:?} to parent with ekey {:?}",
 
                                    &announcement,
 
                                    parent_ekey
 
                                );
 
                                self.inner
 
                                    .endpoint_exts
 
                                    .get_mut(parent_ekey)
 
                                    .expect("ss")
 
                                    .endpoint
 
                                    .send(announcement.clone())?;
 
                                continue; // poll some more
 
                            }
 
                        }
 
                    }
 
                },
 
            };
 
            log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(s) => {
 
                    // This occurs in the event the connector was malformed during connect()
 
                    println!("WASNT EXPECTING {:?}", s);
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "...and its OLD! :(");
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(&mut self.inner.logger, "... DELAY! :(");
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "... its a round-appropriate CommMsg with key {:?}",
 
                        received.recipient
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
                    contents
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Failure => match self.inner.family.parent_ekey {
 
                    Some(parent_ekey) => {
 
                        let announcement = Msg::CommMsg(CommMsg {
 
                            round_index: self.inner.round_index,
 
                            contents: CommMsgContents::Failure,
 
                        });
 
                        log!(
 
                            &mut self.inner.logger,
 
                            "Forwarding {:?} to parent with ekey {:?}",
 
                            &announcement,
 
                            parent_ekey
 
                        );
 
                        self.inner
 
                            .endpoint_exts
 
                            .get_mut(parent_ekey)
 
                            .expect("ss")
 
                            .endpoint
 
                            .send(announcement.clone())?;
 
                    }
 
                    None => return self.end_round_with_decision(Decision::Failure),
 
                },
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ekeys.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { ekey: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
                CommMsgContents::Announce { decision } => {
 
                    if self.inner.family.parent_ekey != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        received.recipient,
 
                        &decision
 
                    );
 
                    return self.end_round_with_decision(decision);
 
                }
 
                CommMsgContents::SendPayload { payload_predicate, payload } => {
 
                    // check that we expect to be able to receive payloads from this sender
 
                    assert_eq!(
 
                        Getter,
 
                        self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity
 
                    );
 

	
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = ekey_to_holder.get(&received.recipient);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        subtree_id,
 
                        &payload_predicate,
 
                        &payload
 
                    );
 
                    let channel_id = self
 
                        .inner
 
                        .endpoint_exts
 
                        .get(received.recipient)
 
                        .expect("UEHFU")
 
                        .info
 
                        .channel_id;
 
                    if payload_predicate.query(channel_id) != Some(true) {
 
                        // sender didn't preserve the invariant
 
                        return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id));
 
                    }
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
                            let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
                            let m_ctx = PolyPContext {
 
                                my_subtree_id: SubtreeId::PolyP { index: *index },
 
                                inner: &mut self.inner,
 
                                solution_storage: &mut self.ephemeral.solution_storage,
 
                            };
 
                            use SyncRunResult as Srr;
 
                            let blocker = poly_p.poly_recv_run(
 
                                m_ctx,
 
                                &self.protocol_description,
 
                                received.recipient,
0 comments (0 inline, 0 general)