diff --git a/dali/driver/base.py b/dali/driver/base.py index efbf8bb..714df18 100644 --- a/dali/driver/base.py +++ b/dali/driver/base.py @@ -1,7 +1,3 @@ -import threading -import usb - - ############################################################################### # driver contracts ############################################################################### @@ -124,113 +120,3 @@ def listen(self): """ raise NotImplementedError( 'Abstract ``Listener`` does not implement ``listen``') - - -############################################################################### -# USB backends -############################################################################### - -class USBBackend(Backend): - """Backend implementation for communicating with USB devices. - """ - - def __init__(self, vendor, product, bus=None, - port_numbers=None, interface=0): - self._device = None - # lookup devices by vendor and product - devices = [dev for dev in usb.core.find( - find_all=True, - idVendor=vendor, - idProduct=product - )] - # use first device if bus or port_numers not defined - if bus is None or port_numbers is None: - self._device = devices[0] - else: - for dev in devices: - if dev.bus == bus and dev.port_numbers == port_numbers: - self._device = dev - break - # if queried device not found, raise - if not self._device: - raise usb.core.USBError('Device not found') - # detach kernel driver if necessary - if self._device.is_kernel_driver_active(interface) is True: - self._device.detach_kernel_driver(interface) - # set device configuration - self._device.set_configuration() - # claim interface - usb.util.claim_interface(self._device, interface) - # get active configuration - cfg = self._device.get_active_configuration() - intf = cfg[(0, 0)] - # get write end point - def match_ep_out(e): - return usb.util.endpoint_direction(e.bEndpointAddress) == \ - usb.util.ENDPOINT_OUT - self._ep_write = usb.util.find_descriptor( - intf, custom_match=match_ep_out) - # get read end point - def match_ep_in(e): - return usb.util.endpoint_direction(e.bEndpointAddress) == \ - usb.util.ENDPOINT_IN - self._ep_read = usb.util.find_descriptor( - intf, custom_match=match_ep_in) - - def read(self, timeout=None): - """Read data from USB device. - """ - return self._ep_read.read(self._ep_read.wMaxPacketSize, timeout=timeout) - - def write(self, data): - """Write data to USB device. - """ - return self._ep_write.write(data) - - def close(self): - """Close connection to USB device. - """ - usb.util.dispose_resources(self._device) - - -class USBListener(USBBackend, Listener): - """Listener implementation for communicating with USB devices. - """ - - def __init__(self, driver, vendor, product, bus=None, - port_numbers=None, interface=0): - super(USBListener, self).__init__( - vendor, - product, - bus=bus, - port_numbers=port_numbers, - interface=interface - ) - self.driver = driver - # flag whether actually disconnecting from device - self._disconnecting = False - # event to stop listening - self._stop_listening = threading.Event() - # create and start listener thread - self._listener = threading.Thread(target=self.listen) - self._listener.start() - - def listen(self): - """Poll data from USB device. - """ - while not self._stop_listening.is_set(): - try: - self.driver.receive(self.read()) - except usb.core.USBError as e: - # read timeout - if e.errno == 110: - continue - if not self._disconnecting: - self.close() - - def close(self): - """Close connection to USB device. - """ - self._disconnecting = True - self._stop_listening.set() - super(USBListener, self).close() diff --git a/dali/driver/tridonic.py b/dali/driver/tridonic.py index 5fa9285..b8515af 100644 --- a/dali/driver/tridonic.py +++ b/dali/driver/tridonic.py @@ -2,8 +2,8 @@ from dali.driver.base import AsyncDALIDriver from dali.driver.base import DALIDriver from dali.driver.base import SyncDALIDriver -from dali.driver.base import USBBackend -from dali.driver.base import USBListener +from dali.driver.usb import USBBackend +from dali.driver.usb import USBListener from dali.frame import BackwardFrame from dali.frame import ForwardFrame import logging diff --git a/dali/driver/usb.py b/dali/driver/usb.py new file mode 100644 index 0000000..0ef07b0 --- /dev/null +++ b/dali/driver/usb.py @@ -0,0 +1,114 @@ +import threading +import usb + +from dali.driver.base import Backend, Listener + + +############################################################################### +# USB backends +############################################################################### + +class USBBackend(Backend): + """Backend implementation for communicating with USB devices. + """ + + def __init__(self, vendor, product, bus=None, + port_numbers=None, interface=0): + self._device = None + # lookup devices by vendor and product + devices = [dev for dev in usb.core.find( + find_all=True, + idVendor=vendor, + idProduct=product + )] + # use first device if bus or port_numers not defined + if bus is None or port_numbers is None: + self._device = devices[0] + else: + for dev in devices: + if dev.bus == bus and dev.port_numbers == port_numbers: + self._device = dev + break + # if queried device not found, raise + if not self._device: + raise usb.core.USBError('Device not found') + # detach kernel driver if necessary + if self._device.is_kernel_driver_active(interface) is True: + self._device.detach_kernel_driver(interface) + # set device configuration + self._device.set_configuration() + # claim interface + usb.util.claim_interface(self._device, interface) + # get active configuration + cfg = self._device.get_active_configuration() + intf = cfg[(0, 0)] + # get write end point + def match_ep_out(e): + return usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_OUT + self._ep_write = usb.util.find_descriptor( + intf, custom_match=match_ep_out) + # get read end point + def match_ep_in(e): + return usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_IN + self._ep_read = usb.util.find_descriptor( + intf, custom_match=match_ep_in) + + def read(self, timeout=None): + """Read data from USB device. + """ + return self._ep_read.read(self._ep_read.wMaxPacketSize, timeout=timeout) + + def write(self, data): + """Write data to USB device. + """ + return self._ep_write.write(data) + + def close(self): + """Close connection to USB device. + """ + usb.util.dispose_resources(self._device) + + +class USBListener(USBBackend, Listener): + """Listener implementation for communicating with USB devices. + """ + + def __init__(self, driver, vendor, product, bus=None, + port_numbers=None, interface=0): + super(USBListener, self).__init__( + vendor, + product, + bus=bus, + port_numbers=port_numbers, + interface=interface + ) + self.driver = driver + # flag whether actually disconnecting from device + self._disconnecting = False + # event to stop listening + self._stop_listening = threading.Event() + # create and start listener thread + self._listener = threading.Thread(target=self.listen) + self._listener.start() + + def listen(self): + """Poll data from USB device. + """ + while not self._stop_listening.is_set(): + try: + self.driver.receive(self.read()) + except usb.core.USBError as e: + # read timeout + if e.errno == 110: + continue + if not self._disconnecting: + self.close() + + def close(self): + """Close connection to USB device. + """ + self._disconnecting = True + self._stop_listening.set() + super(USBListener, self).close()