Source code for ur_cb2.receive.cb2_receive

"""A module to receive data from UR CB2 robots."""

# The MIT License (MIT)
#
# Copyright (c) 2016 GTRC.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import socket
import struct
import array
import threading


[docs]class URReceiver(object): """A class to receive and process data from a UR Robot The receiving and processing can be run in a separate thread by calling start(). The stop() command must be called before exiting to halt the additional thread. Alternatively, receive(), decode(), and print_parsed_data() can be called in sequence in order to receive, decode, and print data. One should not call receive(), decode(), or any of the print methods, if a separate thread is being used. You should never write to any of the data fields externally, however you can read from them. Python's atomic read/write architecture should prevent you from getting any half baked results from basic types, for all lists and tuples, you must lock using lock (recommend that you use `with lock:` paradigm. Attributes: clean_data: Double array of length 101 for all of the data returned by the robot raw_data: String of complete raw data packet __socket: The socket for communications clean_packets: The Integer number of packets which have been received cleanly stub_packets: The Integer number of packets which have been received as stubs received: The total Integer number of complete data sets which have been received waiting_data: String to hold incomplete data sets new_data: Boolean whether new data is available for processing time: Double of time elapsed since the controller was started target_joint_positions: 6 member Double list of target joint positions target_joint_velocities: 6 member Double list of target joint velocities target_joint_accelerations: 6 member Double list of target joint accelerations target_joint_currents: 6 member Double list of target joint currents target_joint_moments: 6 member Double list of target joint moments as torques actual_joint_positions: 6 member Double list of actual joint positions actual_joint_velocities: 6 member Double list of actual joint velocities actual_joint_currents: 6 member Double list of actual joint currents tool_accelerometer: 3 member Double list of ool x,y and z accelerometer values (software version 1.7) force_tcp: 6 member Double list of generalised forces in the TCP position: 6 member Double list of cartesian coordinates of the tool: (x,y,z,rx,ry,rz), where rx, ry and rz is a rotation vector representation of the tool orientation tool_speed: 6 member Double list of speed of the tool given in cartesian coordinates digital_inputs: Current state of the digital inputs. NOTE: these are bits encoded as int64_t, e.g. a value of 5 corresponds to bit 0 and bit 2 set high joint_temperature: 6 member Double list of temperature of each joint in degrees celsius controller_period: Double of controller real time thread execution time robot_control_mode: Double of robot control mode (see PolyScopeProgramServer on the "How to" page joint_control_modes: 6 member Double list of joint control modes (see PolyScopeProgramServer on the "How to" page) (only from software version 1.8 and on) run: Boolean on whether to run or not __receiving_thread: Thread object for running the receiving and parsing loops verbose: Boolean defining whether or not to print data lock: A threading lock which is used to protect data from race conditions _is_stopped: A boolean specifying whether the robot is stopped """ # Format specifier: # ! : network (big endian) # I : unsigned int, message size # 85d : 85 doubles # q : int64_t for digital inputs # 15d : 15 doubles #: Format spec for complete data packet format = struct.Struct('! I 85d q 15d') #: The format spec for the packet length field formatLength = struct.Struct('! I') #: The width to be given to name items when printing out name_width = 30 #: The precision for printing data precision = 7 double_format_string = "{:+0"+str(precision+4)+"."+str(precision)+"f}"
[docs] def __init__(self, open_socket, verbose=False): """Construct a UR Robot connection given connection parameters Args: open_socket (socket.socket): The socket to use for communications. verbose (bool): Whether to print received data in main loop """ self.clean_data = array.array('d', [0] * 101) self.raw_data = '' self.__socket = open_socket self.clean_packets = 0 self.stub_packets = 0 self.received = 0 self.waiting_data = '' self.new_data = False self.time = 0.0 self.target_joint_positions = [0.0]*6 self.target_joint_velocities = [0.0]*6 self.target_joint_accelerations = [0.0]*6 self.target_joint_currents = [0.0]*6 self.target_joint_moments = [0.0]*6 self.actual_joint_positions = [0.0]*6 self.actual_joint_velocities = [0.0]*6 self.actual_joint_currents = [0.0]*6 self.tool_accelerometer = [0.0]*3 self.force_tcp = [0.0]*6 self.position = [0.0]*6 self.tool_speed = [0.0]*6 self.digital_inputs = 0 self.joint_temperature = [0.0]*6 self.controller_period = 0.0 self.robot_control_mode = 0.0 self.joint_control_modes = [0.0]*6 self.run = False self.__receiving_thread = None self.verbose = verbose self.lock = threading.Lock() self._is_stopped = False if verbose: print "\033[2J" # Clear screen
def __del__(self): """Shutdown side thread and print aggregated connection stats""" self.stop() print "Received: "+str(self.received) + " data sets" print "Received: "+str(self.clean_packets) + " clean packets" print "Received: "+str(self.stub_packets) + " stub packets"
[docs] def decode(self): """Decode the data stored in the class's rawData field. Only process the data if there is new data available. Unset the self.newData flag upon completion. Note, this will lock the data set and block execution in a number of other functions """ with self.lock: if self.new_data: self.clean_data = self.format.unpack(self.raw_data) self.time = self.clean_data[1] self.target_joint_positions = self.clean_data[2:8] self.target_joint_velocities = self.clean_data[8:14] self.target_joint_accelerations = self.clean_data[14:20] self.target_joint_currents = self.clean_data[20:26] self.target_joint_moments = self.clean_data[26:32] self.actual_joint_positions = self.clean_data[32:38] self.actual_joint_velocities = self.clean_data[38:44] self.actual_joint_currents = self.clean_data[44:50] self.tool_accelerometer = self.clean_data[50:53] # unused = self.clean_data[53:68] self.force_tcp = self.clean_data[68:74] self.position = self.clean_data[74:80] self.tool_speed = self.clean_data[80:86] self.digital_inputs = self.clean_data[86] self.joint_temperature = self.clean_data[87:93] self.controller_period = self.clean_data[93] # test value = self.clean_data[94] self.robot_control_mode = self.clean_data[95] self.joint_control_modes = self.clean_data[96:102] self.new_data = False self._is_stopped = self.is_stopped()
[docs] def receive(self): """Receive data from the UR Robot. If an entire data set is not received, then store the data in a temporary location (self.waitingData). Once a complete packet is received, place the complete packet into self.rawData and set the newData flag. Note, this will lock the data set and block execution in a number of other functions once a full data set is built. """ incoming_data = self.__socket.recv(812) # expect to get 812 bytes if len(incoming_data) == 812: self.clean_packets += 1 else: self.stub_packets += 1 if self.formatLength.unpack(incoming_data[0:4])[0] == 812: self.waiting_data = incoming_data else: self.waiting_data += incoming_data if len(self.waiting_data) == 812: with self.lock: self.raw_data = self.waiting_data self.received += 1 self.new_data = True
[docs] def print_raw_data(self): """Print the raw data which is stored in self.raw_data. Note, this will lock the data set and block execution in a number of other functions """ with self.lock: print "Received (raw): "+self.raw_data + "\n"
[docs] def print_data(self): """Print the processed data stored in self.clean_data Note, this will lock the data set and block execution in a number of other functions """ with self.lock: print "Received (unpacked):\n " print self.clean_data print "\n"
[docs] def output_data_item(self, name, values): """Output item with name and values. Formatting is specified by self.name_width and self.precision. Args: name (str): The name of the value values (float, int, tuple of float, list of float): The list of values """ to_print = ("%-"+str(self.name_width)+"s") % name if isinstance(values, (list, tuple)): to_print += ": [%s]" % ', '.join(self.double_format_string.format(x) for x in values) elif isinstance(values, (int, bool)): to_print += ": [%s]" % str(values) elif isinstance(values, float): to_print += ": [%s]" % self.double_format_string.format(values) else: print "I don't know that data type: " + str(type(values)) print to_print
[docs] def print_parsed_data(self): """Print the parsed data Note, this will lock the data set and block execution in a number of other functions """ with self.lock: print "\033[H" self.output_data_item("Time since controller turn on", self.time) self.output_data_item("Target joint positions", self.target_joint_positions) self.output_data_item("Target joint velocities", self.target_joint_velocities) self.output_data_item("Target joint accelerations", self.target_joint_accelerations) self.output_data_item("Target joint currents", self.target_joint_currents) self.output_data_item("Target joint moments (torque)", self.target_joint_moments) self.output_data_item("Actual joint positions", self.actual_joint_positions) self.output_data_item("Actual joint velocities", self.actual_joint_velocities) self.output_data_item("Actual joint currents", self.actual_joint_currents) self.output_data_item("Tool accelerometer values", self.tool_accelerometer) self.output_data_item("Generalised forces in the TCP", self.force_tcp) self.output_data_item("Cartesian tool position", self.position) self.output_data_item("Cartesian tool speed", self.tool_speed) self.output_data_item("Joint temperatures (deg C)", self.joint_temperature) self.output_data_item("Controller period", self.controller_period) self.output_data_item("Robot control mode", self.robot_control_mode) self.output_data_item("Joint control modes", self.joint_control_modes) print ((("%-"+str(self.name_width)+"s") % "Digital Input Number") + ": " + '|'.join('{:^2d}'.format(x) for x in range(0, 18))) print ((("%-"+str(self.name_width)+"s") % "Digital Input Value: ") + ": " + '|'.join('{:^2s}'.format(x) for x in '{:018b}'.format( self.digital_inputs)[::-1])) self.output_data_item("Is Stopped:", self._is_stopped)
[docs] def start(self): """Spawn a new thread for receiving and run it""" if (self.__receiving_thread is None or not self.__receiving_thread.is_alive()): self.run = True self.__receiving_thread = threading.Thread(group=None, target=self.loop, name='receiving_thread', args=(), kwargs={}) self.__receiving_thread.start()
[docs] def loop(self): """The main loop which receives, decodes, and optionally prints data""" while self.run: self.receive() self.decode() if self.verbose: self.print_parsed_data()
[docs] def stop(self): """Stops execution of the auxiliary receiving thread""" if self.__receiving_thread is not None: if self.__receiving_thread.is_alive(): self.verbose_print('attempting to shutdown auxiliary thread', '*') self.run = False # Python writes like this are atomic self.__receiving_thread.join() self.verbose_print('\033[500D') self.verbose_print('\033[500C') self.verbose_print('-', '-', 40) if self.__receiving_thread.is_alive(): self.verbose_print('failed to shutdown auxiliary thread', '*') else: self.verbose_print('shutdown auxiliary thread', '*') else: self.verbose_print('auxiliary thread already shutdown', '*') else: self.verbose_print('no auxiliary threads exist', '*')
[docs] def verbose_print(self, string_input, emphasis='', count=5): """Print input if verbose is set Args: string_input (str): The input string to be printed. emphasis (str): Emphasis character to be placed around input. count (int): Number of emphasis characters to use. """ if self.verbose: if emphasis == '': print string_input else: print (emphasis*count + " " + string_input + " " + emphasis * count)
[docs] def is_stopped(self, error=0.005): """Check whether the robot is stopped. Check whether the joint velocities are all below some error. Note, this will lock the data set and block execution in a number of other functions Args: error (float): The error range to define "stopped" Returns: Boolean, whether the robot is stopped. """ with self.lock: to_return = ( all(v == 0 for v in self.target_joint_velocities) and all(v < error for v in self.actual_joint_velocities)) return to_return
[docs] def at_goal(self, goal, cartesian, error=0.005): """Check whether the robot is at a goal point. Check whether the differences between the joint or cartesian coordinates are all below some error. This can be used to determine if a move has been completed. It can also be used to create blends by beginning the next move prior to the current one reaching its goal. Note, this will lock the data set and block execution in a number of other functions. Args: goal (6 member tuple or list of floats): The goal to check against cartesian (bool): Whether the goal is in cartesian coordinates or not (in which case joint coordinates) error (float): The error range in which to consider an object at its goal, in meters for cartesian space and radians for axis space. Returns: Boolean, whether the current position is within the error range of the goal. """ with self.lock: to_return = ( all(abs(g-a) < error for g, a in zip(self.position, goal)) if cartesian else all(abs(g-a) < error for g, a in zip(self.actual_joint_positions, goal))) return to_return
def __enter__(self): """Enters the URRobot receiver from a with statement""" return self def __exit__(self, *_): """Exits at the end of a context manager statement by destructing.""" self.stop()