Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions can/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class CanError(IOError):
pass

from .listener import Listener, BufferedReader, RedirectReader
try:
from .listener import AsyncBufferedReader
except ImportError:
pass

from .io import Logger, Printer, LogReader, MessageSync
from .io import ASCWriter, ASCReader
Expand Down
6 changes: 6 additions & 0 deletions can/interfaces/serial/serial_can.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,9 @@ def _recv_internal(self, timeout):

else:
return None, False

def fileno(self):
if hasattr(self.ser, 'fileno'):
return self.ser.fileno()
# Return an invalid file descriptor on Windows
return -1
30 changes: 21 additions & 9 deletions can/interfaces/slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
import time
import logging

import serial

from can import BusABC, Message

logger = logging.getLogger(__name__)

try:
import serial
except ImportError:
logger.warning("You won't be able to use the slcan can backend without "
"the serial module installed!")
serial = None


class slcanBus(BusABC):
"""
Expand All @@ -43,7 +48,7 @@ class slcanBus(BusABC):

_SLEEP_AFTER_SERIAL_OPEN = 2 # in seconds

def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,
def __init__(self, channel, ttyBaudrate=115200, bitrate=None,
rtscts=False, **kwargs):
"""
:param str channel:
Expand All @@ -55,8 +60,6 @@ def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,
Bitrate in bit/s
:param float poll_interval:
Poll interval in seconds when reading messages
:param float timeout:
timeout in seconds when reading message
:param bool rtscts:
turn hardware handshake (RTS/CTS) on and off
"""
Expand All @@ -68,7 +71,7 @@ def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,
(channel, ttyBaudrate) = channel.split('@')

self.serialPortOrig = serial.serial_for_url(
channel, baudrate=ttyBaudrate, timeout=timeout, rtscts=rtscts)
channel, baudrate=ttyBaudrate, rtscts=rtscts)

time.sleep(self._SLEEP_AFTER_SERIAL_OPEN)

Expand All @@ -81,7 +84,7 @@ def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None,

self.open()

super(slcanBus, self).__init__(channel, ttyBaudrate=115200, timeout=1,
super(slcanBus, self).__init__(channel, ttyBaudrate=115200,
bitrate=None, rtscts=False, **kwargs)

def write(self, string):
Expand All @@ -97,7 +100,7 @@ def close(self):
self.write('C')

def _recv_internal(self, timeout):
if timeout is not None:
if timeout != self.serialPortOrig.timeout:
self.serialPortOrig.timeout = timeout

canId = None
Expand Down Expand Up @@ -145,7 +148,10 @@ def _recv_internal(self, timeout):
else:
return None, False

def send(self, msg, timeout=None):
def send(self, msg, timeout=0):
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout

if msg.is_remote_frame:
if msg.is_extended_id:
sendStr = "R%08X0" % (msg.arbitration_id)
Expand All @@ -163,3 +169,9 @@ def send(self, msg, timeout=None):

def shutdown(self):
self.close()

def fileno(self):
if hasattr(self.serialPortOrig, 'fileno'):
return self.serialPortOrig.fileno()
# Return an invalid file descriptor on Windows
return -1
3 changes: 3 additions & 0 deletions can/interfaces/socketcan/socketcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,9 @@ def _apply_filters(self, filters):
else:
self._is_filtered = True

def fileno(self):
return self.socket.fileno()

@staticmethod
def _detect_available_configs():
return [{'interface': 'socketcan', 'channel': channel}
Expand Down
52 changes: 52 additions & 0 deletions can/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
# Python 2
from Queue import Queue as SimpleQueue, Empty

try:
import asyncio
except ImportError:
asyncio = None


class Listener(object):
"""The basic listener that can be called directly to handle some
Expand Down Expand Up @@ -47,6 +52,12 @@ def on_message_received(self, msg):
def __call__(self, msg):
return self.on_message_received(msg)

def on_error(self, exc):
"""This method is called to handle any exception in the receive thread.

:param Exception exc: The exception causing the thread to stop
"""

def stop(self):
"""
Override to cleanup any open resources.
Expand Down Expand Up @@ -116,3 +127,44 @@ def stop(self):
"""Prohibits any more additions to this reader.
"""
self.is_stopped = True


if asyncio is not None:
class AsyncBufferedReader(Listener):
"""A message buffer for use with :mod:`asyncio`.

See :ref:`asyncio` for how to use with :class:`can.Notifier`.

Can also be used as an asynchronous iterator::

async for msg in reader:
print(msg)
"""

def __init__(self, loop=None):
# set to "infinite" size
self.buffer = asyncio.Queue(loop=loop)

def on_message_received(self, msg):
"""Append a message to the buffer.

Must only be called inside an event loop!
"""
self.buffer.put_nowait(msg)

def get_message(self):
"""
Retrieve the latest message when awaited for::

msg = await reader.get_message()

:rtype: can.Message
:return: The CAN message.
"""
return self.buffer.get()

def __aiter__(self):
return self

def __anext__(self):
return self.buffer.get()
67 changes: 58 additions & 9 deletions can/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,30 @@
import threading
import logging
import time
try:
import asyncio
except ImportError:
asyncio = None

logger = logging.getLogger('can.Notifier')


class Notifier(object):

def __init__(self, bus, listeners, timeout=1.0):
def __init__(self, bus, listeners, timeout=1.0, loop=None):
"""Manages the distribution of **Messages** from a given bus/buses to a
list of listeners.

:param can.BusABC bus: A :ref:`bus` or a list of buses to listen to.
:param list listeners: An iterable of :class:`~can.Listener`
:param float timeout: An optional maximum number of seconds to wait for any message.
:param asyncio.AbstractEventLoop loop:
An :mod:`asyncio` event loop to schedule listeners in.
"""
self.listeners = listeners
self.bus = bus
self.timeout = timeout
self._loop = loop

#: Exception raised in thread
self.exception = None
Expand All @@ -35,11 +42,24 @@ def __init__(self, bus, listeners, timeout=1.0):
self._readers = []
buses = self.bus if isinstance(self.bus, list) else [self.bus]
for bus in buses:
self.add_bus(bus)

def add_bus(self, bus):
"""Add a bus for notification.

:param can.BusABC bus:
CAN bus instance.
"""
if self._loop is not None and hasattr(bus, 'fileno') and bus.fileno() >= 0:
# Use file descriptor to watch for messages
reader = bus.fileno()
self._loop.add_reader(reader, self._on_message_available, bus)
else:
reader = threading.Thread(target=self._rx_thread, args=(bus,),
name='can.notifier for bus "{}"'.format(bus.channel_info))
name='can.notifier for bus "{}"'.format(bus.channel_info))
reader.daemon = True
reader.start()
self._readers.append(reader)
self._readers.append(reader)

def stop(self, timeout=5):
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive
Expand All @@ -52,25 +72,54 @@ def stop(self, timeout=5):
self._running = False
end_time = time.time() + timeout
for reader in self._readers:
now = time.time()
if now < end_time:
reader.join(end_time - now)
if isinstance(reader, threading.Thread):
now = time.time()
if now < end_time:
reader.join(end_time - now)
else:
# reader is a file descriptor
self._loop.remove_reader(reader)
for listener in self.listeners:
listener.stop()
if hasattr(listener, 'stop'):
listener.stop()

def _rx_thread(self, bus):
msg = None
try:
while self._running:
if msg is not None:
with self._lock:
for callback in self.listeners:
callback(msg)
if self._loop is not None:
self._loop.call_soon_threadsafe(
self._on_message_received, msg)
else:
self._on_message_received(msg)
msg = bus.recv(self.timeout)
except Exception as exc:
self.exception = exc
if self._loop is not None:
self._loop.call_soon_threadsafe(self._on_error, exc)
else:
self._on_error(exc)
raise

def _on_message_available(self, bus):
msg = bus.recv(0)
if msg is not None:
self._on_message_received(msg)

def _on_message_received(self, msg):
for callback in self.listeners:
res = callback(msg)
if self._loop is not None and asyncio.iscoroutine(res):
# Schedule coroutine
self._loop.create_task(res)

def _on_error(self, exc):
for listener in self.listeners:
if hasattr(listener, 'on_error'):
listener.on_error(exc)

def add_listener(self, listener):
"""Add new Listener to the notification list.
If it is already present, it will be called two times
Expand Down
1 change: 1 addition & 0 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ A form of CAN interface is also required.
bus
message
listeners
asyncio
bcm


Expand Down
24 changes: 24 additions & 0 deletions doc/asyncio.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
.. _asyncio:

Asyncio support
===============

The :mod:`asyncio` module built into Python 3.4 and later can be used to write
asynchronos code in a single thread. This library supports receiving messages
asynchronosly in an event loop using the :class:`can.Notifier` class.
There will still be one thread per CAN bus but the user application will execute
entirely in the event loop, allowing simpler concurrency without worrying about
threading issues. Interfaces that have a valid file descriptor will however be
supported natively without a thread.

You can also use the :class:`can.AsyncBufferedReader` listener if you prefer
to write coroutine based code instead of using callbacks.


Example
-------

Here is an example using both callback and coroutine based code:

.. literalinclude:: ../examples/asyncio_demo.py
:language: python
3 changes: 3 additions & 0 deletions doc/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ BufferedReader
.. autoclass:: can.BufferedReader
:members:

.. autoclass:: can.AsyncBufferedReader
:members:


Logger
------
Expand Down
44 changes: 44 additions & 0 deletions examples/asyncio_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import asyncio
import can

def print_message(msg):
"""Regular callback function. Can also be a coroutine."""
print(msg)

async def main():
can0 = can.Bus('vcan0', bustype='virtual', receive_own_messages=True)
reader = can.AsyncBufferedReader()
logger = can.Logger('logfile.asc')

listeners = [
print_message, # Callback function
reader, # AsyncBufferedReader() listener
logger # Regular Listener object
]
# Create Notifier with an explicit loop to use for scheduling of callbacks
loop = asyncio.get_event_loop()
notifier = can.Notifier(can0, listeners, loop=loop)
# Start sending first message
can0.send(can.Message(arbitration_id=0))

print('Bouncing 10 messages...')
for _ in range(10):
# Wait for next message from AsyncBufferedReader
msg = await reader.get_message()
# Delay response
await asyncio.sleep(0.5)
msg.arbitration_id += 1
can0.send(msg)
# Wait for last message to arrive
await reader.get_message()
print('Done!')

# Clean-up
notifier.stop()
can0.shutdown()

# Get the default event loop
loop = asyncio.get_event_loop()
# Run until main coroutine finishes
loop.run_until_complete(main())
loop.close()
Loading