diff --git a/can/__init__.py b/can/__init__.py index ce126ff95..3fa8c01d1 100644 --- a/can/__init__.py +++ b/can/__init__.py @@ -34,6 +34,7 @@ class CanError(IOError): from .message import Message from .bus import BusABC +from .thread_safe_bus import ThreadSafeBus from .notifier import Notifier from .interfaces import VALID_INTERFACES from . import interface diff --git a/can/bus.py b/can/bus.py index 9e70cb67e..ffe48280b 100644 --- a/can/bus.py +++ b/can/bus.py @@ -169,10 +169,10 @@ def send_periodic(self, msg, period, duration=None): least *duration* seconds. """ - if not hasattr(self, "_lock"): + if not hasattr(self, "_lock_send_periodic"): # Create a send lock for this bus - self._lock = threading.Lock() - return ThreadBasedCyclicSendTask(self, self._lock, msg, period, duration) + self._lock_send_periodic = threading.Lock() + return ThreadBasedCyclicSendTask(self, self._lock_send_periodic, msg, period, duration) def __iter__(self): """Allow iteration on messages as they are received. @@ -191,6 +191,10 @@ def __iter__(self): @property def filters(self): + """ + Modify the filters of this bus. See :meth:`~can.BusABC.set_filters` + for details. + """ return self._filters @filters.setter diff --git a/can/interfaces/socketcan/socketcan_common.py b/can/interfaces/socketcan/socketcan_common.py index 4e904f774..1b3cec9e5 100644 --- a/can/interfaces/socketcan/socketcan_common.py +++ b/can/interfaces/socketcan/socketcan_common.py @@ -10,10 +10,7 @@ import errno import struct import sys -if sys.version_info[0] < 3 and os.name == 'posix': - import subprocess32 as subprocess -else: - import subprocess +import subprocess import re from can.interfaces.socketcan.socketcan_constants import CAN_EFF_FLAG diff --git a/can/thread_safe_bus.py b/can/thread_safe_bus.py new file mode 100644 index 000000000..b161d47c9 --- /dev/null +++ b/can/thread_safe_bus.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# coding: utf-8 + +""" +""" + +from __future__ import print_function, absolute_import + +from abc import ABCMeta +from threading import RLock + +from wrapt import ObjectProxy + +from .interface import Bus +from .bus import BusABC + + +class NullContextManager(object): + """ + A context manager that does nothing at all. + """ + + def __init__(self, resource=None): + self.resource = resource + + def __enter__(self): + return self.resource + + def __exit__(self, *args): + pass + + +class ThreadSafeBus(ObjectProxy): + """ + Contains a thread safe :class:`can.BusABC` implementation that + wraps around an existing interface instance. All public methods + of that base class are now safe to be called from multiple threads. + The send and receive methods are synchronized separately. + + Use this as a drop-in replacement for :class:`~can.BusABC`. + + .. note:: + + This approach assumes that both :meth:`~can.BusABC.send` and + :meth:`~can.BusABC._recv_internal` of the underlying bus instance can be + called simultaneously, and that the methods use :meth:`~can.BusABC._recv_internal` + instead of :meth:`~can.BusABC.recv` directly. + """ + + # init locks for sending and receiving separately + _lock_send = RLock() + _lock_recv = RLock() + + def __init__(self, *args, **kwargs): + super(ThreadSafeBus, self).__init__(Bus(*args, **kwargs)) + + # now, BusABC.send_periodic() does not need a lock anymore, but the + # implementation still requires a context manager + self.__wrapped__._lock_send_periodic = NullContextManager() + + def recv(self, timeout=None, *args, **kwargs): + with self._lock_recv: + return self.__wrapped__.recv(timeout=timeout, *args, **kwargs) + + def send(self, msg, timeout=None, *args, **kwargs): + with self._lock_send: + return self.__wrapped__.send(msg, timeout=timeout, *args, **kwargs) + + # send_periodic does not need a lock, since the underlying + # `send` method is already synchronized + + @property + def filters(self): + with self._lock_recv: + return self.__wrapped__.filters + + @filters.setter + def filters(self, filters): + with self._lock_recv: + self.__wrapped__.filters = filters + + def set_filters(self, can_filters=None, *args, **kwargs): + with self._lock_recv: + return self.__wrapped__.set_filters(can_filters=can_filters, *args, **kwargs) + + def flush_tx_buffer(self, *args, **kwargs): + with self._lock_send: + return self.__wrapped__.flush_tx_buffer(*args, **kwargs) + + def shutdown(self, *args, **kwargs): + with self._lock_send, self._lock_recv: + return self.__wrapped__.shutdown(*args, **kwargs) + + @property + def state(self): + with self._lock_send, self._lock_recv: + return self.__wrapped__.state + + @state.setter + def state(self, new_state): + with self._lock_send, self._lock_recv: + self.__wrapped__.state = new_state diff --git a/doc/api.rst b/doc/api.rst index 3eca7ddc8..13adb2903 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -25,9 +25,12 @@ Utilities .. automodule:: can.util :members: +.. automethod:: can.detect_available_configs .. _notifier: - + + + Notifier -------- @@ -35,4 +38,3 @@ The Notifier object is used as a message distributor for a bus. .. autoclass:: can.Notifier :members: - diff --git a/doc/bus.rst b/doc/bus.rst index d00cd211e..4aca097bb 100644 --- a/doc/bus.rst +++ b/doc/bus.rst @@ -4,7 +4,9 @@ Bus --- The :class:`~can.Bus` class, as the name suggests, provides an abstraction of a CAN bus. -The bus provides a wrapper around a physical or virtual CAN Bus. +The bus provides an abstract wrapper around a physical or virtual CAN Bus. + +A thread safe bus wrapper is also available, see `Thread safe bus`_. Filtering @@ -14,7 +16,6 @@ Message filtering can be set up for each bus. Where the interface supports it, t out in the hardware or kernel layer - not in Python. - API '''' @@ -33,6 +34,7 @@ Transmitting Writing to the bus is done by calling the :meth:`~can.BusABC.send()` method and passing a :class:`~can.Message` object. + Receiving ''''''''' @@ -44,3 +46,21 @@ by directly iterating over the bus:: Alternatively the :class:`~can.Listener` api can be used, which is a list of :class:`~can.Listener` subclasses that receive notifications when new messages arrive. + + +Thread safe bus +--------------- + +This thread safe version of the :class:`~can.Bus` class can be used by multiple threads at once. +Sending and receiving is locked seperatly to avoid unnessesary delays. +Conflicting calls are executed by blocking until the bus is accessible. + +It can be used exactly like the normal :class:`~can.Bus`: + + # 'socketcan' is only an exemple interface, it works with all the others too + my_bus = can.ThreadSafeBus(interface='socketcan', channel='vcan0') + my_bus.send(...) + my_bus.recv(...) + +.. autoclass:: can.ThreadSafeBus + :members: diff --git a/setup.py b/setup.py index 0af5e8577..60fee70d4 100644 --- a/setup.py +++ b/setup.py @@ -21,10 +21,11 @@ # Dependencies tests_require = [ - 'mock >= 2.0.0', - 'nose >= 1.3.7', - 'pytest-timeout >= 1.2.1', - 'pyserial >= 3.0' + 'mock ~= 2.0', + 'nose ~= 1.3.7', + 'pytest ~= 3.6', + 'pytest-timeout ~= 1.2', + 'pyserial ~= 3.0' ] setup( @@ -55,8 +56,8 @@ # see https://www.python.org/dev/peps/pep-0345/#version-specifiers python_requires=">=2.7,!=3.0,!=3.1,!=3.2,!=3.3", install_requires=[ - 'setuptools', - ] + (['subprocess32 ~= 3.2.7'] if version_info.major < 3 else []), + 'wrapt ~= 1.10', + ], extras_require={ 'serial': ['pyserial >= 3.0'], 'neovi': ['python-ics >= 2.8'], diff --git a/test/back2back_test.py b/test/back2back_test.py index 304bec690..e9ebc698a 100644 --- a/test/back2back_test.py +++ b/test/back2back_test.py @@ -10,10 +10,11 @@ import sys import unittest from time import sleep +from multiprocessing.dummy import Pool as ThreadPool -import can +import pytest -from .data.example_data import generate_message +import can from .config import * from .data.example_data import generate_message @@ -35,16 +36,16 @@ class Back2BackTestCase(unittest.TestCase): """ def setUp(self): - self.bus1 = can.interface.Bus(channel=CHANNEL_1, - bustype=INTERFACE_1, - bitrate=BITRATE, - fd=TEST_CAN_FD, - single_handle=True) - self.bus2 = can.interface.Bus(channel=CHANNEL_2, - bustype=INTERFACE_2, - bitrate=BITRATE, - fd=TEST_CAN_FD, - single_handle=True) + self.bus1 = can.Bus(channel=CHANNEL_1, + bustype=INTERFACE_1, + bitrate=BITRATE, + fd=TEST_CAN_FD, + single_handle=True) + self.bus2 = can.Bus(channel=CHANNEL_2, + bustype=INTERFACE_2, + bitrate=BITRATE, + fd=TEST_CAN_FD, + single_handle=True) def tearDown(self): self.bus1.shutdown() @@ -170,5 +171,51 @@ def test_basics(self): notifier.stop() +class TestThreadSafeBus(Back2BackTestCase): + """Does some testing that is better than nothing. + """ + + def setUp(self): + self.bus1 = can.ThreadSafeBus(channel=CHANNEL_1, + bustype=INTERFACE_1, + bitrate=BITRATE, + fd=TEST_CAN_FD, + single_handle=True) + self.bus2 = can.ThreadSafeBus(channel=CHANNEL_2, + bustype=INTERFACE_2, + bitrate=BITRATE, + fd=TEST_CAN_FD, + single_handle=True) + + @pytest.mark.timeout(5.0) + def test_concurrent_writes(self): + sender_pool = ThreadPool(100) + receiver_pool = ThreadPool(100) + + message = can.Message( + arbitration_id=0x123, + extended_id=True, + timestamp=121334.365, + data=[254, 255, 1, 2] + ) + workload = 1000 * [message] + + def sender(msg): + self.bus1.send(msg) + + def receiver(_): + result = self.bus2.recv(timeout=2.0) + self.assertIsNotNone(result) + self.assertEqual(result, message) + + sender_pool.map_async(sender, workload) + receiver_pool.map_async(receiver, len(workload) * [None]) + + sender_pool.close() + sender_pool.join() + receiver_pool.close() + receiver_pool.join() + + if __name__ == '__main__': unittest.main() diff --git a/test/sockectan_helpers.py b/test/test_socketcan_helpers.py similarity index 91% rename from test/sockectan_helpers.py rename to test/test_socketcan_helpers.py index 846de8647..f33a4e28a 100644 --- a/test/sockectan_helpers.py +++ b/test/test_socketcan_helpers.py @@ -21,11 +21,11 @@ class TestSocketCanHelpers(unittest.TestCase): def test_error_code_to_str(self): """ Check that the function does not crash & always - returns a least one character. + returns at least one character. """ # all possible & also some invalid error codes - test_data = range(0, 256) + (-1, 256, 5235, 346264) + test_data = list(range(0, 256)) + [-1, 256, 5235, 346264] for error_code in test_data: string = error_code_to_str(error_code)