diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 3899d7d4c6bd87e943fad19518069369204f4a99..03499cbc17803f00590555ee5de61b0553487c4d 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -114,7 +114,7 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ return match self.prepared.take() { PreparedStatement::None => None, PreparedStatement::CreatedChannel(ports) => Some(ports), - taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken), + taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken), }; } @@ -386,18 +386,24 @@ impl ConnectorPDL { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage { - sync_header, data_header, - content, - })) { - // We don't own the port - let pd = &sched_ctx.runtime.protocol_description; - let eval_error = branch.code_state.new_error_at_expr( - &pd.modules, &pd.heap, - String::from("attempted to 'put' on port that is no longer owned") - ); - self.eval_error = Some(eval_error); - self.mode = Mode::SyncError; + let message = DataMessage{ sync_header, data_header, content }; + match comp_ctx.submit_message(Message::Data(message)) { + Ok(_) => { + // Message is underway + branch.prepared = PreparedStatement::PerformedPut; + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; + }, + Err(_) => { + // We don't own the port + let pd = &sched_ctx.runtime.protocol_description; + let eval_error = branch.code_state.new_error_at_expr( + &pd.modules, &pd.heap, + String::from("attempted to 'put' on port that is no longer owned") + ); + self.eval_error = Some(eval_error); + self.mode = Mode::SyncError; + } } branch.prepared = PreparedStatement::PerformedPut;