From 80ce091d67cf03c0659cbf3efd01e336a3317ef3 2022-01-24 13:37:10 From: mh Date: 2022-01-24 13:37:10 Subject: [PATCH] WIP: Integrating select, fixing bugs on tests --- diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 03dd5523e71f6d15f47ad885cc4fd19d9ebe235c..412526fa881f5b902fa8cbb023a8b80070be17e5 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -241,7 +241,7 @@ impl CompPDL { sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&message) { let mut target = sched_ctx.runtime.get_component_public(new_target); - target.send_message(sched_ctx, message, true); + target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(!_should_remove); return; @@ -474,6 +474,18 @@ impl CompPDL { } fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { + // Little local utility to send an Ack + fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_port_id: PortId, peer_comp_id: CompId) { + let peer_info = comp_ctx.get_peer(peer_comp_id); + peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ + id: causer_id, + sender_comp_id: comp_ctx.id, + target_port_id: None, + content: ControlMessageContent::Ack, + }), true); + } + + // Handle the content of the control message, and optionally Ack it match message.content { ControlMessageContent::Ack => { let mut to_ack = message.id; @@ -482,14 +494,18 @@ impl CompPDL { match action { AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { // FIX @NoDirectHandle - let handle = sched_ctx.runtime.get_component_public(target_comp); + let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message(sched_ctx, Message::Control(message), true); + let _should_remove = handle.decrement_users(); + debug_assert!(!_should_remove); to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle - let handle = sched_ctx.runtime.get_component_public(to_schedule); + let mut handle = sched_ctx.runtime.get_component_public(to_schedule); wake_up_if_sleeping(sched_ctx, to_schedule, &handle); + let _should_remove = handle.decrement_users(); + debug_assert!(!_should_remove); break; }, AckAction::None => { @@ -513,6 +529,7 @@ impl CompPDL { // the component handle as well let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = &mut comp_ctx.ports[port_index]; + let peer_port_id = port_info.peer_id; let peer_comp_id = port_info.peer_comp_id; port_info.state = PortState::Closed; @@ -529,6 +546,8 @@ impl CompPDL { comp_ctx.peers.remove(peer_index); } + + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); } ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) @@ -551,11 +570,9 @@ impl CompPDL { port_info.state = PortState::Blocked; } - let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); - // TODO: Continue here. Send ack, but think about whether we - // always have the peer in our list of peers? Quickly thinking - // about it, I think so, but we may have a series of port - // transfers. Does that change things? + let peer_port_id = port_info.peer_id; + let peer_comp_id = port_info.peer_comp_id; + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id); }, ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { debug_assert_eq!(message.target_port_id, Some(port_id)); diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 0d12f51fae13996eeb669a122d263887ce7181b2..e3b2fe6ed59fa15590522db25db480dca6c7dfe8 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -96,9 +96,7 @@ impl ControlLayer { ) }; let to_ack = content.schedule_entry_id; - self.entries.remove(entry_index); - self.handle_ack(to_ack, sched_ctx, comp_ctx); return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); },