diff --git a/firmware/rust1/src/bin/heizung.rs b/firmware/rust1/src/bin/heizung.rs index 7e65f6b9812db190d05f424c5958cf12e4eb1eea..7e2e11f401734212a39b17c1d7daa2c6dc81a190 100644 --- a/firmware/rust1/src/bin/heizung.rs +++ b/firmware/rust1/src/bin/heizung.rs @@ -646,8 +646,12 @@ async fn main2(spawner: Spawner) { static OUTPUTS_ACTIVE: AtomicU8 = AtomicU8::new(0); unwrap!(spawner.spawn(adc_task(&ADC_DATA, p.ADC, en_measure_current, analog_in1, current2, current1, measure_vcc, &OUTPUTS_ACTIVE))); + static mut RX_BUFFER: [u8; 256] = [0; 256]; + //SAFETY: We only borrow it once because this function will only be called once. + let rx_buffer_ref = unsafe { &mut RX_BUFFER }; let rs485 = RS485::new( - p.UART0, rx, tx, tx_en, interrupt::take!(UART0_IRQ), p.DMA_CH1, default_modbus_config(), + p.UART0, rx, tx, tx_en, interrupt::take!(UART0_IRQ), + p.DMA_CH1, default_modbus_config(), rx_buffer_ref, p.PIO0, p.DMA_CH0, p.DMA_CH2, ModbusServer::new(ModBusRegs { led_g, diff --git a/firmware/rust1/src/modbus_server.rs b/firmware/rust1/src/modbus_server.rs index 8afa7b46637b5807530889c5daf5ad9e8721319d..aa6d6bf7f0f8a67176c5e3e166c82b4222f4e78b 100644 --- a/firmware/rust1/src/modbus_server.rs +++ b/firmware/rust1/src/modbus_server.rs @@ -1,10 +1,10 @@ use defmt::*; use {defmt_rtt as _, panic_probe as _}; use heapless::Vec; -use crc::{Crc, CRC_16_MODBUS, Digest}; +use crc::{Crc, CRC_16_MODBUS}; use embassy_rp::uart; -use crate::rs485::{RS485Handler}; +use crate::rs485::{self, RS485Handler}; struct Cursor<'a, E>(&'a [u8], usize, E); @@ -96,6 +96,7 @@ pub enum ModbusErrorCode { GatewayTargetDeviceFailedToRespond = 0xb, } +#[derive(PartialEq, Eq, Format, Clone)] pub enum ModbusAdressMatch { NotOurAddress, OurAddress, @@ -117,67 +118,35 @@ pub trait ModbusRegisters { -> Result<(), ModbusErrorCode>; } -#[derive(PartialEq, Eq, Format)] -enum ModbusFrameLength { - NeedMoreData(u16), - Length(u16), - Unknown, -} - -//FIXME This won't work if this is a response frame! -fn get_modbus_frame_length(rxbuf: &[u8]) -> ModbusFrameLength { - use ModbusFrameLength::*; - - if rxbuf.len() < 3 { - return NeedMoreData(3); - } - - match rxbuf[1] { - 0x01..=0x06 => Length(8), - 0x0f | 0x10 => if rxbuf.len() == 7 { Length(9 + rxbuf[6] as u16) } else { NeedMoreData(7) }, - 0x14 | 0x15 => if rxbuf.len() == 3 { Length(5 + rxbuf[2] as u16) } else { NeedMoreData(3) }, - 0x16 => Length(10), - 0x17 => if rxbuf.len() == 11 { Length(13 + rxbuf[10] as u16) } else { NeedMoreData(11) }, - _ => Unknown, - } -} - const CRC: Crc<u16> = Crc::<u16>::new(&CRC_16_MODBUS); -const BUF_LENGTH: usize = 256; +const BUF_LENGTH: usize = 256; // rs485::RX_BUF_LEN must be the same. 256 is the right value for Modbus. pub struct ModbusServer<REGS: ModbusRegisters> { - rxbuf: Vec<u8, BUF_LENGTH>, - rxcrc: Digest<'static, u16>, - rx_expected_bytes: ModbusFrameLength, - rx_received_bytes: u16, txbuf: Vec<u8, BUF_LENGTH>, regs: REGS, } impl<REGS: ModbusRegisters> ModbusServer<REGS> { pub fn new(regs: REGS) -> ModbusServer<REGS> { - ModbusServer { - rxbuf: Vec::new(), - rxcrc: CRC.digest(), - rx_expected_bytes: ModbusFrameLength::NeedMoreData(3), - rx_received_bytes: 0, + defmt::assert!(rs485::RX_BUF_LEN == 256); + + ModbusServer { txbuf: Vec::new(), regs, } } - fn modbus_reply_error(self: &mut Self, code: ModbusErrorCode) { + fn modbus_reply_error(self: &mut Self, rxbuf: &[u8], code: ModbusErrorCode) { self.txbuf.clear(); - self.txbuf.push(self.rxbuf[0]).unwrap(); - self.txbuf.push(self.rxbuf[1] | 0x80).unwrap(); + self.txbuf.push(rxbuf[0]).unwrap(); + self.txbuf.push(rxbuf[1] | 0x80).unwrap(); self.txbuf.push(code as u8).unwrap(); } - fn handle_modbus_frame2(self: &mut Self, should_reply: bool) -> Result<(), ModbusErrorCode> { + fn handle_modbus_frame2(self: &mut Self, rxbuf: &[u8], should_reply: bool) -> Result<(), ModbusErrorCode> { use ModbusErrorCode::*; - let rxbuf = &self.rxbuf; let mut rx = Cursor::new(&rxbuf[0..rxbuf.len()-2], ModbusErrorCode::IllegalDataValue); let txbuf = &mut self.txbuf; //info!("Modbus frame: {:?}", rxbuf.as_slice()); @@ -460,23 +429,23 @@ impl<REGS: ModbusRegisters> ModbusServer<REGS> { } } - fn handle_modbus_frame(self: &mut Self) { - let should_reply = match self.regs.is_address_match(self.rxbuf[0]) { + fn handle_modbus_frame(self: &mut Self, rx_data: &[u8]) { + let should_reply = match self.regs.is_address_match(rx_data[0]) { ModbusAdressMatch::NotOurAddress => return, ModbusAdressMatch::OurAddress => true, ModbusAdressMatch::BroadcastNoReply => false, }; - match self.handle_modbus_frame2(should_reply) { + match self.handle_modbus_frame2(rx_data, should_reply) { Ok(()) => { if should_reply && self.txbuf.capacity() - self.txbuf.len() < 2 { // We don't have enough space for the CRC so reply with error instead. - self.modbus_reply_error(ModbusErrorCode::ServerDeviceFailure); + self.modbus_reply_error(rx_data, ModbusErrorCode::ServerDeviceFailure); } }, Err(code) => { if should_reply { - self.modbus_reply_error(code); + self.modbus_reply_error(rx_data, code); } else { warn!("Error result when processing Modbus frame but we won't reply because it is a broadcast: {:?}", code); } @@ -496,82 +465,40 @@ impl<REGS: ModbusRegisters> RS485Handler for ModbusServer<REGS> { //type CommandFuture = !; const TX_BUF_LENGTH: usize = BUF_LENGTH; - fn on_rx<F>(self: &mut Self, rx: Result<u8, uart::Error>, reply: Option<F>) + fn on_rx_frame<F>(self: &mut Self, rx: Result<&[u8], uart::Error>, reply: Option<F>) where F: FnOnce(&[u8]) { match rx { - Ok(rx_char) => { - //info!("RX {:?}", rx_char); - - self.rxcrc.update(&[rx_char]); - if !self.rxbuf.is_full() { - self.rxbuf.push(rx_char).unwrap_or_default(); - } - self.rx_received_bytes += 1; - - if let ModbusFrameLength::NeedMoreData(x) = self.rx_expected_bytes { - if x == self.rx_received_bytes { - self.rx_expected_bytes = get_modbus_frame_length(self.rxbuf.as_slice()); - match self.rx_expected_bytes { - ModbusFrameLength::Unknown => { - //FIXME Wait for pause. - }, - _ => {} - } - } + Ok(rx_data) if rx_data.len() == 0 => { + // This is what used to be the on_idle() call. + }, + Ok(rx_data) => { + //info!("RX {:?}", rx_data); + + let calculated_crc = CRC.checksum(rx_data); + const CORRECT_CRC: u16 = 0; // because we include the CRC bytes in our calculation + if calculated_crc != CORRECT_CRC { + info!("CRC: {:04x} (should be zero)", calculated_crc); } - if let ModbusFrameLength::Length(x) = self.rx_expected_bytes { - if x == self.rx_received_bytes { - let calculated_crc = self.rxcrc.clone().finalize(); - const CORRECT_CRC: u16 = 0; // because we include the CRC bytes in our calculation - if calculated_crc != CORRECT_CRC { - info!("CRC: {:04x} (should be zero)", calculated_crc); - } - //FIXME In case of CRC mismatch, wait for gap/idle of >=1.5 chars. - const OUR_ADDRESS: u8 = 1; - if self.rxbuf[0] == OUR_ADDRESS && calculated_crc == CORRECT_CRC { - self.txbuf.clear(); - self.handle_modbus_frame(); - - if !self.txbuf.is_empty() { - //info!("Modbus reply: {:?}", self.txbuf); - match reply { - Option::Some(reply) => reply(self.txbuf.as_slice()), - Option::None => warn!("Cannot send reply because a reply is already in progress!"), - } - } - } + if calculated_crc == CORRECT_CRC { + self.txbuf.clear(); + self.handle_modbus_frame(rx_data); - self.rxbuf.clear(); - self.rxcrc = CRC.digest(); - self.rx_expected_bytes = ModbusFrameLength::NeedMoreData(3); - self.rx_received_bytes = 0; + if !self.txbuf.is_empty() { + //info!("Modbus reply: {:?}", self.txbuf); + match reply { + Option::Some(reply) => reply(self.txbuf.as_slice()), + Option::None => warn!("Cannot send reply because a reply is already in progress!"), + } } } }, Err(e) => { info!("RX error {:?}", e); - - //FIXME wait for gap/idle of >=1.5 chars. - self.rxbuf.clear(); - self.rxcrc = CRC.digest(); - self.rx_expected_bytes = ModbusFrameLength::NeedMoreData(3); - self.rx_received_bytes = 0; } } } - fn on_idle(self: &mut Self) { - if !self.rxbuf.is_empty() { - warn!("Partial frame in rx buffer, cut short by inter-byte gap: {:?}, {:?}", self.rx_expected_bytes, self.rxbuf); - } - - self.rxbuf.clear(); - self.rxcrc = CRC.digest(); - self.rx_expected_bytes = ModbusFrameLength::NeedMoreData(3); - self.rx_received_bytes = 0; - } - fn on_tx_done(self: &mut Self) { //TODO } diff --git a/firmware/rust1/src/rs485.rs b/firmware/rust1/src/rs485.rs index cbb7b4fa4c08b7ed3cc8795248c988aab1f8234a..771255e016aa49df9be32f880b55d6fc66d7c7d0 100644 --- a/firmware/rust1/src/rs485.rs +++ b/firmware/rust1/src/rs485.rs @@ -3,15 +3,18 @@ use embassy_futures::select::*; use embassy_rp::gpio; use embassy_rp::pac; use embassy_rp::interrupt::UART0_IRQ; +use embassy_rp::uart::BufferedUartRx; use embassy_rp::uart::Parity; use embassy_time::{Duration, Timer}; use embassy_embedded_hal::SetConfig; use embassy_rp::peripherals::{self, UART0}; -use embassy_rp::uart::{self, UartRx}; +use embassy_rp::uart::{self}; use embassy_rp::pio::{Config, Pio, ShiftConfig, ShiftDirection, FifoJoin}; use embassy_rp::relocate::RelocatedProgram; use embassy_rp::Peripheral; +use embedded_io::asynch::BufRead; use {defmt_rtt as _, panic_probe as _}; +use embedded_io::asynch::Read; use fixed::traits::ToFixed; use fixed::types::U24F8; use fixed::types::U56F8; @@ -23,13 +26,14 @@ pub trait RS485Handler { //type CommandFuture: Future<Output = ()>; const TX_BUF_LENGTH: usize; - fn on_rx<F>(self: &mut Self, rx: Result<u8, uart::Error>, reply: Option<F>) + fn on_rx_frame<F>(self: &mut Self, rx: Result<&[u8], uart::Error>, reply: Option<F>) where F: FnOnce(&[u8]); - fn on_idle(self: &mut Self); fn on_tx_done(self: &mut Self); fn on_autobaud_success(self: &mut Self, baudrate: f32); } +pub const RX_BUF_LEN: usize = 256; + pub struct RS485<H: RS485Handler> { pio: peripherals::PIO0, rx_pin: peripherals::PIN_17, @@ -37,7 +41,8 @@ pub struct RS485<H: RS485Handler> { tx_en_pin: peripherals::PIN_15, dma_channel: peripherals::DMA_CH0, tx_dma_channel: peripherals::DMA_CH2, - rx: UartRx<'static, UART0, uart::Async>, + rx: BufferedUartRx<'static, UART0>, + rx_buffer: [u8; RX_BUF_LEN], uart_baud_rate: u32, uart_parity: Parity, handler: H @@ -79,23 +84,27 @@ async fn debug_print_pio_addr(sm: pac::pio::StateMachine) { impl<H: RS485Handler> RS485<H> { pub fn new( uart: UART0, rx_pin: peripherals::PIN_17, tx_pin: peripherals::PIN_16, tx_en_pin: peripherals::PIN_15, - rx_irq: UART0_IRQ, rx_dma_channel: peripherals::DMA_CH1, uart_config: uart::Config, + rx_irq: UART0_IRQ, _rx_dma_channel: peripherals::DMA_CH1, uart_config: uart::Config, rx_buffer: &'static mut [u8; RX_BUF_LEN], pio: peripherals::PIO0, dma_channel: peripherals::DMA_CH0, tx_dma_channel: peripherals::DMA_CH2, handler: H ) -> RS485<H> { // SAFETY: The auto-baud will only read from this pin and we will set it back to UART mode after initialising the PIO. let rx_pin_for_autobaud = unsafe { rx_pin.clone_unchecked() }; - let uart_rx = UartRx::new( + core::assert!(rx_buffer.len() == RX_BUF_LEN, + "The buffers must have the same length because BufferedUartRx::try_read uses pop() and then expects to write all data to the 2nd buffer."); + + let uart_rx = BufferedUartRx::new( uart, - rx_pin, rx_irq, - rx_dma_channel, + rx_pin, + rx_buffer, uart_config, ); RS485 { pio, rx_pin: rx_pin_for_autobaud, tx_pin, tx_en_pin, dma_channel, tx_dma_channel, rx: uart_rx, + rx_buffer: [0; RX_BUF_LEN], uart_baud_rate: uart_config.baudrate, uart_parity: uart_config.parity, handler, @@ -245,7 +254,8 @@ impl<H: RS485Handler> RS485<H> { let timeout_seconds = 1.5 * symbols_per_byte as f32 / self.uart_baud_rate as f32; let timeout_start_value = (SM_FREQ as f32 * timeout_seconds / 2.) as u32; sm.tx().push(timeout_start_value); - info!("timeout_start_value: {} = 0x{:08x}, {} sec, clkdiv 0x{:08x}", timeout_start_value, timeout_start_value, timeout_seconds, cfg.clock_divider.to_bits()); + info!("timeout_start_value: {} = 0x{:08x}, {} sec, clkdiv 0x{:08x}", + timeout_start_value, timeout_start_value, timeout_seconds, cfg.clock_divider.to_bits()); // switch to the real program and join FIFOs unsafe { common.free_instr(loaded_program.used_memory); }; @@ -268,9 +278,23 @@ impl<H: RS485Handler> RS485<H> { // This switching of RX function might have caused an error to be received. Let's skip that. if true { - let mut rx_buf_one = [0; 1]; - let rx_future = self.rx.read(&mut rx_buf_one); - let _ = select(rx_future, Timer::after(Duration::from_micros(10))).await; + //NOTE We were using self.rx.fill_buf() but that somehow caused us to not receive any more data afterwards. + let mut buf = [0; 1]; + let x = select(self.rx.read(&mut buf), Timer::after(Duration::from_micros(10))).await; + if false { + match x { + Either::First(x) => + info!("DEBUG: first rx: {:?}", x), + Either::Second(x) => + info!("DEBUG: first rx: {:?}", x), + } + } + match x { + Either::First(Ok(_len)) => { + //self.rx.consume(len) + }, + _ => (), + } } info!("Program size for auto-baud: {}, for tx: {}, wait_for_not_idle is at: {}", @@ -308,8 +332,6 @@ impl<H: RS485Handler> RS485<H> { let mut dma_tx_ref = self.tx_dma_channel.into_ref(); let mut din = [42u32; 9]; let mut bit_index = 0; - let mut rx_buf_one = [0; 1]; - let mut rx_future = DontAbort::new(self.rx.read(&mut rx_buf_one), PanicIfReused); let mut tx_future = DontAbort::hang(); loop { let x = select4( @@ -318,7 +340,8 @@ impl<H: RS485Handler> RS485<H> { //debug_print_pio_addr(embassy_rp::pac::PIO0.sm(0)), tx_future.continue_wait(), select3( - rx_future.continue_wait(), + //rx_future.continue_wait(), + core::future::pending::<()>(), //debug_print_pio_addr(embassy_rp::pac::PIO0.sm(1)), core::future::pending::<()>(), irq1.wait(), @@ -419,7 +442,8 @@ impl<H: RS485Handler> RS485<H> { tx_in_progress = false; self.handler.on_tx_done(); }, - Either4::Fourth(Either3::First(x)) => { + Either4::Fourth(Either3::First(_)) => { + /* drop(rx_future); let rx_char = x.map(|_| -> u8 { rx_buf_one[0] }); if Err(embassy_rp::uart::Error::Overrun) == x { @@ -455,9 +479,60 @@ impl<H: RS485Handler> RS485<H> { tx_in_progress = true; } } + */ }, Either4::Fourth(Either3::Third(())) => { - self.handler.on_idle(); + let rx_data = match select(self.rx.read(&mut self.rx_buffer), core::future::ready(())).await { + Either::First(Ok(len)) => { + // The UART uses a ringbuffer and it only pops data once so we need to ask a second + // time if the frame wraps around the end of the ringbuffer and we want all data. + //FIXME There is a race condition: If the ringbuffer was full, it will exactly fill our buffer. + // However, we could be receiving another character right now, which will probably cause a panic in read(). + // -> Actually, pop() has a sane API so I think this cannot happen: The closure returns how + // much bytes have been processed and I think embassy-rp does that correctly. + match select(self.rx.read(&mut self.rx_buffer[len..]), core::future::ready(())).await { + Either::First(Ok(len2)) => { + info!("DEBUG: We needed two calls to self.rx.read(): {} + {}", len, len2); + Ok(&self.rx_buffer[0..len+len2]) + }, + Either::First(Err(err)) => Err(err), // oops, we are ignoring the previous data + Either::Second(()) => Ok(&self.rx_buffer[0..len]), + } + }, + Either::First(Err(err)) => Err(err), + Either::Second(()) => { + Ok(&self.rx_buffer[0..0]) + }, + }; + + if tx_in_progress { + // use a dummy closure to fix the type of the option because I don't know a better way... + let dummy = |_txdata: &[u8]| {}; + self.handler.on_rx_frame(rx_data, if false {Option::Some(dummy)} else {Option::None}); + } else { + drop(tx_future); + tx_future = DontAbort::hang(); + let mut txlen = Option::None; + + self.handler.on_rx_frame(rx_data, Option::Some(|txdata: &[u8]| { + tx_data[0] = (txdata.len() - 1) as u32; + for i in 0..txdata.len() { + let x = txdata[i] & 0xff; + let mut parity = 0; + for j in 0..8 { + parity ^= x>>j; + } + tx_data[i + 1] = x as u32 | (((parity as u32) & 1) << 8); + } + txlen = Option::Some(txdata.len()); + })); + if let Option::Some(len) = txlen { + //FIXME Wait for idle and then wait again from `wait_idle` before sending. We should have a gap of 3.5 chars. + tx_future = DontAbort::new(sm1.tx().dma_push(dma_tx_ref.reborrow(), + &tx_data[0..(len+1)]), HangIfReused); + tx_in_progress = true; + } + } }, _ => { },