diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 4ddefc9f54cef6af8c49f37b6db0e5cd6cf5eb4c..3b949ca7953721d334e1fcc5b74545d9110f07d6 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -1,67 +1,153 @@ use super::*; use crate::common::*; + +//////////////// +struct BranchingNative { + branches: HashMap, +} +struct NativeBranch { + index: usize, + gotten: HashMap, + to_get: HashSet, +} + +//////////////// impl NonsyncProtoContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { - let component = &mut self.connector.proto_components[self.proto_component_index]; - assert!(component.ports.is_subset(&moved_ports)); - // let polarities = self.proto_description.component_polarities(identifier).expect("BAD "); - // if polarities.len() != ports.len() { - // return Err(WrongNumberOfParamaters { expected: polarities.len() }); - // } - // for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - // if !self.native_ports.contains(port) { - // return Err(UnknownPort(*port)); - // } - // if expected_polarity != self.check_polarity(port) { - // return Err(WrongPortPolarity { port: *port, expected_polarity }); - // } - // } - // // ok! - // let state = self.proto_description.new_main_component(identifier, ports); - // let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; - // let proto_component_index = self.proto_components.len(); - // self.proto_components.push(proto_component); - // for port in ports.iter() { - // if let Polarity::Getter = self.check_polarity(port) { - // self.inp_to_route.insert( - // *port, - // InpRoute::LocalComponent(LocalComponentId::Proto { - // index: proto_component_index, - // }), - // ); - // } - // } + pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { + // called by a PROTO COMPONENT. moves its own ports. + // 1. sanity check: this component owns these ports + log!( + self.logger, + "Component {:?} added new component with state {:?}, moving ports {:?}", + self.proto_component_id, + &state, + &moved_ports + ); + assert!(self.proto_component_ports.is_subset(&moved_ports)); + // 2. remove ports from old component & update port->route + let new_id = self.id_manager.new_proto_component_id(); + for port in moved_ports.iter() { + self.proto_component_ports.remove(port); + self.port_info + .routes + .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); + } + // 3. create a new component + self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); } - pub fn new_channel(&mut self) -> [PortId; 2] { - self.connector.add_port_pair() + pub fn new_port_pair(&mut self) -> [PortId; 2] { + // adds two new associated ports, related to each other, and exposed to the proto component + let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + self.proto_component_ports.insert(o); + self.proto_component_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id)); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); + log!( + self.logger, + "Component {:?} port pair (out->in) {:?} -> {:?}", + self.proto_component_id, + o, + i + ); + [o, i] } } impl SyncProtoContext<'_> { pub fn is_firing(&mut self, port: PortId) -> Option { - todo!() + self.predicate.query(port) } pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - todo!() + self.inbox.get(&port) + } +} + +impl Connector { + pub fn sync(&mut self) -> Result { + use SyncError::*; + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Communication { native_batches, .. } => { + // 1. run all proto components to Nonsync blockers + let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = + self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + while let Some((proto_component_id, mut proto_component)) = unrun_components.pop() { + let mut ctx = NonsyncProtoContext { + logger: &mut *self.logger, + port_info: &mut self.port_info, + id_manager: &mut self.id_manager, + proto_component_id, + unrun_components: &mut unrun_components, + proto_component_ports: &mut proto_component.ports, + }; + match proto_component.state.nonsync_run(&mut ctx, &self.proto_description) { + NonsyncBlocker::ComponentExit => drop(proto_component), + NonsyncBlocker::Inconsistent => { + return Err(InconsistentProtoComponent(proto_component_id)) + } + NonsyncBlocker::SyncBlockStart => { + self.proto_components.insert(proto_component_id, proto_component); + } + } + } + + // all ports are GETTER + let mut mem_inbox: Vec<(PortId, SendPayloadMsg)> = vec![]; + + // 2. kick off the native + let mut branching_native = BranchingNative { branches: Default::default() }; + for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate() + { + let mut predicate = Predicate::default(); + // assign trues + for &port in to_get.iter().chain(to_put.keys()) { + predicate.assigned.insert(port, true); + } + // assign falses + for &port in self.native_ports.iter() { + predicate.assigned.entry(port).or_insert(false); + } + // put all messages + for (port, payload) in to_put { + let getter = *self.port_info.peers.get(&port).unwrap(); + mem_inbox.push(( + getter, + SendPayloadMsg { payload_predicate: predicate.clone(), payload }, + )); + } + let branch = NativeBranch { index, gotten: Default::default(), to_get }; + if let Some(existing) = branching_native.branches.insert(predicate, branch) { + return Err(IndistinguishableBatches([index, existing.index])); + } + } + todo!() + } + } } } // impl Connector { // fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> { -// log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); +// log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); // let ret = match &decision { // Decision::Success(predicate) => { // // overwrite MonoN/P -// self.inner.mono_n = { +// self.mono_n = { // let poly_n = self.ephemeral.poly_n.take().unwrap(); // poly_n.choose_mono(predicate).unwrap_or_else(|| { // panic!( // "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", -// &predicate, &poly_n.branches, &self.inner.logger +// &predicate, &poly_n.branches, &self.logger // ); // }) // }; -// self.inner.mono_ps.clear(); -// self.inner.mono_ps.extend( +// self.mono_ps.clear(); +// self.mono_ps.extend( // self.ephemeral // .poly_ps // .drain(..) @@ -71,46 +157,46 @@ impl SyncProtoContext<'_> { // } // Decision::Failure => Err(SyncError::Timeout), // }; -// let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); -// for &child_port in self.inner.family.children_ports.iter() { +// let announcement = CommMsgContents::Announce { decision }.into_msg(self.round_index); +// for &child_port in self.family.children_ports.iter() { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Forwarding {:?} to child with port {:?}", // &announcement, // child_port // ); -// self.inner +// self // .endpoint_exts // .get_mut(child_port) // .expect("eefef") // .endpoint // .send(announcement.clone())?; // } -// self.inner.round_index += 1; +// self.round_index += 1; // self.ephemeral.clear(); // ret // } // // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found // fn handle_locals_maybe_decide(&mut self) -> Result { -// if let Some(parent_port) = self.inner.family.parent_port { +// if let Some(parent_port) = self.family.parent_port { // // I have a parent -> I'm not the leader // let parent_endpoint = -// &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; +// &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; // for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { // let msg = -// CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); -// log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); +// CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index); +// log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port); // parent_endpoint.send(msg)?; // } // Ok(false) // } else { // // I have no parent -> I'm the leader -// assert!(self.inner.family.parent_port.is_none()); +// assert!(self.family.parent_port.is_none()); // let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); // Ok(if let Some(predicate) = maybe_predicate { // let decision = Decision::Success(predicate); -// log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); +// log!(&mut self.logger, "DECIDE ON {:?} AS LEADER!", &decision); // self.end_round_with_decision(decision)?; // true // } else { @@ -123,7 +209,7 @@ impl SyncProtoContext<'_> { // &mut self, // sync_batches: impl Iterator, // ) -> Result { -// let MonoN { ports, .. } = self.inner.mono_n.clone(); +// let MonoN { ports, .. } = self.mono_n.clone(); // let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; // let mut branches = HashMap::<_, _>::default(); // for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { @@ -152,7 +238,7 @@ impl SyncProtoContext<'_> { // let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; // for (port, payload) in puts { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... ... Initial native put msg {:?} pred {:?} batch {:?}", // &payload, // &predicate, @@ -164,14 +250,14 @@ impl SyncProtoContext<'_> { // endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; // } // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... Initial native branch batch index={} with pred {:?}", // sync_batch_index, // &predicate // ); // if branch.to_get.is_empty() { // self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.inner.logger, +// &mut self.logger, // SubtreeId::PolyN, // predicate.clone(), // ); @@ -194,7 +280,7 @@ impl SyncProtoContext<'_> { // // Must set unrecoverable error! and tear down our net channels // self.unrecoverable_error = Some(e); // self.ephemeral.clear(); -// self.inner.endpoint_exts = Default::default(); +// self.endpoint_exts = Default::default(); // e // } // }) @@ -208,9 +294,9 @@ impl SyncProtoContext<'_> { // sync_batches: Option>, // ) -> Result<(), SyncError> { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", -// self.inner.round_index +// self.round_index // ); // assert!(self.ephemeral.is_clear()); // assert!(self.unrecoverable_error.is_none()); @@ -218,17 +304,17 @@ impl SyncProtoContext<'_> { // // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). // // Some actors are dropped. some new actors are created. // // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors -// self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); -// log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); +// self.ephemeral.mono_ps.extend(self.mono_ps.iter().cloned()); +// log!(&mut self.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); // while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { // let mut m_ctx = ProtoSyncContext { // ports: &mut mono_p.ports, // mono_ps: &mut self.ephemeral.mono_ps, -// inner: &mut self.inner, +// inner: &mut self, // }; // // cross boundary into crate::protocol // let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); -// log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); +// log!(&mut self.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); // match blocker { // NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent), // NonsyncBlocker::ComponentExit => drop(mono_p), @@ -236,7 +322,7 @@ impl SyncProtoContext<'_> { // } // } // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Finished running all MonoPs! Have {} PolyPs waiting", // self.ephemeral.poly_ps.len() // ); @@ -247,7 +333,7 @@ impl SyncProtoContext<'_> { // // TODO: store and update this mapping rather than rebuilding it each round. // let port_to_holder: HashMap = { // use PolyId::*; -// let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); +// let n = self.mono_n.ports.iter().map(move |&e| (e, N)); // let p = self // .ephemeral // .poly_ps @@ -257,7 +343,7 @@ impl SyncProtoContext<'_> { // n.chain(p).collect() // }; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "SET OF PolyPs and MonoPs final! port lookup map is {:?}", // &port_to_holder // ); @@ -268,14 +354,14 @@ impl SyncProtoContext<'_> { // let n = std::iter::once(SubtreeId::PolyN); // let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); // let c = self -// .inner +// // .family // .children_ports // .iter() // .map(|&port| SubtreeId::ChildController { port }); // let subtree_id_iter = n.chain(m).chain(c); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Solution Storage has subtree Ids: {:?}", // &subtree_id_iter.clone().collect::>() // ); @@ -284,49 +370,49 @@ impl SyncProtoContext<'_> { // // 5. kick off the synchronous round of the native actor if it exists -// log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); +// log!(&mut self.logger, "Kicking off native's synchronous round..."); // self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { // // using if let because of nested ? operator // // TODO check that there are 1+ branches or NO SOLUTION // let poly_n = self.kick_off_native(sync_batches)?; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "PolyN kicked off, and has branches with predicates... {:?}", // poly_n.branches.keys().collect::>() // ); // Some(poly_n) // } else { -// log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); +// log!(&mut self.logger, "NO NATIVE COMPONENT"); // None // }; // // 6. Kick off the synchronous round of each protocol actor // // If just one actor becomes inconsistent now, there can be no solution! // // TODO distinguish between completed and not completed poly_p's? -// log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); +// log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); // for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { // let my_subtree_id = SubtreeId::PolyP { index }; // let m_ctx = PolyPContext { // my_subtree_id, -// inner: &mut self.inner, +// inner: &mut self, // solution_storage: &mut self.ephemeral.solution_storage, // }; // use SyncRunResult as Srr; // let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; -// log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); +// log!(&mut self.logger, "... PolyP's poly_run got blocker {:?}", &blocker); // match blocker { // Srr::NoBranches => return Err(SyncError::Inconsistent), // Srr::AllBranchesComplete | Srr::BlockingForRecv => (), // } // } -// log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); +// log!(&mut self.logger, "All Poly machines have been kicked off!"); // // 7. `solution_storage` may have new solutions for this controller // // handle their discovery. LEADER => announce, otherwise => send to parent // { // let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Got {} controller-local solutions before a single RECV: {:?}", // peeked.len(), // peeked @@ -337,10 +423,10 @@ impl SyncProtoContext<'_> { // } // // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error -// log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); +// log!(&mut self.logger, "`No decision yet`. Time to recv messages"); // self.undelay_all(); // 'recv_loop: loop { -// log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); +// log!(&mut self.logger, "`POLLING` with deadline {:?}...", deadline); // let received = match deadline { // None => { // // we have personally timed out. perform a "long" poll. @@ -353,7 +439,7 @@ impl SyncProtoContext<'_> { // // timed out! send a FAILURE message to the sink, // // and henceforth don't time out on polling. // deadline = None; -// match self.inner.family.parent_port { +// match self.family.parent_port { // None => { // // I am the sink! announce failure and return. // return self.end_round_with_decision(Decision::Failure); @@ -361,16 +447,16 @@ impl SyncProtoContext<'_> { // Some(parent_port) => { // // I am not the sink! send a failure message. // let announcement = Msg::CommMsg(CommMsg { -// round_index: self.inner.round_index, +// round_index: self.round_index, // contents: CommMsgContents::Failure, // }); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Forwarding {:?} to parent with port {:?}", // &announcement, // parent_port // ); -// self.inner +// self // .endpoint_exts // .get_mut(parent_port) // .expect("ss") @@ -382,7 +468,7 @@ impl SyncProtoContext<'_> { // } // }, // }; -// log!(&mut self.inner.logger, "::: message {:?}...", &received); +// log!(&mut self.logger, "::: message {:?}...", &received); // let current_content = match received.msg { // Msg::SetupMsg(s) => { // // This occurs in the event the connector was malformed during connect() @@ -390,45 +476,45 @@ impl SyncProtoContext<'_> { // return Err(SyncError::UnexpectedSetupMsg); // } // Msg::CommMsg(CommMsg { round_index, .. }) -// if round_index < self.inner.round_index => +// if round_index < self.round_index => // { // // Old message! Can safely discard -// log!(&mut self.inner.logger, "...and its OLD! :("); +// log!(&mut self.logger, "...and its OLD! :("); // drop(received); // continue 'recv_loop; // } // Msg::CommMsg(CommMsg { round_index, .. }) -// if round_index > self.inner.round_index => +// if round_index > self.round_index => // { // // Message from a next round. Keep for later! -// log!(&mut self.inner.logger, "... DELAY! :("); +// log!(&mut self.logger, "... DELAY! :("); // self.delay(received); // continue 'recv_loop; // } // Msg::CommMsg(CommMsg { contents, round_index }) => { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... its a round-appropriate CommMsg with port {:?}", // received.recipient // ); -// assert_eq!(round_index, self.inner.round_index); +// assert_eq!(round_index, self.round_index); // contents // } // }; // match current_content { -// CommMsgContents::Failure => match self.inner.family.parent_port { +// CommMsgContents::Failure => match self.family.parent_port { // Some(parent_port) => { // let announcement = Msg::CommMsg(CommMsg { -// round_index: self.inner.round_index, +// round_index: self.round_index, // contents: CommMsgContents::Failure, // }); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Forwarding {:?} to parent with port {:?}", // &announcement, // parent_port // ); -// self.inner +// self // .endpoint_exts // .get_mut(parent_port) // .expect("ss") @@ -439,18 +525,18 @@ impl SyncProtoContext<'_> { // }, // CommMsgContents::Elaborate { partial_oracle } => { // // Child controller submitted a subtree solution. -// if !self.inner.family.children_ports.contains(&received.recipient) { +// if !self.family.children_ports.contains(&received.recipient) { // return Err(SyncError::ElaborateFromNonChild); // } // let subtree_id = SubtreeId::ChildController { port: received.recipient }; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Received elaboration from child for subtree {:?}: {:?}", // subtree_id, // &partial_oracle // ); // self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.inner.logger, +// &mut self.logger, // subtree_id, // partial_oracle, // ); @@ -459,11 +545,11 @@ impl SyncProtoContext<'_> { // } // } // CommMsgContents::Announce { decision } => { -// if self.inner.family.parent_port != Some(received.recipient) { +// if self.family.parent_port != Some(received.recipient) { // return Err(SyncError::AnnounceFromNonParent); // } // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Received ANNOUNCEMENT from from parent {:?}: {:?}", // received.recipient, // &decision @@ -474,21 +560,21 @@ impl SyncProtoContext<'_> { // // check that we expect to be able to receive payloads from this sender // assert_eq!( // Getter, -// self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity +// self.endpoint_exts.get(received.recipient).unwrap().info.polarity // ); // // message for some actor. Feed it to the appropriate actor // // and then give them another chance to run. // let subtree_id = port_to_holder.get(&received.recipient); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", // subtree_id, // &payload_predicate, // &payload // ); // let channel_id = self -// .inner +// // .endpoint_exts // .get(received.recipient) // .expect("UEHFU") @@ -508,7 +594,7 @@ impl SyncProtoContext<'_> { // // Message for NativeMachine // self.ephemeral.poly_n.as_mut().unwrap().sync_recv( // received.recipient, -// &mut self.inner.logger, +// &mut self.logger, // payload, // payload_predicate, // &mut self.ephemeral.solution_storage, @@ -523,7 +609,7 @@ impl SyncProtoContext<'_> { // let m_ctx = PolyPContext { // my_subtree_id: SubtreeId::PolyP { index: *index }, -// inner: &mut self.inner, +// inner: &mut self, // solution_storage: &mut self.ephemeral.solution_storage, // }; // use SyncRunResult as Srr; @@ -535,7 +621,7 @@ impl SyncProtoContext<'_> { // payload, // )?; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", // subtree_id, // blocker @@ -550,7 +636,7 @@ impl SyncProtoContext<'_> { // .peek_new_locals() // .collect::>(); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Got {} new controller-local solutions from RECV: {:?}", // peeked.len(), // peeked @@ -712,10 +798,10 @@ impl SyncProtoContext<'_> { // fn is_firing(&mut self, port: PortId) -> Option { // assert!(self.ports.contains(&port)); -// let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; +// let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id; // let val = self.predicate.query(channel_id); // log!( -// &mut self.m_ctx.inner.logger, +// &mut self.m_ctx.logger, // "!! PolyContext callback to is_firing by {:?}! returning {:?}", // self.m_ctx.my_subtree_id, // val, @@ -726,7 +812,7 @@ impl SyncProtoContext<'_> { // assert!(self.ports.contains(&port)); // let val = self.inbox.get(&port); // log!( -// &mut self.m_ctx.inner.logger, +// &mut self.m_ctx.logger, // "!! PolyContext callback to read_msg by {:?}! returning {:?}", // self.m_ctx.my_subtree_id, // val,