Skip to content
Snippets Groups Projects
Commit 8629c5be authored by Jake Read's avatar Jake Read
Browse files

asyncio and blocking multi-device tests

parent 94b45ee4
No related branches found
No related tags found
No related merge requests found
Showing
with 436 additions and 90 deletions
......@@ -32,12 +32,63 @@ This is also not accounting for i.e. multiple devices, flow control, flow going
This also doesn't properly inspect whether / not there is significant performance dings due to i.e. cobs, which is [some looping python, anyways](https://github.com/cmcqueen/cobs-python/blob/main/src/cobs/cobs/_cobs_py.py) - so, maybe there is real evidence that we want to i.e. ethernet to the first thing, etc.
## 2023 12 20
## 2023 12 21
So, we want to... consume and produce data, as fast as possible in either direction, and open multiple ports.
I think that the multiple ports thing is going to teach me what I want to know about asyncio, and is simple enough that I can try to replicate it with multiprocessing as well.
... we're into that, we just need to run 'em both and plot two-ups, this should be simple next step, then compare to multiprocessing !
## 2023 12 27
OK, first let's test multi-sink using blocking codes. I'll modify the original to not "read-until" but instead consume bytes at a time, then simply open two devices and develop another plot.
### Multi-Device, Blocking
OK: current tech, here's one device with ye old' blocking code:
![blocking](images/2023-12-27_blocking-1-device.png)
Now let's get two up, using this blocking code, no hub:
![one](images/2023-12-27_blocking-2-devices-01.png)
![two](images/2023-12-27_blocking-2-devices-02.png)
With a hub, **note that some packets (in this sample of just 1000) land out of the normal distribution, up above 4500us per-packet-delay!**
![one](images/2023-12-27_blocking-2-hub-devices-01.png)
![two](images/2023-12-27_blocking-2-hub-devices-02.png)
and four devices on the hub, where **we catch a few more long-tail slow packets**
![01](images/2023-12-27_blocking-4-devices-01.png)
![02](images/2023-12-27_blocking-4-devices-02.png)
![03](images/2023-12-27_blocking-4-devices-03.png)
![04](images/2023-12-27_blocking-4-devices-04.png)
So, it seems basically that we are capped around 0.4MBit/sec in all of these scenarios, but introducing the hub sometimes casues packets to be delayed roughly 2x their normal delivery time. I figure that the hub is doing some switching / buffering that leads to this outcome...
### Multi-Device, Asyncio
"under the hood" asyncio should have basically the same performance as the blocking codes above, but I should test a prototype, if no other reason than to understand the design patterns.
![01](images/2023-12-27_asyncio-4-devices-01.png)
![02](images/2023-12-27_asyncio-4-devices-02.png)
![03](images/2023-12-27_asyncio-4-devices-03.png)
![04](images/2023-12-27_asyncio-4-devices-04.png)
So, once again no difference here **and** we still see some stragglers on the third and fourth plots.
### Multi-Device, Multiprocessing
To finish running through these tests, I want to try multi-processing which is true parallellism...
There's a nice natural alignment with this and the rest of these systems (which are all serialized-codes to begin with), so it might be that this approach just suits: it allows us to carry on doing comms-stuff (like not missing acks and timeouts) while users write potentially blocking-codes (which seems to be semi-common in python).
For future-architecture, my assumption is that I would do something like... one process does oversight, then we build one process per link layer, and probably even one per port, with user-application code going somewhere else.
So, I presume this will be a lot heavier-handed programming wise, I'll put a stake down before carrying on.
---
......
from cobs_usb_serial import CobsUsbSerial
import asyncio
import struct
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
pck_len = 32
async def read_ser(cobs_serial, rx_func):
while True:
data = cobs_serial.read()
if len(data) == pck_len:
stamp = struct.unpack('=I', data[:4])
rx_func(stamp)
await asyncio.sleep(0)
def port_handler(stamp):
print(stamp)
async def main():
port_one = CobsUsbSerial("COM23")
task_one = asyncio.create_task(read_ser(port_one, port_handler))
await asyncio.gather(task_one)
asyncio.run(main())
# from cobs_usb_serial import CobsUsbSerial
# import struct
# import numpy as np
# import pandas as pd
# import matplotlib.pyplot as plt
# ser = CobsUsbSerial("COM23")
# stamp_count = 1000
# pck_len = 250
# stamps = np.zeros(stamp_count)
# for i in range(stamp_count):
# bts = ser.read()
# if len(bts) == pck_len:
# stamp = struct.unpack('=I', bts[:4])
# stamps[i] = stamp[0]
# print("stamps, ", stamps)
# df = pd.DataFrame({'timestamps': stamps})
# df['deltas'] = df['timestamps'].diff()
# # clean NaN's
# df = df.dropna()
# # wipe obviously-wrong deltas (i.e. the 1st, which goes 0-start-us)
# df = df[df['deltas'] < 100000]
# # Plotting
# fig, ax1 = plt.subplots(figsize=(11, 5))
# # Primary x-axis (time deltas)
# df['deltas'].plot(kind='hist', bins=100, ax=ax1)
# ax1.set_xlabel('Time-Stamp Deltas (us)')
# ax1.set_ylabel(f'Frequency (of {stamp_count})')
# # Secondary x-axis (bandwidth)
# ax2 = ax1.twiny()
# ax2.set_xlabel('Estimated Bandwidth (Mbits/s)')
# # Set the limits of the secondary axis based on the primary axis
# # new_tick_locations = np.linspace(df['deltas'].min(), df['deltas'].max(), num=len(ax1.get_xticks()))
# # Convert tick locations to bandwidth
# # bandwidths = [(pck_len * 8) * (1e6 / x) for x in new_tick_locations]
# x_ticks = ax1.get_xticks()
# bandwidth_ticks = [((pck_len * 8) * (1e6 / x)) / 1e6 for x in x_ticks]
# ax2.set_xlim(max(bandwidth_ticks), min(bandwidth_ticks))
# plt.title(f'Single-Source COBS Data Sink Deltas, pck_len={pck_len}')
# plt.tight_layout()
# plt.show()
import asyncio
from cobs import cobs
import serial
class CobsUsbSerial:
def __init__(self, port, baudrate=115200):
self.port = port
self.ser = serial.Serial(port, baudrate=baudrate, timeout=1)
self.buffer = bytearray()
def write(self, data: bytes):
data_enc = cobs.encode(data) + b"\x00"
self.ser.write(data_enc)
def read(self):
byte = self.ser.read(1)
if not byte:
return
if byte == b"\x00":
if len(self.buffer) > 0:
data = cobs.decode(self.buffer)
self.buffer = bytearray()
return data
else:
return
else:
self.buffer += byte
async def attach(self, rx_func):
while True:
bts = self.read()
if bts:
rx_func(bts)
await asyncio.sleep(0)
from cobs_usb_serial_async import CobsUsbSerial
import asyncio
import struct
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
stamp_count = 1000
pck_len = 128
stamps_one = np.zeros(stamp_count)
counter_one = 0
plot_one = False
stamps_two = np.zeros(stamp_count)
counter_two = 0
plot_two = False
stamps_three = np.zeros(stamp_count)
counter_three = 0
plot_three = False
stamps_four = np.zeros(stamp_count)
counter_four = 0
plot_four = False
def plot_stamps(stamps):
# make df from stamps
df = pd.DataFrame({'timestamps': stamps})
# calculate deltas between stamps
df['deltas'] = df['timestamps'].diff()
# clean NaN's
df = df.dropna()
# wipe obviously-wrong deltas (i.e. the 1st, which goes 0-start-us)
df = df[df['deltas'] < 100000]
# Plotting
fig, ax1 = plt.subplots(figsize=(11, 3))
ax1.set_xlim([1750, 4750])
# Primary x-axis (time deltas)
df['deltas'].plot(kind='hist', bins=100, ax=ax1)
ax1.set_xlabel('Time-Stamp Deltas (us) and equivalent (MBits/s)')
ax1.set_ylabel(f'Frequency (of {stamp_count})')
# get axis ticks to calculate equivalent bandwidths
x_ticks = ax1.get_xticks()
ax1.set_xticks(x_ticks)
bandwidths = [((pck_len * 8) * (1e6 / x)) / 1e6 for x in x_ticks]
ticks = []
for i in range(len(x_ticks)):
print(i, x_ticks[i], bandwidths[i])
ticks.append(f"{x_ticks[i]:.0f} ({bandwidths[i]:.3f})")
ax1.set_xticklabels(ticks)
plt.title(f'Single-Source COBS Data Sink Deltas, pck_len={pck_len}')
plt.tight_layout()
plt.show()
def cycle(data, stamps, counter, plot):
if counter >= stamp_count:
if not plot:
plot = True
plot_stamps(stamps)
return counter, plot
if len(data) == pck_len:
stamp = struct.unpack("=I", data[:4])
stamps[counter] = stamp[0]
counter += 1
return counter, plot
def port_one_handler(data):
global counter_one, plot_one
counter_one, plot_one = cycle(data, stamps_one, counter_one, plot_one)
def port_two_handler(data):
global counter_two, plot_two
counter_two, plot_two = cycle(data, stamps_two, counter_two, plot_two)
def port_three_handler(data):
global counter_three, plot_three
counter_three, plot_three = cycle(data, stamps_three, counter_three, plot_three)
def port_four_handler(data):
global counter_four, plot_four
counter_four, plot_four = cycle(data, stamps_four, counter_four, plot_four)
async def main():
port_one = CobsUsbSerial("COM23")
port_two = CobsUsbSerial("COM31")
port_three = CobsUsbSerial("COM33")
port_four = CobsUsbSerial("COM36")
task_one = asyncio.create_task(port_one.attach(port_one_handler))
task_two = asyncio.create_task(port_two.attach(port_two_handler))
task_three = asyncio.create_task(port_three.attach(port_three_handler))
task_four = asyncio.create_task(port_four.attach(port_four_handler))
await asyncio.gather(task_one, task_two, task_three, task_four)
asyncio.run(main())
\ No newline at end of file
from cobs_usb_serial import CobsUsbSerial
import struct
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
ser_one = CobsUsbSerial("COM23")
ser_two = CobsUsbSerial("COM31")
ser_three = CobsUsbSerial("COM33")
ser_four = CobsUsbSerial("COM35")
stamp_count = 1000
pck_len = 128
stamps_one = np.zeros(stamp_count)
counter_one = 0
stamps_two = np.zeros(stamp_count)
counter_two = 0
stamps_three = np.zeros(stamp_count)
counter_three = 0
stamps_four = np.zeros(stamp_count)
counter_four = 0
def cycle(ser, stamps, counter):
if counter >= stamp_count:
return counter
bts = ser.read()
if bts:
if len(bts) == pck_len:
stamp = struct.unpack("=I", bts[:4])
stamps[counter] = stamp[0]
counter += 1
return counter
while True:
counter_one = cycle(ser_one, stamps_one, counter_one)
counter_two = cycle(ser_two, stamps_two, counter_two)
counter_three = cycle(ser_three, stamps_three, counter_three)
counter_four = cycle(ser_four, stamps_four, counter_four)
if counter_one == stamp_count and counter_two == stamp_count and counter_three == stamp_count and counter_four == stamp_count:
break
# print("stamps, ", stamps_one, stamps_two)
def plot_stamps(stamps):
# make df from stamps
df = pd.DataFrame({'timestamps': stamps})
# calculate deltas between stamps
df['deltas'] = df['timestamps'].diff()
# clean NaN's
df = df.dropna()
# wipe obviously-wrong deltas (i.e. the 1st, which goes 0-start-us)
df = df[df['deltas'] < 100000]
# Plotting
fig, ax1 = plt.subplots(figsize=(11, 3))
ax1.set_xlim([1750, 4750])
# Primary x-axis (time deltas)
df['deltas'].plot(kind='hist', bins=100, ax=ax1)
ax1.set_xlabel('Time-Stamp Deltas (us) and equivalent (MBits/s)')
ax1.set_ylabel(f'Frequency (of {stamp_count})')
# get axis ticks to calculate equivalent bandwidths
x_ticks = ax1.get_xticks()
ax1.set_xticks(x_ticks)
bandwidths = [((pck_len * 8) * (1e6 / x)) / 1e6 for x in x_ticks]
ticks = []
for i in range(len(x_ticks)):
print(i, x_ticks[i], bandwidths[i])
ticks.append(f"{x_ticks[i]:.0f} ({bandwidths[i]:.3f})")
ax1.set_xticklabels(ticks)
plt.title(f'Single-Source COBS Data Sink Deltas, pck_len={pck_len}')
plt.tight_layout()
plt.show()
plot_stamps(stamps_one)
plot_stamps(stamps_two)
plot_stamps(stamps_three)
plot_stamps(stamps_four)
\ No newline at end of file
......@@ -6,12 +6,25 @@ class CobsUsbSerial:
def __init__(self, port, baudrate=115200):
self.port = port
self.ser = serial.Serial(port, baudrate=baudrate, timeout=1)
self.buffer = bytearray()
def write(self, data: bytes):
data_enc = cobs.encode(data) + b"\x00"
self.ser.write(data_enc)
def read(self):
data_enc = self.ser.read_until(b"\x00")
data = cobs.decode(data_enc[:-1])
byte = self.ser.read(1)
if not byte:
return
if byte == b"\x00":
if len(self.buffer) > 0:
data = cobs.decode(self.buffer)
self.buffer = bytearray()
return data
else:
return
else:
self.buffer += byte
# data_enc = self.ser.read_until(b"\x00")
# data = cobs.decode(data_enc[:-1])
# return data
from cobs_usb_serial import CobsUsbSerial
import struct
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
ser_one = CobsUsbSerial("COM23")
ser_two = CobsUsbSerial("COM31")
ser_three = CobsUsbSerial("COM33")
ser_four = CobsUsbSerial("COM35")
stamp_count = 1000
pck_len = 128
stamps_one = np.zeros(stamp_count)
counter_one = 0
stamps_two = np.zeros(stamp_count)
counter_two = 0
stamps_three = np.zeros(stamp_count)
counter_three = 0
stamps_four = np.zeros(stamp_count)
counter_four = 0
def cycle(ser, stamps, counter):
if counter >= stamp_count:
return counter
bts = ser.read()
if bts:
if len(bts) == pck_len:
stamp = struct.unpack("=I", bts[:4])
stamps[counter] = stamp[0]
counter += 1
return counter
while True:
counter_one = cycle(ser_one, stamps_one, counter_one)
counter_two = cycle(ser_two, stamps_two, counter_two)
counter_three = cycle(ser_three, stamps_three, counter_three)
counter_four = cycle(ser_four, stamps_four, counter_four)
if counter_one == stamp_count and counter_two == stamp_count and counter_three == stamp_count and counter_four == stamp_count:
break
# print("stamps, ", stamps_one, stamps_two)
def plot_stamps(stamps):
# make df from stamps
df = pd.DataFrame({'timestamps': stamps})
# calculate deltas between stamps
df['deltas'] = df['timestamps'].diff()
# clean NaN's
df = df.dropna()
# wipe obviously-wrong deltas (i.e. the 1st, which goes 0-start-us)
df = df[df['deltas'] < 100000]
# Plotting
fig, ax1 = plt.subplots(figsize=(11, 3))
ax1.set_xlim([1750, 4750])
# Primary x-axis (time deltas)
df['deltas'].plot(kind='hist', bins=100, ax=ax1)
ax1.set_xlabel('Time-Stamp Deltas (us) and equivalent (MBits/s)')
ax1.set_ylabel(f'Frequency (of {stamp_count})')
# get axis ticks to calculate equivalent bandwidths
x_ticks = ax1.get_xticks()
ax1.set_xticks(x_ticks)
bandwidths = [((pck_len * 8) * (1e6 / x)) / 1e6 for x in x_ticks]
ticks = []
for i in range(len(x_ticks)):
print(i, x_ticks[i], bandwidths[i])
ticks.append(f"{x_ticks[i]:.0f} ({bandwidths[i]:.3f})")
ax1.set_xticklabels(ticks)
plt.title(f'Single-Source COBS Data Sink Deltas, pck_len={pck_len}')
plt.tight_layout()
plt.show()
plot_stamps(stamps_one)
plot_stamps(stamps_two)
plot_stamps(stamps_three)
plot_stamps(stamps_four)
\ No newline at end of file
import serial.tools.list_ports
def list_serial_ports():
ports = serial.tools.list_ports.comports()
for port in ports:
print(f"Port: {port.device}")
print(f" - Description: {port.description}")
if port.serial_number:
print(f" - Serial Number: {port.serial_number}")
if port.manufacturer:
print(f" - Manufacturer: {port.manufacturer}")
if port.product:
print(f" - Product: {port.product}")
if port.vid is not None:
print(f" - VID: {port.vid:04X}")
if port.pid is not None:
print(f" - PID: {port.pid:04X}")
print()
list_serial_ports()
......@@ -40,7 +40,7 @@ void loop() {
// tx a stamp AFAP
if(cobs.clearToSend()){
chunk.u = micros();
cobs.send(chunk.bytes, 250);
cobs.send(chunk.bytes, 128);
digitalWrite(PIN_LED_G, !digitalRead(PIN_LED_G));
}
// blink to see hangups
......
images/2023-12-27_asyncio-4-devices-01.png

28.1 KiB

images/2023-12-27_asyncio-4-devices-02.png

27.6 KiB

images/2023-12-27_asyncio-4-devices-03.png

29.2 KiB

images/2023-12-27_asyncio-4-devices-04.png

28.4 KiB

images/2023-12-27_blocking-1-device.png

27.6 KiB

images/2023-12-27_blocking-2-devices-01.png

29 KiB

images/2023-12-27_blocking-2-devices-02.png

27.7 KiB

images/2023-12-27_blocking-2-hub-devices-01.png

25.3 KiB

images/2023-12-27_blocking-2-hub-devices-02.png

25.2 KiB

images/2023-12-27_blocking-4-devices-01.png

28.1 KiB

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment