Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ def shutdown(self):
"""
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

@property
def state(self):
"""
Expand Down
1 change: 1 addition & 0 deletions can/interfaces/pcan/pcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def flash(self, flash):
self.m_objPCANBasic.SetValue(self.m_PcanHandle, PCAN_CHANNEL_IDENTIFYING, bool(flash))

def shutdown(self):
super(PcanBus, self).shutdown()
self.m_objPCANBasic.Uninitialize(self.m_PcanHandle)

@property
Expand Down
17 changes: 15 additions & 2 deletions can/interfaces/virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from random import randint

from can.bus import BusABC
from can import CanError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(self, channel=None, receive_own_messages=False, **config):
self.channel_id = channel
self.channel_info = 'Virtual bus channel %s' % self.channel_id
self.receive_own_messages = receive_own_messages
self._open = True

with channels_lock:

Expand All @@ -60,24 +62,35 @@ def __init__(self, channel=None, receive_own_messages=False, **config):
self.queue = queue.Queue()
self.channel.append(self.queue)

def _check_if_open(self):
"""Raises CanError if the bus is not open.

Has to be called in every method that accesses the bus.
"""
if not self._open:
raise CanError('Operation on closed bus')

def _recv_internal(self, timeout):
self._check_if_open()
try:
msg = self.queue.get(block=True, timeout=timeout)
except queue.Empty:
return None, False
else:
#logger.log(9, 'Received message:\n%s', msg)
return msg, False

def send(self, msg, timeout=None):
self._check_if_open()
msg.timestamp = time.time()
# Add message to all listening on this channel
for bus_queue in self.channel:
if bus_queue is not self.queue or self.receive_own_messages:
bus_queue.put(msg)
#logger.log(9, 'Transmitted message:\n%s', msg)

def shutdown(self):
self._check_if_open()
self._open = False

with channels_lock:
self.channel.remove(self.queue)

Expand Down
98 changes: 98 additions & 0 deletions test/back2back_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,103 @@ def test_basics(self):
notifier.stop()


class Back2BackTestCaseWithContext(unittest.TestCase):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this is needed. The other test should suffice.

"""
Use two interfaces connected to the same CAN bus and test them against
each other.
"""

def _setup_with_context_manager(self, cm):
"""Use a contextmanager to setUp a test case."""
val = cm.__enter__()
self.addCleanup(cm.__exit__, None, None, None) # Same as tearDown call
return val

def setUp(self):
self.bus1 = self._setup_with_context_manager(can.interface.Bus(channel=CHANNEL_1,
bustype=INTERFACE_1,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True))
self.bus2 = self._setup_with_context_manager(can.interface.Bus(channel=CHANNEL_2,
bustype=INTERFACE_2,
bitrate=BITRATE,
fd=TEST_CAN_FD,
single_handle=True))

def _check_received_message(self, recv_msg, sent_msg):
self.assertIsNotNone(recv_msg,
"No message was received on %s" % INTERFACE_2)
self.assertEqual(recv_msg.arbitration_id, sent_msg.arbitration_id)
self.assertEqual(recv_msg.id_type, sent_msg.id_type)
self.assertEqual(recv_msg.is_remote_frame, sent_msg.is_remote_frame)
self.assertEqual(recv_msg.is_error_frame, sent_msg.is_error_frame)
self.assertEqual(recv_msg.is_fd, sent_msg.is_fd)
self.assertEqual(recv_msg.bitrate_switch, sent_msg.bitrate_switch)
self.assertEqual(recv_msg.dlc, sent_msg.dlc)
if not sent_msg.is_remote_frame:
self.assertSequenceEqual(recv_msg.data, sent_msg.data)

def _send_and_receive(self, msg):
# Send with bus 1, receive with bus 2
self.bus1.send(msg)
recv_msg = self.bus2.recv(TIMEOUT)
self._check_received_message(recv_msg, msg)
# Some buses may receive their own messages. Remove it from the queue
self.bus1.recv(0)

# Send with bus 2, receive with bus 1
# Add 1 to arbitration ID to make it a different message
msg.arbitration_id += 1
self.bus2.send(msg)
recv_msg = self.bus1.recv(TIMEOUT)
self._check_received_message(recv_msg, msg)

def test_no_message(self):
self.assertIsNone(self.bus1.recv(0.1))

def test_standard_message(self):
msg = can.Message(extended_id=False,
arbitration_id=0x100,
data=[1, 2, 3, 4, 5, 6, 7, 8])
self._send_and_receive(msg)

def test_extended_message(self):
msg = can.Message(extended_id=True,
arbitration_id=0x123456,
data=[10, 11, 12, 13, 14, 15, 16, 17])
self._send_and_receive(msg)

def test_remote_message(self):
msg = can.Message(extended_id=False,
arbitration_id=0x200,
is_remote_frame=True,
dlc=4)
self._send_and_receive(msg)

def test_dlc_less_than_eight(self):
msg = can.Message(extended_id=False,
arbitration_id=0x300,
data=[4, 5, 6])
self._send_and_receive(msg)

@unittest.skipUnless(TEST_CAN_FD, "Don't test CAN-FD")
def test_fd_message(self):
msg = can.Message(is_fd=True,
extended_id=True,
arbitration_id=0x56789,
data=[0xff] * 64)
self._send_and_receive(msg)

@unittest.skipUnless(TEST_CAN_FD, "Don't test CAN-FD")
def test_fd_message_with_brs(self):
msg = can.Message(is_fd=True,
bitrate_switch=True,
extended_id=True,
arbitration_id=0x98765,
data=[0xff] * 48)
self._send_and_receive(msg)


if __name__ == '__main__':
unittest.main()
36 changes: 36 additions & 0 deletions test/contextmanager_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env python
# coding: utf-8

"""
This module tests the context manager of Bus and Notifier classes
"""

import unittest
import can


class ContextManagerTest(unittest.TestCase):

def setUp(self):
data = [0, 1, 2, 3, 4, 5, 6, 7]
self.msg_send = can.Message(extended_id=False, arbitration_id=0x100, data=data)

def test_open_buses(self):
with can.interface.Bus(bustype='virtual') as bus_send, can.interface.Bus(bustype='virtual') as bus_recv:
bus_send.send(self.msg_send)
msg_recv = bus_recv.recv()

# Receiving a frame with data should evaluate msg_recv to True
self.assertTrue(msg_recv)

def test_use_closed_bus(self):
with can.interface.Bus(bustype='virtual') as bus_send, can.interface.Bus(bustype='virtual') as bus_recv:
bus_send.send(self.msg_send)

# Receiving a frame after bus has been closed should raise a CanException
self.assertRaises(can.CanError, bus_recv.recv)
self.assertRaises(can.CanError, bus_send.send, self.msg_send)


if __name__ == '__main__':
unittest.main()