Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[action] [PR:462] Handle page select and remote access check after changing SFP target (#462) #477

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 59 additions & 12 deletions sonic_platform_base/sonic_xcvr/api/public/cmisTargetFWUpgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
upgrade of remote target from the local target itself.
"""

import struct
import sys
import traceback
from ...fields import consts
Expand Down Expand Up @@ -44,7 +43,38 @@

class CmisTargetFWUpgradeAPI(CmisApi):
def set_firmware_download_target_end(self, target):
return self.xcvr_eeprom.write(consts.TARGET_MODE, target)
"""
Sets the target mode to the specified target.
If the target mode is set to a remote target, then the page select byte is set to 0.
Also, the remote target is then checked to ensure that its accessible.
In case of any error, the target mode is restored to E0.
Returns:
True if the target mode is set successfully, False otherwise.
"""
try:
if not self.xcvr_eeprom.write(consts.TARGET_MODE, target):
logger.error("Failed to set target mode to {}".format(target))
return self._restore_target_to_E0()
if target != TARGET_E0_VALUE:
if not self.xcvr_eeprom.write(consts.PAGE_SELECT_BYTE, 0):
logger.error("Failed to set page select byte to {}".format(target))
return self._restore_target_to_E0()
if not self._is_remote_target_accessible():
logger.error("Remote target {} not accessible.".format(target))
return self._restore_target_to_E0()
except Exception as e:
logger.error("Exception occurred while setting target mode to {}: {}".format(target, repr(e)))
return self._restore_target_to_E0()

return True

def get_current_target_end(self):
"""
Reads the target mode and returns the target mode.
Returns:
The target mode.
"""
return self.xcvr_eeprom.read(consts.TARGET_MODE)

"""
Reads the active, inactive and server firmware version from all targets
Expand All @@ -67,14 +97,7 @@ def get_transceiver_info_firmware_versions(self):
for target in TARGET_LIST:
try:
if not self.set_firmware_download_target_end(target):
logging.error("Target mode change failed. Target: {}".format(target))
continue

# Any register apart from the TARGET_MODE register will have the value 0xff
# if the remote target is not accessible from the local target.
module_type = self.get_module_type()
if 'Unknown' in module_type:
logging.info("Remote target {} not accessible. Skipping.".format(target))
logger.error("Target mode change failed. Target: {}".format(target))
continue

firmware_versions = super().get_transceiver_info_firmware_versions()
Expand All @@ -86,17 +109,41 @@ def get_transceiver_info_firmware_versions(self):
else:
return_dict.update(firmware_versions)
except Exception as e:
logging.error("Exception occurred while handling target {} firmware version: {}".format(target, repr(e)))
logger.error("Exception occurred while handling target {} firmware version: {}".format(target, repr(e)))
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = traceback.format_exception(exc_type, exc_value, exc_traceback)
for tb_line in msg:
for tb_line_split in tb_line.splitlines():
logging.error(tb_line_split)
logger.error(tb_line_split)
continue

self.set_firmware_download_target_end(TARGET_E0_VALUE)
return return_dict

def _is_remote_target_accessible(self):
"""
Once the target is changed to remote, any register apart from the TARGET_MODE register
will have the value 0xff if the remote target is powered down.
Assumption:
The target mode has already been set to the desired remote target.
Returns:
True if the remote target is accessible from the local target, False otherwise.
"""
module_type = self.get_module_type()
if 'Unknown' in module_type:
return False

return True

def _restore_target_to_E0(self):
"""
Logs the error message and restores the target mode to E0.
Returns:
False always.
"""
self.xcvr_eeprom.write(consts.TARGET_MODE, TARGET_E0_VALUE)
return False

def _convert_firmware_info_to_target_firmware_info(self, firmware_info, firmware_info_map):
return_dict = {}
for key, value in firmware_info_map.items():
Expand Down
1 change: 1 addition & 0 deletions sonic_platform_base/sonic_xcvr/fields/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@
CDB_WRITE_MSG = "CdbWriteMessage"

#CMISTargetFWUpgrade
PAGE_SELECT_BYTE = "PageSelectByte"
CMIS_TARGET_SERVER_INFO = "CmisTargetServerInfo"
SERVER_FW_MAGIC_BYTE = "ServerFirmwareMagicByte"
SERVER_FW_CHECKSUM = "ServerFirmwareChecksum"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self, codes):
super().__init__(codes)

self.CMIS_TARGET_SERVER_INFO = RegGroupField(consts.CMIS_TARGET_SERVER_INFO,
NumberRegField(consts.PAGE_SELECT_BYTE, self.getaddr(0, 127), format="B", size=1, ro=False),
NumberRegField(consts.SERVER_FW_MAGIC_BYTE, self.getaddr(0x3, 128), format="B", size=1),
NumberRegField(consts.SERVER_FW_CHECKSUM, self.getaddr(0x3, 129), format="B", size=1),
ServerFWVersionRegField(consts.SERVER_FW_VERSION, self.getaddr(0x3, 130), size=16))
53 changes: 46 additions & 7 deletions tests/sonic_xcvr/test_cmisTargetFWUpgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,42 @@ class TestCmis(object):
eeprom = XcvrEeprom(reader, writer, mem_map)
api = CmisTargetFWUpgradeAPI(eeprom)

@pytest.mark.parametrize("set_firmware_result, module_type, exception_raised", [
(False, 'QSFP+ or later with CMIS', False),
(True, 'Unknown', False),
(True, 'QSFP+ or later with CMIS', True)
@pytest.mark.parametrize("target,write_results,accessible,exception,expected_result", [
(1, [False, True, True, True], True, None, False), # Failed to set target mode
(1, [True, False, True, True], True, None, False), # Failed to set page select byte
(1, [True, True, True, True], False, None, False), # Remote target not accessible
(1, [True, True, True, True], True, Exception("Simulated exception"), False), # Exception occurred
(1, [True, True, True, True], True, None, True), # All operations successful
(0, [True, True, True, True], True, None, True), # Target is E0, all operations successful
])
def test_set_firmware_download_target_end(self, target, write_results, accessible, exception, expected_result):
self.api.xcvr_eeprom.write = MagicMock()
self.api.xcvr_eeprom.write.side_effect = write_results
with patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._is_remote_target_accessible', return_value=accessible):
with patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._restore_target_to_E0', return_value=False):
if exception is not None:
self.api.xcvr_eeprom.write.side_effect = exception

result = self.api.set_firmware_download_target_end(target)
assert result == expected_result
if result:
expected_call_count = 0
else:
expected_call_count = 1
assert self.api._restore_target_to_E0.call_count == expected_call_count

@pytest.mark.parametrize("set_firmware_result, exception_raised", [
(False, False),
(True, True)
])
@patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info_firmware_versions', MagicMock(side_effect=({}, Exception('error'), {})))
@patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._get_server_firmware_version', MagicMock())
@patch('traceback.format_exception')
def test_get_transceiver_info_firmware_versions_failure(self, mock_format_exception, set_firmware_result, module_type, exception_raised):
def test_get_transceiver_info_firmware_versions_failure(self, mock_format_exception, set_firmware_result, exception_raised):
expected_output = {'active_firmware': 'N/A', 'inactive_firmware': 'N/A', 'e1_active_firmware': 'N/A',\
'e1_inactive_firmware': 'N/A', 'e2_active_firmware': 'N/A', 'e2_inactive_firmware': 'N/A',\
'e1_server_firmware': 'N/A', 'e2_server_firmware': 'N/A'}
self.api.set_firmware_download_target_end = MagicMock(return_value=set_firmware_result)
self.api.get_module_type = MagicMock(return_value=module_type)

result = self.api.get_transceiver_info_firmware_versions()
assert result == expected_output
Expand Down Expand Up @@ -58,12 +80,29 @@ def test_get_transceiver_info_firmware_versions_success(self, fw_info_dict, serv
with patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info_firmware_versions', side_effect=fw_info_dict):
with patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._get_server_firmware_version', side_effect=server_fw_info_dict):
self.api.set_firmware_download_target_end = MagicMock(return_value=True)
self.api.get_module_type = MagicMock(return_value='QSFP+ or later with CMIS')

result = self.api.get_transceiver_info_firmware_versions()
assert result == expected_output
assert self.api.set_firmware_download_target_end.call_count == len(TARGET_LIST) + 1

@pytest.mark.parametrize("module_type, expected_result", [
('Unknown', False),
('QSFP+ or later with CMIS', True)
])
def test_is_remote_target_accessible(self, module_type, expected_result):
# Mock the get_module_type method to return the parameterized module_type
self.api.get_module_type = MagicMock(return_value=module_type)

# Call the method and check the result
result = self.api._is_remote_target_accessible()
assert result == expected_result

def test_restore_target_to_E0(self):
self.api.xcvr_eeprom.write = MagicMock()
assert self.api._restore_target_to_E0() == False
self.api.xcvr_eeprom.write.assert_called_once()


@pytest.mark.parametrize("magic_byte, checksum, server_fw_version_byte_array, expected", [
(0, 0, (), {'server_firmware': 'N/A'}),
(0, 0x98, [0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 5, 0x8d], {'server_firmware': 'N/A'}), # Magic byte is 0 but other values are valid
Expand Down
Loading