diff --git a/sonic_platform_base/sonic_xcvr/api/public/cmis.py b/sonic_platform_base/sonic_xcvr/api/public/cmis.py index 551ea8e76..a9f91c210 100644 --- a/sonic_platform_base/sonic_xcvr/api/public/cmis.py +++ b/sonic_platform_base/sonic_xcvr/api/public/cmis.py @@ -1119,11 +1119,15 @@ def set_loopback_mode(self, loopback_mode, lane_mask = 0xff): ''' This function sets the module loopback mode. loopback_mode: Loopback mode has to be one of the five: - 1. "none" (default) - 2. "host-side-input" - 3. "host-side-output" - 4. "media-side-input" - 5. "media-side-output" + 1. "none" + 2. "host-side-input-none" + 3. "host-side-output-none", + 4. "media-side-input-none" + 5. "media-side-output-none" + 6. "host-side-input" + 7. "host-side-output" + 8. "media-side-input" + 9. "media-side-output" lane_mask: A bitmask representing which lanes to apply the loopback mode to. The default value of 0xFF indicates that the mode should be applied to all lanes. @@ -1132,42 +1136,84 @@ def set_loopback_mode(self, loopback_mode, lane_mask = 0xff): ''' loopback_capability = self.get_loopback_capability() if loopback_capability is None: + logger.info('Set loopback mode err, fail to get capability') return False - status_host_input = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) - status_host_output = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) - status_media_input = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) - status_media_output = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) - - if loopback_mode == 'none': - status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0) - status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0) - status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0) - status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0) - return all([status_host_input, status_host_output, status_media_input, status_media_output]) + host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) + host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) + media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) + media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) + + if loopback_capability['per_lane_host_loopback_supported'] is False: + if 'host' in loopback_mode and lane_mask != 0xff: + lane_mask = 0xff + txt = f'Set {loopback_mode} loopback, per lane host loopback is not supported' + logger.info(txt) + + if loopback_capability['per_lane_media_loopback_supported'] is False: + if 'media' in loopback_mode and lane_mask != 0xff: + lane_mask = 0xff + txt = f'Set {loopback_mode} loopback, per lane media loopback is not supported' + logger.info(txt) + + if 'none' in loopback_mode: + if loopback_mode == 'none': + status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0) + status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0) + status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0) + status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0) + return all([status_host_input, status_host_output, status_media_input, status_media_output]) + + if loopback_mode == 'host-side-input-none': + return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask) + + if loopback_mode == 'host-side-output-none': + return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask) + + if loopback_mode == 'media-side-input-none': + return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask) + + if loopback_mode == 'media-side-output-none': + return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask) else: if loopback_capability['simultaneous_host_media_loopback_supported'] is False: if loopback_mode in ['host-side-input', 'host-side-output'] and \ - (status_media_input or status_media_output): + (media_input_val or media_output_val): + txt = f'Set {loopback_mode} loopback mode err, simultaneous host media \ + loopback is not supported' + logger.error(txt) return False if loopback_mode in ['media-side-input', 'media-side-output'] and \ - (status_host_output or status_host_input): + (host_input_val or host_output_val): + txt = f'Set {loopback_mode} loopback mode err, simultaneous host media \ + loopback is not supported' + logger.error(txt) return False if loopback_mode == 'host-side-input': if loopback_capability['host_side_input_loopback_supported']: - return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, lane_mask) + return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask) elif loopback_mode == 'host-side-output': if loopback_capability['host_side_output_loopback_supported']: - return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, lane_mask) + return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask) elif loopback_mode == 'media-side-input': if loopback_capability['media_side_input_loopback_supported']: - return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, lane_mask) + return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask) elif loopback_mode == 'media-side-output': if loopback_capability['media_side_output_loopback_supported']: - return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, lane_mask) - + return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask) + + txt = f'Set {loopback_mode} loopback mode err, lane_mask:{lane_mask:02x} capability:\n' + host_input_support = loopback_capability['host_side_input_loopback_supported'] + host_output_support = loopback_capability['host_side_output_loopback_supported'] + media_input_support = loopback_capability['media_side_input_loopback_supported'] + media_output_support = loopback_capability['media_side_output_loopback_supported'] + txt += f'host_side_input:{host_input_support}, host_output_support:{host_output_support}\n' + txt += f'media_input_support:{media_input_support}, media_output_support:{media_output_support}\n' + txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x},\ + media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}\n' + logger.error(txt) return False def get_vdm(self, field_option=None): diff --git a/tests/sonic_xcvr/test_cmis.py b/tests/sonic_xcvr/test_cmis.py index 33f40964b..25319798a 100644 --- a/tests/sonic_xcvr/test_cmis.py +++ b/tests/sonic_xcvr/test_cmis.py @@ -1052,50 +1052,70 @@ def test_get_loopback_capability(self, mock_response, expected): result = self.api.get_loopback_capability() assert result == expected - @pytest.mark.parametrize("input_param, mock_response",[ - ('none', { + @pytest.mark.parametrize("input_param, mock_response, expected",[ + (['none',0x0f], { 'host_side_input_loopback_supported': True, 'host_side_output_loopback_supported': True, 'media_side_input_loopback_supported': True, 'media_side_output_loopback_supported': True, - 'simultaneous_host_media_loopback_supported': True - }), - ('host-side-input', { + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + 'per_lane_media_loopback_supported': True + }, True), + (['host-side-input', 0x0f], { 'host_side_input_loopback_supported': True, 'host_side_output_loopback_supported': True, 'media_side_input_loopback_supported': True, 'media_side_output_loopback_supported': True, - 'simultaneous_host_media_loopback_supported': True - }), - ('host-side-output', { + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + 'per_lane_media_loopback_supported': True + }, True), + (['host-side-output', 0x0f], { 'host_side_input_loopback_supported': True, 'host_side_output_loopback_supported': True, 'media_side_input_loopback_supported': True, 'media_side_output_loopback_supported': True, - 'simultaneous_host_media_loopback_supported': False - }), - ('media-side-input', { + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_host_loopback_supported': False, + 'per_lane_media_loopback_supported': False + }, False), + (['media-side-input', 0x0f], { 'host_side_input_loopback_supported': True, 'host_side_output_loopback_supported': True, 'media_side_input_loopback_supported': True, 'media_side_output_loopback_supported': True, - 'simultaneous_host_media_loopback_supported': False - }), - ('media-side-output', { + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + 'per_lane_media_loopback_supported': True + }, True), + (['media-side-output', 0x0f], { 'host_side_input_loopback_supported': True, 'host_side_output_loopback_supported': True, 'media_side_input_loopback_supported': True, 'media_side_output_loopback_supported': True, - 'simultaneous_host_media_loopback_supported': True - }), - ( - 'none', None - ) - ]) - def test_set_loopback_mode(self, input_param, mock_response): + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_host_loopback_supported': False, + 'per_lane_media_loopback_supported': False + }, False), + (['media-side-output', 0x0f], { + 'host_side_input_loopback_supported': False, + 'host_side_output_loopback_supported': False, + 'media_side_input_loopback_supported': False, + 'media_side_output_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + 'per_lane_media_loopback_supported': True + }, False), + (['none', 0x0F], None, False) + ]) + def test_set_loopback_mode(self, input_param, mock_response, expected): self.api.get_loopback_capability = MagicMock() self.api.get_loopback_capability.return_value = mock_response - self.api.set_loopback_mode(input_param) + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0xf0,0,0xf0,0] + result = self.api.set_loopback_mode(input_param[0], input_param[1]) + assert result == expected @pytest.mark.parametrize("mock_response, expected",[ (