Changeset - e771fee620aa
[Not reviewed]
0 6 0
MH - 4 years ago 2021-11-15 12:57:41
contact@maxhenger.nl
Remove unnecessary layer of abstraction
6 files changed with 83 insertions and 53 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/executor.rs
Show inline comments
 
@@ -171,60 +171,64 @@ impl Frame {
 
                            self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                            self.serialize_expression(heap, *value_expr_id);
 
                        }
 
                    }
 
                }
 
            },
 
            Expression::Cast(expr) => {
 
                self.serialize_expression(heap, expr.subject);
 
            }
 
            Expression::Call(expr) => {
 
                for arg_expr_id in &expr.arguments {
 
                    self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                    self.serialize_expression(heap, *arg_expr_id);
 
                }
 
            },
 
            Expression::Variable(_expr) => {
 
                // No subexpressions
 
            }
 
        }
 
    }
 
}
 

	
 
type EvalResult = Result<EvalContinuation, EvalError>;
 

	
 
#[derive(Debug)]
 
pub enum EvalContinuation {
 
    // Returned in both sync and non-sync modes
 
    Stepping,
 
    Inconsistent,
 
    Terminal,
 
    SyncBlockStart,
 
    // Returned only in sync mode
 
    BranchInconsistent,
 
    SyncBlockEnd,
 
    NewComponent(DefinitionId, i32, ValueGroup),
 
    NewChannel,
 
    NewFork,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, Value),
 
    Put(PortId, ValueGroup),
 
    // Returned only in non-sync mode
 
    ComponentTerminated,
 
    SyncBlockStart,
 
    NewComponent(DefinitionId, i32, ValueGroup),
 
    NewChannel,
 
}
 

	
 
// Note: cloning is fine, methinks. cloning all values and the heap regions then
 
// we end up with valid "pointers" to heap regions.
 
#[derive(Debug, Clone)]
 
pub struct Prompt {
 
    pub(crate) frames: Vec<Frame>,
 
    pub(crate) store: Store,
 
}
 

	
 
impl Prompt {
 
    pub fn new(_types: &TypeTable, heap: &Heap, def: DefinitionId, monomorph_idx: i32, args: ValueGroup) -> Self {
 
        let mut prompt = Self{
 
            frames: Vec::new(),
 
            store: Store::new(),
 
        };
 

	
 
        // Maybe do typechecking in the future?
 
        debug_assert!((monomorph_idx as usize) < _types.get_base_definition(&def).unwrap().definition.procedure_monomorphs().len());
 
        let new_frame = Frame::new(heap, def, monomorph_idx);
 
        let max_stack_size = new_frame.max_stack_size;
 
        prompt.frames.push(new_frame);
 
        args.into_store(&mut prompt.store);
 
        prompt.store.reserve_stack(max_stack_size);
 
@@ -265,49 +269,49 @@ impl Prompt {
 
        fn array_inclusive_index_is_invalid(store: &Store, array_heap_pos: u32, idx: i64) -> bool {
 
            let array_len = store.heap_regions[array_heap_pos as usize].values.len();
 
            return idx < 0 || idx >= array_len as i64;
 
        }
 

	
 
        fn array_exclusive_index_is_invalid(store: &Store, array_heap_pos: u32, idx: i64) -> bool {
 
            let array_len = store.heap_regions[array_heap_pos as usize].values.len();
 
            return idx < 0 || idx > array_len as i64;
 
        }
 

	
 
        fn construct_array_error(prompt: &Prompt, modules: &[Module], heap: &Heap, expr_id: ExpressionId, heap_pos: u32, idx: i64) -> EvalError {
 
            let array_len = prompt.store.heap_regions[heap_pos as usize].values.len();
 
            return EvalError::new_error_at_expr(
 
                prompt, modules, heap, expr_id,
 
                format!("index {} is out of bounds: array length is {}", idx, array_len)
 
            )
 
        }
 

	
 
        // Checking if we're at the end of execution
 
        let cur_frame = self.frames.last_mut().unwrap();
 
        if cur_frame.position.is_invalid() {
 
            if heap[cur_frame.definition].is_function() {
 
                todo!("End of function without return, return an evaluation error");
 
            }
 
            return Ok(EvalContinuation::Terminal);
 
            return Ok(EvalContinuation::ComponentTerminated);
 
        }
 

	
 
        debug_log!("Taking step in '{}'", heap[cur_frame.definition].identifier().value.as_str());
 

	
 
        // Execute all pending expressions
 
        while !cur_frame.expr_stack.is_empty() {
 
            let next = cur_frame.expr_stack.pop_back().unwrap();
 
            debug_log!("Expr stack: {:?}", next);
 
            match next {
 
                ExprInstruction::PushValToFront => {
 
                    cur_frame.expr_values.rotate_right(1);
 
                },
 
                ExprInstruction::EvalExpr(expr_id) => {
 
                    let expr = &heap[expr_id];
 
                    match expr {
 
                        Expression::Assignment(expr) => {
 
                            let to = cur_frame.expr_values.pop_back().unwrap().as_ref();
 
                            let rhs = cur_frame.expr_values.pop_back().unwrap();
 

	
 
                            // Note: although not pretty, the assignment operator takes ownership
 
                            // of the right-hand side value when possible. So we do not drop the
 
                            // rhs's optionally owned heap data.
 
                            let rhs = self.store.read_take_ownership(rhs);
 
                            apply_assignment_operator(&mut self.store, to, expr.operation, rhs);
 
@@ -602,49 +606,50 @@ impl Prompt {
 
                                    }
 
                                },
 
                                Method::Put => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_port_value = self.store.maybe_read_ref(&port_value).clone();
 

	
 
                                    let port_id = if let Value::Output(port_id) = deref_port_value {
 
                                        port_id
 
                                    } else {
 
                                        unreachable!("executor calling 'put' on value {:?}", deref_port_value)
 
                                    };
 

	
 
                                    let msg_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone();
 

	
 
                                    if ctx.performed_put(port_id) {
 
                                        // We're fine, deallocate in case the expression value stack
 
                                        // held an owned value
 
                                        self.store.drop_value(msg_value.get_heap_pos());
 
                                    } else {
 
                                        // Prepare to execute again
 
                                        cur_frame.expr_values.push_front(msg_value);
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                        return Ok(EvalContinuation::Put(port_id, deref_msg_value));
 
                                        let value_group = ValueGroup::from_store(&self.store, &[deref_msg_value]);
 
                                        return Ok(EvalContinuation::Put(port_id, value_group));
 
                                    }
 
                                },
 
                                Method::Fires => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let port_value_deref = self.store.maybe_read_ref(&port_value).clone();
 

	
 
                                    let port_id = match port_value_deref {
 
                                        Value::Input(port_id) => port_id,
 
                                        Value::Output(port_id) => port_id,
 
                                        _ => unreachable!("executor calling 'fires' on value {:?}", port_value_deref),
 
                                    };
 

	
 
                                    match ctx.fires(port_id) {
 
                                        None => {
 
                                            cur_frame.expr_values.push_front(port_value);
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                            return Ok(EvalContinuation::BlockFires(port_id));
 
                                        },
 
                                        Some(value) => {
 
                                            cur_frame.expr_values.push_back(value);
 
                                        }
 
                                    }
 
                                },
 
                                Method::Create => {
 
@@ -671,49 +676,49 @@ impl Prompt {
 
                                    values.resize(length as usize, Value::UInt8(0));
 
                                    cur_frame.expr_values.push_back(Value::Message(heap_pos));
 
                                },
 
                                Method::Length => {
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let value_heap_pos = value.get_heap_pos();
 
                                    let value = self.store.maybe_read_ref(&value);
 

	
 
                                    let heap_pos = match value {
 
                                        Value::Array(pos) => *pos,
 
                                        Value::String(pos) => *pos,
 
                                        _ => unreachable!("length(...) on {:?}", value),
 
                                    };
 

	
 
                                    let len = self.store.heap_regions[heap_pos as usize].values.len();
 

	
 
                                    // TODO: @PtrInt
 
                                    cur_frame.expr_values.push_back(Value::UInt32(len as u32));
 
                                    self.store.drop_value(value_heap_pos);
 
                                },
 
                                Method::Assert => {
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let value = self.store.maybe_read_ref(&value).clone();
 
                                    if !value.as_bool() {
 
                                        return Ok(EvalContinuation::Inconsistent)
 
                                        return Ok(EvalContinuation::BranchInconsistent)
 
                                    }
 
                                },
 
                                Method::Print => {
 
                                    // Convert the runtime-variant of a string
 
                                    // into an actual string.
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let value_heap_pos = value.as_string();
 
                                    let elements = &self.store.heap_regions[value_heap_pos as usize].values;
 

	
 
                                    let mut message = String::with_capacity(elements.len());
 
                                    for element in elements {
 
                                        message.push(element.as_char());
 
                                    }
 

	
 
                                    // Drop the heap-allocated value from the
 
                                    // store
 
                                    self.store.drop_heap_pos(value_heap_pos);
 
                                    println!("{}", message);
 
                                },
 
                                Method::UserComponent => {
 
                                    // This is actually handled by the evaluation
 
                                    // of the statement.
 
                                    debug_assert_eq!(heap[expr.definition].parameters().len(), cur_frame.expr_values.len());
 
                                    debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this)
 
@@ -921,49 +926,49 @@ impl Prompt {
 

	
 
                // The preceding frame has executed a call, so is expecting the
 
                // return expression on its expression value stack. Note that
 
                // we may be returning a reference to something on our stack,
 
                // so we need to read that value and clone it.
 
                let return_value = cur_frame.expr_values.pop_back().unwrap();
 
                let return_value = match return_value {
 
                    Value::Ref(value_id) => self.store.read_copy(value_id),
 
                    _ => return_value,
 
                };
 

	
 
                // Pre-emptively pop our stack frame
 
                self.frames.pop();
 

	
 
                // Clean up our section of the stack
 
                self.store.clear_stack(0);
 
                self.store.stack.truncate(self.store.cur_stack_boundary + 1);
 
                let prev_stack_idx = self.store.stack.pop().unwrap().as_stack_boundary();
 

	
 
                // TODO: Temporary hack for testing, remove at some point
 
                if self.frames.is_empty() {
 
                    debug_assert!(prev_stack_idx == -1);
 
                    debug_assert!(self.store.stack.len() == 0);
 
                    self.store.stack.push(return_value);
 
                    return Ok(EvalContinuation::Terminal);
 
                    return Ok(EvalContinuation::ComponentTerminated);
 
                }
 

	
 
                debug_assert!(prev_stack_idx >= 0);
 
                // Return to original state of stack frame
 
                self.store.cur_stack_boundary = prev_stack_idx as usize;
 
                let cur_frame = self.frames.last_mut().unwrap();
 
                cur_frame.expr_values.push_back(return_value);
 

	
 
                // We just returned to the previous frame, which might be in
 
                // the middle of evaluating expressions for a particular
 
                // statement. So we don't want to enter the code below.
 
                return Ok(EvalContinuation::Stepping);
 
            },
 
            Statement::Goto(stmt) => {
 
                cur_frame.position = stmt.target.unwrap().upcast();
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::New(stmt) => {
 
                let call_expr = &heap[stmt.expression];
 
                debug_assert!(heap[call_expr.definition].is_component());
 
                debug_assert_eq!(
 
                    cur_frame.expr_values.len(), heap[call_expr.definition].parameters().len(),
 
                    "mismatch in expr stack size and number of arguments for new statement"
src/protocol/mod.rs
Show inline comments
 
@@ -145,95 +145,93 @@ impl ProtocolDescription {
 
    }
 

	
 
    // expects port polarities to be correct
 
    #[deprecated]
 
    pub(crate) fn new_component(&self, module_name: &[u8], identifier: &[u8], ports: &[PortId]) -> ComponentState {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(module_name, identifier).unwrap()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(x)),
 
                Polarity::Putter => args.push(Value::Output(x)),
 
            }
 
        }
 

	
 
        let module_root = self.lookup_module_root(module_name).unwrap();
 
        let root = &self.heap[module_root];
 
        let def = root.get_definition_ident(&self.heap, identifier).unwrap();
 
        // TODO: Check for polymorph
 
        ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) }
 
    }
 

	
 
    // TODO: Ofcourse, rename this at some point, perhaps even remove it in its
 
    //  entirety. Find some way to interface with the parameter's types.
 
    pub(crate) fn new_component_v2(
 
        &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup
 
    ) -> Result<ComponentState, ComponentCreationError> {
 
    ) -> Result<Prompt, ComponentCreationError> {
 
        // Find the module in which the definition can be found
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(ComponentCreationError::ModuleDoesntExist);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let definition_id = root.get_definition_ident(&self.heap, identifier);
 
        if definition_id.is_none() {
 
            return Err(ComponentCreationError::DefinitionDoesntExist);
 
        }
 
        let definition_id = definition_id.unwrap();
 

	
 
        let definition = &self.heap[definition_id];
 
        if !definition.is_component() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // Make sure that the types of the provided value group matches that of
 
        // the expected types.
 
        let definition = definition.as_component();
 
        if !definition.poly_vars.is_empty() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // - check number of arguments
 
        let expr_data = self.types.get_procedure_expression_data(&definition_id, 0);
 
        if expr_data.arg_types.len() != arguments.values.len() {
 
            return Err(ComponentCreationError::InvalidNumArguments);
 
        }
 

	
 
        // - for each argument try to make sure the types match
 
        for arg_idx in 0..arguments.values.len() {
 
            let expected_type = &expr_data.arg_types[arg_idx];
 
            let provided_value = &arguments.values[arg_idx];
 
            if !self.verify_same_type(expected_type, 0, &arguments, provided_value) {
 
                return Err(ComponentCreationError::InvalidArgumentType(arg_idx));
 
            }
 
        }
 

	
 
        // By now we're sure that all of the arguments are correct. So create
 
        // the connector.
 
        return Ok(ComponentState{
 
            prompt: Prompt::new(&self.types, &self.heap, definition_id, 0, arguments),
 
        });
 
        return Ok(Prompt::new(&self.types, &self.heap, definition_id, 0, arguments));
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
                Some(name) => if name.as_bytes() == module_name {
 
                    return Some(module.root_id);
 
                },
 
                None => if module_name.is_empty() {
 
                    return Some(module.root_id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool {
 
        use ConcreteTypePart as CTP;
 

	
 
        match &expected.parts[expected_idx] {
 
            CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(),
 
            CTP::Bool => if let Value::Bool(_) = argument { true } else { false },
 
            CTP::UInt8 => if let Value::UInt8(_) = argument { true } else { false },
 
@@ -302,89 +300,88 @@ pub enum RunResult {
 
    BranchInconsistent, // branch has inconsistent behaviour
 
    BranchMissingPortState(PortId), // branch doesn't know about port firing
 
    BranchGet(PortId), // branch hasn't received message on input port yet
 
    BranchAtSyncEnd,
 
    BranchFork,
 
    BranchPut(PortId, ValueGroup),
 
}
 

	
 
impl ComponentState {
 
    pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult {
 
        use EvalContinuation as EC;
 
        use RunResult as RR;
 

	
 
        loop {
 
            let step_result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            match step_result {
 
                Err(reason) => {
 
                    // TODO: @temp
 
                    println!("Evaluation error:\n{}", reason);
 
                    todo!("proper error handling/bubbling up");
 
                },
 
                Ok(continuation) => match continuation {
 
                    // TODO: Probably want to remove this translation
 
                    EC::Stepping => continue,
 
                    EC::Inconsistent => return RR::BranchInconsistent,
 
                    EC::Terminal => return RR::ComponentTerminated,
 
                    EC::BranchInconsistent => return RR::BranchInconsistent,
 
                    EC::ComponentTerminated => return RR::ComponentTerminated,
 
                    EC::SyncBlockStart => return RR::ComponentAtSyncStart,
 
                    EC::SyncBlockEnd => return RR::BranchAtSyncEnd,
 
                    EC::NewComponent(definition_id, monomorph_idx, args) =>
 
                        return RR::NewComponent(definition_id, monomorph_idx, args),
 
                    EC::NewChannel =>
 
                        return RR::NewChannel,
 
                    EC::NewFork =>
 
                        return RR::BranchFork,
 
                    EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchGet(port_id),
 
                    EC::Put(port_id, value) => {
 
                        let value_group = ValueGroup::from_store(&self.prompt.store, &[value]);
 
                    EC::Put(port_id, value_group) => {
 
                        return RR::BranchPut(port_id, value_group);
 
                    },
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
// TODO: @remove the old stuff
 
impl ComponentState {
 
    pub(crate) fn nonsync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut NonsyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> NonsyncBlocker {
 
        let mut context = EvalContext::Nonsync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context);
 
            match result {
 
                Err(err) => {
 
                    println!("Evaluation error:\n{}", err);
 
                    panic!("proper error handling when component fails");
 
                },
 
                Ok(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return NonsyncBlocker::Inconsistent,
 
                    EvalContinuation::Terminal => return NonsyncBlocker::ComponentExit,
 
                    EvalContinuation::BranchInconsistent => return NonsyncBlocker::Inconsistent,
 
                    EvalContinuation::ComponentTerminated => return NonsyncBlocker::ComponentExit,
 
                    EvalContinuation::SyncBlockStart => return NonsyncBlocker::SyncBlockStart,
 
                    // Not possible to end sync block if never entered one
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(definition_id, monomorph_idx, args) => {
 
                        // Look up definition (TODO for now, assume it is a definition)
 
                        let mut moved_ports = HashSet::new();
 
                        for arg in args.values.iter() {
 
                            match arg {
 
                                Value::Output(port) => {
 
                                    moved_ports.insert(*port);
 
                                }
 
                                Value::Input(port) => {
 
                                    moved_ports.insert(*port);
 
                                }
 
                                _ => {}
 
                            }
 
                        }
 
                        for region in args.regions.iter() {
 
                            for arg in region {
 
                                match arg {
 
                                    Value::Output(port) => { moved_ports.insert(*port); },
 
                                    Value::Input(port) => { moved_ports.insert(*port); },
 
                                    _ => {},
 
                                }
 
@@ -406,73 +403,75 @@ impl ComponentState {
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(_) => unreachable!(),
 
                    EvalContinuation::BlockGet(_) => unreachable!(),
 
                    EvalContinuation::Put(_, _) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn sync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut SyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> SyncBlocker {
 
        let mut context = EvalContext::Sync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context);
 
            match result {
 
                Err(err) => {
 
                    println!("Evaluation error:\n{}", err);
 
                    panic!("proper error handling when component fails");
 
                },
 
                Ok(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return SyncBlocker::Inconsistent,
 
                    EvalContinuation::BranchInconsistent => return SyncBlocker::Inconsistent,
 
                    // First need to exit synchronous block before definition may end
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    EvalContinuation::ComponentTerminated => unreachable!(),
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _, _) => unreachable!(),
 
                    EvalContinuation::NewChannel => unreachable!(),
 
                    EvalContinuation::NewFork => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => {
 
                        return SyncBlocker::CouldntCheckFiring(port);
 
                    },
 
                    EvalContinuation::BlockGet(port) => {
 
                        return SyncBlocker::CouldntReadMsg(port);
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let payload;
 
                        match message {
 

	
 
                        // Extract bytes from `put`
 
                        match &message.values[0] {
 
                            Value::Null => {
 
                                return SyncBlocker::Inconsistent;
 
                            },
 
                            Value::Message(heap_pos) => {
 
                                // Create a copy of the payload
 
                                let values = &self.prompt.store.heap_regions[heap_pos as usize].values;
 
                                let values = &message.regions[*heap_pos as usize];
 
                                let mut bytes = Vec::with_capacity(values.len());
 
                                for value in values {
 
                                    bytes.push(value.as_uint8());
 
                                }
 
                                payload = Payload(Arc::new(bytes));
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return SyncBlocker::PutMsg(port, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
impl RunContext for EvalContext<'_> {
 
    fn performed_put(&mut self, port: PortId) -> bool {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(ctx) => {
 
                ctx.did_put_or_get(port)
 
            }
src/runtime2/branch.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 
use crate::protocol::eval::{Value, ValueGroup};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 

	
 
use super::port::PortIdLocal;
 

	
 
// To share some logic between the FakeTree and ExecTree implementation
 
trait BranchListItem {
 
    fn get_id(&self) -> BranchId;
 
    fn set_next_id(&mut self, id: BranchId);
 
    fn get_next_id(&self) -> BranchId;
 
}
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32
 
}
 

	
 
impl BranchId {
 
    #[inline]
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ index: 0 };
 
    }
 

	
 
@@ -67,64 +66,64 @@ impl PreparedStatement {
 
            return true;
 
        } else {
 
            return false;
 
        }
 
    }
 

	
 
    pub(crate) fn take(&mut self) -> PreparedStatement {
 
        if let PreparedStatement::None = self {
 
            return PreparedStatement::None;
 
        } else {
 
            let mut replacement = PreparedStatement::None;
 
            std::mem::swap(self, &mut replacement);
 
            return replacement;
 
        }
 
    }
 
}
 

	
 
/// The execution state of a branch. This envelops the PDL code and the
 
/// execution state. And derived from that: if we're ready to keep running the
 
/// code, or if we're halted for some reason (e.g. waiting for a message).
 
pub(crate) struct Branch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    // Execution state
 
    pub code_state: ComponentState,
 
    pub code_state: Prompt,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
    pub prepared: PreparedStatement,
 
}
 

	
 
impl BranchListItem for Branch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
impl Branch {
 
    /// Creates a new non-speculative branch
 
    pub(crate) fn new_non_sync(component_state: ComponentState) -> Self {
 
    pub(crate) fn new_non_sync(component_state: Prompt) -> Self {
 
        Branch {
 
            id: BranchId::new_invalid(),
 
            parent_id: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        // debug_assert!(
 
        //     (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
        //     (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        // ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.id,
 
            code_state: parent_branch.code_state.clone(),
 
@@ -179,49 +178,49 @@ impl QueueKind {
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ExecTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Execution tree of branches. Tries to keep the extra information stored
 
/// herein to a minimum. So the execution tree is aware of the branches, their
 
/// execution state and the way they're dependent on each other, but the
 
/// execution tree should not be aware of e.g. sync algorithms.
 
///
 
/// Note that the tree keeps track of multiple lists of branches. Each list
 
/// contains branches that ended up in a particular execution state. The lists
 
/// are described by the various `BranchQueue` instances and the `next_in_queue`
 
/// field in each branch.
 
pub(crate) struct ExecTree {
 
    // All branches. the `parent_id` field in each branch implies the shape of
 
    // the tree. Branches are index stable throughout a sync round.
 
    pub branches: Vec<Branch>,
 
    queues: [BranchQueue; NUM_QUEUES]
 
}
 

	
 
impl ExecTree {
 
    /// Constructs a new execution tree with a single non-sync branch.
 
    pub fn new(component: ComponentState) -> Self {
 
    pub fn new(component: Prompt) -> Self {
 
        return Self {
 
            branches: vec![Branch::new_non_sync(component)],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    // --- Generic branch (queue) management
 

	
 
    /// Returns if tree is in speculative mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return self.branches.len() != 1;
 
    }
 

	
 
    /// Returns true if the particular queue is empty
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    /// Pops a branch (ID) from a queue.
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
src/runtime2/connector.rs
Show inline comments
 
@@ -8,81 +8,82 @@
 
// a consensus algorithm, etc. But on the other hand, our data is rather
 
// simple: we have a speculative execution tree, a set of ports that we own,
 
// and a bit of code that we should run.
 
//
 
// So currently the code is organized as following:
 
// - The scheduler that is running the component is the authoritative source on
 
//     ports during *non-sync* mode. The consensus algorithm is the
 
//     authoritative source during *sync* mode. They retrieve each other's
 
//     state during the transitions. Hence port data exists duplicated between
 
//     these two datastructures.
 
// - The execution tree is where executed branches reside. But the execution
 
//     tree is only aware of the tree shape itself (and keeps track of some
 
//     queues of branches that are in a particular state), and tends to store
 
//     the PDL program state. The consensus algorithm is also somewhat aware
 
//     of the execution tree, but only in terms of what is needed to complete
 
//     a sync round (for now, that means the port mapping in each branch).
 
//     Hence once more we have properties conceptually associated with branches
 
//     in two places.
 
// - TODO: Write about handling messages, consensus wrapping data
 
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx
 

	
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::PortId;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::PreparedStatement;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx};
 

	
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
#[derive(Debug)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,          // Run again, immediately
 
    Later,              // Schedule for running, at some later point in time
 
    NotNow,             // Do not reschedule for running
 
    Exit,               // Connector has exited
 
    Error(EvalError),   // Connector has experienced a fatal error
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
// TODO: Remove remaining fields once 'fires()' is removed from language.
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    prepared: PreparedStatement,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn performed_put(&mut self, _port: PortId) -> bool {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => false,
 
            PreparedStatement::PerformedPut => true,
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken)
 
        };
 
    }
 

	
 
@@ -128,49 +129,49 @@ impl Connector for ConnectorPDL {
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 

	
 
                if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Actually found a solution
 
                    self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 

	
 
                self.last_finished_handled = Some(branch_id);
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: ComponentState) -> Self {
 
    pub fn new(initial: Prompt) -> Self {
 
        Self{
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) {
 
        while let Some(message) = ctx.read_next_message() {
 
            match message {
 
                Message::Data(message) => self.handle_new_data_message(message, ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, ctx),
 
                Message::Control(_) => unreachable!("control message in component"),
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
@@ -204,221 +205,242 @@ impl ConnectorPDL {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        // Retrieve the branch and run it
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            return ConnectorScheduling::Error(eval_error);
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
            EvalContinuation::BranchInconsistent => {
 
                // Branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
            EvalContinuation::BlockFires(port_id) => {
 
                // Branch called `fires()` on a port that has not been used yet.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                // Create two forks, one that assumes the port will fire, and
 
                // one that assumes the port remains silent
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 

	
 
                let firing_branch_id = self.tree.fork_branch(branch_id);
 
                let silent_branch_id = self.tree.fork_branch(branch_id);
 
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
 
                debug_assert_eq!(_result, Consistency::Valid);
 
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
 
                debug_assert_eq!(_result, Consistency::Valid);
 

	
 
                // Somewhat important: we push the firing one first, such that
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchGet(port_id) => {
 
            EvalContinuation::BlockGet(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
                self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                // Note: we only know that a branch is waiting on a message when
 
                // it reaches the `get` call. But we might have already received
 
                // a message that targets this branch, so check now.
 
                let mut any_message_received = false;
 
                for message in comp_ctx.get_read_data_messages(port_id) {
 
                    if self.consensus.branch_can_receive(branch_id, &message) {
 
                        // This branch can receive the message, so we do the
 
                        // fork-and-receive dance
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
                    }
 
                }
 

	
 
                if any_message_received {
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
            RunResult::BranchAtSyncEnd => {
 
            EvalContinuation::SyncBlockEnd => {
 
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
                if consistency == Consistency::Valid {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchFork => {
 
            EvalContinuation::NewFork => {
 
                // Like the `NewChannel` result. This means we're setting up
 
                // a branch and putting a marker inside the RunContext for the
 
                // next time we run the PDL code
 
                let left_id = branch_id;
 
                let right_id = self.tree.fork_branch(left_id);
 
                self.consensus.notify_of_new_branch(left_id, right_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, left_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            RunResult::BranchPut(port_id, content) => {
 
            EvalContinuation::Put(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content: DataContent::Message(content),
 
                }));
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 

	
 
        // If here then the run result did not require a particular action. We
 
        // return whether we have more active branches to run or not.
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());
 

	
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            return ConnectorScheduling::Error(eval_error);
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
            EvalContinuation::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 

	
 
                return ConnectorScheduling::Exit;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
            EvalContinuation::SyncBlockStart => {
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                debug_assert!(self.last_finished_handled.is_none());
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
            EvalContinuation::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Note: we're relinquishing ownership of ports. But because
 
                // we are in non-sync mode the scheduler will handle and check
 
                // port ownership transfer.
 
                debug_assert!(comp_ctx.workspace_ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);
 

	
 
                let new_state = ComponentState {
 
                    prompt: Prompt::new(
 
                let new_prompt = Prompt::new(
 
                    &sched_ctx.runtime.protocol_description.types,
 
                    &sched_ctx.runtime.protocol_description.heap,
 
                    definition_id, monomorph_idx, arguments
 
                    ),
 
                };
 
                let new_component = ConnectorPDL::new(new_state);
 
                );
 
                let new_component = ConnectorPDL::new(new_prompt);
 
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
 
                comp_ctx.workspace_ports.clear();
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
            EvalContinuation::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared = PreparedStatement::CreatedChannel((
 
                    Value::Output(PortId::new(putter.self_id.index)),
 
                    Value::Input(PortId::new(getter.self_id.index)),
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) {
 
        let mut fake_vec = Vec::new();
 
        self.tree.end_sync(solution_branch_id);
 
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            // TODO: Handle sent/received ports
 
            debug_assert!(ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        ctx.notify_sync_end(&[]);
 
        self.last_finished_handled = None;
 
    }
 

	
 
    /// Runs the prompt repeatedly until some kind of execution-blocking
 
    /// condition appears.
 
    #[inline]
 
    fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result<EvalContinuation, EvalError> {
 
        loop {
 
            let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            if let Ok(EvalContinuation::Stepping) = result {
 
                continue;
 
            }
 

	
 
            return result;
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/native.rs
Show inline comments
 
@@ -402,50 +402,50 @@ impl ApplicationInterface {
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        if self.is_in_sync {
 
            return Err(ComponentCreationError::InSync);
 
        }
 

	
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for initial_port in &initial_ports {
 
            if !self.owned_ports.iter().any(|(_, v)| v == initial_port) {
 
                return Err(ComponentCreationError::UnownedPort);
 
            }
 
        }
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(state);
 
        let prompt = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(prompt);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push_back(ApplicationJob::NewConnector(connector, initial_ports));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Queues up a description of a synchronous round to run. Will not actually
 
    /// run the synchronous behaviour in blocking fashion. The results *must* be
 
    /// retrieved using `try_wait` or `wait` for the interface to be considered
 
    /// in non-sync mode.
 
    // TODO: Maybe change API in the future. For now it does the job
 
    pub fn perform_sync_round(&mut self, actions: Vec<ApplicationSyncAction>) -> Result<(), ApplicationStartSyncError> {
 
        if self.is_in_sync {
 
            return Err(ApplicationStartSyncError::AlreadyInSync);
 
        }
 

	
 
        // Check the action ports for consistency
 
        for action in &actions {
src/runtime2/scheduler.rs
Show inline comments
 
@@ -27,49 +27,49 @@ impl Scheduler {
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            self.debug("Waiting for work");
 
            let connector_key = self.runtime.wait_for_work();
 
            if connector_key.is_none() {
 
                // We should exit
 
                self.debug(" ... No more work, quitting");
 
                break 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index));
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
            while let ConnectorScheduling::Immediate = cur_schedule {
 
                self.handle_inbox_messages(scheduled);
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, "Finished running");
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
@@ -94,48 +94,53 @@ impl Scheduler {
 
                    // we're running it must currently be `false`.
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Error(eval_error) => {
 
                    // Display error. Then exit
 
                    println!("Oh oh!\n{}", eval_error);
 
                    panic!("Abort!");
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message));
 
            if let Some(target_port) = Self::get_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
0 comments (0 inline, 0 general)