diff --git a/sonic_platform_base/sonic_xcvr/api/public/cmis.py b/sonic_platform_base/sonic_xcvr/api/public/cmis.py index 2dd8acb6f..4bd106032 100644 --- a/sonic_platform_base/sonic_xcvr/api/public/cmis.py +++ b/sonic_platform_base/sonic_xcvr/api/public/cmis.py @@ -1115,6 +1115,174 @@ def get_loopback_capability(self): loopback_capability['media_side_output_loopback_supported'] = bool((allowed_loopback_result >> 0) & 0x1) return loopback_capability + def set_host_input_loopback(self, lane_mask, enable): + """Sets the host-side input loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. + """ + loopback_capability = self.get_loopback_capability() + if loopback_capability is None: + logger.info('Failed to get loopback capabilities') + return False + + if loopback_capability['host_side_input_loopback_supported'] is False: + txt = 'Host input loopback is not supported' + logger.error(txt) + return False + + if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff: + txt = f'Per-lane host input loopback is not supported, lane_mask:{lane_mask:02x}' + logger.error(txt) + return False + + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) + media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) + if media_input_val or media_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}' + logger.error(txt) + return False + + host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask) + + return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask) + + def set_host_output_loopback(self, lane_mask, enable): + """Sets the host-side output loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. + """ + loopback_capability = self.get_loopback_capability() + if loopback_capability is None: + logger.info('Failed to get loopback capabilities') + return False + + if loopback_capability['host_side_output_loopback_supported'] is False: + txt = 'Host output loopback is not supported' + logger.error(txt) + return False + + if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff: + txt = f'Per-lane host output loopback is not supported, lane_mask:{lane_mask:02x}' + logger.error(txt) + return False + + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) + media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) + if media_input_val or media_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}' + logger.error(txt) + return False + + host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask) + + return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask) + + def set_media_input_loopback(self, lane_mask, enable): + """Sets the media-side input loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. + """ + loopback_capability = self.get_loopback_capability() + if loopback_capability is None: + logger.info('Failed to get loopback capabilities') + return False + + if loopback_capability['media_side_input_loopback_supported'] is False: + txt = 'Media input loopback is not supported' + logger.error(txt) + return False + + if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff: + txt = f'Per-lane media input loopback is not supported, lane_mask:{lane_mask:02x}' + logger.error(txt) + return False + + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) + host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) + if host_input_val or host_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}' + logger.error(txt) + return False + + media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask) + + return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask) + + def set_media_output_loopback(self, lane_mask, enable): + """Sets the media-side output loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. + """ + loopback_capability = self.get_loopback_capability() + if loopback_capability is None: + logger.info('Failed to get loopback capabilities') + return False + + if loopback_capability['media_side_output_loopback_supported'] is False: + txt = 'Media output loopback is not supported' + logger.error(txt) + return False + + if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff: + txt = f'Per-lane media input loopback is not supported, lane_mask:{lane_mask:02x}' + logger.error(txt) + return False + + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) + host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) + if host_input_val or host_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}' + logger.error(txt) + return False + + media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask) + + return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask) + def set_loopback_mode(self, loopback_mode, lane_mask = 0xff): ''' This function sets the module loopback mode. @@ -1131,76 +1299,42 @@ def set_loopback_mode(self, loopback_mode, lane_mask = 0xff): lane_mask: A bitmask representing which lanes to apply the loopback mode to. The default value of 0xFF indicates that the mode should be applied to all lanes. - The function will look at 13h:128 to check advertized loopback capabilities. - Return True if the provision succeeds, False if it fails + Return True if the operation succeeds, False otherwise. ''' - loopback_capability = self.get_loopback_capability() - if loopback_capability is None: - logger.info('Failed to get loopback capabilities') - return False - - host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) - host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) - media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) - media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) - host_input_support = loopback_capability['host_side_input_loopback_supported'] - host_output_support = loopback_capability['host_side_output_loopback_supported'] - media_input_support = loopback_capability['media_side_input_loopback_supported'] - media_output_support = loopback_capability['media_side_output_loopback_supported'] - - if lane_mask != 0xff: - if any([loopback_capability['per_lane_host_loopback_supported'] is False and 'host' in loopback_mode, - loopback_capability['per_lane_media_loopback_supported'] is False and 'media' in loopback_mode]): - txt = f'Per-lane {loopback_mode} loopback is not supported, lane_mask:{lane_mask:02x}\n' - logger.error(txt) - return False - if 'none' in loopback_mode: if loopback_mode == 'none': - status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0) - status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0) - status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0) - status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0) - return all([status_host_input, status_host_output, status_media_input, status_media_output]) + return all([ + self.set_host_input_loopback(0xff, False), + self.set_host_output_loopback(0xff, False), + self.set_media_input_loopback(0xff, False), + self.set_media_output_loopback(0xff, False) + ]) if loopback_mode == 'host-side-input-none': - return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask) + return self.set_host_input_loopback(lane_mask, False) if loopback_mode == 'host-side-output-none': - return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask) + return self.set_host_output_loopback(lane_mask, False) if loopback_mode == 'media-side-input-none': - return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask) + return self.set_media_input_loopback(lane_mask, False) if loopback_mode == 'media-side-output-none': - return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask) + return self.set_media_output_loopback(lane_mask, False) else: - if loopback_capability['simultaneous_host_media_loopback_supported'] is False: - if any(['host' in loopback_mode and (media_input_val or media_output_val), - 'media' in loopback_mode and (host_input_val or host_output_val)]): - txt = 'Simultaneous host media loopback is not supported\n' - txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}, ' - txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}\n' - logger.error(txt) - return False - - if loopback_mode == 'host-side-input' and host_input_support: - return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask) + if loopback_mode == 'host-side-input': + return self.set_host_input_loopback(lane_mask, True) - if loopback_mode == 'host-side-output' and host_output_support: - return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask) + if loopback_mode == 'host-side-output': + return self.set_host_output_loopback(lane_mask, True) - if loopback_mode == 'media-side-input' and media_input_support: - return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask) + if loopback_mode == 'media-side-input': + return self.set_media_input_loopback(lane_mask, True) - if loopback_mode == 'media-side-output' and media_output_support: - return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask) + if loopback_mode == 'media-side-output': + return self.set_media_output_loopback(lane_mask, True) txt = f'Failed to set {loopback_mode} loopback, lane_mask:{lane_mask:02x}\n' - txt += f'host_input_support:{host_input_support}, host_output_support:{host_output_support}, ' - txt += f'media_input_support:{media_input_support}, media_output_support:{media_output_support}\n' - txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}, ' - txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}\n' logger.error(txt) return False diff --git a/tests/sonic_xcvr/test_cmis.py b/tests/sonic_xcvr/test_cmis.py index eaac98703..95a1778f0 100644 --- a/tests/sonic_xcvr/test_cmis.py +++ b/tests/sonic_xcvr/test_cmis.py @@ -1053,76 +1053,179 @@ def test_get_loopback_capability(self, mock_response, expected): assert result == expected @pytest.mark.parametrize("input_param, mock_response, expected",[ - (['none',0x0f], { + ([0xf, True], None, False), + ([0xf, True], { + 'host_side_input_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, False), + ([0xf, True], { 'host_side_input_loopback_supported': True, - 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': False, + }, False), + ([0xf, True], { + 'host_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': False, 'per_lane_host_loopback_supported': True, - 'per_lane_media_loopback_supported': True - }, True), - (['host-side-input', 0x0f], { + }, False), + ([0xf, True], { 'host_side_input_loopback_supported': True, - 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - 'per_lane_media_loopback_supported': True }, True), - (['host-side-output', 0x0f], { + ([0xf, False], { 'host_side_input_loopback_supported': True, - 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True, - 'simultaneous_host_media_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, True), + ]) + def test_set_host_input_loopback(self, input_param, mock_response, expected): + self.api.get_loopback_capability = MagicMock() + self.api.get_loopback_capability.return_value = mock_response + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_host_input_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + ([0xf, True], None, False), + ([0xf, True], { + 'host_side_output_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - 'per_lane_media_loopback_supported': True }, False), - (['host-side-output', 0x0f], { - 'host_side_input_loopback_supported': True, + ([0xf, True], { 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': False, - 'per_lane_media_loopback_supported': True }, False), - (['media-side-input', 0x0f], { - 'host_side_input_loopback_supported': True, + ([0xf, True], { + 'host_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_host_loopback_supported': True, + }, False), + ([0xf, True], { 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - 'per_lane_media_loopback_supported': True }, True), - (['media-side-output', 0x0f], { - 'host_side_input_loopback_supported': True, + ([0xf, False], { 'host_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, True), + ]) + def test_set_host_output_loopback(self, input_param, mock_response, expected): + self.api.get_loopback_capability = MagicMock() + self.api.get_loopback_capability.return_value = mock_response + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_host_output_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + ([0xf, True], None, False), + ([0xf, True], { + 'media_side_input_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, False), + ([0xf, True], { + 'media_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': False, + }, False), + ([0xf, True], { 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': False, - 'per_lane_host_loopback_supported': False, - 'per_lane_media_loopback_supported': False + 'per_lane_media_loopback_supported': True, }, False), - (['media-side-output', 0x0f], { - 'host_side_input_loopback_supported': False, - 'host_side_output_loopback_supported': False, - 'media_side_input_loopback_supported': False, + ([0xf, True], { + 'media_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), + ([0xf, False], { + 'media_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), + ]) + def test_set_media_input_loopback(self, input_param, mock_response, expected): + self.api.get_loopback_capability = MagicMock() + self.api.get_loopback_capability.return_value = mock_response + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_media_input_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + ([0xf, True], None, False), + ([0xf, True], { 'media_side_output_loopback_supported': False, 'simultaneous_host_media_loopback_supported': True, - 'per_lane_host_loopback_supported': True, - 'per_lane_media_loopback_supported': True + 'per_lane_media_loopback_supported': True, }, False), - (['none', 0x0F], None, False) + ([0xf, True], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': False, + }, False), + ([0xf, True], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_media_loopback_supported': True, + }, False), + ([0xf, True], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), + ([0xf, False], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), ]) - def test_set_loopback_mode(self, input_param, mock_response, expected): + def test_set_media_output_loopback(self, input_param, mock_response, expected): self.api.get_loopback_capability = MagicMock() self.api.get_loopback_capability.return_value = mock_response self.api.xcvr_eeprom.read = MagicMock() - self.api.xcvr_eeprom.read.side_effect = [0xf0,0,0xf0,0] + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_media_output_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + (['none', 0], True, True), + (['host-side-input', 0x0F], True, True), + (['host-side-output', 0x0F], True, True), + (['media-side-input', 0x0F], True, True), + (['media-side-output', 0x0F], True, True), + (['host-side-input-none', 0xF0], True, True), + (['host-side-output-none', 0xF0], True, True), + (['media-side-input-none', 0xF0], True, True), + (['media-side-output-none', 0xF0], True, True), + (['', 0xF0], True, False), + + ]) + def test_set_loopback_mode(self, input_param, mock_response, expected): + self.api.set_host_input_loopback = MagicMock() + self.api.set_host_input_loopback.return_value = mock_response + self.api.set_host_output_loopback = MagicMock() + self.api.set_host_output_loopback.return_value = mock_response + self.api.set_media_input_loopback = MagicMock() + self.api.set_media_input_loopback.return_value = mock_response + self.api.set_media_output_loopback = MagicMock() + self.api.set_media_output_loopback.return_value = mock_response result = self.api.set_loopback_mode(input_param[0], input_param[1]) assert result == expected