Skip to content
Snippets Groups Projects
Commit 0fac8cb0 authored by Jake Read's avatar Jake Read
Browse files

multiprocessing results

parent a78ad065
No related branches found
No related tags found
No related merge requests found
Showing
with 180 additions and 0 deletions
......@@ -90,6 +90,17 @@ For future-architecture, my assumption is that I would do something like... one
So, I presume this will be a lot heavier-handed programming wise, I'll put a stake down before carrying on. Neil also reminded me that Urumbu has implemented multiprocessing, so I should take a look at that as well.
So, here's the spin-up multiproc:
![01](images/2023-12-27_multi-01.png)
![02](images/2023-12-27_multi-02.png)
![03](images/2023-12-27_multi-03.png)
![04](images/2023-12-27_multi-04.png)
So - clearly the winner actually, althought the gains are small. We see tighter distributions, no fly-aways at the right side of the plot, and consistently a-little-bit-faster reception.
That's about enough of this for me, then - it will be multiprocessing with queues, that should be simple enough to spin up. I will get on with an ethernet test suite next, and then on to UART/PIO links, which I suspect will be (surprising!) faster than these USB links.
### Due Dilligence on Serial Sources
The underlying 0.4MBit/s is pretty underwhelming, I should:
......
from cobs import cobs
import serial
# since multiproc takes functions as targets,
# we'll org like this:
def serial_process(port, queue):
try:
ser = serial.Serial(port, baudrate=115200, timeout = 1)
buffer = bytearray()
while True:
byte = ser.read(1)
if not byte:
continue
if byte == b"\x00":
if len(buffer) > 0:
data = cobs.decode(buffer)
buffer = bytearray()
queue.put(data)
else:
continue
else:
buffer += byte
except serial.SerialException as e:
print(f"Serial exception on {port}: {e}")
# class CobsUsbSerial:
# def __init__(self, port, baudrate=115200):
# self.port = port
# self.ser = serial.Serial(port, baudrate=baudrate, timeout=1)
# self.buffer = bytearray()
# def write(self, data: bytes):
# data_enc = cobs.encode(data) + b"\x00"
# self.ser.write(data_enc)
# def read(self):
# byte = self.ser.read(1)
# if not byte:
# return
# if byte == b"\x00":
# if len(self.buffer) > 0:
# data = cobs.decode(self.buffer)
# self.buffer = bytearray()
# return data
# else:
# return
# else:
# self.buffer += byte
# async def attach(self, rx_func):
# while True:
# bts = self.read()
# if bts:
# rx_func(bts)
# await asyncio.sleep(0)
from cobs_usb_serial_multi import serial_process
from plot_stamps import plot_stamps
import struct, multiprocessing, time
import numpy as np
stamp_count = 1000
pck_len = 128
if __name__ == "__main__":
ports = ["COM23", "COM31", "COM33", "COM36"]
queues = [multiprocessing.Queue() for _ in ports]
processes = [multiprocessing.Process(target=serial_process, args=(port, queue)) for port, queue in zip(ports, queues)]
stamps = [np.zeros(stamp_count) for _ in ports]
stamps_lengths = [0 for _ in ports]
plot_states = [False for _ in ports]
for p in processes:
p.start()
try:
# infinite loop over process-comm objects;
while True:
for q, queue in enumerate(queues):
if not queue.empty():
data = queue.get()
if len(data) == pck_len:
if(stamps_lengths[q] >= stamp_count):
continue
# unpack each as an integer stamp and store in stamps
stamp = struct.unpack("=I", data[:4])[0]
stamps[q][stamps_lengths[q]] = stamp
stamps_lengths[q] += 1
# plot, if the list is done:
if stamps_lengths[q] >= stamp_count and not plot_states[q]:
plot_states[q] = True
plot_stamps(stamps[q], stamp_count, pck_len)
# to let the processor chill for 1us
time.sleep(0.000001)
except KeyboardInterrupt:
print("halting...")
for p in processes:
p.terminate()
p.join()
import pandas as pd
import matplotlib.pyplot as plt
def plot_stamps(stamps, stamp_count, pck_len):
# make df from stamps
df = pd.DataFrame({'timestamps': stamps})
# calculate deltas between stamps
df['deltas'] = df['timestamps'].diff()
# clean NaN's
df = df.dropna()
# wipe obviously-wrong deltas (i.e. the 1st, which goes 0-start-us)
df = df[df['deltas'] < 100000]
# Plotting
fig, ax1 = plt.subplots(figsize=(11, 3))
ax1.set_xlim([1750, 2750])
# Primary x-axis (time deltas)
df['deltas'].plot(kind='hist', bins=100, ax=ax1)
ax1.set_xlabel('Time-Stamp Deltas (us) and equivalent (MBits/s)')
ax1.set_ylabel(f'Frequency (of {stamp_count})')
# get axis ticks to calculate equivalent bandwidths
x_ticks = ax1.get_xticks()
ax1.set_xticks(x_ticks)
bandwidths = [((pck_len * 8) * (1e6 / x)) / 1e6 for x in x_ticks]
ticks = []
for i in range(len(x_ticks)):
print(i, x_ticks[i], bandwidths[i])
ticks.append(f"{x_ticks[i]:.0f} ({bandwidths[i]:.3f})")
ax1.set_xticklabels(ticks)
plt.title(f'Single-Source COBS Data Sink Deltas, pck_len={pck_len}')
plt.tight_layout()
plt.show()
\ No newline at end of file
import serial.tools.list_ports
def list_serial_ports():
ports = serial.tools.list_ports.comports()
for port in ports:
print(f"Port: {port.device}")
print(f" - Description: {port.description}")
if port.serial_number:
print(f" - Serial Number: {port.serial_number}")
if port.manufacturer:
print(f" - Manufacturer: {port.manufacturer}")
if port.product:
print(f" - Product: {port.product}")
if port.vid is not None:
print(f" - VID: {port.vid:04X}")
if port.pid is not None:
print(f" - PID: {port.pid:04X}")
print()
list_serial_ports()
images/2023-12-27_multi-01.png

28.4 KiB

images/2023-12-27_multi-02.png

27.6 KiB

images/2023-12-27_multi-03.png

29 KiB

images/2023-12-27_multi-04.png

28.4 KiB

images/2023-12-27_multi-test.png

27.6 KiB

images/2023-12-27_multi-unp-01.png

29.4 KiB

images/2023-12-27_multi-unp-02.png

28.9 KiB

images/2023-12-27_multi-unp-03.png

29.4 KiB

images/2023-12-27_multi-unp-04.png

29.4 KiB

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment