Asynchronous Stepper Motion

Hi,

I’m working on a project that requires being able to move a stepper mid-print in Klipper. For testing, I’m using a manual_stepper with the following extras module to test asynchronous motion.

class StepperTest:
    def __init__(self, config):
        self.printer = config.get_printer()
        self.reactor = self.printer.get_reactor()

        self.printer.register_event_handler('klippy:ready', self.handle_ready)

    def handle_ready(self):
        self.stepper = self.printer.lookup_object('manual_stepper test')
        self.reactor.register_timer(self._test_loop, self.reactor.monotonic() + 1)
    
    def _test_loop(self, eventtime=None):
        self.stepper.do_set_position(0)
        self.stepper.do_move(100, 100, 1000, sync=False)

        return eventtime + 3

def load_config(config):
    return StepperTest(config)

When the printer is idle, the stepper moves 100mm every 3s, as expected. However, during printing, the stepper takes much longer than 3s between moves. Eventually, Klipper errors out with a TTC.

For my “print”, I’m using a G-code to repeatedly move the extruder 5mm forward and backwards (min_extrude_temp: 0).

M83
G1 F9000
G1 E5
G1 E-5
...

Is there a proper way to asynchronously control stepper motors in Klipper?

Thanks in advance.

class StepperTest:
    def __init__(self, config):
        self.printer = config.get_printer()
        self.reactor = self.printer.get_reactor()
+       self.gcode = self.printer.lookup_object('gcode')
+       self.mutex = self.gcode.get_mutex()

        self.printer.register_event_handler('klippy:ready', self.handle_ready)

    def handle_ready(self):
        self.stepper = self.printer.lookup_object('manual_stepper test')
        self.reactor.register_timer(self._test_loop, self.reactor.monotonic() + 1)

    def _test_loop(self, eventtime=None):
+       with self.mutex:
            self.stepper.do_set_position(0)
            self.stepper.do_move(100, 100, 1000, sync=False)
            return eventtime + 3

def load_config(config):
    return StepperTest(config)

It looks like using gcode.mutex allows for mid-print moves. However, the G-code freezes for ~0.5s the first time this runs, and after that it the contents of the with block seem to execute in batches

Here’s what I mean. Expected behavior:

  1. Stepper moves
  2. Wait 3s
  3. Stepper moves
  4. Wait 3s
  5. Repeat

What actually happens mid-print:

  1. Wait
  2. Stepper moves
  3. Stepper moves
  4. Wait
  5. Stepper moves
  6. Stepper moves
  7. Repeat

EDIT: It seems like homing moves halt printer moves for the duration of the homing move.

The design of the Klipper software is based around the idea of “scheduling actions” in advance. This design has advantages and disadvantages. Advantages include being able to implement most logic on a general purpose host computer and being able to use multiple microcontrollers. Disadvantages include challenges responding to input with low latency and higher complexity code to generate a schedule.

At a very high-level I get the impression you are trying to take certain actions in the “now” instead of designing a “schedule of upcoming actions”. For what it is worth, I suspect this approach will be challenging and will result in an ongoing set of confusing errors.

All motor movements are intended to traverse through the lookahead queue (toolhead.py), the iterative solver ( chelper/itersolve.c ), the step synchronization system (chelper/steppersync.c), the serial queues (chelper/serialqueue.c), the mcu move queue ( src/basecmd.c ), and finally the steppers ( src/stepper.c ). This is why you need to take the gcode mutex prior to submitting movements - as it’s not valid to queue submit movements without coordinating with other users of those systems.

As you’ve noticed, however, there is only one user of the gcode mutex at any given time. So, if some long running command is running, then your code wont be running. This is the primary goal of the gcode mutex - only one user at a time.

It’s not entirely clear to me what your ultimate goal is, so it’s hard to give advice. If you really do want to run a stepper asynchronously every 3 seconds, then the way I’d probably tackle it is by utilizing toolhead.note_mcu_movequeue_activity() and mcu.register_flush_callback(). You can take a look at how klippy/extras/pwm_tool.py uses this mechanism. Note that this is a different way of looking at the problem - instead of trying to “move a motor now”, you’re telling the system that you have actions that are occuring in the near future (note_mcu_movequeue_activity()), then you hook into the system as it “advances its schedule time” (register_flush_callback()) to add in your own schedule of desired actions that are aligned with the current schedule time.

Maybe that helps a little,
-Kevin

Thank you for those tips!

While my system will utilize a sort of “lookahead”, the time before the action needs to be taken can vary significantly, so those actions do need to take priority over print moves.

Considering I’m working towards a sequence like:

  1. Home stepperA
  2. Move stepperA x mm
  3. Move stepperB (already homed)
  4. Move servo
  5. Home stepperA
  6. Move stepperA x mm

I think it would be best to stick with a synchronous system here, especially since I want real-time error handling.

I’m thinking a sequence like this would work better considering how complex it is to schedule tasks ahead of time.

  1. Pause print virtual_sdcard.do_pause()
  2. Lift Z and retract with gcode_move.move_with_transform()
  3. Perform actions
  4. Lower Z and unretract with gcode_move.move_with_transform()
  5. Resume print virtual_sdcard.do_resume()

Thank you!