Skip to content
41 changes: 32 additions & 9 deletions can/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# coding: utf-8

"""
This module contains the implementation of `can.Notifier`.
This module contains the implementation of :class:`~can.Notifier`.
"""

import threading
Expand All @@ -18,7 +18,7 @@ def __init__(self, bus, listeners, timeout=None):
list of listeners.

:param bus: The :ref:`bus` to listen too.
:param listeners: An iterable of :class:`~can.Listeners`
:param listeners: An iterable of :class:`~can.Listener`s
:param timeout: An optional maximum number of seconds to wait for any message.
"""
self.listeners = listeners
Expand All @@ -28,23 +28,25 @@ def __init__(self, bus, listeners, timeout=None):
# exception raised in thread
self.exception = None

self.running = threading.Event()
self.running.set()
self._running = threading.Event()
self._running.set()

self._reader = threading.Thread(target=self.rx_thread, name="can.notifier")
self._reader = threading.Thread(target=self._rx_thread,
name='can.notifier for bus "{}"'.format(self.bus.channel_info))
self._reader.daemon = True
self._reader.start()

def stop(self):
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive
and call :meth:`~can.Listener.stop` on each Listener."""
self.running.clear()
and call :meth:`~can.Listener.stop` on each Listener.
"""
self._running.clear()
if self.timeout is not None:
self._reader.join(self.timeout + 0.1)

def rx_thread(self):
def _rx_thread(self):
try:
while self.running.is_set():
while self._running.is_set():
msg = self.bus.recv(self.timeout)
if msg is not None:
for callback in self.listeners:
Expand All @@ -55,3 +57,24 @@ def rx_thread(self):
finally:
for listener in self.listeners:
listener.stop()

def add_listener(self, listener):
"""Add new Listener to the notification list.
If it is already present, it will be called two times
each time a message arrives.

:param listener: a :class:`~can.Listener` object to be added to
the list to be notified
"""
self.listeners.append(listener)

def remove_listener(self, listener):
"""Remove a listener from the notification list. This method
trows an exception if the given listener is not part of the
stored listeners.

:param listener: a :class:`~can.Listener` object to be removed from
the list to be notified
:raises ValueError: if `listener` was never added to this notifier
"""
self.listeners.remove(listener)
3 changes: 3 additions & 0 deletions test/back2back_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

import can

from .data.example_data import generate_message

from .config import *
from .data.example_data import generate_message


BITRATE = 500000
TIMEOUT = 0.1

Expand Down
16 changes: 16 additions & 0 deletions test/listener_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ def testBasicListenerCanBeAddedToNotifier(self):
notifier = can.Notifier(self.bus, [a_listener], 0.1)
notifier.stop()
self.assertIn(a_listener, notifier.listeners)

def testAddListenerToNotifier(self):
a_listener = can.Listener()
notifier = can.Notifier(self.bus, [], 0.1)
notifier.stop()
self.assertNotIn(a_listener, notifier.listeners)
notifier.add_listener(a_listener)
self.assertIn(a_listener, notifier.listeners)

def testRemoveListenerFromNotifier(self):
a_listener = can.Listener()
notifier = can.Notifier(self.bus, [a_listener], 0.1)
notifier.stop()
self.assertIn(a_listener, notifier.listeners)
notifier.remove_listener(a_listener)
self.assertNotIn(a_listener, notifier.listeners)

def testPlayerTypeResolution(self):
def test_filetype_to_instance(extension, klass):
Expand Down
2 changes: 1 addition & 1 deletion test/logformats_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class TestSqliteDatabaseFormat(unittest.TestCase):

def test_writer_and_reader(self):
_test_writer_and_reader(self, can.SqliteWriter, can.SqliteReader,
sleep_time=can.SqliteWriter.MAX_TIME_BETWEEN_WRITES,
sleep_time=can.SqliteWriter.MAX_TIME_BETWEEN_WRITES + 0.5,
check_comments=False)

def testSQLWriterWritesToSameFile(self):
Expand Down