| # Copyright 2023 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Functionalities to reliably reboot the device.""" |
| |
| import enum |
| import json |
| import logging |
| import subprocess |
| import time |
| |
| from typing import Optional |
| |
| from common import run_continuous_ffx_command, run_ffx_command, get_ssh_address |
| from compatible_utils import get_ssh_prefix |
| |
| |
| class TargetState(enum.Enum): |
| """State of a target.""" |
| UNKNOWN = enum.auto() |
| DISCONNECTED = enum.auto() |
| PRODUCT = enum.auto() |
| FASTBOOT = enum.auto() |
| ZEDBOOT = enum.auto() |
| |
| |
| class BootMode(enum.Enum): |
| """Specifies boot mode for device.""" |
| REGULAR = enum.auto() |
| RECOVERY = enum.auto() |
| BOOTLOADER = enum.auto() |
| |
| |
| _STATE_TO_BOOTMODE = { |
| TargetState.PRODUCT: BootMode.REGULAR, |
| TargetState.FASTBOOT: BootMode.BOOTLOADER, |
| TargetState.ZEDBOOT: BootMode.RECOVERY |
| } |
| |
| _BOOTMODE_TO_STATE = {value: key for key, value in _STATE_TO_BOOTMODE.items()} |
| |
| |
| class StateNotFoundError(Exception): |
| """Raised when target's state cannot be found.""" |
| |
| |
| class StateTransitionError(Exception): |
| """Raised when target does not transition to desired state.""" |
| |
| |
| def _state_string_to_state(state_str: str) -> TargetState: |
| state_str = state_str.strip().lower() |
| if state_str == 'product': |
| return TargetState.PRODUCT |
| if state_str == 'zedboot (r)': |
| return TargetState.ZEDBOOT |
| if state_str == 'fastboot': |
| return TargetState.FASTBOOT |
| if state_str == 'unknown': |
| return TargetState.UNKNOWN |
| if state_str == 'disconnected': |
| return TargetState.DISCONNECTED |
| |
| raise NotImplementedError(f'State {state_str} not supported') |
| |
| |
| def _get_target_state(target_id: Optional[str], |
| serial_num: Optional[str], |
| num_attempts: int = 1) -> TargetState: |
| """Return state of target or the default target. |
| |
| Args: |
| target_id: Optional nodename of the target. If not given, default target |
| is used. |
| serial_num: Optional serial number of target. Only usable if device is |
| in fastboot. |
| num_attempts: Optional number of times to attempt getting status. |
| |
| Returns: |
| TargetState of the given node, if found. |
| |
| Raises: |
| StateNotFoundError: If target cannot be found, or default target is not |
| defined if |target_id| is not given. |
| """ |
| for i in range(num_attempts): |
| targets = json.loads( |
| run_ffx_command(cmd=('target', 'list'), |
| check=True, |
| capture_output=True, |
| json_out=True).stdout.strip()) |
| for target in targets: |
| if target_id is None and target['is_default']: |
| return _state_string_to_state(target['target_state']) |
| if target_id == target['nodename']: |
| return _state_string_to_state(target['target_state']) |
| if serial_num == target['serial']: |
| # Should only return Fastboot. |
| return _state_string_to_state(target['target_state']) |
| # Do not sleep for last attempt. |
| if i < num_attempts - 1: |
| time.sleep(10) |
| |
| # Could not find a state for given target. |
| error_target = target_id |
| if target_id is None: |
| error_target = 'default target' |
| |
| raise StateNotFoundError(f'Could not find state for {error_target}.') |
| |
| |
| def boot_device(target_id: Optional[str], |
| mode: BootMode, |
| serial_num: Optional[str] = None, |
| must_boot: bool = False) -> None: |
| """Boot device into desired mode, with fallback to SSH on failure. |
| |
| Args: |
| target_id: Optional target_id of device. |
| mode: Desired boot mode. |
| must_boot: Forces device to boot, regardless of current state. |
| Raises: |
| StateTransitionError: When final state of device is not desired. |
| """ |
| # Avoid cycle dependency. |
| # This file will be replaced with serial_boot_device quite soon, later one |
| # should be much more reliable comparing to ffx target list and ssh. So |
| # changing the file structure is not necessary in the current situation. |
| # pylint: disable=cyclic-import, import-outside-toplevel |
| # pylint: disable=wrong-import-position |
| import serial_boot_device |
| if serial_boot_device.boot_device(target_id, serial_num, mode, must_boot): |
| return |
| |
| # Skip boot call if already in the state and not skipping check. |
| state = _get_target_state(target_id, serial_num, num_attempts=3) |
| wanted_state = _BOOTMODE_TO_STATE.get(mode) |
| if not must_boot: |
| logging.debug('Current state %s. Want state %s', str(state), |
| str(wanted_state)) |
| must_boot = state != wanted_state |
| |
| if not must_boot: |
| logging.debug('Skipping boot - already in good state') |
| return |
| |
| def _wait_for_state_transition(current_state: TargetState): |
| local_state = None |
| # Check that we transition out of current state. |
| for _ in range(30): |
| try: |
| local_state = _get_target_state(target_id, serial_num) |
| if local_state != current_state: |
| # Changed states - can continue |
| break |
| except StateNotFoundError: |
| logging.debug('Device disconnected...') |
| if current_state != TargetState.DISCONNECTED: |
| # Changed states - can continue |
| break |
| finally: |
| time.sleep(2) |
| else: |
| logging.warning( |
| 'Device did not change from initial state. Exiting early') |
| return local_state or TargetState.DISCONNECTED |
| |
| # Now we want to transition to the new state. |
| for _ in range(90): |
| try: |
| local_state = _get_target_state(target_id, serial_num) |
| if local_state == wanted_state: |
| return local_state |
| except StateNotFoundError: |
| logging.warning('Could not find target state.' |
| ' Sleeping then retrying...') |
| finally: |
| time.sleep(2) |
| return local_state or TargetState.DISCONNECTED |
| |
| _boot_device_ffx(target_id, serial_num, state, mode) |
| state = _wait_for_state_transition(state) |
| |
| if state == TargetState.DISCONNECTED: |
| raise StateNotFoundError('Target could not be found!') |
| |
| if state == wanted_state: |
| return |
| |
| logging.warning( |
| 'Booting with FFX to %s did not succeed. Attempting with DM', mode) |
| |
| # Fallback to SSH, with no retry if we tried with ffx.: |
| _boot_device_dm(target_id, serial_num, state, mode) |
| state = _wait_for_state_transition(state) |
| |
| if state != wanted_state: |
| raise StateTransitionError( |
| f'Could not get device to desired state. Wanted {wanted_state},' |
| f' got {state}') |
| logging.debug('Got desired state: %s', state) |
| |
| |
| def _boot_device_ffx(target_id: Optional[str], serial_num: Optional[str], |
| current_state: TargetState, mode: BootMode): |
| cmd = ['target', 'reboot'] |
| if mode == BootMode.REGULAR: |
| logging.info('Triggering regular boot') |
| elif mode == BootMode.RECOVERY: |
| cmd.append('-r') |
| elif mode == BootMode.BOOTLOADER: |
| cmd.append('-b') |
| else: |
| raise NotImplementedError(f'BootMode {mode} not supported') |
| |
| logging.debug('FFX reboot with command [%s]', ' '.join(cmd)) |
| # TODO(crbug.com/1432405): We need to wait for the state transition or kill |
| # the process if it fails. |
| if current_state == TargetState.FASTBOOT: |
| run_continuous_ffx_command(cmd=cmd, |
| target_id=serial_num, |
| configs=['product.reboot.use_dm=true']) |
| else: |
| run_continuous_ffx_command(cmd=cmd, |
| target_id=target_id, |
| configs=['product.reboot.use_dm=true']) |
| |
| |
| def _boot_device_dm(target_id: Optional[str], serial_num: Optional[str], |
| current_state: TargetState, mode: BootMode): |
| # Can only use DM if device is in regular boot. |
| if current_state != TargetState.PRODUCT: |
| if mode == BootMode.REGULAR: |
| raise StateTransitionError('Cannot boot to Regular via DM - ' |
| 'FFX already failed to do so.') |
| # Boot to regular. |
| # TODO(crbug.com/1432405): After changing to run_continuous_ffx_command, |
| # this behavior becomes invalid, we need to wait for the state |
| # transition. |
| _boot_device_ffx(target_id, serial_num, current_state, |
| BootMode.REGULAR) |
| |
| ssh_prefix = get_ssh_prefix(get_ssh_address(target_id)) |
| |
| reboot_cmd = None |
| |
| if mode == BootMode.REGULAR: |
| reboot_cmd = 'reboot' |
| elif mode == BootMode.RECOVERY: |
| reboot_cmd = 'reboot-recovery' |
| elif mode == BootMode.BOOTLOADER: |
| reboot_cmd = 'reboot-bootloader' |
| else: |
| raise NotImplementedError(f'BootMode {mode} not supported') |
| |
| # Boot commands can fail due to SSH connections timeout. |
| full_cmd = ssh_prefix + ['--', 'dm', reboot_cmd] |
| logging.debug('DM reboot with command [%s]', ' '.join(full_cmd)) |
| subprocess.run(full_cmd, check=False) |