blob: ee6f1ba72999f9621231be394b4cb8962d106f65 [file] [log] [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functionalities to reliably reboot the device."""
import enum
import json
import logging
import subprocess
import time
from typing import Optional
from common import run_continuous_ffx_command, run_ffx_command, get_ssh_address
from compatible_utils import get_ssh_prefix
class TargetState(enum.Enum):
"""State of a target."""
UNKNOWN = enum.auto()
DISCONNECTED = enum.auto()
PRODUCT = enum.auto()
FASTBOOT = enum.auto()
ZEDBOOT = enum.auto()
class BootMode(enum.Enum):
"""Specifies boot mode for device."""
REGULAR = enum.auto()
RECOVERY = enum.auto()
BOOTLOADER = enum.auto()
_STATE_TO_BOOTMODE = {
TargetState.PRODUCT: BootMode.REGULAR,
TargetState.FASTBOOT: BootMode.BOOTLOADER,
TargetState.ZEDBOOT: BootMode.RECOVERY
}
_BOOTMODE_TO_STATE = {value: key for key, value in _STATE_TO_BOOTMODE.items()}
class StateNotFoundError(Exception):
"""Raised when target's state cannot be found."""
class StateTransitionError(Exception):
"""Raised when target does not transition to desired state."""
def _state_string_to_state(state_str: str) -> TargetState:
state_str = state_str.strip().lower()
if state_str == 'product':
return TargetState.PRODUCT
if state_str == 'zedboot (r)':
return TargetState.ZEDBOOT
if state_str == 'fastboot':
return TargetState.FASTBOOT
if state_str == 'unknown':
return TargetState.UNKNOWN
if state_str == 'disconnected':
return TargetState.DISCONNECTED
raise NotImplementedError(f'State {state_str} not supported')
def _get_target_state(target_id: Optional[str],
serial_num: Optional[str],
num_attempts: int = 1) -> TargetState:
"""Return state of target or the default target.
Args:
target_id: Optional nodename of the target. If not given, default target
is used.
serial_num: Optional serial number of target. Only usable if device is
in fastboot.
num_attempts: Optional number of times to attempt getting status.
Returns:
TargetState of the given node, if found.
Raises:
StateNotFoundError: If target cannot be found, or default target is not
defined if |target_id| is not given.
"""
for i in range(num_attempts):
targets = json.loads(
run_ffx_command(cmd=('target', 'list'),
check=True,
capture_output=True,
json_out=True).stdout.strip())
for target in targets:
if target_id is None and target['is_default']:
return _state_string_to_state(target['target_state'])
if target_id == target['nodename']:
return _state_string_to_state(target['target_state'])
if serial_num == target['serial']:
# Should only return Fastboot.
return _state_string_to_state(target['target_state'])
# Do not sleep for last attempt.
if i < num_attempts - 1:
time.sleep(10)
# Could not find a state for given target.
error_target = target_id
if target_id is None:
error_target = 'default target'
raise StateNotFoundError(f'Could not find state for {error_target}.')
def boot_device(target_id: Optional[str],
mode: BootMode,
serial_num: Optional[str] = None,
must_boot: bool = False) -> None:
"""Boot device into desired mode, with fallback to SSH on failure.
Args:
target_id: Optional target_id of device.
mode: Desired boot mode.
must_boot: Forces device to boot, regardless of current state.
Raises:
StateTransitionError: When final state of device is not desired.
"""
# Avoid cycle dependency.
# This file will be replaced with serial_boot_device quite soon, later one
# should be much more reliable comparing to ffx target list and ssh. So
# changing the file structure is not necessary in the current situation.
# pylint: disable=cyclic-import, import-outside-toplevel
# pylint: disable=wrong-import-position
import serial_boot_device
if serial_boot_device.boot_device(target_id, serial_num, mode, must_boot):
return
# Skip boot call if already in the state and not skipping check.
state = _get_target_state(target_id, serial_num, num_attempts=3)
wanted_state = _BOOTMODE_TO_STATE.get(mode)
if not must_boot:
logging.debug('Current state %s. Want state %s', str(state),
str(wanted_state))
must_boot = state != wanted_state
if not must_boot:
logging.debug('Skipping boot - already in good state')
return
def _wait_for_state_transition(current_state: TargetState):
local_state = None
# Check that we transition out of current state.
for _ in range(30):
try:
local_state = _get_target_state(target_id, serial_num)
if local_state != current_state:
# Changed states - can continue
break
except StateNotFoundError:
logging.debug('Device disconnected...')
if current_state != TargetState.DISCONNECTED:
# Changed states - can continue
break
finally:
time.sleep(2)
else:
logging.warning(
'Device did not change from initial state. Exiting early')
return local_state or TargetState.DISCONNECTED
# Now we want to transition to the new state.
for _ in range(90):
try:
local_state = _get_target_state(target_id, serial_num)
if local_state == wanted_state:
return local_state
except StateNotFoundError:
logging.warning('Could not find target state.'
' Sleeping then retrying...')
finally:
time.sleep(2)
return local_state or TargetState.DISCONNECTED
_boot_device_ffx(target_id, serial_num, state, mode)
state = _wait_for_state_transition(state)
if state == TargetState.DISCONNECTED:
raise StateNotFoundError('Target could not be found!')
if state == wanted_state:
return
logging.warning(
'Booting with FFX to %s did not succeed. Attempting with DM', mode)
# Fallback to SSH, with no retry if we tried with ffx.:
_boot_device_dm(target_id, serial_num, state, mode)
state = _wait_for_state_transition(state)
if state != wanted_state:
raise StateTransitionError(
f'Could not get device to desired state. Wanted {wanted_state},'
f' got {state}')
logging.debug('Got desired state: %s', state)
def _boot_device_ffx(target_id: Optional[str], serial_num: Optional[str],
current_state: TargetState, mode: BootMode):
cmd = ['target', 'reboot']
if mode == BootMode.REGULAR:
logging.info('Triggering regular boot')
elif mode == BootMode.RECOVERY:
cmd.append('-r')
elif mode == BootMode.BOOTLOADER:
cmd.append('-b')
else:
raise NotImplementedError(f'BootMode {mode} not supported')
logging.debug('FFX reboot with command [%s]', ' '.join(cmd))
# TODO(crbug.com/1432405): We need to wait for the state transition or kill
# the process if it fails.
if current_state == TargetState.FASTBOOT:
run_continuous_ffx_command(cmd=cmd,
target_id=serial_num,
configs=['product.reboot.use_dm=true'])
else:
run_continuous_ffx_command(cmd=cmd,
target_id=target_id,
configs=['product.reboot.use_dm=true'])
def _boot_device_dm(target_id: Optional[str], serial_num: Optional[str],
current_state: TargetState, mode: BootMode):
# Can only use DM if device is in regular boot.
if current_state != TargetState.PRODUCT:
if mode == BootMode.REGULAR:
raise StateTransitionError('Cannot boot to Regular via DM - '
'FFX already failed to do so.')
# Boot to regular.
# TODO(crbug.com/1432405): After changing to run_continuous_ffx_command,
# this behavior becomes invalid, we need to wait for the state
# transition.
_boot_device_ffx(target_id, serial_num, current_state,
BootMode.REGULAR)
ssh_prefix = get_ssh_prefix(get_ssh_address(target_id))
reboot_cmd = None
if mode == BootMode.REGULAR:
reboot_cmd = 'reboot'
elif mode == BootMode.RECOVERY:
reboot_cmd = 'reboot-recovery'
elif mode == BootMode.BOOTLOADER:
reboot_cmd = 'reboot-bootloader'
else:
raise NotImplementedError(f'BootMode {mode} not supported')
# Boot commands can fail due to SSH connections timeout.
full_cmd = ssh_prefix + ['--', 'dm', reboot_cmd]
logging.debug('DM reboot with command [%s]', ' '.join(full_cmd))
subprocess.run(full_cmd, check=False)