Hello everyone! I’m looking for guidance on integrating Flashforge’s proprietary load-cell module into Klipper.
Background
- Printer: Flashforge 5M
- Stock MCU: Notion N32G455 → Replaced with genuine STM32F103CBT6 & STM32F103RET6
(now flashes mainline Klipper flawlessly)
Flashforge added two sensors—
- Load-cell
- TVOC
…behind a closed MCU module that:
- Isn’t directly connected via UART to the host CPU
- Talks UART internally
- Emulates a Z-endstop signal( in case
Load-cell
sensor)
They also patched Klipper’s STM32 firmware so the mainboard thinks it’s talking to a normal spi temputure modules. The module’s UART lines are wired straight to the primary MCU UART pins.
The community has already reverse-engineered most of the firmware and hardware, so the MCU side is (mostly) covered. My task now is to implement the host-side driver in Klipper’s Python.
Original Klipper Config
[temperature_sensor Load_Cell]
sensor_type: MAX31856
sensor_pin: PD5
#spi_bus: spi4
spi_speed: 1000000
spi_software_sclk_pin: PD6
spi_software_mosi_pin: PD7
spi_software_miso_pin: PD8
min_temp: 0
max_temp: 2048
gcode_id: W
[temperature_sensor TVOC_Level]
sensor_type: MAX31865
sensor_pin: PD4
#spi_bus: spi4
#cs_pin: PD0
spi_speed: 500000
spi_software_sclk_pin: PE3
spi_software_mosi_pin: PE4
spi_software_miso_pin: PE5
min_temp: 0
max_temp: 2048
gcode_id: V
[output_pin _level_clear]
pin: PD10
value: 0
[gcode_button _check_level_pin]
pin: !PE0
press_gcode:
G4 P0
[output_pin _level_h1]
pin: PG2
value: 0
[output_pin _level_h2]
pin: PG3
value: 0
[output_pin _level_h3]
pin: PG4
value: 0
Note: Although pins
PG2
,PG3
, andPG4
are defined, they actually emulate commands that are sent over UART5 by MCU.
What I’ve Done
- Replaced the Notions MCU chips with STM32
- Written a (messy) prototype of the MCU-side code
MCU-Side prototype code
#include "autoconf.h"
#include "board/gpio.h"
#include "board/irq.h"
#include "board/misc.h"
#include "board/armcm_boot.h"
#include "command.h"
#include <stdint.h>
#include <string.h> // for memset
#include "sched.h"
#include "internal.h"
#define UARTx UART5
#define UARTx_IRQn UART5_IRQn
#define GPIO_AF_MODE 7
#define GPIO_Rx GPIO('D',2)
#define GPIO_Tx GPIO('C',12)
#define BAUDRATE 9600U
#define POLL_INTERVAL_US 200000U
#define TASK_INTERVAL_US 50000U
#define RXBUF_SIZE 128
#define TXBUF_SIZE 64
static const uint8_t CMD_H1[] = { 'H','1',' ','\0','\0','\0','\0','\0','\0' };
static const uint8_t CMD_H2[] = { 'H','2',' ','S','5','0','0',' ','\0' };
static const uint8_t CMD_H3[] = { 'H','3',' ','S','2','0','0',' ','\0' };
static const uint8_t CMD_H7[] = { 'H','7',' ','\0','\0','\0','\0','\0','\0' };
struct flashforge_loadcell_state {
struct timer timer;
uint8_t rxbuf[RXBUF_SIZE];
volatile uint16_t rx_head, rx_tail;
uint8_t txbuf[TXBUF_SIZE];
volatile uint16_t tx_head, tx_tail;
uint32_t last_poll;
volatile uint8_t line_ready;
volatile uint8_t rx_overflow;
};
static struct flashforge_loadcell_state state;
// Task wake-up token
static struct task_wake loadcell_wake;
// Custom atoi-like function
static int16_t
custom_atoi(const char *s)
{
int16_t res = 0;
int sign = 1;
if (*s == '-') {
sign = -1;
s++;
} else if (*s == '+') {
s++;
}
for (; *s != '\0'; s++) {
if (*s >= '0' && *s <= '9') {
res = res * 10 + (*s - '0');
} else {
break;
}
}
return res * sign;
}
// Simple strtok-like function
static char*
simple_strtok(char* str, const char* delim, char** saveptr)
{
char* token;
if (str == NULL) {
str = *saveptr;
}
// Skip leading delimiters
while (*str != '\0' && (strchr(delim, *str) != NULL)) {
str++;
}
if (*str == '\0') {
*saveptr = NULL;
return NULL;
}
token = str;
// Find end of token
while (*str != '\0' && (strchr(delim, *str) == NULL)) {
str++;
}
if (*str != '\0') {
*str = '\0';
*saveptr = str + 1;
} else {
*saveptr = NULL;
}
return token;
}
// ISR: read DR, fill buffer, set flags and wake the task
void UARTx_IRQHandler(void) {
uint32_t sr = UARTx->SR;
if (sr & USART_SR_ORE) {
(void)UARTx->DR; // clear overrun error
}
if (sr & USART_SR_RXNE) {
uint8_t d = UARTx->DR;
uint16_t next = (state.rx_head + 1) % RXBUF_SIZE;
if (next != state.rx_tail) {
state.rxbuf[state.rx_head] = d;
state.rx_head = next;
if (d == '\n') {
state.line_ready = 1;
sched_wake_task(&loadcell_wake);
}
} else {
state.rx_overflow = 1;
sched_wake_task(&loadcell_wake);
}
}
if ((sr & USART_SR_TXE) && (UARTx->CR1 & USART_CR1_TXEIE)) {
if (state.tx_tail != state.tx_head) {
UARTx->DR = state.txbuf[state.tx_tail];
state.tx_tail = (state.tx_tail + 1) % TXBUF_SIZE;
} else {
UARTx->CR1 &= ~USART_CR1_TXEIE;
}
}
}
void flashforge_loadcell_uart_init(void) {
memset(&state, 0, sizeof(state));
loadcell_wake.wake = 0;
enable_pclock((uint32_t)UARTx);
uint32_t pclk = get_pclock_frequency((uint32_t)UARTx);
uint32_t div = DIV_ROUND_CLOSEST(pclk, BAUDRATE);
UARTx->BRR = ((div/16)<<USART_BRR_DIV_Mantissa_Pos)
| ((div%16)<<USART_BRR_DIV_Fraction_Pos);
UARTx->CR1 = USART_CR1_UE | USART_CR1_RE | USART_CR1_TE | USART_CR1_RXNEIE;
armcm_enable_irq(UARTx_IRQHandler, UARTx_IRQn, 1);
gpio_peripheral(GPIO_Rx, GPIO_FUNCTION(GPIO_AF_MODE), 1);
gpio_peripheral(GPIO_Tx, GPIO_FUNCTION(GPIO_AF_MODE), 0);
}
DECL_INIT(flashforge_loadcell_uart_init);
void flashforge_loadcell_send_cmd(const uint8_t *cmd, size_t len) {
irq_disable();
for (size_t i = 0; i < len; i++) {
uint16_t next = (state.tx_head + 1) % TXBUF_SIZE;
if (next != state.tx_tail) {
state.txbuf[state.tx_head] = cmd[i];
state.tx_head = next;
} else {
break;
}
}
if (state.tx_head != state.tx_tail) {
UARTx->CR1 |= USART_CR1_TXEIE;
}
irq_enable();
}
static void parse_line(void) {
irq_disable();
uint16_t head = state.rx_head;
uint16_t tail = state.rx_tail;
state.line_ready = 0;
irq_enable();
char line[RXBUF_SIZE];
uint16_t pos = tail;
size_t len = 0;
uint8_t found_nl = 0;
// Copy until newline or buffer end
while (pos != head && len < sizeof(line)-1) {
char c = state.rxbuf[pos];
line[len++] = c;
pos = (pos+1) % RXBUF_SIZE;
if (c == '\n') {
found_nl = 1;
break;
}
}
line[len] = '\0';
if (found_nl) {
irq_disable();
state.rx_tail = pos;
irq_enable();
char *tok;
char *saveptr = NULL;
tok = simple_strtok(line, " \t", &saveptr);
// Skip first three tokens
for (int i = 0; i < 3 && tok; i++) {
tok = simple_strtok(NULL, " \t", &saveptr);
}
if (tok) {
int16_t w = custom_atoi(tok);
if (w >= 0) {
sendf("flashforge_loadcell_weight_update value=%hu", (uint16_t)w);
} else {
sendf("flashforge_loadcell_parse_error line=%s", line);
}
} else {
sendf("flashforge_loadcell_parse_error line=%s", line);
}
}
}
// Timer callback: poll periodically. Interrupts are disabled.
// Only sends CMD_H7 on schedule and requests reschedule.
static uint_fast8_t flashforge_loadcell_timer_cb(struct timer *tm) {
uint32_t now = timer_read_time();
if ((uint32_t)(now - state.last_poll) >= POLL_INTERVAL_US) {
flashforge_loadcell_send_cmd(CMD_H7, sizeof(CMD_H7));
state.last_poll = now;
}
tm->waketime = now + timer_from_us(TASK_INTERVAL_US);
return SF_RESCHEDULE;
}
// Task: runs on wake (ISR) and may call sendf
void flashforge_loadcell_task(void) {
if (state.rx_overflow) {
state.rx_overflow = 0;
// Clear RX buffer to discard garbage
irq_disable();
state.rx_head = state.rx_tail;
irq_enable();
}
// Parse line if ready
if (sched_check_wake(&loadcell_wake) && state.line_ready) {
parse_line();
}
}
DECL_TASK(flashforge_loadcell_task);
// Start command: registers timer and resets state
void command_start_flashforge_loadcell(uint32_t *args) {
(void)args;
// Initialize fields before start
state.last_poll = timer_read_time();
state.rx_overflow = 0;
state.line_ready = 0;
loadcell_wake.wake = 0;
state.timer.func = flashforge_loadcell_timer_cb;
state.timer.waketime = timer_read_time() + timer_from_us(1000U);
sched_add_timer(&state.timer);
}
DECL_COMMAND(command_start_flashforge_loadcell, "flashforge_loadcell_start");
// H1/H2/H3/H7 commands
void command_flashforge_loadcell_h1(uint32_t *args) { (void)args; flashforge_loadcell_send_cmd(CMD_H1, sizeof(CMD_H1)); }
DECL_COMMAND(command_flashforge_loadcell_h1, "flashforge_loadcell_cmd_h1");
void command_flashforge_loadcell_h2(uint32_t *args) { (void)args; flashforge_loadcell_send_cmd(CMD_H2, sizeof(CMD_H2)); }
DECL_COMMAND(command_flashforge_loadcell_h2, "flashforge_loadcell_cmd_h2");
void command_flashforge_loadcell_h3(uint32_t *args) { (void)args; flashforge_loadcell_send_cmd(CMD_H3, sizeof(CMD_H3)); }
DECL_COMMAND(command_flashforge_loadcell_h3, "flashforge_loadcell_cmd_h3");
void command_flashforge_loadcell_h7(uint32_t *args) { (void)args; flashforge_loadcell_send_cmd(CMD_H7, sizeof(CMD_H7)); }
DECL_COMMAND(command_flashforge_loadcell_h7, "flashforge_loadcell_cmd_h7");
// Shutdown: disable UART and timer
void flashforge_loadcell_shutdown(void) {
UARTx->CR1 &= ~USART_CR1_UE;
sched_del_timer(&state.timer);
}
DECL_SHUTDOWN(flashforge_loadcell_shutdown);
My Question
How should I best implement the host-side in Klipper (Python)? I’m considering:
- Extending/modifying the existing
LoadCell
class - Writing a new module that speaks the module’s UART protocol
- Other approaches?
I know this will never land in Klipper’s mainline—so I plan to maintain it as a fork with solution.
Any advice on:
- Structuring the Python driver
- Integrating into Klipper’s config system
- Handling G-code hooks, polling threads, etc.
would be hugely appreciated!
Thank you in advance for your help.