Changeset - aebcee16d6bc
[Not reviewed]
1 7 6
Christopher Esterhuyse - 5 years ago 2020-06-26 11:16:39
christopher.esterhuyse@gmail.com
fixed cyclic routing bug. Was actually a one-liner
14 files changed with 151 insertions and 92 deletions:
0 comments (0 inline, 0 general)
examples/6_amy_log.txt
Show inline comments
 
deleted file
examples/6_atomic/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
	size_t pdl_len = strlen(pdl_ptr);
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
	char logpath[] = "./6_amy_log.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Err %s\n", reowolf_error_peek(NULL));
 
	
 
	PortId putter, getter;
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	connector_print_debug(c);
 
	
 
	printf("Let's try to put without get\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	// connector_get(c, getter);
 
	
 
	int err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	
 
	printf("Let's try again, doing both\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	connector_get(c, getter);
 
	err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, getter, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(pdl_ptr);
 
	return 0;
 
}
 
\ No newline at end of file
examples/7_recovery/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
	size_t pdl_len = strlen(pdl_ptr);
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
	char logpath[] = "./7_amy_log.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Err %s\n", reowolf_error_peek(NULL));
 
	
 
	PortId putter, getter;
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	connector_print_debug(c);
 
	
 
	printf("Let's try to put without get\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	// connector_get(c, getter);
 
	
 
	int err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	
 
	printf("\nLet's try again, doing both\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	connector_get(c, getter);
 
	err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, getter, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(pdl_ptr);
 
	return 0;
 
}
 
\ No newline at end of file
examples/7_recovery/make.sh
Show inline comments
 
new file 100644
 
#!/bin/sh
 

	
 
LIB_PATH="../../target/release"
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH amy.c -o ./amy
examples/8_net_ports/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./8_amy_log.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	PortId putter, getter;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &putter, addr_str, sizeof(addr_str)-1, Putter, Active);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_add_net_port(
 
		c, &getter, addr_str, sizeof(addr_str)-1, Getter, Passive);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_connect(c, 4000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	return 0;
 
}
 
\ No newline at end of file
examples/8_net_ports/make.sh
Show inline comments
 
new file 100644
 
#!/bin/sh
 

	
 
LIB_PATH="../../target/release"
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH amy.c -o ./amy
examples/9_net_self_putget/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./9_amy_log.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	PortId putter, getter;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &putter, addr_str, sizeof(addr_str)-1, Putter, Active);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_add_net_port(
 
		c, &getter, addr_str, sizeof(addr_str)-1, Getter, Passive);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_connect(c, 4000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, putter, "hi", 2);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, getter);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_sync(c, 4000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	return 0;
 
}
 
\ No newline at end of file
examples/9_net_self_putget/make.sh
Show inline comments
 
new file 100644
 
#!/bin/sh
 

	
 
LIB_PATH="../../target/release"
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH amy.c -o ./amy
examples/reowolf_rs.dll
Show inline comments
 
binary diff not shown
reowolf.h
Show inline comments
 
/* CBindgen generated */
 

	
 
#ifndef REOWOLF_HEADER_DEFINED
 
#define REOWOLF_HEADER_DEFINED
 

	
 
#include <stdarg.h>
 
#include <stdbool.h>
 
#include <stdint.h>
 
#include <stdlib.h>
 

	
 
typedef enum {
 
  Active,
 
  Passive,
 
} EndpointPolarity;
 

	
 
typedef enum {
 
  Putter,
 
  Getter,
 
} Polarity;
 

	
 
typedef struct Arc_ProtocolDescription Arc_ProtocolDescription;
 

	
 
typedef struct Connector Connector;
 

	
 
typedef int32_t ErrorCode;
 

	
 
typedef uint32_t ConnectorId;
 

	
 
typedef uint32_t PortSuffix;
 

	
 
typedef struct {
 
  ConnectorId connector_id;
 
  PortSuffix u32_suffix;
 
} Id;
 

	
 
typedef Id PortId;
 

	
 
/**
 
 * Given
 
 * - an initialized connector in setup or connecting state,
 
 * - a string slice for the component's identifier in the connector's configured protocol description,
 
 * - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
 * the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
 * Usable in {setup, communication} states.
 
 */
 
ErrorCode connector_add_component(Connector *connector,
 
                                  const uint8_t *ident_ptr,
 
                                  uintptr_t ident_len,
 
                                  const PortId *ports_ptr,
 
                                  uintptr_t ports_len);
 

	
 
/**
 
 * Given
 
 * - an initialized connector in setup or connecting state,
 
 * - a utf-8 encoded socket address,
 
 * - the logical polarity of P,
 
 * - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
 * returns P, a port newly added to the native interface.
 
 */
 
ErrorCode connector_add_net_port(Connector *connector,
 
                                 PortId *port,
 
                                 const uint8_t *addr_str_ptr,
 
                                 uintptr_t addr_str_len,
 
                                 Polarity port_polarity,
 
                                 EndpointPolarity endpoint_polarity,
 
                                 PortId *port);
 
                                 EndpointPolarity endpoint_polarity);
 

	
 
/**
 
 * Given an initialized connector in setup or connecting state,
 
 * - Creates a new directed port pair with logical channel putter->getter,
 
 * - adds the ports to the native component's interface,
 
 * - and returns them using the given out pointers.
 
 * Usable in {setup, communication} states.
 
 */
 
void connector_add_port_pair(Connector *connector, PortId *out_putter, PortId *out_getter);
 

	
 
/**
 
 * Connects this connector to the distributed system of connectors reachable through endpoints,
 
 * Usable in setup state, and changes the state to communication.
 
 */
 
ErrorCode connector_connect(Connector *connector, int64_t timeout_millis);
 

	
 
/**
 
 * Destroys the given a pointer to the connector on the heap, freeing its resources.
 
 * Usable in {setup, communication} states.
 
 */
 
void connector_destroy(Connector *connector);
 

	
 
ErrorCode connector_get(Connector *connector, PortId port);
 

	
 
const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr_t *len);
 

	
 
/**
 
 * Initializes `out` with a new connector using the given protocol description as its configuration.
 
 * The connector uses the given (internal) connector ID.
 
 */
 
Connector *connector_new(const Arc_ProtocolDescription *pd);
 

	
 
Connector *connector_new_logging(const Arc_ProtocolDescription *pd,
 
                                 const uint8_t *path_ptr,
 
                                 uintptr_t path_len);
 

	
 
intptr_t connector_next_batch(Connector *connector);
 

	
 
void connector_print_debug(Connector *connector);
 

	
 
/**
 
 * Convenience function combining the functionalities of
 
 * "payload_new" with "connector_put_payload".
 
 */
 
ErrorCode connector_put_bytes(Connector *connector,
 
                              PortId port,
 
                              const uint8_t *bytes_ptr,
 
                              uintptr_t bytes_len);
 

	
 
intptr_t connector_sync(Connector *connector, int64_t timeout_millis);
 

	
 
/**
 
 * Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
 */
 
Arc_ProtocolDescription *protocol_description_clone(const Arc_ProtocolDescription *pd);
 

	
 
/**
 
 * Destroys the given initialized protocol description and frees its resources.
 
 */
 
void protocol_description_destroy(Arc_ProtocolDescription *pd);
 

	
 
/**
 
 * Parses the utf8-encoded string slice to initialize a new protocol description object.
 
 * - On success, initializes `out` and returns 0
 
 * - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
 */
 
Arc_ProtocolDescription *protocol_description_parse(const uint8_t *pdl, uintptr_t pdl_len);
 

	
 
/**
 
 * Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
 * - pointer is NULL iff there was no last error
 
 * - data at pointer is null-delimited
 
 * - len does NOT include the length of the null-delimiter
 
 * If len is NULL, it will not written to.
 
 */
 
const uint8_t *reowolf_error_peek(uintptr_t *len);
 

	
 
#endif /* REOWOLF_HEADER_DEFINED */
src/runtime/communication.rs
Show inline comments
 
@@ -304,195 +304,198 @@ impl Connector {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
            }
 
        };
 
        log!(cu.logger, "Sync round ending! Cleaning up");
 
        // dropping {solution_storage, payloads_to_get}
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        mut solution_storage: SolutionStorage,
 
        mut payloads_to_get: Vec<(PortId, SendPayloadMsg)>,
 
        mut deadline: Option<Instant>,
 
    ) -> Result<Decision, SyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.logger, "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            )?;
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
            if branches.is_empty() {
 
                log!(cu.logger, "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.logger, "As the leader, deciding on timeout");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(cu.logger, "Decision loop! have {} messages to recv", payloads_to_get.len());
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                match cu.port_info.routes.get(&getter) {
 
                let route = cu.port_info.routes.get(&getter);
 
                log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route);
 
                match route {
 
                    None => {
 
                        log!(
 
                            cu.logger,
 
                            "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!",
 
                            getter,
 
                            &send_payload_msg
 
                        );
 
                    }
 
                    Some(Route::Endpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        &send_payload_msg,
 
                    ),
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                &send_payload_msg,
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(
 
                                    cu.logger,
 
                                    "{:?} has become inconsistent!",
 
                                    proto_component_id
 
                                );
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
                                        Self::request_failure(cu, comm, parent)?
 
                                    } else {
 
                                        log!(cu.logger, "Already requested failure");
 
                                    }
 
                                } else {
 
                                    log!(cu.logger, "As the leader, deciding on timeout");
 
                                    return Ok(Decision::Failure);
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
 
                                &send_payload_msg,
 
                                proto_component_id
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // check if we have a solution yet
 
            log!(cu.logger, "Check if we have any local decisions...");
 
            for solution in solution_storage.iter_new_local_make_old() {
 
                log!(cu.logger, "New local decision with solution {:?}...", &solution);
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::Suggest { suggestion },
 
                        });
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
                    None => {
 
                        log!(cu.logger, "No parent. Deciding on solution {:?}", &solution);
 
                        return Ok(Decision::Success(solution));
 
                    }
 
                }
 
            }
 

	
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            log!(cu.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (endpoint_index, msg) = loop {
 
                    match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? {
 
                        None => {
 
                            log!(cu.logger, "Reached user-defined deadling without decision...");
 
                            if let Some(parent) = comm.neighborhood.parent {
src/runtime/ffi.rs
Show inline comments
 
@@ -124,197 +124,197 @@ pub unsafe extern "C" fn connector_new_logging(
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_parts(path_ptr, path_len);
 
    let path_str = match std::str::from_utf8(path_bytes) {
 
        Ok(path_str) => path_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return std::ptr::null_mut();
 
        }
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let connector_id = Connector::random_id();
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id, 8);
 
            Box::into_raw(Box::new(c))
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id(), 8);
 
    Box::into_raw(Box::new(c))
 
}
 

	
 
/// Destroys the given a pointer to the connector on the heap, freeing its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    drop(Box::from_raw(connector))
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    out_putter.write(o);
 
    out_getter.write(i);
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_parts(ident_ptr, ident_len),
 
        &*slice_from_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    port: *mut PortId,
 
    addr_str_ptr: *const u8,
 
    addr_str_len: usize,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
    port: *mut PortId,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let addr_bytes = &*slice_from_parts(addr_str_ptr, addr_str_len);
 
    let addr_str = match std::str::from_utf8(addr_bytes) {
 
        Ok(addr_str) => addr_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -1;
 
        }
 
    };
 
    let sock_address: SocketAddr = match addr_str.parse() {
 
        Ok(addr) => addr,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -2;
 
        }
 
    };
 
    match connector.new_net_port(port_polarity, sock_address, endpoint_polarity) {
 
        Ok(p) => {
 
            port.write(p);
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -3
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
 
    timeout_millis: i64,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.connect(timeout) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: *mut Payload,
 
// ) -> ErrorCode {
 
//     match connector.put(port, payload.read()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload_cloning(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: &Payload,
 
// ) -> ErrorCode {
 
//     match connector.put(port, payload.clone()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
/// Convenience function combining the functionalities of
 
/// "payload_new" with "connector_put_payload".
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let bytes = &*slice_from_parts(bytes_ptr, bytes_len);
 
    match connector.put(port, Payload::from(bytes)) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
src/runtime/setup.rs
Show inline comments
 
@@ -54,340 +54,354 @@ impl Connector {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    endpoint_setups,
 
                    &mut cu.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                session_optimize(cu, &mut comm, deadline)?;
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError::*;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ConnectError> {
 
        let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                .expect("mio::TcpStream connect should not fail!");
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(90);
 

	
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = IndexSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 
    let mut connect_failed: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if token == WAKER_TOKEN {
 
                log!(logger, "Notification from waker");
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    log!(
 
                        logger,
 
                        "Restarting connection with endpoint {:?} {:?}",
 
                        index,
 
                        todo.endpoint_setup.sock_addr
 
                    );
 
                    match &mut todo.todo_endpoint {
 
                        TodoEndpoint::Endpoint(endpoint) => {
 
                            let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                .expect("mio::TcpStream connect should not fail!");
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                            std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                            poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                        }
 
                        _ => unreachable!(),
 
                    }
 
                }
 
            } else {
 
                // FIRST try convert this into an endpoint
 
                if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    match listener.accept() {
 
                        Ok((mut stream, peer_addr)) => {
 
                            poll.registry().deregister(listener).unwrap();
 
                            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                            log!(
 
                                logger,
 
                                "Endpoint[{}] accepted a connection from {:?}",
 
                                index,
 
                                peer_addr
 
                            );
 
                            let endpoint = Endpoint { stream, inbox: vec![] };
 
                            todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                        }
 
                        Err(e) if would_block(&e) => {
 
                            log!(logger, "Spurious wakeup on listener {:?}", index)
 
                        }
 
                        Err(_) => {
 
                            log!(logger, "accept() failure on index {}", index);
 
                            return Err(AcceptFailed(listener.local_addr().unwrap()));
 
                        }
 
                    }
 
                }
 
                if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint {
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        connect_failed.insert(index);
 
                        if waker_continue_signal.is_none() {
 
                            log!(logger, "First connect failure. Starting waker thread");
 
                            let waker =
 
                                Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
 
                            let wcs = Arc::new(AtomicBool::from(true));
 
                            let wcs2 = wcs.clone();
 
                            std::thread::spawn(move || {
 
                                while wcs2.load(std::sync::atomic::Ordering::SeqCst) {
 
                                    std::thread::sleep(WAKER_PERIOD);
 
                                    let _ = waker.wake();
 
                                }
 
                            });
 
                            waker_continue_signal = Some(wcs);
 
                        }
 
                        continue;
 
                    }
 
                    if connect_failed.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    if !setup_incomplete.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                    if event.is_writable() && !todo.sent_local_port {
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: todo.local_port,
 
                        }));
 
                        endpoint
 
                            .send(&msg)
 
                            .map_err(|e| {
 
                                EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            })
 
                            .unwrap();
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        todo.sent_local_port = true;
 
                    }
 
                    if event.is_readable() && todo.recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv(logger).map_err(|e| {
 
                            EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
                        }
 
                        match maybe_msg {
 
                            None => {} // msg deserialization incomplete
 
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                if peer_info.polarity == local_polarity {
 
                                    return Err(ConnectError::PortPeerPolarityMismatch(
 
                                        todo.local_port,
 
                                    ));
 
                                }
 
                                todo.recv_peer_port = Some(peer_info.port);
 
                                // 1. finally learned the peer of this port!
 
                                port_info.peers.insert(todo.local_port, peer_info.port);
 
                                // 2. learned the info of this peer port
 
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                port_info.peers.insert(peer_info.port, todo.local_port);
 
                                port_info.routes.insert(peer_info.port, Route::Endpoint { index });
 
                                if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                    // check just for logging purposes
 
                                    log!(
 
                                        logger,
 
                                        "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                        peer_info.port,
 
                                        route
 
                                    );
 
                                }
 
                                port_info
 
                                    .routes
 
                                    .entry(peer_info.port)
 
                                    .or_insert(Route::Endpoint { index });
 
                            }
 
                            Some(inappropriate_msg) => {
 
                                log!(
 
                                    logger,
 
                                    "delaying msg {:?} during channel setup phase",
 
                                    inappropriate_msg
 
                                );
 
                                delayed_messages.push((index, inappropriate_msg));
 
                            }
 
                        }
 
                    }
 
                    if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                        setup_incomplete.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(mut endpoint) => {
 
                    poll.registry()
 
                        .reregister(&mut endpoint.stream, Token(index), Interest::READABLE)
 
                        .unwrap();
 
                    endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    if let Some(wcs) = waker_continue_signal {
 
        log!(logger, "Sending waker the stop signal");
 
        wcs.store(false, std::sync::atomic::Ordering::SeqCst);
 
    }
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        polled_undrained,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        endpoint_exts,
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
        }
 
        Ok(())
 
    }
 
    ///////////////////////
 
    /*
 
    Conceptually, we have two distinct disstributed algorithms back-to-back
 
    1. Leader election using echo algorithm with extinction.
 
        - Each connector initiates a wave tagged with their ID
 
        - Connectors participate in waves of GREATER ID, abandoning previous waves
 
        - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader
 
    2. Tree construction
 
        - The leader broadcasts their leadership with msg A
 
        - Upon receiving their first announcement, connectors reply B, and send A to all peers
 
        - A controller exits once they have received A or B from each neighbor
 

	
 
    The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1,
 
    so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun.
 

	
 
    NOTE the distinction between PARENT and LEADER
 
    */
 
    log!(logger, "beginning neighborhood construction");
src/runtime/tests.rs
Show inline comments
 
@@ -411,96 +411,109 @@ fn local_timeout() {
 
    let test_log_path = Path::new("./logs/local_timeout");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(g).unwrap();
 
    match c.sync(Some(Duration::from_millis(200))) {
 
        Err(SyncError::RoundFailure) => {}
 
        res => panic!("expeted timeout. but got {:?}", res),
 
    }
 
}
 

	
 
#[test]
 
fn parent_timeout() {
 
    let test_log_path = Path::new("./logs/parent_timeout");
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // parent; times out
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // child
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn child_timeout() {
 
    let test_log_path = Path::new("./logs/child_timeout");
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // child; times out
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // parent
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn chain_connect() {
 
    let test_log_path = Path::new("./logs/chain_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(10, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[1], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // LEADER
 
            let mut c = file_logged_connector(7, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[1], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[2], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(4, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[2], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn net_self_loop() {
 
    let test_log_path = Path::new("./logs/net_self_loop");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_millis(500))).unwrap();
 
}
0 comments (0 inline, 0 general)