diff --git a/src/protocol/lexer.rs b/src/protocol/lexer.rs index 6718ca6da65deb1a82047a005ba8b4da7fe4406c..35a0a2f76a8dd0dbf3ed069f008dc94ad3cc1668 100644 --- a/src/protocol/lexer.rs +++ b/src/protocol/lexer.rs @@ -1031,7 +1031,7 @@ impl Lexer<'_> { self.source.consume(); next = self.source.next(); } - if next != Some(b'\'') || data.len() == 0 { + if next != Some(b'\'') || data.is_empty() { return Err(self.source.error("Expected character constant")); } self.source.consume(); @@ -1205,14 +1205,10 @@ impl Lexer<'_> { } let backup = self.source.clone(); let mut result = false; - match self.consume_type_annotation_spilled() { - Ok(_) => match self.consume_whitespace(false) { - Ok(_) => { - result = self.has_identifier(); - } - Err(_) => {} - }, - Err(_) => {} + if let Ok(_) = self.consume_type_annotation_spilled() { + if let Ok(_) = self.consume_whitespace(false) { + result = self.has_identifier(); + } } *self.source = backup; return result; @@ -1231,7 +1227,7 @@ impl Lexer<'_> { self.consume_whitespace(false)?; } self.consume_string(b"}")?; - if statements.len() == 0 { + if statements.is_empty() { Ok(h.alloc_skip_statement(|this| SkipStatement { this, position, next: None }).upcast()) } else { Ok(h.alloc_block_statement(|this| BlockStatement { @@ -1341,16 +1337,13 @@ impl Lexer<'_> { self.consume_whitespace(false)?; let true_body = self.consume_statement(h)?; self.consume_whitespace(false)?; - let false_body; - if self.has_keyword(b"else") { + let false_body = if self.has_keyword(b"else") { self.consume_keyword(b"else")?; self.consume_whitespace(false)?; - false_body = self.consume_statement(h)?; + self.consume_statement(h)? } else { - false_body = h - .alloc_skip_statement(|this| SkipStatement { this, position, next: None }) - .upcast(); - } + h.alloc_skip_statement(|this| SkipStatement { this, position, next: None }).upcast() + }; Ok(h.alloc_if_statement(|this| IfStatement { this, position, test, true_body, false_body })) } fn consume_while_statement(&mut self, h: &mut Heap) -> Result { @@ -1432,12 +1425,11 @@ impl Lexer<'_> { let position = self.source.pos(); self.consume_keyword(b"return")?; self.consume_whitespace(false)?; - let expression; - if self.has_string(b"(") { - expression = self.consume_paren_expression(h)?; + let expression = if self.has_string(b"(") { + self.consume_paren_expression(h) } else { - expression = self.consume_expression(h)?; - } + self.consume_expression(h) + }?; self.consume_whitespace(false)?; self.consume_string(b";")?; Ok(h.alloc_return_statement(|this| ReturnStatement { this, position, expression })) @@ -1446,12 +1438,11 @@ impl Lexer<'_> { let position = self.source.pos(); self.consume_keyword(b"assert")?; self.consume_whitespace(false)?; - let expression; - if self.has_string(b"(") { - expression = self.consume_paren_expression(h)?; + let expression = if self.has_string(b"(") { + self.consume_paren_expression(h) } else { - expression = self.consume_expression(h)?; - } + self.consume_expression(h) + }?; self.consume_whitespace(false)?; self.consume_string(b";")?; Ok(h.alloc_assert_statement(|this| AssertStatement { diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index a76dd76dfb6a964fb93ca42ea626d737d198f086..8f9465dcf03b75edb18c65fb83525bb3a61721cb 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -203,7 +203,7 @@ impl ComponentState { } Value::Message(MessageValue(Some(buffer))) => { // Create a copy of the payload - payload = buffer.clone(); + payload = buffer; } _ => unreachable!(), } diff --git a/src/protocol/parser.rs b/src/protocol/parser.rs index dd1bae03bcf1a1f6e8a6aac49c694c4bba94ab04..7a66d156d202beb48c7ce09d85ca6b798ca398eb 100644 --- a/src/protocol/parser.rs +++ b/src/protocol/parser.rs @@ -999,7 +999,7 @@ impl Visitor for ResolveVariables { let id = h[var].identifier; // First check whether variable with same identifier is in scope let check_duplicate = self.find_variable(h, id); - if !check_duplicate.is_none() { + if check_duplicate.is_some() { return Err(ParseError::new(h[id].position, "Declared variable clash")); } // Then check the expression's variables (this should not refer to own variable) @@ -1018,7 +1018,7 @@ impl Visitor for ResolveVariables { let var = h[stmt].from; let id = h[var].identifier; let check_duplicate = self.find_variable(h, id); - if !check_duplicate.is_none() { + if check_duplicate.is_some() { return Err(ParseError::new(h[id].position, "Declared variable clash")); } let mut block = &mut h[self.scope.unwrap().to_block()]; @@ -1029,7 +1029,7 @@ impl Visitor for ResolveVariables { let var = h[stmt].to; let id = h[var].identifier; let check_duplicate = self.find_variable(h, id); - if !check_duplicate.is_none() { + if check_duplicate.is_some() { return Err(ParseError::new(h[id].position, "Declared variable clash")); } let mut block = &mut h[self.scope.unwrap().to_block()]; diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index b38ef2db7bb7e121b63361fafcced1793eacac0c..ae4c9985bebf731f91caeaf5bec0a4158c378339 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -42,26 +42,30 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { impl Connector { pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError::*; - match &mut self.phased { + let Self { phased, .. } = self; + match phased { ConnectorPhased::Setup { .. } => Err(NoPreviousRound), - ConnectorPhased::Communication { round_result, .. } => match round_result { - Err(_) => Err(PreviousSyncFailed), - Ok(None) => Err(NoPreviousRound), - Ok(Some((_index, gotten))) => gotten.get(&port).ok_or(PortDidntGet), - }, + ConnectorPhased::Communication(ConnectorCommunication { round_result, .. }) => { + match round_result { + Err(_) => Err(PreviousSyncFailed), + Ok(None) => Err(NoPreviousRound), + Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet), + } + } } } pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { use PortOpError::*; - if !self.native_ports.contains(&port) { + let Self { unphased, phased } = self; + if !unphased.native_ports.contains(&port) { return Err(PortUnavailable); } - if Putter != *self.port_info.polarities.get(&port).unwrap() { + if Putter != *unphased.port_info.polarities.get(&port).unwrap() { return Err(WrongPolarity); } - match &mut self.phased { + match phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, .. } => { + ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { let batch = native_batches.last_mut().unwrap(); if batch.to_put.contains_key(&port) { return Err(MultipleOpsOnPort); @@ -74,9 +78,10 @@ impl Connector { pub fn next_batch(&mut self) -> Result { // returns index of new batch use NextBatchError::*; - match &mut self.phased { + let Self { phased, .. } = self; + match phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, .. } => { + ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { native_batches.push(Default::default()); Ok(native_batches.len() - 1) } @@ -84,15 +89,16 @@ impl Connector { } pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { use PortOpError::*; - if !self.native_ports.contains(&port) { + let Self { unphased, phased } = self; + if !unphased.native_ports.contains(&port) { return Err(PortUnavailable); } - if Getter != *self.port_info.polarities.get(&port).unwrap() { + if Getter != *unphased.port_info.polarities.get(&port).unwrap() { return Err(WrongPolarity); } - match &mut self.phased { + match phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, .. } => { + ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { let batch = native_batches.last_mut().unwrap(); if !batch.to_get.insert(port) { return Err(MultipleOpsOnPort); @@ -101,417 +107,405 @@ impl Connector { } } } + // entrypoint for caller. overwrites round result enum, and returns what happened pub fn sync(&mut self, timeout: Option) -> Result { - use SyncError::*; - match &mut self.phased { - ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { - round_index, - neighborhood, - native_batches, - endpoint_manager, - round_result, - .. - } => { - let mut deadline = timeout.map(|to| Instant::now() + to); - let logger: &mut dyn Logger = &mut *self.logger; - // 1. run all proto components to Nonsync blockers - log!( - logger, - "~~~ SYNC called with timeout {:?}; starting round {}", - &timeout, - round_index - ); - let mut branching_proto_components = - HashMap::::default(); - let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = - self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); - log!(logger, "Nonsync running {} proto components...", unrun_components.len()); - while let Some((proto_component_id, mut component)) = unrun_components.pop() { - // TODO coalesce fields - log!( - logger, - "Nonsync running proto component with ID {:?}. {} to go after this", - proto_component_id, - unrun_components.len() - ); - let mut ctx = NonsyncProtoContext { - logger: &mut *logger, - port_info: &mut self.port_info, - id_manager: &mut self.id_manager, - proto_component_id, - unrun_components: &mut unrun_components, - proto_component_ports: &mut self - .proto_components - .get_mut(&proto_component_id) - .unwrap() - .ports, - }; - let blocker = component.state.nonsync_run(&mut ctx, &self.proto_description); - log!( - logger, - "proto component {:?} ran to nonsync blocker {:?}", - proto_component_id, - &blocker - ); - use NonsyncBlocker as B; - match blocker { - B::ComponentExit => drop(component), - B::Inconsistent => { - return Err(InconsistentProtoComponent(proto_component_id)) - } - B::SyncBlockStart => { - branching_proto_components.insert( - proto_component_id, - BranchingProtoComponent::initial(component), - ); - } - } + let Self { unphased, phased } = self; + match phased { + ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), + ConnectorPhased::Communication(comm) => { + comm.round_result = Self::connected_sync(unphased, comm, timeout); + match &comm.round_result { + Ok(None) => unreachable!(), + Ok(Some(ok_result)) => Ok(ok_result.batch_index), + Err(sync_error) => Err(sync_error.clone()), } - log!( - logger, - "All {} proto components are now done with Nonsync phase", - branching_proto_components.len(), - ); + } + } + } - // NOTE: all msgs in outbox are of form (Getter, Payload) - let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; + // TODO make cu immutable + fn connected_sync( + cu: &mut ConnectorUnphased, + comm: &mut ConnectorCommunication, + timeout: Option, + ) -> Result, SyncError> { + use SyncError as Se; + let mut deadline = timeout.map(|to| Instant::now() + to); + // 1. run all proto components to Nonsync blockers + log!( + cu.logger, + "~~~ SYNC called with timeout {:?}; starting round {}", + &timeout, + comm.round_index + ); + let mut branching_proto_components = + HashMap::::default(); + let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = + cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len()); + while let Some((proto_component_id, mut component)) = unrun_components.pop() { + // TODO coalesce fields + log!( + cu.logger, + "Nonsync running proto component with ID {:?}. {} to go after this", + proto_component_id, + unrun_components.len() + ); + let mut ctx = NonsyncProtoContext { + logger: &mut *cu.logger, + port_info: &mut cu.port_info, + id_manager: &mut cu.id_manager, + proto_component_id, + unrun_components: &mut unrun_components, + proto_component_ports: &mut cu + .proto_components + .get_mut(&proto_component_id) + .unwrap() + .ports, + }; + let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); + log!( + cu.logger, + "proto component {:?} ran to nonsync blocker {:?}", + proto_component_id, + &blocker + ); + use NonsyncBlocker as B; + match blocker { + B::ComponentExit => drop(component), + B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)), + B::SyncBlockStart => { + branching_proto_components + .insert(proto_component_id, BranchingProtoComponent::initial(component)); + } + } + } + log!( + cu.logger, + "All {} proto components are now done with Nonsync phase", + branching_proto_components.len(), + ); - // create the solution storage - let mut solution_storage = { - let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); - let c = self - .proto_components - .keys() - .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); - let e = neighborhood.children.iter().map(|&index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) - }; - log!(logger, "Solution storage initialized"); + // NOTE: all msgs in outbox are of form (Getter, Payload) + let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; - // 2. kick off the native + // create the solution storage + let mut solution_storage = { + let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); + let c = cu + .proto_components + .keys() + .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); + let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); + SolutionStorage::new(n.chain(c).chain(e)) + }; + log!(cu.logger, "Solution storage initialized"); + + // 2. kick off the native + log!( + cu.logger, + "Translating {} native batches into branches...", + comm.native_batches.len() + ); + let mut branching_native = BranchingNative { branches: Default::default() }; + for (index, NativeBatch { to_get, to_put }) in comm.native_batches.drain(..).enumerate() { + let predicate = { + let mut predicate = Predicate::default(); + // assign trues + for &port in to_get.iter().chain(to_put.keys()) { + let var = cu.port_info.firing_var_for(port); + predicate.assigned.insert(var, true); + } + // assign falses + for &port in cu.native_ports.iter() { + let var = cu.port_info.firing_var_for(port); + predicate.assigned.entry(var).or_insert(false); + } + predicate + }; + log!(cu.logger, "Native branch {} has pred {:?}", index, &predicate); + + // put all messages + for (putter, payload) in to_put { + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; + log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); + // rely on invariant: sync batches respect port polarity + let getter = *cu.port_info.peers.get(&putter).unwrap(); + payloads_to_get.push((getter, msg)); + } + if to_get.is_empty() { log!( - logger, - "Translating {} native batches into branches...", - native_batches.len() + cu.logger, + "Native submitting solution for batch {} with {:?}", + index, + &predicate ); - let mut branching_native = BranchingNative { branches: Default::default() }; - for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate() - { - let predicate = { - let mut predicate = Predicate::default(); - // assign trues - for &port in to_get.iter().chain(to_put.keys()) { - let var = self.port_info.firing_var_for(port); - predicate.assigned.insert(var, true); - } - // assign falses - for &port in self.native_ports.iter() { - let var = self.port_info.firing_var_for(port); - predicate.assigned.entry(var).or_insert(false); - } - predicate - }; - log!(logger, "Native branch {} has pred {:?}", index, &predicate); + solution_storage.submit_and_digest_subtree_solution( + &mut *cu.logger, + Route::LocalComponent(LocalComponentId::Native), + predicate.clone(), + ); + } + let branch = NativeBranch { index, gotten: Default::default(), to_get }; + if let Some(existing) = branching_native.branches.insert(predicate, branch) { + // TODO + return Err(Se::IndistinguishableBatches([index, existing.index])); + } + } + log!(cu.logger, "Done translating native batches into branches"); + comm.native_batches.push(Default::default()); - // put all messages - for (putter, payload) in to_put { - let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!(logger, "Native branch {} sending msg {:?}", index, &msg); - // rely on invariant: sync batches respect port polarity - let getter = *self.port_info.peers.get(&putter).unwrap(); - payloads_to_get.push((getter, msg)); - } - if to_get.is_empty() { - log!( - logger, - "Native submitting solution for batch {} with {:?}", - index, - &predicate - ); - solution_storage.submit_and_digest_subtree_solution( - logger, - Route::LocalComponent(LocalComponentId::Native), - predicate.clone(), - ); + // run all proto components to their sync blocker + log!( + cu.logger, + "Running all {} proto components to their sync blocker...", + branching_proto_components.len() + ); + for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { + let ConnectorUnphased { port_info, proto_description, .. } = cu; + let BranchingProtoComponent { ports, branches } = proto_component; + let mut swap = HashMap::default(); + let mut blocked = HashMap::default(); + // drain from branches --> blocked + let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked); + BranchingProtoComponent::drain_branches_to_blocked( + cd, + cu, + &mut solution_storage, + &mut payloads_to_get, + proto_component_id, + ports, + ); + // swap the blocked branches back + std::mem::swap(&mut blocked, branches); + } + log!(cu.logger, "All proto components are blocked"); + + log!(cu.logger, "Entering decision loop..."); + comm.endpoint_manager.undelay_all(); + let decision = 'undecided: loop { + // drain payloads_to_get, sending them through endpoints / feeding them to components + while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { + assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); + match cu.port_info.routes.get(&getter).unwrap() { + Route::Endpoint { index } => { + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::SendPayload(send_payload_msg), + }); + comm.endpoint_manager.send_to(*index, &msg).unwrap(); } - let branch = NativeBranch { index, gotten: Default::default(), to_get }; - if let Some(existing) = branching_native.branches.insert(predicate, branch) { - // TODO - return Err(IndistinguishableBatches([index, existing.index])); + Route::LocalComponent(LocalComponentId::Native) => branching_native.feed_msg( + cu, + &mut solution_storage, + // &mut Pay + getter, + send_payload_msg, + ), + Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { + if let Some(branching_component) = + branching_proto_components.get_mut(proto_component_id) + { + let proto_component_id = *proto_component_id; + // let ConnectorUnphased { port_info, proto_description, .. } = cu; + branching_component.feed_msg( + cu, + &mut solution_storage, + proto_component_id, + &mut payloads_to_get, + getter, + send_payload_msg, + ) + } } } - log!(logger, "Done translating native batches into branches"); - native_batches.push(Default::default()); + } - // run all proto components to their sync blocker - log!( - logger, - "Running all {} proto components to their sync blocker...", - branching_proto_components.len() - ); - for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() - { - let Self { port_info, proto_description, .. } = self; - let BranchingProtoComponent { ports, branches } = proto_component; - let mut swap = HashMap::default(); - let mut blocked = HashMap::default(); - // drain from branches --> blocked - let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked); - BranchingProtoComponent::drain_branches_to_blocked( - cd, - logger, - port_info, - proto_description, - &mut solution_storage, - |putter, m| { - let getter = *port_info.peers.get(&putter).unwrap(); - payloads_to_get.push((getter, m)); - }, - proto_component_id, - ports, - ); - // swap the blocked branches back - std::mem::swap(&mut blocked, branches); + // check if we have a solution yet + log!(cu.logger, "Check if we have any local decisions..."); + for solution in solution_storage.iter_new_local_make_old() { + log!(cu.logger, "New local decision with solution {:?}...", &solution); + match comm.neighborhood.parent { + Some(parent) => { + log!(cu.logger, "Forwarding to my parent {:?}", parent); + let suggestion = Decision::Success(solution); + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::Suggest { suggestion }, + }); + comm.endpoint_manager.send_to(parent, &msg).unwrap(); + } + None => { + log!(cu.logger, "No parent. Deciding on solution {:?}", &solution); + break 'undecided Decision::Success(solution); + } } - log!(logger, "All proto components are blocked"); + } - log!(logger, "Entering decision loop..."); - endpoint_manager.undelay_all(); - let decision = 'undecided: loop { - // drain payloads_to_get, sending them through endpoints / feeding them to components - while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { - assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter)); - match self.port_info.routes.get(&getter).unwrap() { - Route::Endpoint { index } => { + // stuck! make progress by receiving a msg + // try recv messages arriving through endpoints + log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); + { + let (endpoint_index, msg) = loop { + match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? { + None => { + log!(cu.logger, "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + log!( + cu.logger, + "Sending failure request to parent index {}", + parent + ); let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::SendPayload(send_payload_msg), + round_index: comm.round_index, + contents: CommMsgContents::Suggest { + suggestion: Decision::Failure, + }, }); - endpoint_manager.send_to(*index, &msg).unwrap(); - } - Route::LocalComponent(LocalComponentId::Native) => branching_native - .feed_msg( - logger, - &self.port_info, - &mut solution_storage, - getter, - send_payload_msg, - ), - Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { - if let Some(branching_component) = - branching_proto_components.get_mut(proto_component_id) - { - let proto_component_id = *proto_component_id; - let Self { port_info, proto_description, .. } = self; - branching_component.feed_msg( - logger, - port_info, - proto_description, - &mut solution_storage, - proto_component_id, - |putter, m| { - let getter = *port_info.peers.get(&putter).unwrap(); - payloads_to_get.push((getter, m)); - }, - getter, - send_payload_msg, - ) - } + comm.endpoint_manager.send_to(parent, &msg).unwrap(); + } else { + log!(cu.logger, "As the leader, deciding on timeout"); + break 'undecided Decision::Failure; } + deadline = None; } + Some((endpoint_index, msg)) => break (endpoint_index, msg), } - - // check if we have a solution yet - log!(logger, "Check if we have any local decisions..."); - for solution in solution_storage.iter_new_local_make_old() { - log!(logger, "New local decision with solution {:?}...", &solution); - match neighborhood.parent { - Some(parent) => { - log!(logger, "Forwarding to my parent {:?}", parent); - let suggestion = Decision::Success(solution); - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Suggest { suggestion }, - }); - endpoint_manager.send_to(parent, &msg).unwrap(); - } - None => { - log!(logger, "No parent. Deciding on solution {:?}", &solution); - break 'undecided Decision::Success(solution); - } + }; + log!(cu.logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); + let comm_msg_contents = match msg { + Msg::SetupMsg(..) => { + log!(cu.logger, "Discarding setup message; that phase is over"); + continue 'undecided; + } + Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&comm.round_index) { + Ordering::Equal => comm_msg.contents, + Ordering::Less => { + log!( + cu.logger, + "We are in round {}, but msg is for round {}. Discard", + comm_msg.round_index, + comm.round_index, + ); + drop(comm_msg); + continue 'undecided; } + Ordering::Greater => { + log!( + cu.logger, + "We are in round {}, but msg is for round {}. Buffer", + comm_msg.round_index, + comm.round_index, + ); + comm.endpoint_manager + .delayed_messages + .push((endpoint_index, Msg::CommMsg(comm_msg))); + continue 'undecided; + } + }, + }; + match comm_msg_contents { + CommMsgContents::SendPayload(send_payload_msg) => { + let getter = + comm.endpoint_manager.endpoint_exts[endpoint_index].getter_for_incoming; + assert!(cu.port_info.polarities.get(&getter) == Some(&Getter)); + log!( + cu.logger, + "Msg routed to getter port {:?}. Buffer for recv loop", + getter, + ); + payloads_to_get.push((getter, send_payload_msg)); } - - // stuck! make progress by receiving a msg - // try recv messages arriving through endpoints - log!(logger, "No decision yet. Let's recv an endpoint msg..."); - { - let (endpoint_index, msg) = loop { - match endpoint_manager.try_recv_any_comms(logger, deadline)? { - None => { - log!( - logger, - "Reached user-defined deadling without decision..." + CommMsgContents::Suggest { suggestion } => { + // only accept this control msg through a child endpoint + if comm.neighborhood.children.contains(&endpoint_index) { + match suggestion { + Decision::Success(predicate) => { + // child solution contributes to local solution + log!(cu.logger, "Child provided solution {:?}", &predicate); + let route = Route::Endpoint { index: endpoint_index }; + solution_storage.submit_and_digest_subtree_solution( + &mut *cu.logger, + route, + predicate, ); - if let Some(parent) = neighborhood.parent { - log!( - logger, - "Sending failure request to parent index {}", - parent - ); - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Suggest { - suggestion: Decision::Failure, - }, - }); - endpoint_manager.send_to(parent, &msg).unwrap(); - } else { - log!(logger, "As the leader, deciding on timeout"); - break 'undecided Decision::Failure; - } - deadline = None; } - Some((endpoint_index, msg)) => break (endpoint_index, msg), - } - }; - log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); - let comm_msg_contents = match msg { - Msg::SetupMsg(..) => { - log!(logger, "Discarding setup message; that phase is over"); - continue 'undecided; - } - Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) { - Ordering::Equal => comm_msg.contents, - Ordering::Less => { - log!( - logger, - "We are in round {}, but msg is for round {}. Discard", - comm_msg.round_index, - round_index, - ); - drop(comm_msg); - continue 'undecided; - } - Ordering::Greater => { - log!( - logger, - "We are in round {}, but msg is for round {}. Buffer", - comm_msg.round_index, - round_index, - ); - endpoint_manager - .delayed_messages - .push((endpoint_index, Msg::CommMsg(comm_msg))); - continue 'undecided; - } - }, - }; - match comm_msg_contents { - CommMsgContents::SendPayload(send_payload_msg) => { - let getter = endpoint_manager.endpoint_exts[endpoint_index] - .getter_for_incoming; - assert!(self.port_info.polarities.get(&getter) == Some(&Getter)); - log!( - logger, - "Msg routed to getter port {:?}. Buffer for recv loop", - getter, - ); - payloads_to_get.push((getter, send_payload_msg)); - } - CommMsgContents::Suggest { suggestion } => { - // only accept this control msg through a child endpoint - if neighborhood.children.contains(&endpoint_index) { - match suggestion { - Decision::Success(predicate) => { - // child solution contributes to local solution + Decision::Failure => { + match comm.neighborhood.parent { + None => { log!( - logger, - "Child provided solution {:?}", - &predicate - ); - let route = Route::Endpoint { index: endpoint_index }; - solution_storage.submit_and_digest_subtree_solution( - logger, route, predicate, + cu.logger, + "As sink, I decide on my child's failure" ); + // I am the sink. Decide on failed + break 'undecided Decision::Failure; + } + Some(parent) => { + log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent); + // I've got a parent. Forward the failure suggestion. + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::Suggest { suggestion }, + }); + comm.endpoint_manager.send_to(parent, &msg).unwrap(); } - Decision::Failure => match neighborhood.parent { - None => { - log!( - logger, - "As sink, I decide on my child's failure" - ); - // I am the sink. Decide on failed - break 'undecided Decision::Failure; - } - Some(parent) => { - log!(logger, "Forwarding failure through my parent endpoint {:?}", parent); - // I've got a parent. Forward the failure suggestion. - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Suggest { - suggestion, - }, - }); - endpoint_manager.send_to(parent, &msg).unwrap(); - } - }, } - } else { - log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index); - } - } - CommMsgContents::Announce { decision } => { - if Some(endpoint_index) == neighborhood.parent { - // adopt this decision - break 'undecided decision; - } else { - log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index); } } + } else { + log!( + cu.logger, + "Discarding suggestion {:?} from non-child endpoint idx {:?}", + &suggestion, + endpoint_index + ); + } + } + CommMsgContents::Announce { decision } => { + if Some(endpoint_index) == comm.neighborhood.parent { + // adopt this decision + break 'undecided decision; + } else { + log!( + cu.logger, + "Discarding announcement {:?} from non-parent endpoint idx {:?}", + &decision, + endpoint_index + ); } } - log!(logger, "Endpoint msg recv done"); - }; - log!(logger, "Committing to decision {:?}!", &decision); - - // propagate the decision to children - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Announce { decision: decision.clone() }, - }); - log!( - logger, - "Announcing decision {:?} through child endpoints {:?}", - &msg, - &neighborhood.children - ); - for &child in neighborhood.children.iter() { - endpoint_manager.send_to(child, &msg).unwrap(); } + } + log!(cu.logger, "Endpoint msg recv done"); + }; + log!(cu.logger, "Committing to decision {:?}!", &decision); - *round_result = match decision { - Decision::Failure => Err(RoundFailure), - Decision::Success(predicate) => { - // commit changes to component states - self.proto_components.clear(); - self.proto_components.extend( - branching_proto_components - .into_iter() - .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), - ); - Ok(Some(branching_native.collapse_with(&predicate))) - } - }; - log!(logger, "Updated round_result to {:?}", round_result); + // propagate the decision to children + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::Announce { decision: decision.clone() }, + }); + log!( + cu.logger, + "Announcing decision {:?} through child endpoints {:?}", + &msg, + &comm.neighborhood.children + ); + for &child in comm.neighborhood.children.iter() { + comm.endpoint_manager.send_to(child, &msg).unwrap(); + } - let returning = round_result - .as_ref() - .map(|option| option.as_ref().unwrap().0) - .map_err(|sync_error| sync_error.clone()); - log!(logger, "Returning {:?}", &returning); - returning + match decision { + Decision::Failure => Err(Se::RoundFailure), + Decision::Success(predicate) => { + // commit changes to component states + cu.proto_components.clear(); + cu.proto_components.extend( + branching_proto_components + .into_iter() + .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), + ); + Ok(Some(branching_native.collapse_with(&predicate))) } } } @@ -519,21 +513,21 @@ impl Connector { impl BranchingNative { fn feed_msg( &mut self, - logger: &mut dyn Logger, - port_info: &PortInfo, + cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, + // payloads_to_get: &mut Vec<(PortId, CommMsgContents)>, getter: PortId, send_payload_msg: SendPayloadMsg, ) { - log!(logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert!(port_info.polarities.get(&getter).copied() == Some(Getter)); + log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); + assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); let mut draining = HashMap::default(); let finished = &mut self.branches; std::mem::swap(&mut draining, finished); for (predicate, mut branch) in draining.drain() { - log!(logger, "visiting native branch {:?} with {:?}", &branch, &predicate); + log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = port_info.firing_var_for(getter); + let var = cu.port_info.firing_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); @@ -541,7 +535,7 @@ impl BranchingNative { if branch.to_get.is_empty() { let route = Route::LocalComponent(LocalComponentId::Native); solution_storage.submit_and_digest_subtree_solution( - logger, + &mut *cu.logger, route, predicate.clone(), ); @@ -550,7 +544,7 @@ impl BranchingNative { if predicate.query(var) != Some(true) { // optimization. Don't bother trying this branch log!( - logger, + cu.logger, "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); @@ -562,7 +556,7 @@ impl BranchingNative { Csr::Nonexistant => { // this branch does not receive the message log!( - logger, + cu.logger, "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); @@ -571,7 +565,7 @@ impl BranchingNative { Csr::Equivalent | Csr::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); - log!(logger, "branch pred covers it! Accept the msg"); + log!(cu.logger, "branch pred covers it! Accept the msg"); finished.insert(predicate, branch); } Csr::LatterNotFormer => { @@ -580,7 +574,7 @@ impl BranchingNative { let predicate2 = send_payload_msg.predicate.clone(); feed_branch(&mut branch2, &predicate2); log!( - logger, + cu.logger, "payload pred {:?} covers branch pred {:?}", &predicate2, &predicate @@ -593,7 +587,7 @@ impl BranchingNative { let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); log!( - logger, + cu.logger, "new subsuming pred created {:?}. forking and feeding", &predicate2 ); @@ -603,38 +597,40 @@ impl BranchingNative { } } } - fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { + fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk { for (branch_predicate, branch) in self.branches { if solution_predicate.satisfies(&branch_predicate) { let NativeBranch { index, gotten, .. } = branch; - return (index, gotten); + return RoundOk { batch_index: index, gotten }; } } panic!("Native had no branches matching pred {:?}", solution_predicate); } } + +// |putter, m| { +// let getter = *cu.port_info.peers.get(&putter).unwrap(); +// payloads_to_get.push((getter, m)); +// }, impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, - // - logger: &mut dyn Logger, - port_info: &PortInfo, - proto_description: &ProtocolDescription, + cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, - mut outbox_unqueue: impl FnMut(PortId, SendPayloadMsg), + payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>, proto_component_id: ProtoComponentId, ports: &HashSet, ) { cd.cylic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { - logger, + logger: &mut *cu.logger, predicate: &predicate, - port_info, + port_info: &cu.port_info, inbox: &branch.inbox, }; - let blocker = branch.state.sync_run(&mut ctx, proto_description); + let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); log!( - logger, + cu.logger, "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", proto_component_id, &predicate, @@ -649,12 +645,12 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // make concrete all variables for &port in ports.iter() { - let var = port_info.firing_var_for(port); + let var = cu.port_info.firing_var_for(port); predicate.assigned.entry(var).or_insert(false); } // submit solution for this component solution_storage.submit_and_digest_subtree_solution( - logger, + &mut *cu.logger, Route::LocalComponent(LocalComponentId::Proto(proto_component_id)), predicate.clone(), ); @@ -668,7 +664,7 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check - let var = port_info.firing_var_for(port); + let var = cu.port_info.firing_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, false), branch.clone()); @@ -676,21 +672,20 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), port_info.polarities.get(&putter)); + assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); // overwrite assignment - let var = port_info.firing_var_for(putter); + let var = cu.port_info.firing_var_for(putter); let was = predicate.assigned.insert(var, true); if was == Some(false) { - log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" - log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); - outbox_unqueue( - putter, - SendPayloadMsg { predicate: predicate.clone(), payload }, - ); + log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + let getter = *cu.port_info.peers.get(&putter).unwrap(); + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; + payloads_to_get.push((getter, msg)); drainer.add_input(predicate, branch); } } @@ -699,15 +694,14 @@ impl BranchingProtoComponent { } fn feed_msg( &mut self, - logger: &mut dyn Logger, - port_info: &PortInfo, - proto_description: &ProtocolDescription, + cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, proto_component_id: ProtoComponentId, - outbox_unqueue: impl FnMut(PortId, SendPayloadMsg), + payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>, getter: PortId, send_payload_msg: SendPayloadMsg, ) { + let logger = &mut *cu.logger; log!( logger, "feeding proto component {:?} getter {:?} {:?}", @@ -760,17 +754,15 @@ impl BranchingProtoComponent { let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked); BranchingProtoComponent::drain_branches_to_blocked( cd, - logger, - port_info, - proto_description, + cu, solution_storage, - outbox_unqueue, + payloads_to_get, proto_component_id, ports, ); // swap the blocked branches back std::mem::swap(&mut blocked, branches); - log!(logger, "component settles down with branches: {:?}", branches.keys()); + log!(cu.logger, "component settles down with branches: {:?}", branches.keys()); } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; @@ -957,7 +949,6 @@ impl ProtoComponentBranch { assert!(was.is_none()) } } - impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { fn new( input: &'a mut HashMap, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 901835c3ad34d5be048affc1bca121407f36a163..b5f63c8958d8bc0e79d923734538dbd57c1ba0f8 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -10,6 +10,11 @@ mod tests; use crate::common::*; use error::*; +#[derive(Debug)] +pub struct RoundOk { + batch_index: usize, + gotten: HashMap, +} #[derive(Debug)] pub struct VecSet { // invariant: ordered, deduplicated @@ -135,28 +140,31 @@ pub struct PortInfo { } #[derive(Debug)] pub struct Connector { + unphased: ConnectorUnphased, + phased: ConnectorPhased, +} +#[derive(Debug)] +pub struct ConnectorCommunication { + round_index: usize, + endpoint_manager: EndpointManager, + neighborhood: Neighborhood, + mem_inbox: Vec, + native_batches: Vec, + round_result: Result, SyncError>, +} +#[derive(Debug)] +pub struct ConnectorUnphased { proto_description: Arc, proto_components: HashMap, logger: Box, id_manager: IdManager, native_ports: HashSet, port_info: PortInfo, - phased: ConnectorPhased, } #[derive(Debug)] pub enum ConnectorPhased { - Setup { - endpoint_setups: Vec<(PortId, EndpointSetup)>, - surplus_sockets: u16, - }, - Communication { - round_index: usize, - endpoint_manager: EndpointManager, - neighborhood: Neighborhood, - mem_inbox: Vec, - native_batches: Vec, - round_result: Result)>, SyncError>, - }, + Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 }, + Communication(ConnectorCommunication), } #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { @@ -225,31 +233,32 @@ impl IdManager { } impl Drop for Connector { fn drop(&mut self) { - log!(&mut *self.logger, "Connector dropping. Goodbye!"); + log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!"); } } impl Connector { pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.logger, &mut new_logger); + std::mem::swap(&mut self.unphased.logger, &mut new_logger); new_logger } pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.logger + &mut *self.unphased.logger } pub fn new_port_pair(&mut self) -> [PortId; 2] { + let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; - self.native_ports.insert(o); - self.native_ports.insert(i); + let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()]; + cu.native_ports.insert(o); + cu.native_ports.insert(i); // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); + cu.port_info.polarities.insert(o, Putter); + cu.port_info.polarities.insert(i, Getter); + cu.port_info.peers.insert(o, i); + cu.port_info.peers.insert(i, o); let route = Route::LocalComponent(LocalComponentId::Native); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); - log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + cu.port_info.routes.insert(o, route); + cu.port_info.routes.insert(i, route); + log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } pub fn add_component( @@ -260,31 +269,32 @@ impl Connector { // called by the USER. moves ports owned by the NATIVE use AddComponentError::*; // 1. check if this is OK - let polarities = self.proto_description.component_polarities(identifier)?; + let cu = &mut self.unphased; + let polarities = cu.proto_description.component_polarities(identifier)?; if polarities.len() != ports.len() { return Err(WrongNumberOfParamaters { expected: polarities.len() }); } for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !self.native_ports.contains(port) { + if !cu.native_ports.contains(port) { return Err(UnknownPort(*port)); } - if expected_polarity != *self.port_info.polarities.get(port).unwrap() { + if expected_polarity != *cu.port_info.polarities.get(port).unwrap() { return Err(WrongPortPolarity { port: *port, expected_polarity }); } } // 3. remove ports from old component & update port->route - let new_id = self.id_manager.new_proto_component_id(); + let new_id = cu.id_manager.new_proto_component_id(); for port in ports.iter() { - self.port_info + cu.port_info .routes .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); } - self.native_ports.retain(|port| !ports.contains(port)); + cu.native_ports.retain(|port| !ports.contains(port)); // 4. add new component - self.proto_components.insert( + cu.proto_components.insert( new_id, ProtoComponent { - state: self.proto_description.new_main_component(identifier, ports), + state: cu.proto_description.new_main_component(identifier, ports), ports: ports.iter().copied().collect(), }, ); diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index d52109fd9a713dbce41509d0bdc965f08804bce5..abbbbabd1cd3d31924605d421103113d3f6449e0 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -20,12 +20,14 @@ impl Connector { ) -> Self { log!(&mut *logger, "Created with connector_id {:?}", connector_id); Self { - proto_description, - proto_components: Default::default(), - logger, - id_manager: IdManager::new(connector_id), - native_ports: Default::default(), - port_info: Default::default(), + unphased: ConnectorUnphased { + proto_description, + proto_components: Default::default(), + logger, + id_manager: IdManager::new(connector_id), + native_ports: Default::default(), + port_info: Default::default(), + }, phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, } } @@ -35,16 +37,17 @@ impl Connector { sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, ) -> Result { - match &mut self.phased { + let Self { unphased: up, phased } = self; + match phased { ConnectorPhased::Setup { endpoint_setups, .. } => { let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity }; - let p = self.id_manager.new_port_id(); - self.native_ports.insert(p); + let p = up.id_manager.new_port_id(); + up.native_ports.insert(p); // {polarity, route} known. {peer} unknown. - self.port_info.polarities.insert(p, polarity); - self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); + up.port_info.polarities.insert(p, polarity); + up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); log!( - self.logger, + up.logger, "Added net port {:?} with polarity {:?} and endpoint setup {:?} ", p, polarity, @@ -58,44 +61,45 @@ impl Connector { } pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { use ConnectError::*; - match &mut self.phased { + let Self { unphased: up, phased } = self; + match phased { ConnectorPhased::Communication { .. } => { - log!(self.logger, "Call to connecting in connected state"); + log!(up.logger, "Call to connecting in connected state"); Err(AlreadyConnected) } ConnectorPhased::Setup { endpoint_setups, .. } => { - log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout); + log!(up.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( - &mut *self.logger, + &mut *up.logger, endpoint_setups, - &mut self.port_info, + &mut up.port_info, deadline, )?; log!( - self.logger, + up.logger, "Successfully connected {} endpoints", endpoint_manager.endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( - self.id_manager.connector_id, - &mut *self.logger, + up.id_manager.connector_id, + &mut *up.logger, &mut endpoint_manager, deadline, )?; - log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); - log!(self.logger, "connect() finished. setup phase complete"); + log!(up.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(up.logger, "connect() finished. setup phase complete"); // TODO session optimization goes here - self.phased = ConnectorPhased::Communication { + self.phased = ConnectorPhased::Communication(ConnectorCommunication { round_index: 0, endpoint_manager, neighborhood, mem_inbox: Default::default(), native_batches: vec![Default::default()], round_result: Ok(None), - }; + }); Ok(()) } } @@ -144,8 +148,8 @@ fn new_endpoint_manager( // 1. Start to construct EndpointManager let mut poll = Poll::new().map_err(|_| PollInitFailed)?; - let mut events = Events::with_capacity(64); - let mut polled_undrained = IndexSet::::default(); + let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); + let mut polled_undrained = IndexSet::default(); let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively