diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 64cf8f35d58c40467a2a7d157570f43f64a5f2b4..2873ce68b0775c786311f1ab12314c9fcb512d95 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -26,17 +26,20 @@ impl SocketState { /// States from the point of view of the component that is connecting to this /// TCP component (i.e. from the point of view of attempting to interface with /// a socket). +#[derive(PartialEq, Debug)] enum SyncState { AwaitingCmd, Getting, - Putting + Putting, + FinishSync, } pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, sync_state: SyncState, - pending_recv: Vec, // on the input port + inbox_main: Option, + inbox_backup: Vec, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket input_union_send_tag_value: i64, @@ -54,7 +57,8 @@ impl Component for ComponentTcpClient { fn on_creation(&mut self, sched_ctx: &SchedulerCtx) { let pd = &sched_ctx.runtime.protocol; let cmd_type = pd.find_type(b"std.internet", b"Cmd") - .expect("'Cmd' type in the 'std.internet' module") + .expect("'Cmd' type in the 'std.internet' module"); + let cmd_type = cmd_type .as_union(); self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); @@ -63,13 +67,17 @@ impl Component for ComponentTcpClient { } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { - self.handle_incoming_data_message(message); + if self.inbox_main.is_none() { + self.inbox_main = Some(message); + } else { + self.inbox_backup.push(message); + } } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { - match mesage { + match message { Message::Data(message) => { - self.handle_incoming_data_message(message); + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); @@ -80,7 +88,10 @@ impl Component for ComponentTcpClient { &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); - } + }, + Message::Poll => { + sched_ctx.log("Received polling event"); + }, } } @@ -113,7 +124,7 @@ impl Component for ComponentTcpClient { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { - if let Some(message) = self.pending_recv.pop() { + if let Some(message) = self.inbox_backup.pop() { if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { // Check which command we're supposed to execute. let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); @@ -186,13 +197,11 @@ impl Component for ComponentTcpClient { self.byte_buffer.resize(BUFFER_SIZE, 0); match socket.receive(&mut self.byte_buffer) { Ok(num_received) => { - self.byte_buffer.resize(num_received); + self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); - - let port_handle = comp_ctx.get_port_handle(self.pdl_output_port_id); - let port_info = comp_ctx.get_port(port_handle); - let message = self.consensus.annotate_data_message(comp_ctx, port_info, message_content); - + let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); + self.sync_state = SyncState::FinishSync; + return Ok(scheduling); }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { @@ -203,12 +212,18 @@ impl Component for ComponentTcpClient { } } }, + SyncState::FinishSync => { + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.exec_state.mode = CompMode::SyncEnd; + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Requeue); + } } }, CompMode::BlockedGet => { // Entered when awaiting a new command debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); - if self. + return Ok(CompScheduling::Sleep); }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), @@ -252,6 +267,9 @@ impl ComponentTcpClient { return Self{ socket_state: SocketState::Connected(socket.unwrap()), + sync_state: SyncState::AwaitingCmd, + inbox_main: None, + inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, input_union_finish_tag_value: -1, @@ -260,13 +278,24 @@ impl ComponentTcpClient { exec_state: CompExecState::new(), control: ControlLayer::default(), consensus: Consensus::new(), + byte_buffer: Vec::new(), } } // Handles incoming data from the PDL side (hence, going into the socket) - fn handle_incoming_data_message(&mut self, message: DataMessage) { - // Input message is an array of bytes (u8) - self.pending_recv.push(message); + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } } fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) {