This repository has been archived on 2021-08-17. You can view files and clone it, but cannot push or open issues or pull requests.
libdagon/dagon_planner/planner.py

34 lines
642 B
Python

import cmds
from serial import Serial
class InUseException(Exception):
def __str__(self):
return f"{self.chs} is already used in plan"
def __init__(self, chs):
self.chs = chs
class Planner:
def exec(self):
for c in self.cmds:
s = c.exec()
if s is not None:
self.ser.write(bytes(s, "UTF-8"))
def add(self, cmd):
acc = cmd.accessing()
if len(acc) > 0:
conflict = self.used & acc
if len(conflict) > 0:
raise InUseException(conflict)
if cmd.is_async:
self.used |= acc
self.cmds.append(cmd)
def __init__(self):
self.cmds = []
self.used = set()
self.waiting_on = dict()
self.ser = Serial()