From 16ab24974c19a4873d9c179c070053a1efc44619 Mon Sep 17 00:00:00 2001
From: Benjamin Koch <bbbsnowball@gmail.com>
Date: Wed, 28 Jun 2023 03:10:46 +0200
Subject: [PATCH] use BufferedUartRx

---
 firmware/rust1/src/bin/heizung.rs   |   6 +-
 firmware/rust1/src/modbus_server.rs | 145 +++++++---------------------
 firmware/rust1/src/rs485.rs         | 109 +++++++++++++++++----
 3 files changed, 133 insertions(+), 127 deletions(-)

diff --git a/firmware/rust1/src/bin/heizung.rs b/firmware/rust1/src/bin/heizung.rs
index 7e65f6b..7e2e11f 100644
--- a/firmware/rust1/src/bin/heizung.rs
+++ b/firmware/rust1/src/bin/heizung.rs
@@ -646,8 +646,12 @@ async fn main2(spawner: Spawner) {
     static OUTPUTS_ACTIVE: AtomicU8 = AtomicU8::new(0);
     unwrap!(spawner.spawn(adc_task(&ADC_DATA, p.ADC, en_measure_current, analog_in1, current2, current1, measure_vcc, &OUTPUTS_ACTIVE)));
 
+    static mut RX_BUFFER: [u8; 256] = [0; 256];
+    //SAFETY: We only borrow it once because this function will only be called once.
+    let rx_buffer_ref = unsafe { &mut RX_BUFFER };
     let rs485 = RS485::new(
-        p.UART0, rx, tx, tx_en, interrupt::take!(UART0_IRQ), p.DMA_CH1, default_modbus_config(),
+        p.UART0, rx, tx, tx_en, interrupt::take!(UART0_IRQ),
+        p.DMA_CH1, default_modbus_config(), rx_buffer_ref,
         p.PIO0, p.DMA_CH0, p.DMA_CH2,
         ModbusServer::new(ModBusRegs {
             led_g,
diff --git a/firmware/rust1/src/modbus_server.rs b/firmware/rust1/src/modbus_server.rs
index 8afa7b4..aa6d6bf 100644
--- a/firmware/rust1/src/modbus_server.rs
+++ b/firmware/rust1/src/modbus_server.rs
@@ -1,10 +1,10 @@
 use defmt::*;
 use {defmt_rtt as _, panic_probe as _};
 use heapless::Vec;
-use crc::{Crc, CRC_16_MODBUS, Digest};
+use crc::{Crc, CRC_16_MODBUS};
 use embassy_rp::uart;
 
-use crate::rs485::{RS485Handler};
+use crate::rs485::{self, RS485Handler};
 
 struct Cursor<'a, E>(&'a [u8], usize, E);
 
@@ -96,6 +96,7 @@ pub enum ModbusErrorCode {
     GatewayTargetDeviceFailedToRespond = 0xb,
 }
 
+#[derive(PartialEq, Eq, Format, Clone)]
 pub enum ModbusAdressMatch {
     NotOurAddress,
     OurAddress,
@@ -117,67 +118,35 @@ pub trait ModbusRegisters {
             -> Result<(), ModbusErrorCode>;
 }
 
-#[derive(PartialEq, Eq, Format)]
-enum ModbusFrameLength {
-    NeedMoreData(u16),
-    Length(u16),
-    Unknown,
-}
-
-//FIXME This won't work if this is a response frame!
-fn get_modbus_frame_length(rxbuf: &[u8]) -> ModbusFrameLength {
-    use ModbusFrameLength::*;
-
-    if rxbuf.len() < 3 {
-        return NeedMoreData(3);
-    }
-
-    match rxbuf[1] {
-        0x01..=0x06 => Length(8),
-        0x0f | 0x10 => if rxbuf.len() == 7 { Length(9 + rxbuf[6] as u16) } else { NeedMoreData(7) },
-        0x14 | 0x15 => if rxbuf.len() == 3 { Length(5 + rxbuf[2] as u16) } else { NeedMoreData(3) },
-        0x16 => Length(10),
-        0x17 => if rxbuf.len() == 11 { Length(13 + rxbuf[10] as u16) } else { NeedMoreData(11) },
-        _ => Unknown,
-    }
-}
-
 const CRC: Crc<u16> = Crc::<u16>::new(&CRC_16_MODBUS);
-const BUF_LENGTH: usize = 256;
+const BUF_LENGTH: usize = 256;  // rs485::RX_BUF_LEN must be the same. 256 is the right value for Modbus.
 
 pub struct ModbusServer<REGS: ModbusRegisters> {
-    rxbuf: Vec<u8, BUF_LENGTH>,
-    rxcrc: Digest<'static, u16>,
-    rx_expected_bytes: ModbusFrameLength,
-    rx_received_bytes: u16,
     txbuf: Vec<u8, BUF_LENGTH>,
     regs: REGS,
 }
 
 impl<REGS: ModbusRegisters> ModbusServer<REGS> {
     pub fn new(regs: REGS) -> ModbusServer<REGS> {
-        ModbusServer { 
-            rxbuf: Vec::new(),
-            rxcrc: CRC.digest(),
-            rx_expected_bytes: ModbusFrameLength::NeedMoreData(3),
-            rx_received_bytes: 0,
+        defmt::assert!(rs485::RX_BUF_LEN == 256);
+
+        ModbusServer {
             txbuf: Vec::new(),
             regs,
         }
     }
 
 
-    fn modbus_reply_error(self: &mut Self, code: ModbusErrorCode) {
+    fn modbus_reply_error(self: &mut Self, rxbuf: &[u8], code: ModbusErrorCode) {
         self.txbuf.clear();
-        self.txbuf.push(self.rxbuf[0]).unwrap();
-        self.txbuf.push(self.rxbuf[1] | 0x80).unwrap();
+        self.txbuf.push(rxbuf[0]).unwrap();
+        self.txbuf.push(rxbuf[1] | 0x80).unwrap();
         self.txbuf.push(code as u8).unwrap();
     }
     
-    fn handle_modbus_frame2(self: &mut Self, should_reply: bool) -> Result<(), ModbusErrorCode> {
+    fn handle_modbus_frame2(self: &mut Self, rxbuf: &[u8], should_reply: bool) -> Result<(), ModbusErrorCode> {
         use ModbusErrorCode::*;
 
-        let rxbuf = &self.rxbuf;
         let mut rx = Cursor::new(&rxbuf[0..rxbuf.len()-2], ModbusErrorCode::IllegalDataValue);
         let txbuf = &mut self.txbuf;
         //info!("Modbus frame: {:?}", rxbuf.as_slice());
@@ -460,23 +429,23 @@ impl<REGS: ModbusRegisters> ModbusServer<REGS> {
         }
     }
     
-    fn handle_modbus_frame(self: &mut Self) {
-        let should_reply = match self.regs.is_address_match(self.rxbuf[0]) {
+    fn handle_modbus_frame(self: &mut Self, rx_data: &[u8]) {
+        let should_reply = match self.regs.is_address_match(rx_data[0]) {
             ModbusAdressMatch::NotOurAddress => return,
             ModbusAdressMatch::OurAddress => true,
             ModbusAdressMatch::BroadcastNoReply => false,
         };
 
-        match self.handle_modbus_frame2(should_reply) {
+        match self.handle_modbus_frame2(rx_data, should_reply) {
             Ok(()) => {
                 if should_reply && self.txbuf.capacity() - self.txbuf.len() < 2 {
                     // We don't have enough space for the CRC so reply with error instead.
-                    self.modbus_reply_error(ModbusErrorCode::ServerDeviceFailure);
+                    self.modbus_reply_error(rx_data, ModbusErrorCode::ServerDeviceFailure);
                 }
             },
             Err(code) => {
                 if should_reply {
-                    self.modbus_reply_error(code);
+                    self.modbus_reply_error(rx_data, code);
                 } else {
                     warn!("Error result when processing Modbus frame but we won't reply because it is a broadcast: {:?}", code);
                 }
@@ -496,82 +465,40 @@ impl<REGS: ModbusRegisters> RS485Handler for ModbusServer<REGS> {
     //type CommandFuture = !;
     const TX_BUF_LENGTH: usize = BUF_LENGTH;
 
-    fn on_rx<F>(self: &mut Self, rx: Result<u8, uart::Error>, reply: Option<F>)
+    fn on_rx_frame<F>(self: &mut Self, rx: Result<&[u8], uart::Error>, reply: Option<F>)
         where F: FnOnce(&[u8]) {
         match rx {
-            Ok(rx_char) => {
-                //info!("RX {:?}", rx_char);
-
-                self.rxcrc.update(&[rx_char]);
-                if !self.rxbuf.is_full() {
-                    self.rxbuf.push(rx_char).unwrap_or_default();
-                }
-                self.rx_received_bytes += 1;
-
-                if let ModbusFrameLength::NeedMoreData(x) = self.rx_expected_bytes {
-                    if x == self.rx_received_bytes {
-                        self.rx_expected_bytes = get_modbus_frame_length(self.rxbuf.as_slice());
-                        match self.rx_expected_bytes {
-                            ModbusFrameLength::Unknown => {
-                                //FIXME Wait for pause.
-                            },
-                            _ => {}
-                        }
-                    }
+            Ok(rx_data) if rx_data.len() == 0 => {
+                // This is what used to be the on_idle() call.
+            },
+            Ok(rx_data) => {
+                //info!("RX {:?}", rx_data);
+
+                let calculated_crc = CRC.checksum(rx_data);
+                const CORRECT_CRC: u16 = 0;  // because we include the CRC bytes in our calculation
+                if calculated_crc != CORRECT_CRC {
+                    info!("CRC: {:04x} (should be zero)", calculated_crc);
                 }
-                if let ModbusFrameLength::Length(x) = self.rx_expected_bytes {
-                    if x == self.rx_received_bytes {
-                        let calculated_crc = self.rxcrc.clone().finalize();
-                        const CORRECT_CRC: u16 = 0;  // because we include the CRC bytes in our calculation
-                        if calculated_crc != CORRECT_CRC {
-                            info!("CRC: {:04x} (should be zero)", calculated_crc);
-                        }
 
-                        //FIXME In case of CRC mismatch, wait for gap/idle of >=1.5 chars.
-                        const OUR_ADDRESS: u8 = 1;
-                        if self.rxbuf[0] == OUR_ADDRESS && calculated_crc == CORRECT_CRC {
-                            self.txbuf.clear();
-                            self.handle_modbus_frame();
-
-                            if !self.txbuf.is_empty() {
-                                //info!("Modbus reply: {:?}", self.txbuf);
-                                match reply {
-                                    Option::Some(reply) => reply(self.txbuf.as_slice()),
-                                    Option::None => warn!("Cannot send reply because a reply is already in progress!"),
-                                }
-                            }
-                        }
+                if calculated_crc == CORRECT_CRC {
+                    self.txbuf.clear();
+                    self.handle_modbus_frame(rx_data);
 
-                        self.rxbuf.clear();
-                        self.rxcrc = CRC.digest();
-                        self.rx_expected_bytes = ModbusFrameLength::NeedMoreData(3);
-                        self.rx_received_bytes = 0;
+                    if !self.txbuf.is_empty() {
+                        //info!("Modbus reply: {:?}", self.txbuf);
+                        match reply {
+                            Option::Some(reply) => reply(self.txbuf.as_slice()),
+                            Option::None => warn!("Cannot send reply because a reply is already in progress!"),
+                        }
                     }
                 }
             },
             Err(e) => {
                 info!("RX error {:?}", e);
-        
-                //FIXME wait for gap/idle of >=1.5 chars.
-                self.rxbuf.clear();
-                self.rxcrc = CRC.digest();
-                self.rx_expected_bytes = ModbusFrameLength::NeedMoreData(3);
-                self.rx_received_bytes = 0;
             }
         }
     }
 
-    fn on_idle(self: &mut Self) {
-        if !self.rxbuf.is_empty() {
-            warn!("Partial frame in rx buffer, cut short by inter-byte gap: {:?}, {:?}", self.rx_expected_bytes, self.rxbuf);
-        }
-
-        self.rxbuf.clear();
-        self.rxcrc = CRC.digest();
-        self.rx_expected_bytes = ModbusFrameLength::NeedMoreData(3);
-        self.rx_received_bytes = 0;
-    }
-
     fn on_tx_done(self: &mut Self) {
         //TODO
     }
diff --git a/firmware/rust1/src/rs485.rs b/firmware/rust1/src/rs485.rs
index cbb7b4f..771255e 100644
--- a/firmware/rust1/src/rs485.rs
+++ b/firmware/rust1/src/rs485.rs
@@ -3,15 +3,18 @@ use embassy_futures::select::*;
 use embassy_rp::gpio;
 use embassy_rp::pac;
 use embassy_rp::interrupt::UART0_IRQ;
+use embassy_rp::uart::BufferedUartRx;
 use embassy_rp::uart::Parity;
 use embassy_time::{Duration, Timer};
 use embassy_embedded_hal::SetConfig;
 use embassy_rp::peripherals::{self, UART0};
-use embassy_rp::uart::{self, UartRx};
+use embassy_rp::uart::{self};
 use embassy_rp::pio::{Config, Pio, ShiftConfig, ShiftDirection, FifoJoin};
 use embassy_rp::relocate::RelocatedProgram;
 use embassy_rp::Peripheral;
+use embedded_io::asynch::BufRead;
 use {defmt_rtt as _, panic_probe as _};
+use embedded_io::asynch::Read;
 use fixed::traits::ToFixed;
 use fixed::types::U24F8;
 use fixed::types::U56F8;
@@ -23,13 +26,14 @@ pub trait RS485Handler {
     //type CommandFuture: Future<Output = ()>;
     const TX_BUF_LENGTH: usize;
 
-    fn on_rx<F>(self: &mut Self, rx: Result<u8, uart::Error>, reply: Option<F>)
+    fn on_rx_frame<F>(self: &mut Self, rx: Result<&[u8], uart::Error>, reply: Option<F>)
         where F: FnOnce(&[u8]);
-    fn on_idle(self: &mut Self);
     fn on_tx_done(self: &mut Self);
     fn on_autobaud_success(self: &mut Self, baudrate: f32);
 }
 
+pub const RX_BUF_LEN: usize = 256;
+
 pub struct RS485<H: RS485Handler> {
     pio: peripherals::PIO0,
     rx_pin: peripherals::PIN_17,
@@ -37,7 +41,8 @@ pub struct RS485<H: RS485Handler> {
     tx_en_pin: peripherals::PIN_15,
     dma_channel: peripherals::DMA_CH0,
     tx_dma_channel: peripherals::DMA_CH2,
-    rx: UartRx<'static, UART0, uart::Async>,
+    rx: BufferedUartRx<'static, UART0>,
+    rx_buffer: [u8; RX_BUF_LEN],
     uart_baud_rate: u32,
     uart_parity: Parity,
     handler: H
@@ -79,23 +84,27 @@ async fn debug_print_pio_addr(sm: pac::pio::StateMachine) {
 impl<H: RS485Handler> RS485<H> {
     pub fn new(
             uart: UART0, rx_pin: peripherals::PIN_17, tx_pin: peripherals::PIN_16, tx_en_pin: peripherals::PIN_15,
-            rx_irq: UART0_IRQ, rx_dma_channel: peripherals::DMA_CH1, uart_config: uart::Config,
+            rx_irq: UART0_IRQ, _rx_dma_channel: peripherals::DMA_CH1, uart_config: uart::Config, rx_buffer: &'static mut [u8; RX_BUF_LEN],
             pio: peripherals::PIO0, dma_channel: peripherals::DMA_CH0, tx_dma_channel: peripherals::DMA_CH2,
             handler: H
         ) -> RS485<H> {
         // SAFETY: The auto-baud will only read from this pin and we will set it back to UART mode after initialising the PIO.
         let rx_pin_for_autobaud = unsafe { rx_pin.clone_unchecked() };
 
-        let uart_rx = UartRx::new(
+        core::assert!(rx_buffer.len() == RX_BUF_LEN,
+            "The buffers must have the same length because BufferedUartRx::try_read uses pop() and then expects to write all data to the 2nd buffer.");
+
+        let uart_rx = BufferedUartRx::new(
             uart,
-            rx_pin,
             rx_irq,
-            rx_dma_channel,
+            rx_pin,
+            rx_buffer,
             uart_config,
         );
         RS485 {
             pio, rx_pin: rx_pin_for_autobaud, tx_pin, tx_en_pin, dma_channel, tx_dma_channel,
             rx: uart_rx,
+            rx_buffer: [0; RX_BUF_LEN],
             uart_baud_rate: uart_config.baudrate,
             uart_parity: uart_config.parity,
             handler,
@@ -245,7 +254,8 @@ impl<H: RS485Handler> RS485<H> {
         let timeout_seconds = 1.5 * symbols_per_byte as f32 / self.uart_baud_rate as f32;
         let timeout_start_value = (SM_FREQ as f32 * timeout_seconds / 2.) as u32;
         sm.tx().push(timeout_start_value);
-        info!("timeout_start_value: {} = 0x{:08x}, {} sec, clkdiv 0x{:08x}", timeout_start_value, timeout_start_value, timeout_seconds, cfg.clock_divider.to_bits());
+        info!("timeout_start_value: {} = 0x{:08x}, {} sec, clkdiv 0x{:08x}",
+            timeout_start_value, timeout_start_value, timeout_seconds, cfg.clock_divider.to_bits());
 
         // switch to the real program and join FIFOs
         unsafe { common.free_instr(loaded_program.used_memory); };
@@ -268,9 +278,23 @@ impl<H: RS485Handler> RS485<H> {
 
         // This switching of RX function might have caused an error to be received. Let's skip that.
         if true {
-            let mut rx_buf_one = [0; 1];
-            let rx_future = self.rx.read(&mut rx_buf_one);
-            let _ = select(rx_future, Timer::after(Duration::from_micros(10))).await;
+            //NOTE We were using self.rx.fill_buf() but that somehow caused us to not receive any more data afterwards.
+            let mut buf = [0; 1];
+            let x = select(self.rx.read(&mut buf), Timer::after(Duration::from_micros(10))).await;
+            if false {
+                match x {
+                    Either::First(x) =>
+                        info!("DEBUG: first rx: {:?}", x),
+                    Either::Second(x) =>
+                        info!("DEBUG: first rx: {:?}", x),
+                }
+            }
+            match x {
+                Either::First(Ok(_len)) => {
+                    //self.rx.consume(len)
+                },
+                _ => (),
+            }
         }
 
         info!("Program size for auto-baud: {}, for tx: {}, wait_for_not_idle is at: {}",
@@ -308,8 +332,6 @@ impl<H: RS485Handler> RS485<H> {
         let mut dma_tx_ref = self.tx_dma_channel.into_ref();
         let mut din = [42u32; 9];
         let mut bit_index = 0;
-        let mut rx_buf_one = [0; 1];
-        let mut rx_future = DontAbort::new(self.rx.read(&mut rx_buf_one), PanicIfReused);
         let mut tx_future = DontAbort::hang();
         loop {
             let x = select4(
@@ -318,7 +340,8 @@ impl<H: RS485Handler> RS485<H> {
                 //debug_print_pio_addr(embassy_rp::pac::PIO0.sm(0)),
                 tx_future.continue_wait(),
                 select3(
-                    rx_future.continue_wait(),
+                    //rx_future.continue_wait(),
+                    core::future::pending::<()>(),
                     //debug_print_pio_addr(embassy_rp::pac::PIO0.sm(1)),
                     core::future::pending::<()>(),
                     irq1.wait(),
@@ -419,7 +442,8 @@ impl<H: RS485Handler> RS485<H> {
                     tx_in_progress = false;
                     self.handler.on_tx_done();
                 },
-                Either4::Fourth(Either3::First(x)) => {
+                Either4::Fourth(Either3::First(_)) => {
+                    /*
                     drop(rx_future);
                     let rx_char = x.map(|_| -> u8 { rx_buf_one[0] });
                     if Err(embassy_rp::uart::Error::Overrun) == x {
@@ -455,9 +479,60 @@ impl<H: RS485Handler> RS485<H> {
                             tx_in_progress = true;
                         }
                     }
+                    */
                 },
                 Either4::Fourth(Either3::Third(())) => {
-                    self.handler.on_idle();
+                    let rx_data = match select(self.rx.read(&mut self.rx_buffer), core::future::ready(())).await {
+                        Either::First(Ok(len)) => {
+                            // The UART uses a ringbuffer and it only pops data once so we need to ask a second
+                            // time if the frame wraps around the end of the ringbuffer and we want all data.
+                            //FIXME There is a race condition: If the ringbuffer was full, it will exactly fill our buffer.
+                            //      However, we could be receiving another character right now, which will probably cause a panic in read().
+                            //      -> Actually, pop() has a sane API so I think this cannot happen: The closure returns how
+                            //         much bytes have been processed and I think embassy-rp does that correctly.
+                            match select(self.rx.read(&mut self.rx_buffer[len..]), core::future::ready(())).await {
+                                Either::First(Ok(len2)) => {
+                                    info!("DEBUG: We needed two calls to self.rx.read(): {} + {}", len, len2);
+                                    Ok(&self.rx_buffer[0..len+len2])
+                                },
+                                Either::First(Err(err)) => Err(err), // oops, we are ignoring the previous data
+                                Either::Second(()) => Ok(&self.rx_buffer[0..len]),
+                            }
+                        },
+                        Either::First(Err(err)) => Err(err),
+                        Either::Second(()) => {
+                            Ok(&self.rx_buffer[0..0])
+                        },
+                    };
+
+                    if tx_in_progress {
+                        // use a dummy closure to fix the type of the option because I don't know a better way...
+                        let dummy = |_txdata: &[u8]| {};
+                        self.handler.on_rx_frame(rx_data, if false {Option::Some(dummy)} else {Option::None});
+                    } else {
+                        drop(tx_future);
+                        tx_future = DontAbort::hang();
+                        let mut txlen = Option::None;
+
+                        self.handler.on_rx_frame(rx_data, Option::Some(|txdata: &[u8]| {
+                            tx_data[0] = (txdata.len() - 1) as u32;
+                            for i in 0..txdata.len() {
+                                let x = txdata[i] & 0xff;
+                                let mut parity = 0;
+                                for j in 0..8 {
+                                    parity ^= x>>j;
+                                }
+                                tx_data[i + 1] = x as u32 | (((parity as u32) & 1) << 8);
+                            }
+                            txlen = Option::Some(txdata.len());
+                        }));
+                        if let Option::Some(len) = txlen {
+                            //FIXME Wait for idle and then wait again from `wait_idle` before sending. We should have a gap of 3.5 chars.
+                            tx_future = DontAbort::new(sm1.tx().dma_push(dma_tx_ref.reborrow(),
+                                &tx_data[0..(len+1)]), HangIfReused);
+                            tx_in_progress = true;
+                        }
+                    }
                 },
                 _ => {
                 },
-- 
GitLab