From 9141305c636c93aa9b0d79549395ba4e9df66595 Mon Sep 17 00:00:00 2001 From: longhuan-cisco <84595962+longhuan-cisco@users.noreply.github.com> Date: Fri, 23 Feb 2024 06:42:10 -0800 Subject: [PATCH] Enhance port_update_event logic to be thread-safe (#430) * Enhance port_mapping to be multi-thread safe * Increase cov for handle_port_update_event test in test_xcvrd.py * Fix comment in test_xcvrd.py * Migrate to PortChangeObserver class to handle port update events * Rename port_mapping.py to port_event_helper.py * Remove unused import statement in port_event_helper.py * Resolve issue after update branch to latest master --- sonic-xcvrd/tests/test_xcvrd.py | 187 +++++++++++-- sonic-xcvrd/xcvrd/xcvrd.py | 38 +-- .../{port_mapping.py => port_event_helper.py} | 256 ++++++++++-------- 3 files changed, 329 insertions(+), 152 deletions(-) rename sonic-xcvrd/xcvrd/xcvrd_utilities/{port_mapping.py => port_event_helper.py} (52%) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 85fdccd0d..1f1479872 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -1,5 +1,5 @@ #from unittest.mock import DEFAULT -from xcvrd.xcvrd_utilities.port_mapping import * +from xcvrd.xcvrd_utilities.port_event_helper import * from xcvrd.xcvrd_utilities.sfp_status_helper import * from xcvrd.xcvrd_utilities.media_settings_parser import * from xcvrd.xcvrd_utilities.optics_si_parser import * @@ -157,7 +157,7 @@ def test_CmisManagerTask_task_run_with_exception(self): assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) assert("wait_for_port_config_done" in str(trace)) - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) def test_DomInfoUpdateTask_task_run_with_exception(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -178,7 +178,7 @@ def test_DomInfoUpdateTask_task_run_with_exception(self): assert("subscribe_port_config_change" in str(trace)) @patch('xcvrd.xcvrd.SfpStateUpdateTask.init', MagicMock()) - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) def test_SfpStateUpdateTask_task_run_with_exception(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -240,7 +240,7 @@ def test_is_cmis_api(self, mock_class, expected_return_value): assert is_cmis_api(mock_xcvr_api) == expected_return_value @patch('xcvrd.xcvrd._wrapper_get_sfp_type') - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_info', MagicMock(return_value={'temperature': '22.75', 'voltage': '0.5', @@ -277,7 +277,7 @@ def test_post_port_dom_info_to_db(self, mock_get_sfp_type): mock_get_sfp_type.return_value = 'QSFP_DD' post_port_dom_info_to_db(logical_port_name, port_mapping, dom_tbl, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_firmware_info', MagicMock(return_value={'active_firmware': '2.1.1', 'inactive_firmware': '1.2.4'})) @@ -299,7 +299,7 @@ def test_post_port_dom_threshold_info_to_db(self, mock_get_sfp_type): mock_get_sfp_type.return_value = 'QSFP_DD' post_port_dom_info_to_db(logical_port_name, port_mapping, dom_threshold_tbl, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_pm', MagicMock(return_value={'prefec_ber_avg': '0.0003407240007014899', 'prefec_ber_min': '0.0006814479342250317', @@ -316,7 +316,7 @@ def test_post_port_pm_info_to_db(self): post_port_pm_info_to_db(logical_port_name, port_mapping, pm_tbl, stop_event) assert pm_tbl.get_size_for_key(logical_port_name) == 6 - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) def test_del_port_sfp_dom_info_from_db(self): logical_port_name = "Ethernet0" @@ -364,7 +364,7 @@ def test_delete_port_from_status_table_sw(self): delete_port_from_status_table_sw(logical_port_name, status_tbl) assert status_tbl.get_size_for_key(logical_port_name) == 1 - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_threshold_info', MagicMock(return_value={'temphighalarm': '22.75', 'temphighwarning': '0.5', @@ -393,7 +393,7 @@ def test_post_port_dom_threshold_info_to_db(self): dom_threshold_tbl = Table("STATE_DB", TRANSCEIVER_DOM_THRESHOLD_TABLE) post_port_dom_threshold_info_to_db(logical_port_name, port_mapping, dom_threshold_tbl, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_get_transceiver_info', MagicMock(return_value={'type': '22.75', @@ -445,7 +445,7 @@ def test_post_port_sfp_info_to_db(self): transceiver_dict = {} post_port_sfp_info_to_db(logical_port_name, port_mapping, dom_tbl, transceiver_dict, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @@ -525,7 +525,7 @@ def test_post_port_sfp_info_and_dom_thr_to_db_once(self): task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event, sfp_error_event) task._post_port_sfp_info_and_dom_thr_to_db_once(port_mapping, xcvr_table_helper, stop_event) - @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) @@ -802,19 +802,154 @@ def test_is_error_sfp_status(self): @patch('swsscommon.swsscommon.SubscriberStateTable') @patch('swsscommon.swsscommon.Select.select') def test_handle_port_update_event(self, mock_select, mock_sub_table): + class DummyPortChangeEventHandler: + def __init__(self): + self.port_event_cache = [] + + def handle_port_change_event(self, port_event): + self.port_event_cache.append(port_event) + + CONFIG_DB = 'CONFIG_DB' + PORT_TABLE = swsscommon.CFG_PORT_TABLE_NAME + port_change_event_handler = DummyPortChangeEventHandler() + expected_processed_event_count = 0 + mock_selectable = MagicMock() - mock_selectable.pop = MagicMock( - side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)]) + side_effect_list = [ + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '40000'), ('fec', 'rs'))), + (None, None, None) + ] + mock_selectable.pop = MagicMock(side_effect=side_effect_list) mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) mock_sub_table.return_value = mock_selectable logger = MagicMock() - - sel, asic_context = subscribe_port_update_event(DEFAULT_NAMESPACE, logger) - port_mapping = PortMapping() stop_event = threading.Event() stop_event.is_set = MagicMock(return_value=False) - handle_port_update_event(sel, asic_context, stop_event, - logger, port_mapping.handle_port_change_event) + + observer = PortChangeObserver(DEFAULT_NAMESPACE, logger, stop_event, + port_change_event_handler.handle_port_change_event, + [{CONFIG_DB: PORT_TABLE}]) + + # Test basic single update event without filtering: + assert observer.handle_port_update_event() + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + # 'fec' should not be filtered out + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.SET_COMMAND, + 'asic_id': 0, + 'speed': '40000', + 'fec': 'rs' + } + } + assert observer.port_event_cache == expected_cache + + observer = PortChangeObserver(DEFAULT_NAMESPACE, logger, stop_event, + port_change_event_handler.handle_port_change_event, + [{CONFIG_DB: PORT_TABLE, 'FILTER': ['speed']}]) + mock_selectable.pop.side_effect = iter(side_effect_list) + + # Test basic single update event with filtering: + assert not observer.port_event_cache + assert observer.handle_port_update_event() + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + # 'fec' should be filtered out + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.SET_COMMAND, + 'asic_id': 0, + 'speed': '40000', + } + } + assert observer.port_event_cache == expected_cache + assert port_change_event_handler.port_event_cache[-1].port_name == 'Ethernet0' + assert port_change_event_handler.port_event_cache[-1].event_type == PortChangeEvent.PORT_SET + assert port_change_event_handler.port_event_cache[-1].port_index == 1 + assert port_change_event_handler.port_event_cache[-1].asic_id == 0 + assert port_change_event_handler.port_event_cache[-1].db_name == CONFIG_DB + assert port_change_event_handler.port_event_cache[-1].table_name == PORT_TABLE + assert port_change_event_handler.port_event_cache[-1].port_dict == \ + expected_cache[('Ethernet0', CONFIG_DB, PORT_TABLE)] + + # Test duplicate update event on the same key: + mock_selectable.pop.side_effect = iter(side_effect_list) + # return False when no new event is processed + assert not observer.handle_port_update_event() + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + assert observer.port_event_cache == expected_cache + + # Test soaking multiple different update events on the same key: + side_effect_list = [ + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '100000'))), + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '200000'))), + ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), ('speed', '400000'))), + (None, None, None) + ] + mock_selectable.pop.side_effect = iter(side_effect_list) + assert observer.handle_port_update_event() + # only the last event should be processed + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.SET_COMMAND, + 'asic_id': 0, + 'speed': '400000', + } + } + assert observer.port_event_cache == expected_cache + + # Test select timeout case: + mock_select.return_value = (swsscommon.Select.TIMEOUT, None) + assert not observer.handle_port_update_event() + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + mock_select.return_value = (swsscommon.Select.OBJECT, None) + + # Test update event for DEL case: + side_effect_list = [ + ('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), ('speed', '400000'))), + (None, None, None) + ] + mock_selectable.pop.side_effect = iter(side_effect_list) + assert observer.handle_port_update_event() + expected_processed_event_count +=1 + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.DEL_COMMAND, + 'asic_id': 0, + 'speed': '400000', + } + } + assert observer.port_event_cache == expected_cache + + # Test update event if it's a subset of cached event: + side_effect_list = [ + ('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )), + (None, None, None) + ] + mock_selectable.pop.side_effect = iter(side_effect_list) + assert not observer.handle_port_update_event() + assert len(port_change_event_handler.port_event_cache) == expected_processed_event_count + expected_cache = { + ('Ethernet0', CONFIG_DB, PORT_TABLE): { + 'port_name': 'Ethernet0', + 'index': '1', + 'op': swsscommon.DEL_COMMAND, + 'asic_id': 0, + } + } + assert observer.port_event_cache == expected_cache @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) @patch('swsscommon.swsscommon.SubscriberStateTable') @@ -952,8 +1087,7 @@ def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper assert task.get_configured_tx_power_from_db('Ethernet0') == -10 @patch('xcvrd.xcvrd.platform_chassis') - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) + @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) def test_CmisManagerTask_task_run_stop(self, mock_chassis): mock_object = MagicMock() mock_object.get_presence = MagicMock(return_value=True) @@ -1180,8 +1314,7 @@ def test_CmisManagerTask_post_port_active_apsel_to_db(self): @patch('xcvrd.xcvrd.platform_chassis') - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) + @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) @patch('xcvrd.xcvrd.is_cmis_api', MagicMock(return_value=True)) @@ -1351,8 +1484,8 @@ def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw assert not task.port_mapping.logical_to_asic assert mock_del_status_tbl_hw.call_count == 1 - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_event_helper.handle_port_config_change', MagicMock()) def test_DomInfoUpdateTask_task_run_stop(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -1445,7 +1578,7 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw assert not task.port_mapping.logical_to_asic assert mock_update_status_hw.call_count == 1 - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None))) def test_SfpStateUpdateTask_task_run_stop(self): port_mapping = PortMapping() stop_event = threading.Event() @@ -1514,8 +1647,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): @patch('time.sleep', MagicMock()) @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock()) - @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) - @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) + @patch('xcvrd.xcvrd_utilities.port_event_helper.subscribe_port_config_change', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_event_helper.handle_port_config_change', MagicMock()) @patch('xcvrd.xcvrd.SfpStateUpdateTask.init', MagicMock()) @patch('os.kill') @patch('xcvrd.xcvrd.SfpStateUpdateTask._mapping_event_from_change_event') diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 38d884ff4..7e45f05ee 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -26,7 +26,8 @@ from swsscommon import swsscommon from .xcvrd_utilities import sfp_status_helper - from .xcvrd_utilities import port_mapping + from .xcvrd_utilities import port_event_helper + from .xcvrd_utilities.port_event_helper import PortChangeObserver from .xcvrd_utilities import media_settings_parser from .xcvrd_utilities import optics_si_parser @@ -1206,7 +1207,7 @@ def wait_for_port_config_done(self, namespace): # Make sure this daemon started after all port configured while not self.task_stopping_event.is_set(): - (state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) + (state, c) = sel.select(port_event_helper.SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: continue if state != swsscommon.Select.OBJECT: @@ -1225,15 +1226,14 @@ def task_worker(self): self.wait_for_port_config_done(namespace) # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal - sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, helper_logger) - while not self.task_stopping_event.is_set(): - # Handle port change event from main thread - port_mapping.handle_port_update_event(sel, - asic_context, + port_change_observer = PortChangeObserver(self.namespaces, helper_logger, self.task_stopping_event, - helper_logger, self.on_port_update_event) + while not self.task_stopping_event.is_set(): + # Handle port change event from main thread + port_change_observer.handle_port_update_event() + for lport, info in self.port_dict.items(): if self.task_stopping_event.is_set(): break @@ -1587,7 +1587,7 @@ def task_worker(self): dom_th_info_cache = {} transceiver_status_cache = {} pm_info_cache = {} - sel, asic_context = port_mapping.subscribe_port_config_change(self.namespaces) + sel, asic_context = port_event_helper.subscribe_port_config_change(self.namespaces) # Start loop to update dom info in DB periodically while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): @@ -1598,7 +1598,7 @@ def task_worker(self): pm_info_cache.clear() # Handle port change event from main thread - port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) + port_event_helper.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) logical_port_list = self.port_mapping.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs @@ -1655,7 +1655,7 @@ def join(self): raise self.exc def on_port_config_change(self, port_change_event): - if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + if port_change_event.event_type == port_event_helper.PortChangeEvent.PORT_REMOVE: self.on_remove_logical_port(port_change_event) self.port_mapping.handle_port_change_event(port_change_event) @@ -1792,7 +1792,7 @@ def _init_port_sfp_status_tbl(self, port_mapping, xcvr_table_helper, stop_event= update_port_transceiver_status_table_sw(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_INSERTED) def init(self): - port_mapping_data = port_mapping.get_port_mapping(self.namespaces) + port_mapping_data = port_event_helper.get_port_mapping(self.namespaces) # Post all the current interface sfp/dom threshold info to STATE_DB self.retry_eeprom_set = self._post_port_sfp_info_and_dom_thr_to_db_once(port_mapping_data, self.xcvr_table_helper, self.main_thread_stop_event) @@ -1879,9 +1879,9 @@ def task_worker(self, stopping_event, sfp_error_event): state = STATE_INIT self.init() - sel, asic_context = port_mapping.subscribe_port_config_change(self.namespaces) + sel, asic_context = port_event_helper.subscribe_port_config_change(self.namespaces) while not stopping_event.is_set(): - port_mapping.handle_port_config_change(sel, asic_context, stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) + port_event_helper.handle_port_config_change(sel, asic_context, stopping_event, self.port_mapping, helper_logger, self.on_port_config_change) # Retry those logical ports whose EEPROM reading failed or timeout when the SFP is inserted self.retry_eeprom_reading() @@ -2096,10 +2096,10 @@ def join(self): raise self.exc def on_port_config_change(self , port_change_event): - if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: + if port_change_event.event_type == port_event_helper.PortChangeEvent.PORT_REMOVE: self.on_remove_logical_port(port_change_event) self.port_mapping.handle_port_change_event(port_change_event) - elif port_change_event.event_type == port_mapping.PortChangeEvent.PORT_ADD: + elif port_change_event.event_type == port_event_helper.PortChangeEvent.PORT_ADD: self.port_mapping.handle_port_change_event(port_change_event) self.on_add_logical_port(port_change_event) @@ -2255,7 +2255,7 @@ def wait_for_port_config_done(self, namespace): # Make sure this daemon started after all port configured while not self.stop_event.is_set(): - (state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) + (state, c) = sel.select(port_event_helper.SELECT_TIMEOUT_MSECS) if state == swsscommon.Select.TIMEOUT: continue if state != swsscommon.Select.OBJECT: @@ -2319,14 +2319,14 @@ def init(self): self.wait_for_port_config_done(namespace) self.log_notice("XCVRD INIT: After port config is done") - return port_mapping.get_port_mapping(self.namespaces) + return port_event_helper.get_port_mapping(self.namespaces) # Deinitialize daemon def deinit(self): self.log_info("Start daemon deinit...") # Delete all the information from DB and then exit - port_mapping_data = port_mapping.get_port_mapping(self.namespaces) + port_mapping_data = port_event_helper.get_port_mapping(self.namespaces) logical_port_list = port_mapping_data.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py similarity index 52% rename from sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py rename to sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py index 788dc8134..3c0dbffa3 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py @@ -4,6 +4,11 @@ from swsscommon import swsscommon SELECT_TIMEOUT_MSECS = 1000 +DEFAULT_PORT_TBL_MAP = [ + {'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME}, + {'STATE_DB': 'TRANSCEIVER_INFO'}, + {'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']}, +] class PortChangeEvent: @@ -11,9 +16,9 @@ class PortChangeEvent: PORT_REMOVE = 1 PORT_SET = 2 PORT_DEL = 3 - PORT_EVENT = {} - def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): + def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None, + db_name=None, table_name=None): # Logical port name, e.g. Ethernet0 self.port_name = port_name # Physical port index, equals to "index" field of PORT table in CONFIG_DB @@ -24,6 +29,8 @@ def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): self.event_type = event_type # Port config dict self.port_dict = port_dict + self.db_name = db_name + self.table_name = table_name def __str__(self): return '{} - name={} index={} asic_id={}'.format('Add' if self.event_type == self.PORT_ADD else 'Remove', @@ -31,6 +38,147 @@ def __str__(self): self.port_index, self.asic_id) +class PortChangeObserver: + """ + PortChangeObserver is a class to monitor port change events in DBs, and + notify callback function + """ + + def __init__(self, namespaces, logger, + stop_event, + port_change_event_handler, + port_tbl_map=DEFAULT_PORT_TBL_MAP): + """ + Args: + namespaces (list): List of namespaces to monitor + logger (Logger): Logger object + stop_event (threading.Event): Stop event to stop the observer + port_change_event_handler (function): Callback function to handle port change event + port_tbl_map (list): List of dictionaries, each dictionary contains + the DB name and table name to monitor + """ + # To avoid duplicate event processing, this dict stores the latest port + # change event for each key which is a tuple of + # (port_name, port_tbl.db_name, port_tbl.table_name) + self.port_event_cache = {} + self.namespaces = namespaces + self.logger = logger + self.stop_event = stop_event + self.port_change_event_handler = port_change_event_handler + self.port_tbl_map = port_tbl_map + self.subscribe_port_update_event() + + def apply_filter_to_fvp(self, filter, fvp): + if filter is not None: + for key in fvp.copy().keys(): + if key not in (set(filter) | set({'index', 'port_name', 'asic_id', 'op'})): + del fvp[key] + + def subscribe_port_update_event(self): + """ + Subscribe to a particular DB's table and listen to only interested fields + Format : + { : , , , .. } where only field update will be received + """ + sel = swsscommon.Select() + asic_context = {} + for d in self.port_tbl_map: + for namespace in self.namespaces: + db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0]) + port_tbl.db_name = list(d.keys())[0] + port_tbl.table_name = list(d.values())[0] + port_tbl.filter = d['FILTER'] if 'FILTER' in d else None + asic_context[port_tbl] = asic_id + sel.addSelectable(port_tbl) + self.logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format( + port_tbl, list(d.values())[0], namespace)) + self.sel, self.asic_context = sel, asic_context + + def handle_port_update_event(self): + """ + Select PORT update events, notify the observers upon a port update in CONFIG_DB + or a XCVR insertion/removal in STATE_DB + + Returns: + bool: True if there's at least one update event; False if there's no update event. + """ + has_event = False + if not self.stop_event.is_set(): + (state, _) = self.sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + return has_event + if state != swsscommon.Select.OBJECT: + self.logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') + return has_event + + port_event_cache = {} + for port_tbl in self.asic_context.keys(): + while True: + (port_name, op, fvp) = port_tbl.pop() + if not port_name: + break + if not validate_port(port_name): + continue + fvp = dict(fvp) if fvp is not None else {} + self.logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format( + port_name, op, port_tbl.db_name, port_tbl.table_name, fvp)) + if 'index' not in fvp: + fvp['index'] = '-1' + fvp['port_name'] = port_name + fvp['asic_id'] = self.asic_context[port_tbl] + fvp['op'] = op + fvp['FILTER'] = port_tbl.filter + # Soak duplicate events and consider only the last event + port_event_cache[(port_name, port_tbl.db_name, port_tbl.table_name)] = fvp + + # Now apply filter over soaked events + for key, fvp in port_event_cache.items(): + db_name = key[1] + table_name = key[2] + port_index = int(fvp['index']) + port_change_event = None + filter = fvp['FILTER'] + del fvp['FILTER'] + self.apply_filter_to_fvp(filter, fvp) + + if key in self.port_event_cache: + # Compare current event with last event on this key, to see if + # there's really a need to update. + diff = set(fvp.items()) - set(self.port_event_cache[key].items()) + # Ignore duplicate events + if not diff: + self.port_event_cache[key] = fvp + continue + # Update the latest event to the cache + self.port_event_cache[key] = fvp + + if fvp['op'] == swsscommon.SET_COMMAND: + port_change_event = PortChangeEvent(fvp['port_name'], + port_index, + fvp['asic_id'], + PortChangeEvent.PORT_SET, + fvp, + db_name, + table_name) + elif fvp['op'] == swsscommon.DEL_COMMAND: + port_change_event = PortChangeEvent(fvp['port_name'], + port_index, + fvp['asic_id'], + PortChangeEvent.PORT_DEL, + fvp, + db_name, + table_name) + # This is the final event considered for processing + self.logger.log_warning("*** {} handle_port_update_event() fvp {}".format( + key, fvp)) + if port_change_event is not None: + has_event = True + self.port_change_event_handler(port_change_event) + + return has_event + class PortMapping: def __init__(self): @@ -107,110 +255,6 @@ def subscribe_port_config_change(namespaces): sel.addSelectable(port_tbl) return sel, asic_context -def subscribe_port_update_event(namespaces, logger): - """ - Subscribe to a particular DB's table and listen to only interested fields - Format : - { :
, , , .. } where only field update will be received - """ - port_tbl_map = [ - {'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME}, - {'STATE_DB': 'TRANSCEIVER_INFO'}, - {'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']}, - ] - - sel = swsscommon.Select() - asic_context = {} - for d in port_tbl_map: - for namespace in namespaces: - db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace) - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0]) - port_tbl.db_name = list(d.keys())[0] - port_tbl.table_name = list(d.values())[0] - port_tbl.filter = d['FILTER'] if 'FILTER' in d else None - asic_context[port_tbl] = asic_id - sel.addSelectable(port_tbl) - logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format( - port_tbl, list(d.values())[0], namespace)) - return sel, asic_context - -def apply_filter_to_fvp(filter, fvp): - if filter is not None: - for key in fvp.copy().keys(): - if key not in (set(filter) | set({'index', 'key', 'asic_id', 'op'})): - del fvp[key] - -def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler): - """ - Select PORT update events, notify the observers upon a port update in CONFIG_DB - or a XCVR insertion/removal in STATE_DB - """ - if not stop_event.is_set(): - (state, _) = sel.select(SELECT_TIMEOUT_MSECS) - if state == swsscommon.Select.TIMEOUT: - return - if state != swsscommon.Select.OBJECT: - logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') - return - - port_event_cache = {} - for port_tbl in asic_context.keys(): - while True: - (key, op, fvp) = port_tbl.pop() - if not key: - break - if not validate_port(key): - continue - fvp = dict(fvp) if fvp is not None else {} - logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format( - key, op, port_tbl.db_name, port_tbl.table_name, fvp)) - - if 'index' not in fvp: - fvp['index'] = '-1' - fvp['key'] = key - fvp['asic_id'] = asic_context[port_tbl] - fvp['op'] = op - fvp['FILTER'] = port_tbl.filter - # Soak duplicate events and consider only the last event - port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp - - # Now apply filter over soaked events - for key, fvp in port_event_cache.items(): - port_index = int(fvp['index']) - port_change_event = None - diff = {} - filter = fvp['FILTER'] - del fvp['FILTER'] - apply_filter_to_fvp(filter, fvp) - - if key in PortChangeEvent.PORT_EVENT: - diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items())) - # Ignore duplicate events - if not diff: - PortChangeEvent.PORT_EVENT[key] = fvp - continue - PortChangeEvent.PORT_EVENT[key] = fvp - - if fvp['op'] == swsscommon.SET_COMMAND: - port_change_event = PortChangeEvent(fvp['key'], - port_index, - fvp['asic_id'], - PortChangeEvent.PORT_SET, - fvp) - elif fvp['op'] == swsscommon.DEL_COMMAND: - port_change_event = PortChangeEvent(fvp['key'], - port_index, - fvp['asic_id'], - PortChangeEvent.PORT_DEL, - fvp) - # This is the final event considered for processing - logger.log_warning("*** {} handle_port_update_event() fvp {}".format( - key, fvp)) - if port_change_event is not None: - port_change_event_handler(port_change_event) - - def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers """