Skip to content

Commit

Permalink
Enhance port_update_event logic to be thread-safe (sonic-net#430)
Browse files Browse the repository at this point in the history
* Enhance port_mapping to be multi-thread safe

* Increase cov for handle_port_update_event test in test_xcvrd.py

* Fix comment in test_xcvrd.py

* Migrate to PortChangeObserver class to handle port update events

* Rename port_mapping.py to port_event_helper.py

* Remove unused import statement in port_event_helper.py

* Resolve issue after update branch to latest master
  • Loading branch information
longhuan-cisco committed Feb 23, 2024
1 parent f1511c2 commit 9141305
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 152 deletions.
187 changes: 160 additions & 27 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#from unittest.mock import DEFAULT
from xcvrd.xcvrd_utilities.port_mapping import *
from xcvrd.xcvrd_utilities.port_event_helper import *
from xcvrd.xcvrd_utilities.sfp_status_helper import *
from xcvrd.xcvrd_utilities.media_settings_parser import *
from xcvrd.xcvrd_utilities.optics_si_parser import *
Expand Down Expand Up @@ -157,7 +157,7 @@ def test_CmisManagerTask_task_run_with_exception(self):
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
assert("wait_for_port_config_done" in str(trace))

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
@patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
def test_DomInfoUpdateTask_task_run_with_exception(self):
port_mapping = PortMapping()
stop_event = threading.Event()
Expand All @@ -178,7 +178,7 @@ def test_DomInfoUpdateTask_task_run_with_exception(self):
assert("subscribe_port_config_change" in str(trace))

@patch('xcvrd.xcvrd.SfpStateUpdateTask.init', MagicMock())
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
@patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
def test_SfpStateUpdateTask_task_run_with_exception(self):
port_mapping = PortMapping()
stop_event = threading.Event()
Expand Down Expand Up @@ -240,7 +240,7 @@ def test_is_cmis_api(self, mock_class, expected_return_value):
assert is_cmis_api(mock_xcvr_api) == expected_return_value

@patch('xcvrd.xcvrd._wrapper_get_sfp_type')
@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_info', MagicMock(return_value={'temperature': '22.75',
'voltage': '0.5',
Expand Down Expand Up @@ -277,7 +277,7 @@ def test_post_port_dom_info_to_db(self, mock_get_sfp_type):
mock_get_sfp_type.return_value = 'QSFP_DD'
post_port_dom_info_to_db(logical_port_name, port_mapping, dom_tbl, stop_event)

@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_get_transceiver_firmware_info', MagicMock(return_value={'active_firmware': '2.1.1',
'inactive_firmware': '1.2.4'}))
Expand All @@ -299,7 +299,7 @@ def test_post_port_dom_threshold_info_to_db(self, mock_get_sfp_type):
mock_get_sfp_type.return_value = 'QSFP_DD'
post_port_dom_info_to_db(logical_port_name, port_mapping, dom_threshold_tbl, stop_event)

@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_get_transceiver_pm', MagicMock(return_value={'prefec_ber_avg': '0.0003407240007014899',
'prefec_ber_min': '0.0006814479342250317',
Expand All @@ -316,7 +316,7 @@ def test_post_port_pm_info_to_db(self):
post_port_pm_info_to_db(logical_port_name, port_mapping, pm_tbl, stop_event)
assert pm_tbl.get_size_for_key(logical_port_name) == 6

@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
def test_del_port_sfp_dom_info_from_db(self):
logical_port_name = "Ethernet0"
Expand Down Expand Up @@ -364,7 +364,7 @@ def test_delete_port_from_status_table_sw(self):
delete_port_from_status_table_sw(logical_port_name, status_tbl)
assert status_tbl.get_size_for_key(logical_port_name) == 1

@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_threshold_info', MagicMock(return_value={'temphighalarm': '22.75',
'temphighwarning': '0.5',
Expand Down Expand Up @@ -393,7 +393,7 @@ def test_post_port_dom_threshold_info_to_db(self):
dom_threshold_tbl = Table("STATE_DB", TRANSCEIVER_DOM_THRESHOLD_TABLE)
post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, dom_threshold_tbl, stop_event)

@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_get_transceiver_info', MagicMock(return_value={'type': '22.75',
Expand Down Expand Up @@ -445,7 +445,7 @@ def test_post_port_sfp_info_to_db(self):
transceiver_dict = {}
post_port_sfp_info_to_db(logical_port_name, port_mapping, dom_tbl, transceiver_dict, stop_event)

@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True))
Expand Down Expand Up @@ -525,7 +525,7 @@ def test_post_port_sfp_info_and_dom_thr_to_db_once(self):
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event)
task._post_port_sfp_info_and_dom_thr_to_db_once(port_mapping, xcvr_table_helper, stop_event)

@patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0]))
@patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True))
@patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True))
Expand Down Expand Up @@ -802,19 +802,154 @@ def test_is_error_sfp_status(self):
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_handle_port_update_event(self, mock_select, mock_sub_table):
class DummyPortChangeEventHandler:
def __init__(self):
self.port_event_cache = []

def handle_port_change_event(self, port_event):
self.port_event_cache.append(port_event)

CONFIG_DB = 'CONFIG_DB'
PORT_TABLE = swsscommon.CFG_PORT_TABLE_NAME
port_change_event_handler = DummyPortChangeEventHandler()
expected_processed_event_count = 0

mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(
side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)])
side_effect_list = [
('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '40000'), ('fec', 'rs'))),
(None, None, None)
]
mock_selectable.pop = MagicMock(side_effect=side_effect_list)
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable
logger = MagicMock()

sel, asic_context = subscribe_port_update_event(DEFAULT_NAMESPACE, logger)
port_mapping = PortMapping()
stop_event = threading.Event()
stop_event.is_set = MagicMock(return_value=False)
handle_port_update_event(sel, asic_context, stop_event,
logger, port_mapping.handle_port_change_event)

observer = PortChangeObserver(DEFAULT_NAMESPACE, logger, stop_event,
port_change_event_handler.handle_port_change_event,
[{CONFIG_DB: PORT_TABLE}])

# Test basic single update event without filtering:
assert observer.handle_port_update_event()
expected_processed_event_count +=1
assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count
# 'fec' should not be filtered out
expected_cache = {
('Ethernet0', CONFIG_DB, PORT_TABLE): {
'port_name': 'Ethernet0',
'index': '1',
'op': swsscommon.SET_COMMAND,
'asic_id': 0,
'speed': '40000',
'fec': 'rs'
}
}
assert observer.port_event_cache == expected_cache

observer = PortChangeObserver(DEFAULT_NAMESPACE, logger, stop_event,
port_change_event_handler.handle_port_change_event,
[{CONFIG_DB: PORT_TABLE, 'FILTER': ['speed']}])
mock_selectable.pop.side_effect = iter(side_effect_list)

# Test basic single update event with filtering:
assert not observer.port_event_cache
assert observer.handle_port_update_event()
expected_processed_event_count +=1
assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count
# 'fec' should be filtered out
expected_cache = {
('Ethernet0', CONFIG_DB, PORT_TABLE): {
'port_name': 'Ethernet0',
'index': '1',
'op': swsscommon.SET_COMMAND,
'asic_id': 0,
'speed': '40000',
}
}
assert observer.port_event_cache == expected_cache
assert port_change_event_handler.port_event_cache[-1].port_name == 'Ethernet0'
assert port_change_event_handler.port_event_cache[-1].event_type == PortChangeEvent.PORT_SET
assert port_change_event_handler.port_event_cache[-1].port_index == 1
assert port_change_event_handler.port_event_cache[-1].asic_id == 0
assert port_change_event_handler.port_event_cache[-1].db_name == CONFIG_DB
assert port_change_event_handler.port_event_cache[-1].table_name == PORT_TABLE
assert port_change_event_handler.port_event_cache[-1].port_dict == \
expected_cache[('Ethernet0', CONFIG_DB, PORT_TABLE)]

# Test duplicate update event on the same key:
mock_selectable.pop.side_effect = iter(side_effect_list)
# return False when no new event is processed
assert not observer.handle_port_update_event()
assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count
assert observer.port_event_cache == expected_cache

# Test soaking multiple different update events on the same key:
side_effect_list = [
('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '100000'))),
('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '200000'))),
('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '400000'))),
(None, None, None)
]
mock_selectable.pop.side_effect = iter(side_effect_list)
assert observer.handle_port_update_event()
# only the last event should be processed
expected_processed_event_count +=1
assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count
expected_cache = {
('Ethernet0', CONFIG_DB, PORT_TABLE): {
'port_name': 'Ethernet0',
'index': '1',
'op': swsscommon.SET_COMMAND,
'asic_id': 0,
'speed': '400000',
}
}
assert observer.port_event_cache == expected_cache

# Test select timeout case:
mock_select.return_value = (swsscommon.Select.TIMEOUT, None)
assert not observer.handle_port_update_event()
assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count
mock_select.return_value = (swsscommon.Select.OBJECT, None)

# Test update event for DEL case:
side_effect_list = [
('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), ('speed', '400000'))),
(None, None, None)
]
mock_selectable.pop.side_effect = iter(side_effect_list)
assert observer.handle_port_update_event()
expected_processed_event_count +=1
assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count
expected_cache = {
('Ethernet0', CONFIG_DB, PORT_TABLE): {
'port_name': 'Ethernet0',
'index': '1',
'op': swsscommon.DEL_COMMAND,
'asic_id': 0,
'speed': '400000',
}
}
assert observer.port_event_cache == expected_cache

# Test update event if it's a subset of cached event:
side_effect_list = [
('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )),
(None, None, None)
]
mock_selectable.pop.side_effect = iter(side_effect_list)
assert not observer.handle_port_update_event()
assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count
expected_cache = {
('Ethernet0', CONFIG_DB, PORT_TABLE): {
'port_name': 'Ethernet0',
'index': '1',
'op': swsscommon.DEL_COMMAND,
'asic_id': 0,
}
}
assert observer.port_event_cache == expected_cache

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
Expand Down Expand Up @@ -952,8 +1087,7 @@ def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper
assert task.get_configured_tx_power_from_db('Ethernet0') == -10

@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock())
@patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock()))
def test_CmisManagerTask_task_run_stop(self, mock_chassis):
mock_object = MagicMock()
mock_object.get_presence = MagicMock(return_value=True)
Expand Down Expand Up @@ -1180,8 +1314,7 @@ def test_CmisManagerTask_post_port_active_apsel_to_db(self):


@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock())
@patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock()))
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
@patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock())
@patch('xcvrd.xcvrd.is_cmis_api', MagicMock(return_value=True))
Expand Down Expand Up @@ -1351,8 +1484,8 @@ def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw
assert not task.port_mapping.logical_to_asic
assert mock_del_status_tbl_hw.call_count == 1

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
@patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_event_helper.handle_port_config_change', MagicMock())
def test_DomInfoUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
stop_event = threading.Event()
Expand Down Expand Up @@ -1445,7 +1578,7 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw
assert not task.port_mapping.logical_to_asic
assert mock_update_status_hw.call_count == 1

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None)))
def test_SfpStateUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
stop_event = threading.Event()
Expand Down Expand Up @@ -1514,8 +1647,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self):
@patch('time.sleep', MagicMock())
@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
@patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock())
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
@patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_event_helper.handle_port_config_change', MagicMock())
@patch('xcvrd.xcvrd.SfpStateUpdateTask.init', MagicMock())
@patch('os.kill')
@patch('xcvrd.xcvrd.SfpStateUpdateTask._mapping_event_from_change_event')
Expand Down
Loading

0 comments on commit 9141305

Please sign in to comment.