Easily create system daemons (and programs) that use an email-like syntax for communicating between threaded and forked processes.
- PyPI →
pip install polycephaly
- Git repository
- Documentation
Hardware
Polycephaly reduces the barrier to entry for creating parallelized system services (specifically, for embedded systems and servers) which can be easily maintained by systemd and other init systems.
Examples of this module working simultaneously with threading and locking applied to precision hardware that’s defined by easy-to-understand JSON profiles that I’ve generated, which correlate to hardware vendors’ communications specifications:
def birth( self, *args, **kwargs ):
self.registerCallbacks(
connected = self.connected,
disconnected = self.disconnected,
boardPong = self.boardPong,
commsError = self.commsError,
deviceError = self.deviceError,
)
# Prepare keyword arguments that are passed from the main program.
self.setupGlobalKwargs()
# Add controller's JSON profile to each mapped device.
self.setupDeviceProfiles()
# Setup and run related device threads (e.g. communications and lighting.)
self.launchThreads()
# Wait until device path is found by analzying the USB bus before proceeding.
self.waitForBoardPath()
# Turn on board's green LED
self.device( self.getDeviceID( 'microcontroller' ) ).led( True )
# Set Servo ID 7 to spin counter-clockwise as fast as possible, and abruptly change direction and speed.
with self.device( 7 ) as device:
device.spin( -1023 )
time.sleep( 5 )
device.spin( 100 )
pass # END METHOD : Birth
def _threadLeds( self, **kwargs ):
'''
Used for controlling LEDs of all connected devices for animations.
This simple thread iterates colors on all devices at the same time, and then reverses the color direction to reach the starting point, for a seamless transition.
'''
# Thread loop
while self.isActive():
for i in range( 1, 8 ):
self.multiDevice( range( 1, 8 ) ).led( i )
time.sleep( 0.1 )
pass # END FOR
for i in reversed( range( 0, 7 ) ):
self.multiDevice( range( 1, 8 ) ).led( i )
time.sleep( 0.1 )
time.sleep( 1 )
pass # END WHILE LOOP
pass # END Method : Thread LEDs