Source code for faradayio.faraday

"""
.. module:: faraday
    :platform: Unix
    :synopsis: Main module for Faraday radios from FaradayRF.

.. moduleauthor:: Bryce Salmi <bryce@faradayrf.com>

"""

import sliplib
import pytun
import threading
import serial
import timeout_decorator

import serial.tools.list_ports


[docs]class Faraday(object): """A class that enables transfer of data between computer and Faraday This class interfaces a Faraday over a serial port. All it simply does is properly encode/decode SLIP packets. Attributes: _serialPort (serial instance): Pyserial serial port instance. """ def __init__(self, serialPort=None): self._serialPort = serialPort
[docs] def send(self, msg): """Encodes data to slip protocol and then sends over serial port Uses the SlipLib module to convert the message data into SLIP format. The message is then sent over the serial port opened with the instance of the Faraday class used when invoking send(). Args: msg (bytes): Bytes format message to send over serial port. Returns: int: Number of bytes transmitted over the serial port. """ # Create a sliplib Driver slipDriver = sliplib.Driver() # Package data in slip format slipData = slipDriver.send(msg) # Send data over serial port res = self._serialPort.write(slipData) # Return number of bytes transmitted over serial port return res
[docs] def receive(self, length): """Reads in data from a serial port (length bytes), decodes SLIP packets A function which reads from the serial port and then uses the SlipLib module to decode the SLIP protocol packets. Each message received is added to a receive buffer in SlipLib which is then returned. Args: length (int): Length to receive with serialPort.read(length) Returns: bytes: An iterator of the receive buffer """ # Create a sliplib Driver slipDriver = sliplib.Driver() # Receive data from serial port ret = self._serialPort.read(length) # Decode data from slip format, stores msgs in sliplib.Driver.messages temp = slipDriver.receive(ret) return iter(temp)
class TunnelServer(object): """ A class which creates a TUN/TAP device for Faraday uses. Creates a basic TUN/TAP adapter with generic values for use with Faraday. Also provides a method to close TUN/TAP when the class is destroyed. Attributes: addr: IP address of TUN/TAP adapter netmask: Netmask of TUN/TAP adapter mtu: Maximum Transmission Unit for TUN/TAP adapter name: Name of TUN/TAP adapter """ def __init__(self, addr, netmask, mtu, name): self._tun = pytun.TunTapDevice(name=name) self._tun.addr = addr self._tun.netmask = netmask self._tun.mtu = mtu # Set TUN persistance to True and bring TUN up self._tun.persist(True) self._tun.up() def __del__(self): """ Clean up TUN when the TunnelServer class is destroyed """ self._tun.down() print("TUN brought down...") class Monitor(threading.Thread): """ A class which inherits from threading.Thread to provide TUN/TAP monitors Inherits from threading.Thread and is designed to be run as a thread. This provides functions which can be used to monitor the TUN/TAP adapter and send/receive data when it arrives over TUN or serial. Attributes: serialPort: Pyserial instance for a serial port isRunning: Threading event to signal thread exit name: Name of TUN/TAP device to be created by the monitor addr: IP address of the TUN/TAP device to be created mtu: Maximum Transmission Unit of TUN/TAP adapter TODO delete? """ def __init__(self, serialPort, isRunning, name="Faraday", addr='10.0.0.1', netmask='255.255.255.0', mtu=1500): super().__init__() self.isRunning = isRunning self._serialPort = serialPort # Start a TUN adapter self._TUN = TunnelServer(name=name, addr=addr, netmask=netmask, mtu=mtu) # Create a Faraday instance self._faraday = Faraday(serialPort=serialPort) @timeout_decorator.timeout(1, use_signals=False) def checkTUN(self): """ Checks the TUN adapter for data and returns any that is found. Returns: packet: Data read from the TUN adapter """ packet = self._TUN._tun.read(self._TUN._tun.mtu) return(packet) def monitorTUN(self): """ Monitors the TUN adapter and sends data over serial port. Returns: ret: Number of bytes sent over serial port """ packet = self.checkTUN() if packet: try: # TODO Do I need to strip off [4:] before sending? ret = self._faraday.send(packet) return ret except AttributeError as error: # AttributeError was encounteredthreading.Event() print("AttributeError") def rxSerial(self, length): """ Checks the serial port for data and returns any that is found. Args: length: Number of bytes to read from serial port Returns: data: Data received from serial port """ return(self._faraday.receive(length)) def txSerial(self, data): """ Sends data over serial port. Args: data: Data to be sent over serial port Returns: length: Number of bytes sent over serial port """ return self._faraday.send(data) def checkSerial(self): """ Check the serial port for data to write to the TUN adapter. """ for item in self.rxSerial(self._TUN._tun.mtu): # print("about to send: {0}".format(item)) try: self._TUN._tun.write(item) except pytun.Error as error: print("pytun error writing: {0}".format(item)) print(error) def run(self): """ Wrapper function for TUN and serial port monitoring Wraps the necessary functions to loop over until self._isRunning threading.Event() is set(). This checks for data on the TUN/serial interfaces and then sends data over the appropriate interface. This function is automatically run when Threading.start() is called on the Monitor class. """ while self.isRunning.is_set(): try: try: # self.checkTUN() self.monitorTUN() except timeout_decorator.TimeoutError as error: # No data received so just move on pass self.checkSerial() except KeyboardInterrupt: break class SerialTestClass(object): """A mock serial port test class""" def __init__(self): """Creates a mock serial port which is a loopback object""" self._port = "loop://" self._timeout = 0 self._baudrate = 115200 self.serialPort = \ serial.serial_for_url(url=self._port, timeout=self._timeout, baudrate=self._baudrate) def isPortAvailable(port='/dev/ttyUSB0'): ''' Checks whether specified port is available. Source code derived from @lqdev suggestion per #38 Args: port: Serial port location i.e. 'COM1'. Default is /dev/ttyUSB0 Returns: available: Boolean value indicating presence of port ''' isPortAvailable = serial.tools.list_ports.grep(port) try: next(isPortAvailable) available = True except StopIteration: available = False return available