Files
@ c62d6f0cc48a
Branch filter:
Location: CSY/reowolf/src/runtime2/component/component_internet.rs
c62d6f0cc48a
5.5 KiB
application/rls-services+xml
WIP on implementing (figuring out) tcp component
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | use crate::protocol::eval::{ValueGroup, Value, EvalError};
use crate::runtime2::*;
use crate::runtime2::component::CompCtx;
use crate::runtime2::stdlib::internet::*;
use super::component::{self, *};
use super::control_layer::*;
use super::consensus::*;
enum SocketState {
Connected(SocketTcpClient),
Error,
}
enum SyncState {
Getting,
Putting
}
pub struct ComponentTcpClient {
// Properties for the tcp socket
socket_state: SocketState,
pending_recv: Vec<DataMessage>, // on the input port
pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
pdl_output_port_id: PortId, // output towards PDL, so received over socket
// Generic component state
exec_state: CompExecState,
control: ControlLayer,
consensus: Consensus,
}
impl Component for ComponentTcpClient {
fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
self.handle_incoming_data_message(message);
}
fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
match mesage {
Message::Data(message) => {
self.handle_incoming_data_message(message);
},
Message::Sync(message) => {
let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
},
Message::Control(message) => {
component::default_handle_control_message(
&mut self.exec_state, &mut self.control, &mut self.consensus,
message, sched_ctx, comp_ctx
);
}
}
}
fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode));
match self.exec_state.mode {
CompMode::BlockedGet | CompMode::BlockedSelect => {
// impossible for this component. We always accept the input
// values, and we never perform an explicit select.
unreachable!();
},
CompMode::NonSync => {
// When in non-sync mode
match &mut self.socket_state {
SocketState::Connected(socket) => {
if self.pending_tx
},
SocketState::Error => {
self.exec_state.mode = CompMode::StartExit;
return Ok(CompScheduling::Immediate);
}
}
},
CompMode::Sync => {
},
CompMode::SyncEnd | CompMode::BlockedPut =>
return Ok(CompScheduling::Sleep),
CompMode::StartExit =>
return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)),
CompMode::BusyExit =>
return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)),
CompMode::Exit =>
return Ok(component::default_handle_exit(&self.exec_state)),
}
return Ok(CompScheduling::Immediate);
}
}
impl ComponentTcpClient {
pub(crate) fn new(arguments: ValueGroup) -> Self {
use std::net::{IpAddr, Ipv4Addr};
debug_assert_eq!(arguments.values.len(), 4);
// Parsing arguments
let ip_heap_pos = arguments.values[0].as_array();
let ip_elements = &arguments.regions[ip_heap_pos as usize];
if ip_elements.len() != 4 {
todo!("friendly error reporting: ip contains 4 octects");
}
let ip_address = IpAddr::V4(Ipv4Addr::new(
ip_elements[0].as_uint8(), ip_elements[1].as_uint8(),
ip_elements[2].as_uint8(), ip_elements[3].as_uint8()
));
let port = arguments.values[1].as_uint16();
let input_port = component::port_id_from_eval(arguments.values[2].as_input());
let output_port = component::port_id_from_eval(arguments.values[3].as_output());
let socket = SocketTcpClient::new(ip_address, port);
if let Err(socket) = socket {
todo!("friendly error reporting: failed to open socket {:?}", socket);
}
return Self{
socket_state: SocketState::Connected(socket.unwrap()),
pending_tx: Vec::new(),
pdl_input_port_id: input_port,
pdl_output_port_id: output_port,
exec_state: CompExecState::new(),
control: ControlLayer::default(),
consensus: Consensus::new(),
}
}
// Handles incoming data from the PDL side (hence, going into the socket)
fn handle_incoming_data_message(&mut self, message: DataMessage) {
// Input message is an array of bytes (u8)
self.pending_recv.push(message);
}
fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec<u8>) {
debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id);
debug_assert_eq!(message.content.values.len(), 1);
if let Value::Array(array_pos) = message.content.values[0] {
let region = &message.content.regions[array_pos as usize];
bytes.reserve(region.len());
for value in region {
bytes.push(value.as_uint8());
}
} else {
unreachable!();
}
}
}
|