Changeset - 0d46914f7c2e
[Not reviewed]
0 6 0
MH - 4 years ago 2021-11-22 22:20:53
contact@maxhenger.nl
WIP on fixing bug where only partial sync region is available after error
6 files changed with 54 insertions and 32 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -114,7 +114,7 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::CreatedChannel(ports) => Some(ports),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken),
 
        };
 
    }
 

	
 
@@ -386,18 +386,24 @@ impl ConnectorPDL {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content,
 
                })) {
 
                    // We don't own the port
 
                    let pd = &sched_ctx.runtime.protocol_description;
 
                    let eval_error = branch.code_state.new_error_at_expr(
 
                        &pd.modules, &pd.heap,
 
                        String::from("attempted to 'put' on port that is no longer owned")
 
                    );
 
                    self.eval_error = Some(eval_error);
 
                    self.mode = Mode::SyncError;
 
                let message = DataMessage{ sync_header, data_header, content };
 
                match comp_ctx.submit_message(Message::Data(message)) {
 
                    Ok(_) => {
 
                        // Message is underway
 
                        branch.prepared = PreparedStatement::PerformedPut;
 
                        self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                        return ConnectorScheduling::Immediate;
 
                    },
 
                    Err(_) => {
 
                        // We don't own the port
 
                        let pd = &sched_ctx.runtime.protocol_description;
 
                        let eval_error = branch.code_state.new_error_at_expr(
 
                            &pd.modules, &pd.heap,
 
                            String::from("attempted to 'put' on port that is no longer owned")
 
                        );
 
                        self.eval_error = Some(eval_error);
 
                        self.mode = Mode::SyncError;
 
                    }
 
                }
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
src/runtime2/consensus.rs
Show inline comments
 
@@ -186,7 +186,12 @@ impl Consensus {
 
                target_port: port_info.peer_id,
 
                content: SyncPortContent::NotificationWave,
 
            };
 
            ctx.submit_message(Message::SyncPort(message));
 

	
 
            // Note: submitting the message might fail. But we're attempting to
 
            // handle the error anyway.
 
            // TODO: Think about this a second time: how do we make sure the
 
            //  entire network will fail if we reach this condition
 
            let _unused = ctx.submit_message(Message::SyncPort(message));
 
        }
 

	
 
        return maybe_conclusion;
 
@@ -265,7 +270,7 @@ impl Consensus {
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                ctx.submit_message(Message::SyncPort(SyncPortMessage {
 
                let message = SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
@@ -274,8 +279,17 @@ impl Consensus {
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                    content: SyncPortContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(self_port_id);
 
                };
 
                match ctx.submit_message(Message::SyncPort(message)) {
 
                    Ok(_) => {
 
                        self.encountered_ports.push(self_port_id);
 
                    },
 
                    Err(_) => {
 
                        // Seems like we were done with this branch, but one of
 
                        // the silent ports (in scope) is actually closed
 
                        return self.notify_of_fatal_branch(branch_id, ctx);
 
                    }
 
                }
 
            }
 

	
 
            target_mapping.push((
 
@@ -469,7 +483,9 @@ impl Consensus {
 
                        target_port: port_desc.peer_id,
 
                        content: SyncPortContent::NotificationWave,
 
                    };
 
                    ctx.submit_message(Message::SyncPort(message)).unwrap();
 
                    // As with the other SyncPort where we throw away the
 
                    // result: we're dealing with an error here anyway
 
                    let _unused = ctx.submit_message(Message::SyncPort(message));
 
                }
 
            }
 
        }
 
@@ -566,7 +582,7 @@ impl Consensus {
 
                    target_component_id: peer.id,
 
                    content: SyncCompContent::Notification,
 
                };
 
                ctx.submit_message(Message::SyncComp(message));
 
                ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
            }
 

	
 
            // But also send our locally combined solution
 
@@ -579,7 +595,7 @@ impl Consensus {
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncCompContent::Notification
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        } // else: exactly equal, so do nothing
 

	
 
        return true;
 
@@ -665,7 +681,7 @@ impl Consensus {
 
                target_component_id: self.highest_connector_id,
 
                content,
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 

	
 
        return None;
 
@@ -690,7 +706,7 @@ impl Consensus {
 
                target_component_id: connector_id,
 
                content: SyncCompContent::GlobalSolution(global_solution.clone()),
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 

	
 
        debug_assert!(my_final_branch_id.is_valid());
 
@@ -716,7 +732,7 @@ impl Consensus {
 
                        target_component_id: presence.added_by,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
 
                    ctx.submit_message(Message::SyncComp(message));
 
                    ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
                }
 
            }
 
        }
 
@@ -748,7 +764,7 @@ impl Consensus {
 
                target_component_id: self.highest_connector_id,
 
                content: SyncCompContent::PartialSolution(partial_solution),
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 
    }
 
}
src/runtime2/mod.rs
Show inline comments
 
@@ -431,7 +431,7 @@ impl ConnectorStore {
 
            }
 
        }
 

	
 
        // println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        return key;
 
    }
 

	
 
@@ -444,7 +444,7 @@ impl ConnectorStore {
 
            // Note: but not deallocating!
 
        }
 

	
 
        // println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        self.free.push(key.index as usize);
 
    }
 
}
src/runtime2/scheduler.rs
Show inline comments
 
@@ -405,11 +405,11 @@ impl Scheduler {
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -15,9 +15,9 @@ use crate::runtime2::native::{ApplicationSyncAction};
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 1;
 
pub(crate) const NUM_INSTANCES: u32 = 5;
 
pub(crate) const NUM_LOOPS: u32 = 5;
 
pub(crate) const NUM_THREADS: u32 = 2;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
src/runtime2/tests/sync_failure.rs
Show inline comments
 
@@ -65,7 +65,7 @@ fn test_shared_sync_failure() {
 
    ";
 

	
 
    run_test_in_runtime(CODE, |api| {
 
        for variant in 0..4 { // all `Location` enum variants, except `Never`.
 
        for variant in 3..4 { // all `Location` enum variants, except `Never`.
 
            // Create the channels
 
            api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
0 comments (0 inline, 0 general)