Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions can/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class CanError(IOError):

from .message import Message
from .bus import BusABC
from .thread_safe_bus import ThreadSafeBus
from .notifier import Notifier
from .interfaces import VALID_INTERFACES
from . import interface
Expand Down
10 changes: 7 additions & 3 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ def send_periodic(self, msg, period, duration=None):
least *duration* seconds.

"""
if not hasattr(self, "_lock"):
if not hasattr(self, "_lock_send_periodic"):
# Create a send lock for this bus
self._lock = threading.Lock()
return ThreadBasedCyclicSendTask(self, self._lock, msg, period, duration)
self._lock_send_periodic = threading.Lock()
return ThreadBasedCyclicSendTask(self, self._lock_send_periodic, msg, period, duration)

def __iter__(self):
"""Allow iteration on messages as they are received.
Expand All @@ -191,6 +191,10 @@ def __iter__(self):

@property
def filters(self):
"""
Modify the filters of this bus. See :meth:`~can.BusABC.set_filters`
for details.
"""
return self._filters

@filters.setter
Expand Down
5 changes: 1 addition & 4 deletions can/interfaces/socketcan/socketcan_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import errno
import struct
import sys
if sys.version_info[0] < 3 and os.name == 'posix':
import subprocess32 as subprocess
else:
import subprocess
import subprocess
import re

from can.interfaces.socketcan.socketcan_constants import CAN_EFF_FLAG
Expand Down
102 changes: 102 additions & 0 deletions can/thread_safe_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python
# coding: utf-8

"""
"""

from __future__ import print_function, absolute_import

from abc import ABCMeta
from threading import RLock

from wrapt import ObjectProxy

from .interface import Bus
from .bus import BusABC


class NullContextManager(object):
"""
A context manager that does nothing at all.
"""

def __init__(self, resource=None):
self.resource = resource

def __enter__(self):
return self.resource

def __exit__(self, *args):
pass


class ThreadSafeBus(ObjectProxy):
"""
Contains a thread safe :class:`can.BusABC` implementation that
wraps around an existing interface instance. All public methods
of that base class are now safe to be called from multiple threads.
The send and receive methods are synchronized separately.

Use this as a drop-in replacement for :class:`~can.BusABC`.

.. note::

This approach assumes that both :meth:`~can.BusABC.send` and
:meth:`~can.BusABC._recv_internal` of the underlying bus instance can be
called simultaneously, and that the methods use :meth:`~can.BusABC._recv_internal`
instead of :meth:`~can.BusABC.recv` directly.
"""

# init locks for sending and receiving separately
_lock_send = RLock()
_lock_recv = RLock()

def __init__(self, *args, **kwargs):
super(ThreadSafeBus, self).__init__(Bus(*args, **kwargs))

# now, BusABC.send_periodic() does not need a lock anymore, but the
# implementation still requires a context manager
self.__wrapped__._lock_send_periodic = NullContextManager()

def recv(self, timeout=None, *args, **kwargs):
with self._lock_recv:
return self.__wrapped__.recv(timeout=timeout, *args, **kwargs)

def send(self, msg, timeout=None, *args, **kwargs):
with self._lock_send:
return self.__wrapped__.send(msg, timeout=timeout, *args, **kwargs)

# send_periodic does not need a lock, since the underlying
# `send` method is already synchronized

@property
def filters(self):
with self._lock_recv:
return self.__wrapped__.filters

@filters.setter
def filters(self, filters):
with self._lock_recv:
self.__wrapped__.filters = filters

def set_filters(self, can_filters=None, *args, **kwargs):
with self._lock_recv:
return self.__wrapped__.set_filters(can_filters=can_filters, *args, **kwargs)

def flush_tx_buffer(self, *args, **kwargs):
with self._lock_send:
return self.__wrapped__.flush_tx_buffer(*args, **kwargs)

def shutdown(self, *args, **kwargs):
with self._lock_send, self._lock_recv:
return self.__wrapped__.shutdown(*args, **kwargs)

@property
def state(self):
with self._lock_send, self._lock_recv:
return self.__wrapped__.state

@state.setter
def state(self, new_state):
with self._lock_send, self._lock_recv:
self.__wrapped__.state = new_state
6 changes: 4 additions & 2 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ Utilities
.. automodule:: can.util
:members:

.. automethod:: can.detect_available_configs

.. _notifier:




Notifier
--------

The Notifier object is used as a message distributor for a bus.

.. autoclass:: can.Notifier
:members:

24 changes: 22 additions & 2 deletions doc/bus.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ Bus
---

The :class:`~can.Bus` class, as the name suggests, provides an abstraction of a CAN bus.
The bus provides a wrapper around a physical or virtual CAN Bus.
The bus provides an abstract wrapper around a physical or virtual CAN Bus.

A thread safe bus wrapper is also available, see `Thread safe bus`_.


Filtering
Expand All @@ -14,7 +16,6 @@ Message filtering can be set up for each bus. Where the interface supports it, t
out in the hardware or kernel layer - not in Python.



API
''''

Expand All @@ -33,6 +34,7 @@ Transmitting
Writing to the bus is done by calling the :meth:`~can.BusABC.send()` method and
passing a :class:`~can.Message` object.


Receiving
'''''''''

Expand All @@ -44,3 +46,21 @@ by directly iterating over the bus::

Alternatively the :class:`~can.Listener` api can be used, which is a list of :class:`~can.Listener`
subclasses that receive notifications when new messages arrive.


Thread safe bus
---------------

This thread safe version of the :class:`~can.Bus` class can be used by multiple threads at once.
Sending and receiving is locked seperatly to avoid unnessesary delays.
Conflicting calls are executed by blocking until the bus is accessible.

It can be used exactly like the normal :class:`~can.Bus`:

# 'socketcan' is only an exemple interface, it works with all the others too
my_bus = can.ThreadSafeBus(interface='socketcan', channel='vcan0')
my_bus.send(...)
my_bus.recv(...)

.. autoclass:: can.ThreadSafeBus
:members:
13 changes: 7 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

# Dependencies
tests_require = [
'mock >= 2.0.0',
'nose >= 1.3.7',
'pytest-timeout >= 1.2.1',
'pyserial >= 3.0'
'mock ~= 2.0',
'nose ~= 1.3.7',
'pytest ~= 3.6',
'pytest-timeout ~= 1.2',
'pyserial ~= 3.0'
]

setup(
Expand Down Expand Up @@ -55,8 +56,8 @@
# see https://www.python.org/dev/peps/pep-0345/#version-specifiers
python_requires=">=2.7,!=3.0,!=3.1,!=3.2,!=3.3",
install_requires=[
'setuptools',
] + (['subprocess32 ~= 3.2.7'] if version_info.major < 3 else []),
'wrapt ~= 1.10',
],
extras_require={
'serial': ['pyserial >= 3.0'],
'neovi': ['python-ics >= 2.8'],
Expand Down
71 changes: 59 additions & 12 deletions test/back2back_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import sys
import unittest
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool

import can
import pytest

from .data.example_data import generate_message
import can

from .config import *
from .data.example_data import generate_message
Expand All @@ -35,16 +36,16 @@ class Back2BackTestCase(unittest.TestCase):
"""

def setUp(self):
self.bus1 = can.interface.Bus(channel=CHANNEL_1,
bustype=INTERFACE_1,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True)
self.bus2 = can.interface.Bus(channel=CHANNEL_2,
bustype=INTERFACE_2,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True)
self.bus1 = can.Bus(channel=CHANNEL_1,
bustype=INTERFACE_1,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True)
self.bus2 = can.Bus(channel=CHANNEL_2,
bustype=INTERFACE_2,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True)

def tearDown(self):
self.bus1.shutdown()
Expand Down Expand Up @@ -170,5 +171,51 @@ def test_basics(self):
notifier.stop()


class TestThreadSafeBus(Back2BackTestCase):
"""Does some testing that is better than nothing.
"""

def setUp(self):
self.bus1 = can.ThreadSafeBus(channel=CHANNEL_1,
bustype=INTERFACE_1,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True)
self.bus2 = can.ThreadSafeBus(channel=CHANNEL_2,
bustype=INTERFACE_2,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True)

@pytest.mark.timeout(5.0)
def test_concurrent_writes(self):
sender_pool = ThreadPool(100)
receiver_pool = ThreadPool(100)

message = can.Message(
arbitration_id=0x123,
extended_id=True,
timestamp=121334.365,
data=[254, 255, 1, 2]
)
workload = 1000 * [message]

def sender(msg):
self.bus1.send(msg)

def receiver(_):
result = self.bus2.recv(timeout=2.0)
self.assertIsNotNone(result)
self.assertEqual(result, message)

sender_pool.map_async(sender, workload)
receiver_pool.map_async(receiver, len(workload) * [None])

sender_pool.close()
sender_pool.join()
receiver_pool.close()
receiver_pool.join()


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions test/sockectan_helpers.py → test/test_socketcan_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ class TestSocketCanHelpers(unittest.TestCase):
def test_error_code_to_str(self):
"""
Check that the function does not crash & always
returns a least one character.
returns at least one character.
"""

# all possible & also some invalid error codes
test_data = range(0, 256) + (-1, 256, 5235, 346264)
test_data = list(range(0, 256)) + [-1, 256, 5235, 346264]

for error_code in test_data:
string = error_code_to_str(error_code)
Expand Down