Commit 655afdea authored by Chetan Sharma's avatar Chetan Sharma
Browse files

Polishing repo

parent eec2d671
......@@ -42,6 +42,8 @@ def F_exp_lin(conditions: Conditions, K_TC, K_te, K_RC, K_re, p, q):
return np.array([Fx, Fy])
# linear models, easier to fit and more simple
def T_lin(conditions, K_tc, K_te, K_rc, K_re):
D, W, _, _, endmill = conditions.unpack()
N, R, _, _, _ = endmill.unpack()
......@@ -60,6 +62,8 @@ def F_lin(conditions, K_tc, K_te, K_rc, K_re):
return np.array([Fx, Fy])
# linear models that account for spindle speed
def T_lin_full(conditions, K_tc, K_tc0, K_te, K_rc, K_rc0, K_re):
"""
Linear model that also attempts to account for variations in cutting force from spindle speeds. Assumes a linear (w/ intercept) relation between the two.
......@@ -73,16 +77,19 @@ def T_lin_full(conditions, K_tc, K_tc0, K_te, K_rc, K_rc0, K_re):
N, R, _, _, _ = endmill.unpack()
return (D*N*R*(K_te*np.arccos(1 - W/R) + (2*np.pi*K_tc0*W*f_r + 2*np.pi*K_tc*R*W*f_r*w)/(N*R*w)))/(2*np.pi)
def F_lin_full(conditions, K_tc, K_tc0, K_te, K_rc, K_rc0, K_re):
"""
Linear model that also attempts to account for variations in cutting force from spindle speeds. Assumes a linear (w/ intercept) relation between the two.
"""
D, W, f_r, w, endmill = conditions.unpack()
N, R, _, _, _ = endmill.unpack()
return [- ((D*K_re*N*R*W)/2 + (D*K_rc*R**3*f_r*np.pi*np.arccos((R - W)/R))/2 - (D*K_te*N*R*W**(1/2)*(2*R - W)**(1/2))/2 - (D*K_tc*R*W*f_r*np.pi*(2*R - W))/2 - (D*K_rc*R**2*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2 + (D*K_rc*R*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*np.pi) - ((D*K_rc0*R**2*f_r*np.pi*np.arccos((R - W)/R))/2 + (D*K_rc0*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2 - (D*K_tc0*W*f_r*np.pi*(2*R - W))/2 - (D*K_rc0*R*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*w*np.pi),
((D*K_te*N*R*W)/2 + (D*K_tc*R**3*f_r*np.pi*np.arccos((R - W)/R))/2 + (D*K_re*N*R*W**(1/2)*(2*R - W)**(1/2))/2 + (D*K_rc*R*W*f_r*np.pi*(2*R - W))/2 - (D*K_tc*R**2*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2 + (D*K_tc*R*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*np.pi) + ((D*K_tc0*R**2*f_r*np.pi*np.arccos((R - W)/R))/2 + (D*K_tc0*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2 + (D*K_rc0*W*f_r*np.pi*(2*R - W))/2 - (D*K_tc0*R*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*w*np.pi)]
return [- ((D*K_re*N*R*W)/2 + (D*K_rc*R**3*f_r*np.pi*np.arccos((R - W)/R))/2 - (D*K_te*N*R*W**(1/2)*(2*R - W)**(1/2))/2 - (D*K_tc*R*W*f_r*np.pi*(2*R - W))/2 - (D*K_rc*R**2*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2 + (D*K_rc*R*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*np.pi) - ((D*K_rc0*R**2*f_r*np.pi*np.arccos((R - W)/R))/2 + (D*K_rc0*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2 - (D*K_tc0*W*f_r*np.pi*(2*R - W))/2 - (D*K_rc0*R*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*w*np.pi),
((D*K_te*N*R*W)/2 + (D*K_tc*R**3*f_r*np.pi*np.arccos((R - W)/R))/2 + (D*K_re*N*R*W**(1/2)*(2*R - W)**(1/2))/2 + (D*K_rc*R*W*f_r*np.pi*(2*R - W))/2 - (D*K_tc*R**2*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2 + (D*K_tc*R*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*np.pi) + ((D*K_tc0*R**2*f_r*np.pi*np.arccos((R - W)/R))/2 + (D*K_tc0*W**(3/2)*f_r*np.pi*(2*R - W)**(1/2))/2 + (D*K_rc0*W*f_r*np.pi*(2*R - W))/2 - (D*K_tc0*R*W**(1/2)*f_r*np.pi*(2*R - W)**(1/2))/2)/(R**2*w*np.pi)]
# coefficient calculators for linear regression
def T_x_vector(conditions: Conditions):
"""
Outputs vector that corresponds to coefficients in order:
......@@ -94,6 +101,7 @@ def T_x_vector(conditions: Conditions):
return[(D * N * W * f_t) / (2 * np.pi), (D * N * R * np.arccos(1 - (W / R))) / (2 * np.pi)]
def T_x_vector_padded(conditions: Conditions):
"""
Outputs vector that corresponds to coefficients in order:
......@@ -106,23 +114,27 @@ def T_x_vector_padded(conditions: Conditions):
return[(D * N * W * f_t) / (2 * np.pi), (D * N * R * np.arccos(1 - (W / R))) / (2 * np.pi), 0, 0]
def Fy_x_vector(conditions: Conditions):
D, W, _, _, endmill = conditions.unpack()
N, R, _, _, _ = endmill.unpack()
f_t = calc_f_t(conditions)
return [(D*N*f_t*(W**(3/2)*(2*R - W)**(1/2) + R**2*np.arccos((R - W)/R) - R*W**(1/2)*(2*R - W)**(1/2)))/(4*R**2*np.pi), (D*N*W)/(2*R*np.pi), (D*N*W*f_t*(2*R - W))/(4*R**2*np.pi), (D*N*W**(1/2)*(2*R - W)**(1/2))/(2*R*np.pi)]
def T_x_vector_full(conditions: Conditions):
D, W, f_r, w, endmill = conditions.unpack()
N, R, _, _, _ = endmill.unpack()
return [ D*R*W*f_r, (D*W*f_r)/w, (D*N*R*np.arccos(1 - W/R))/(2*np.pi), 0, 0, 0]
return [D*R*W*f_r, (D*W*f_r)/w, (D*N*R*np.arccos(1 - W/R))/(2*np.pi), 0, 0, 0]
def Fy_x_vector_full(conditions: Conditions):
D, W, f_r, w, endmill = conditions.unpack()
N, R, _, _, _ = endmill.unpack()
return [ (D*f_r*(W**(3/2)*(2*R - W)**(1/2) + R**2*np.arccos((R - W)/R) - R*W**(1/2)*(2*R - W)**(1/2)))/(2*R), (D*f_r*(W**(3/2)*(2*R - W)**(1/2) + R**2*np.arccos((R - W)/R) - R*W**(1/2)*(2*R - W)**(1/2)))/(2*R**2*w), (D*N*W)/(2*R*np.pi), (D*W*f_r*(2*R - W))/(2*R), (D*W*f_r*(2*R - W))/(2*R**2*w), (D*N*W**(1/2)*(2*R - W)**(1/2))/(2*R*np.pi)]
return [(D*f_r*(W**(3/2)*(2*R - W)**(1/2) + R**2*np.arccos((R - W)/R) - R*W**(1/2)*(2*R - W)**(1/2)))/(2*R), (D*f_r*(W**(3/2)*(2*R - W)**(1/2) + R**2*np.arccos((R - W)/R) - R*W**(1/2)*(2*R - W)**(1/2)))/(2*R**2*w), (D*N*W)/(2*R*np.pi), (D*W*f_r*(2*R - W))/(2*R), (D*W*f_r*(2*R - W))/(2*R**2*w), (D*N*W**(1/2)*(2*R - W)**(1/2))/(2*R*np.pi)]
def deflection_load(D_a, prediction: Prediction, machinechar : MachineChar, E_e=650e9):
def deflection_load(D_a, prediction: Prediction, machinechar: MachineChar, E_e=650e9):
"""
Calculates ratio of tool deflection to allowed deflection.
Uses FEA model described in doi:10.1016/j.ijmachtools.2005.09.009.
......@@ -165,7 +177,7 @@ def deflection_load(D_a, prediction: Prediction, machinechar : MachineChar, E_e=
# calculate tool bending
D_t = C * (Fy / E) * ((l_1 ** 3 / d_1 ** 4) +
((l_2 ** 3 - l_1 ** 3) / d_2 ** 4)) ** G * 1e-3
((l_2 ** 3 - l_1 ** 3) / d_2 ** 4)) ** G * 1e-3
# calculate machine deflection
D_m = Fy / K_machine
......@@ -173,7 +185,7 @@ def deflection_load(D_a, prediction: Prediction, machinechar : MachineChar, E_e=
return (D_t + D_m) / D_a
def deflection_load_simple(prediction: Prediction, machinechar : MachineChar, E_e = 600e9, a_c = 0.8):
def deflection_load_simple(prediction: Prediction, machinechar: MachineChar, E_e=600e9, a_c=0.8):
"""
Calculates ratio of deflection to allowable deflection.
Uses simple approximation that assumes endmill cutter can be approx. as 0.8 of its diameter.
......@@ -199,13 +211,16 @@ def deflection_load_simple(prediction: Prediction, machinechar : MachineChar, E_
I_ceq = np.pi / 4 * r_ceq ** 4 # equivalent cutter 2nd inertia
I_s = np.pi / 4 * r_s ** 4 # shank 2nd inertia
M_s = F_y * (l_c - D / 2) # find moment at end of shank
D_s = (M_s * l_s ** 2) / (2 * E_e * I_s) + (F_y * l_s ** 3) / (3 * E_e * I_s) # deflection of shank from moment and force
theta_s = (M_s * l_s) / (E_e * I_s) # slope at end of shank
D_c = (F_y * (l_c - D / 2) ** 2 * (3 * l_c - (l_c - D / 2))) / (6 * E_e * I_ceq) + D_s + theta_s * l_c # deflection at cutter tip from everything
M_s = F_y * (l_c - D / 2) # find moment at end of shank
D_s = (M_s * l_s ** 2) / (2 * E_e * I_s) + (F_y * l_s ** 3) / \
(3 * E_e * I_s) # deflection of shank from moment and force
theta_s = (M_s * l_s) / (E_e * I_s) # slope at end of shank
D_c = (F_y * (l_c - D / 2) ** 2 * (3 * l_c - (l_c - D / 2))) / (6 * E_e *
I_ceq) + D_s + theta_s * l_c # deflection at cutter tip from everything
return (D_m + D_c) / D_a
def failure_prob_milling(prediction: Prediction, m=4, o=1500e6, a_c=0.8):
"""
Calculates failure probability according to Weibull distribution. Method adapted from https://doi.org/10.1016/S0007-8506(07)62072-1
......@@ -257,14 +272,12 @@ def motor_torque_load(prediction: Prediction, machinechar: MachineChar):
Returns:
Ratio of current motor torque to max achievable torque
"""
_, _, _, w, _, T, _ = prediction.unpack()
r_e, K_T, R_w, V_max, I_max, T_nom, _, _, _ = machinechar.unpack()
w_m = w * r_e
_, _, _, _, _, T, _ = prediction.unpack()
r_e, K_T, _, _, I_max, T_nom, _, _, _ = machinechar.unpack()
T_m = (T + T_nom) / r_e
# store K_V for convenience
K_V = 1 / K_T
# max torque is either determined by max current that can be supplied or max current achievable given winding resistance and whatnot
return T_m / min(K_T * I_max, (V_max - K_V * w_m) / R_w)
# max torque is either determined by max current that can be supplied
return T_m / (K_T * I_max)
def motor_speed_load(prediction: Prediction, machinechar: MachineChar):
......
......@@ -98,7 +98,7 @@ class Conditions:
"""
return [self.D, self.W, self.f_r, self.w, self.endmill]
def compromise(self, other, pref_to_other : float):
def compromise(self, other, pref_to_other: float):
"""
Returns a linear interpolation between this condition and another (as decided by preference_to_other).
Endmill doesn't change.
......@@ -109,7 +109,8 @@ class Conditions:
"""
self_conditions = self.unpack()[:-1]
other_conditions = other.unpack()[:-1]
between = [s * (1-pref_to_other) + o * pref_to_other for s, o in zip(self_conditions, other_conditions)]
between = [s * (1-pref_to_other) + o * pref_to_other for s,
o in zip(self_conditions, other_conditions)]
return Conditions(*between, self.endmill)
def conditions(self):
......
......@@ -14,7 +14,7 @@ from models import (
from objects import MachineChar, Prediction, Conditions
from ml import Model
# import pyswarms as ps
import pyswarms as ps
import logging
log = logging.getLogger(__name__)
......@@ -32,7 +32,7 @@ class Optimizer:
W and f_r are only considered as initial guesses for the optimizer.
"""
MAX_TOOL_BREAKAGE_PROB = 0.20
MAX_TOOL_BREAKAGE_PROB = 0.05
def __init__(self, model: Model, machinechar: MachineChar, D_a, fixed_conditions: Conditions):
......@@ -41,7 +41,7 @@ class Optimizer:
self.D_a = D_a
self.fixed_conditions = fixed_conditions
def optimize(self, verbose = False):
def optimize(self, verbose=False):
"""
Uses the current state of the model to predict the optimum milling conditions.
"""
......@@ -79,15 +79,17 @@ class Optimizer:
"""
return 1 - (1 / (1 + np.exp(-20 * (x - 0.9))))
def failure(self, prediction: Prediction, verbose = False):
def failure(self, prediction: Prediction, verbose=False):
"""
Failure metric. Slightly arbitrary. At 1 when failure is improbable and at 0 when failure is likely.
Failure is defined as the geometric combination of all failure modes
(each normalized between 0-1 using the logistic function.)
"""
# pull out all of these
failure_deflection = deflection_load_simple(prediction, self.machinechar)
failure_breakage = failure_prob_milling(prediction) / Optimizer.MAX_TOOL_BREAKAGE_PROB
failure_deflection = deflection_load_simple(
prediction, self.machinechar)
failure_breakage = failure_prob_milling(
prediction) / Optimizer.MAX_TOOL_BREAKAGE_PROB
failure_torque = motor_torque_load(prediction, self.machinechar)
failure_speed = motor_speed_load(prediction, self.machinechar)
......@@ -97,93 +99,74 @@ class Optimizer:
failure_speed]
if verbose:
log.info("Ind. failures: " + ", ".join([name + ": " + "{:.3f}".format(value) for name, value in zip(['deflection', 'breakage', 'torque', 'speed'], failures)]))
log.info("Ind. failures: " + ", ".join([name + ": " + "{:.3f}".format(
value) for name, value in zip(['deflection', 'breakage', 'torque', 'speed'], failures)]))
failure_logistic = reduce(
lambda x, y: x*y, map(self.inv_logistic, failures))
return failure_logistic
# class OptimizerFull(Optimizer):
# def optimize(self, verbose = False):
# """
# Uses the current state of the model to predict the optimum milling conditions.
# """
# x0 = [self.fixed_conditions.W, self.fixed_conditions.f_r, self.fixed_conditions.w]
# bounds = ((0, self.fixed_conditions.endmill.r_c * 1.8),
# (0, self.machinechar.f_r_max),
# (0, None))
# minimum = minimize(self._optimize_func, x0, bounds=bounds)
# W, f_r, w = minimum.x
# D, _, _, _, endmill = self.fixed_conditions.unpack()
# optimized = Conditions(D, W, f_r, w, endmill)
# if verbose:
# prediction = self.model.predict_one(optimized)
# self.failure(prediction, verbose = True)
# return optimized
# def _optimize_func(self, X):
# W, f_r, w = X
# D, _, _, _, endmill = self.fixed_conditions.unpack()
# conditions = Conditions(D, W, f_r, w, endmill)
# prediction = self.model.predict_one(conditions)
# loss = self.loss(prediction)
# return loss
# class OptimizerPSOFull(Optimizer):
# def optimize(self, verbose = False):
# """
# Uses the current state of the model to predict the optimum milling conditions.
# """
# min_bound = np.array([0,0,0])
# max_bound = np.array([self.fixed_conditions.endmill.r_c * 1.8,
# self.machinechar.f_r_max,
# 1000])
# options = {'c1': 0.5, 'c2': 0.3, 'w':0.9}
# optimizer = ps.single.global_best.GlobalBestPSO(100, 3, options = options, bounds = (min_bound, max_bound))
# func = lambda x: np.apply_along_axis(self._optimize_func, 1, x)
# _, pos = optimizer.optimize(func, iters = 500)
# W, f_r, w = pos
# D, _, _, _, endmill = self.fixed_conditions.unpack()
# optimized = Conditions(D, W, f_r, w, endmill)
# if verbose:
# prediction = self.model.predict_one(optimized)
# self.failure(prediction, verbose = True)
# return optimized
# def _optimize_func(self, X):
# W, f_r, w = X
# D, _, _, _, endmill = self.fixed_conditions.unpack()
# conditions = Conditions(D, W, f_r, w, endmill)
# prediction = self.model.predict_one(conditions)
# loss = self.loss(prediction)
# return loss
# class OptimizerPSO(Optimizer):
# def optimize(self, verbose = False):
# """
# Uses the current state of the model to predict the optimum milling conditions.
# """
# min_bound = np.array([0,0])
# max_bound = np.array([self.fixed_conditions.endmill.r_c * 1.8,
# self.machinechar.f_r_max])
# options = {'c1': 0.5, 'c2': 0.3, 'w':0.9}
# optimizer = ps.single.global_best.GlobalBestPSO(100, 2, options = options, bounds = (min_bound, max_bound))
# func = lambda x: np.apply_along_axis(self._optimize_func, 1, x)
# _, pos = optimizer.optimize(func, iters = 500)
# W, f_r = pos
# D, _, _, w, endmill = self.fixed_conditions.unpack()
# optimized = Conditions(D, W, f_r, w, endmill)
# if verbose:
# prediction = self.model.predict_one(optimized)
# self.failure(prediction, verbose = True)
# return optimized
# def _optimize_func(self, X):
# W, f_r = X
# D, _, _, w, endmill = self.fixed_conditions.unpack()
# conditions = Conditions(D, W, f_r, w, endmill)
# prediction = self.model.predict_one(conditions)
# loss = self.loss(prediction)
# return loss
class OptimizerFull(Optimizer):
"""Optimizer that uses cutting models that try to account for variations in cutting pressures due to different cutting speeds. Doesn't really work...
"""
def optimize(self, verbose=False):
"""
Uses the current state of the model to predict the optimum milling conditions.
"""
x0 = [self.fixed_conditions.W,
self.fixed_conditions.f_r, self.fixed_conditions.w]
bounds = ((0, self.fixed_conditions.endmill.r_c * 1.8),
(0, self.machinechar.f_r_max),
(0, None))
minimum = minimize(self._optimize_func, x0, bounds=bounds)
W, f_r, w = minimum.x
D, _, _, _, endmill = self.fixed_conditions.unpack()
optimized = Conditions(D, W, f_r, w, endmill)
if verbose:
prediction = self.model.predict_one(optimized)
self.failure(prediction, verbose=True)
return optimized
def _optimize_func(self, X):
W, f_r, w = X
D, _, _, _, endmill = self.fixed_conditions.unpack()
conditions = Conditions(D, W, f_r, w, endmill)
prediction = self.model.predict_one(conditions)
loss = self.loss(prediction)
return loss
class OptimizerPSO(Optimizer):
"""Uses a particle swarm optimizer. Doesn't really work.
"""
def optimize(self, verbose=False):
"""
Uses the current state of the model to predict the optimum milling conditions.
"""
min_bound = np.array([0, 0])
max_bound = np.array([self.fixed_conditions.endmill.r_c * 1.8,
self.machinechar.f_r_max])
options = {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
optimizer = ps.single.global_best.GlobalBestPSO(
100, 2, options=options, bounds=(min_bound, max_bound))
def func(x): return np.apply_along_axis(self._optimize_func, 1, x)
_, pos = optimizer.optimize(func, iters=500)
W, f_r = pos
D, _, _, w, endmill = self.fixed_conditions.unpack()
optimized = Conditions(D, W, f_r, w, endmill)
if verbose:
prediction = self.model.predict_one(optimized)
self.failure(prediction, verbose=True)
return optimized
def _optimize_func(self, X):
W, f_r = X
D, _, _, w, endmill = self.fixed_conditions.unpack()
conditions = Conditions(D, W, f_r, w, endmill)
prediction = self.model.predict_one(conditions)
loss = self.loss(prediction)
return loss
import serial
import pyvesc
import time
import numpy as np
import shelve
......@@ -15,6 +14,7 @@ class Sensor(ABC):
"""
Represents a sensor used for data collection. Implements a basic set of multithreaded data collection subroutines.
"""
def __init__(self):
self.sensorThread = None
self.measure = threading.Event()
......@@ -46,11 +46,12 @@ class Sensor(ABC):
class Machine(Sensor):
"""
Initialize Machine interface.
Args:
port: port to use
"""
def __init__(self, port: serial.Serial, graceful_shutdown = False):
def __init__(self, port: serial.Serial, graceful_shutdown=False):
super().__init__()
self._logger = logging.getLogger(__name__ + ".machine")
self.port = serial.Serial(port, 115200,
......@@ -74,7 +75,7 @@ class Machine(Sensor):
def __del__(self):
if self.graceful_shutdown:
self.rapid({'z': 1})
self.rapid({'z': 1e-3})
self.rapid({'x': 0, 'y': 0})
self.rapid({'z': 0})
self.hold_until_still()
......@@ -176,67 +177,12 @@ class Machine(Sensor):
return ('machine', self.get_state())
class Spindle(Sensor):
REDUCTION = 4
def __init__(self, port):
super().__init__()
self._logger = logging.getLogger(__name__ + ".spindle")
self._logger.info("Waking up VESC")
self.spindle = pyvesc.VESC(port)
self._logger.info("VESC Ready")
self.spindle_lock = threading.Lock()
self.avg_current = 0
def close(self):
self.spindle_lock.acquire()
self.spindle.set_rpm(0)
self.spindle.stop_heartbeat()
self.spindle_lock.release()
def find_avg_current(self):
self.avg_current = 0
self.spindle_lock.acquire()
time.sleep(1)
samples = 300
for _ in range(samples):
time.sleep(0.01)
measurements = self.spindle.get_measurements()
self.avg_current += measurements.avg_motor_current
self.spindle_lock.release()
self.avg_current /= samples
def set_w(self, w):
self.spindle_lock.acquire()
rpm = int(w * (60 / (2 * np.pi))) * 4
self.spindle.set_rpm(rpm)
time.sleep(1)
confirmation = self.spindle.get_measurements()
self.spindle_lock.release()
self._logger.info("Attempted to set RPM of " + str(rpm) +
", actual RPM is " + str(confirmation.rpm))
if rpm > 0 and confirmation.rpm < rpm * 0.75:
self.spindle.set_rpm(0)
raise Exception("Spindle did not approach requested speed of " +
str(rpm) + ", actual speed is " + str(confirmation.rpm))
def sensorFunction(self):
self.spindle_lock.acquire()
measurements = self.spindle.get_measurements()
self.spindle_lock.release()
all_measurements = {attr: getattr(measurements, attr) for attr in dir(
measurements) if (attr[0] != '_' and "fields" not in attr)}
all_measurements['avg_current'] = self.avg_current
all_measurements['torque'] = (
measurements.avg_motor_current - self.avg_current) * 0.0348134
return ('spindle', all_measurements)
class TFD(Sensor):
"""
Represents a tool force dynamometer.
Args:
port: Port to use
"""
BITS_TO_N = -1714
......@@ -262,7 +208,8 @@ class TFD(Sensor):
return float(resp) / TFD.BITS_TO_N
except Exception as ex:
e = ex
self._logger.warn("Read #" + str(i + 1) + " failed, trying again... Specific error: " + str(e))
self._logger.warn(
"Read #" + str(i + 1) + " failed, trying again... Specific error: " + str(e))
self.port.close()
self.port = serial.Serial(self.port_id, 115200, timeout=0.5)
self.port.reset_input_buffer()
......@@ -282,7 +229,8 @@ class TFD(Sensor):
except Exception as ex:
e = ex
# sensor failed to calibrate, retry
self._logger.warn("Calibrate #" + str(i + 1) + " failed, trying again... Specific error: " + str(e))
self._logger.warn("Calibrate #" + str(i + 1) +
" failed, trying again... Specific error: " + str(e))
self.port.close()
self.port = serial.Serial(self.port_id, 115200, timeout=0.5)
self.port.reset_input_buffer()
......@@ -296,6 +244,12 @@ class TFD(Sensor):
class Spindle_Applied(Sensor):
"""
Represents an Applied Motion spindle.
Args:
port: Port to use
"""
RS485_ADDRESS = 0
I_TO_T = 0.10281
......@@ -327,8 +281,8 @@ class Spindle_Applied(Sensor):
def write(self, msg):
self.port_lock.acquire()
full_msg = str(Spindle_Applied.RS485_ADDRESS) +\
msg +\
"\r"
msg +\
"\r"
self.port.write(full_msg.encode('ascii'))
self.port.flush()
self.port_lock.release()
......@@ -339,7 +293,8 @@ class Spindle_Applied(Sensor):
try:
self.port_lock.acquire()
self.write(msg)
response = self.port.read_until('\r'.encode('ascii')).decode('ascii').strip()
response = self.port.read_until(
'\r'.encode('ascii')).decode('ascii').strip()
self.port_lock.release()
ack_byte = response[1] if response else None
if ack_byte in ('%', '*'):
......@@ -349,11 +304,13 @@ class Spindle_Applied(Sensor):
elif not response:
self._logger.warn("Command was not understood by drive")
else:
self._logger.warn("Unknown ack received: " + response.strip())
self._logger.warn(
"Unknown ack received: " + response.strip())
# trying again
except Exception as ex:
e = ex
self._logger.warn("Write-ack #" + str(i + 1) + " failed, trying again. Specific error: " + str(e))
self._logger.warn("Write-ack #" + str(i + 1) +
" failed, trying again. Specific error: " + str(e))
# closing and reopening the port to unbork any issues
self.port_lock.acquire()
......@@ -367,7 +324,6 @@ class Spindle_Applied(Sensor):
self.port_lock.release()
time.sleep(0.1)
raise IOError("Failed to write command to spindle...")
def write_get_response(self, msg):
......@@ -376,16 +332,19 @@ class Spindle_Applied(Sensor):
try:
self.port_lock.acquire()
self.write(msg)
response = self.port.read_until("\r".encode('ascii')).decode('ascii').strip()
response = self.port.read_until(
"\r".encode('ascii')).decode('ascii').strip()
self.port_lock.release()
assert response[0] == str(Spindle_Applied.RS485_ADDRESS), "Address of response incorrect, was actually " + response[0]
assert response[0] == str(
Spindle_Applied.RS485_ADDRESS), "Address of response incorrect, was actually " + response[0]
assert response[1:3] == msg[0:2], "Command of response incorrect, was actually " + response[1:3]
assert response[3] == "=", "No '=' in response, was actually " + response[4]
return response[4:]
except Exception as ex:
e = ex
self._logger.warn("Write-ack #" + str(i + 1) + " failed, trying again. Specific error: " + str(e))
self._logger.warn("Write-ack #" + str(i + 1) +
" failed, trying again. Specific error: " + str(e))
self.port_lock.acquire()
self.port.close()
self.port = serial.Serial(self.port_id, 115200, timeout=0.5)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment