blob: 79151ad2e3adb9fb865b8f8c87380589d33b90cc [file] [log] [blame]
#! /usr/bin/env python3
# The Atmosic In-System Programming Tool (ISP) is for bundling all
# three types of binaries -- OTP NVDS, flash NVDS, and flash -- into a
# single binary archive. The ISP tool can then be used to unpack the
# components of the archive and download them on a device. This is
# particularly useful for separatating the separation of firmware
# development and firmware downloading -- a developer will use the
# Atmosic SDK to build the flash/OTP content and archive it and then
# hands it off to a non-SDK user who only has access to this ISP tool
# to program devices.
#
# Executing this atm_isp script with the -h option shows the available
# subcommands. Each subcommand also has a help menu that can be shown
# by add -h after the subcommand name.
import argparse
import atm_isp_python.atm_isp_pb2 as ISP
import os
import os.path
import sys
import tempfile
import time
import pprint
import platform
def auto_int(x):
return int(x, 0)
def parse_args(args=None, namespace=None):
output_base_parser = argparse.ArgumentParser(add_help=False)
output_base_parser\
.add_argument('-o', '--output',
dest='output_path',
metavar='NEW_ARCHIVE',
help='Output archive file (default: stdout)')
base_parser = argparse.ArgumentParser(add_help=False)
base_parser\
.add_argument('-i', '--input',
dest='input_path',
metavar='ARCHIVE',
default=os.devnull, help='Input archive file')
verbose_parser = argparse.ArgumentParser(add_help=False)
verbose_parser.add_argument('-v', '--verbose', dest='verbose', action="store_true", help="increase output verbosity")
openocd_load_base_parser = argparse.ArgumentParser(add_help=False, parents=[base_parser, output_base_parser, verbose_parser])
openocd_load_base_parser.add_argument('image', type=argparse.FileType('rb'), help='Path to image')
openocd_load_flash_base_parser = argparse.ArgumentParser(
add_help=False,
parents=[openocd_load_base_parser]
)
openocd_load_flash_base_parser.add_argument('region_start', type=auto_int, nargs='?', help='Start address of flash region to erase')
openocd_load_flash_base_parser.add_argument('region_size', type=auto_int, nargs='?', help='Size of flash region to erase')
parser = argparse.ArgumentParser(description='Atmosic ISP Tool')
subparsers = parser.add_subparsers(dest='opcode')
init_parser = subparsers.add_parser('init', parents=[output_base_parser])
init_parser.add_argument('family', help='Platform family (e.g. atm2)')
init_parser.add_argument('name', help='Full platform name (e.g. ATM2xxx-x0x)')
openocd_load_flash_parser = subparsers.add_parser(
'loadFlash',
parents=[openocd_load_flash_base_parser]
)
openocd_load_flash_parser.add_argument('address', type=auto_int, nargs='?',
help='Address where image should be loaded')
openocd_load_flash_parser.add_argument('-mpr_start', '--mpr_start',
dest='mpr_start', metavar='MPR_START', type=auto_int, required=False, help='MPR_START')
openocd_load_flash_parser.add_argument('-mpr_size', '--mpr_size',
dest='mpr_size', metavar='MPR_SIZE', type=auto_int, required=False, help='MPR_SIZE')
openocd_load_flash_parser.add_argument('-mpr_lock_size', '--mpr_lock_size',
dest='mpr_lock_size', metavar='MPR_LOCK_SIZE', type=auto_int, required=False, help='MPR_LOCK_SIZE')
openocd_load_flash_nvds_parser = subparsers.add_parser(
'loadFlashNvds',
parents=[openocd_load_flash_base_parser]
)
openocd_load_otp_nvds_parser = subparsers.add_parser(
'loadOtpNvds',
parents=[openocd_load_base_parser]
)
decode_parser = subparsers.add_parser('decode', parents=[base_parser])
burn_parser = subparsers.add_parser('burn', parents=[base_parser])
burn_parser.add_argument('-r', '--openocd_pkg_root', help='Path to directory where openocd and its scripts are found')
burn_parser.add_argument('-E', '--openocd_script_only', action='store_true', help='Stop after preparing OpenOCD script')
burn_parser.add_argument('-e', '--erase_workarounds', action='store_true', help='Erase workaround tags in OTP before loading OTP')
burn_parser.add_argument('-v', '--verbose', action='store_true', help='Verbose mode')
burn_parser.add_argument('-c', '--check_image', action='store_true', help='Verify OTP/flash image after burning/loading')
burn_parser.add_argument('-t', '--tcl_script', help='Path to output Jim Tcl script for use by OpenOCD (generates Jim Tcl script only; delays all operations post-unpacking of archive to Tcl/OpenOCD); implies -E')
burn_parser.add_argument('-d', '--dst_dir', help='Use this directory to dump openocd script in; implies -E')
burn_parser.add_argument('-p', '--program_only', action='store_true', help='Program the device only (no reset hard on exit)')
return parser.parse_args(args, namespace)
def strattr(a, attr=None, fmt=str, treat_0_as_undef=False):
if type(a) == int:
if a == 0 and treat_0_as_undef:
return ''
if attr is None:
return fmt(a)
return '%s=%s'%(attr, fmt(a))
def imgstr(image):
maxBytes = 8
if len(image) > maxBytes:
return pprint.pformat(image[0:maxBytes]) + '...'
else:
return pprint.pformat(image)
def imgsz(img):
return '(size=%u,content=%s)'%(len(img), imgstr(img))
# Lists the contents of the archive
class PrintArchive:
def __init__(self, out):
self.out = out
def print_cmd(self, opcode, image, op_specific_arg_names, op_specific_arg_values):
tokens = [opcode]
tokens.append(strattr(image, 'image', imgsz))
arg_names_rev = op_specific_arg_names
arg_names_rev.reverse()
tokens_op_spec_rev = []
for i, arg_name in enumerate(arg_names_rev):
token = strattr(op_specific_arg_values[-1 - i], arg_name, str, not tokens_op_spec_rev)
if token:
tokens_op_spec_rev.append(token)
tokens_op_spec_rev.reverse()
tokens.extend(tokens_op_spec_rev)
self.out.write(' '.join(filter(None, tokens)))
self.out.write('\n')
def LoadOtpNvds(self, image):
self.print_cmd('LoadOtpNvds', image)
def LoadFlashNvds(self, image, region_start, region_size):
self.print_cmd('LoadFlashNvds', image, ['region_start', 'region_size'], [region_start, region_size])
def LoadFlash(self, image, region_start, region_size, address):
self.print_cmd('LoadFlash', image, ['region_start', 'region_size', 'address'], [region_start, region_size, address])
def Platform(self, family, name):
self.out.write('Platform {} ({} family)\n'.format(name, family))
# Executes a loadFlash* command from the archive
def iter_archive_aux_flash(cmdName, cmd, handler):
image = cmd.commonLoadFlash.commonLoad.image
region_start = cmd.commonLoadFlash.region_start
region_size = cmd.commonLoadFlash.region_size
if cmdName == 'loadFlashNvds':
handler.LoadFlashNvds(image, region_start, region_size)
elif cmdName == 'loadFlash':
handler.LoadFlash(image, region_start, region_size, cmd.address)
else:
return False
return True
# Executes a command from the archive
def iter_archive_aux(cmdName, cmd, handler):
if cmdName == 'loadOtpNvds':
handler.LoadOtpNvds(cmd.commonLoad.image)
elif iter_archive_aux_flash(cmdName, cmd, handler):
pass
else:
return False
return True
# Executes all commands in the archive
def iter_archive(archive, handler):
platform = archive.meta.platform
handler.Platform(platform.family, platform.name)
for cmdUnion in archive.script:
cmdName = cmdUnion.WhichOneof("cmdUnion")
cmd = getattr(cmdUnion, cmdName)
if not iter_archive_aux(cmdName, cmd, handler):
raise Exception('[iter_archive] Unknown command name: %s'%cmdName)
def serialize_archive(archive, output_path=None):
serialized_archive = archive.SerializeToString()
if not output_path:
sys.stdout.buffer.write(serialized_archive)
else:
with open(output_path, 'wb') as output:
output.write(serialized_archive)
def remove_image_if_exists(imagepath):
if os.path.exists(imagepath):
os.remove(imagepath)
def this_file():
if hasattr(sys, 'frozen') and hasattr(sys, '_MEIPASS'):
return sys.executable
return __file__
# Utilities for locating and interacting with tools -- nvds_tool and openocd
class Tools:
openocd_pkg_root = os.path.abspath(os.path.join(os.path.dirname(this_file()), '..'))
platform_family = 'plat_family'
platform_name = 'plat_name'
@staticmethod
def pfx(system = platform.system()):
if system == 'Linux':
return 'Linux'
if system == 'Darwin':
return 'Darwin'
if system == 'Windows' or \
system.startswith('MSYS') or \
system.startswith('MINGW'):
return 'Windows_NT'
raise Exception("Unknown platform '%s'"%system)
@staticmethod
def nvds_tool():
return os.path.join(Tools.platform_dir(), 'tools', 'bin', Tools.pfx(), 'nvds_tool')
@staticmethod
def set_openocd(openocd):
Tools.openocd = openocd
@staticmethod
def set_openocd_pkg_root(root):
Tools.openocd_pkg_root = root
@staticmethod
def set_platform(platform_name, platform_family):
Tools.platform_name = platform_name
Tools.platform_family = platform_family
@staticmethod
def openocd_dir():
openocd_pkg_root = Tools.openocd_pkg_root
return os.path.join(openocd_pkg_root, 'tools', 'openocd')
@staticmethod
def platform_dir():
openocd_pkg_root = Tools.openocd_pkg_root
platform_family = Tools.platform_family
platform_name = Tools.platform_name
platforms_dir = os.path.join(openocd_pkg_root, 'platform')
return os.path.join(platforms_dir, platform_family, platform_name)
@staticmethod
def openocd_flags():
openocd_dir = Tools.openocd_dir()
openocd_cfg_dir = os.path.join(Tools.platform_dir(), 'openocd')
flags = [
'-s', os.path.join(openocd_dir, 'tcl'),
'-s', openocd_cfg_dir,
'-f', '{}x_openocd.cfg'.format(Tools.platform_family),
]
return flags
@staticmethod
def openocd():
openocd_dir = Tools.openocd_dir()
return os.path.join(openocd_dir, 'bin', Tools.pfx(), 'openocd')
# Constructs nvds_tool commands
class NvdsTool:
def __init__(self, in_nvds_path, out_nvds_path, new_nvds_paths = [], inverted = True):
self.in_nvds_path = in_nvds_path
self.out_nvds_path = out_nvds_path
self.new_nvds_paths = new_nvds_paths
self.inverted = inverted
def merge_cmd(self, erase_workarounds=False):
args = [Tools.nvds_tool()]
if self.inverted:
args.append('-i')
args.extend(['-r', self.in_nvds_path])
if erase_workarounds:
args.extend(['-E', 'all'])
for p in self.new_nvds_paths:
args.extend(['-r', p])
args.extend(['-b', '-o', self.out_nvds_path])
return ' '.join(args)
# Constructs OpenOCD scripts for executing the commands in the archive
class WriteOpenOcdScript:
otp_nvds = 'otp_nvds.nvm'
old_otp_nvds = 'old_' + otp_nvds
sydney_burn_nvm = 'sydney_burn_nvm'
sydney_dump_nvm = 'sydney_dump_nvm'
sydney_verify_nvm = 'sydney_verify_nvm'
exit_code_failure_base = 0x10
exit_code_failure_nvds_tool = exit_code_failure_base
def __init__(self, dirpath, must_be_empty=True, verbose=True, check_image=True, tcl_script=None, program_only=False):
self.dirpath = dirpath
self.verbose = verbose
self.check_image = check_image
self.tcl_script = tcl_script
self.program_only = program_only
if not os.path.exists(dirpath):
os.makedirs(dirpath)
elif os.path.isfile(dirpath):
raise Exception('"%s" is not a directory')
elif not os.access(dirpath, os.W_OK):
raise Exception('"%s" is not writable')
elif must_be_empty and os.listdir(dirpath):
raise Exception('"%s" is not empty')
if tcl_script:
script_path = tcl_script
else:
script_path = self.path('atm.tcl')
self.script = open(script_path, 'w')
if not tcl_script:
self.script.write('init\n')
self.script.write('set ::env(ERASE_UPGRADE_DATA) 1\n')# Make this opt'l?
self.image_counts = {
self.sydney_burn_nvm: 0,
'sydney_load_nvds': 0,
'sydney_load_flash': 0,
}
def finalize_script(self, erase_workarounds = False):
if self.image_counts[self.sydney_burn_nvm] > 0:
self.read_n_modify_otp(erase_workarounds)
self.print_cmd(self.sydney_burn_nvm, self.path(self.otp_nvds))
if self.check_image:
self.print_cmd(self.sydney_verify_nvm, self.path(self.otp_nvds))
if not self.tcl_script:
if not self.program_only:
self.script.write('set _RESET_HARD_ON_EXIT 1\n')
self.script.write('exit\n')
self.script.close()
def path(self, f):
return os.path.join(self.dirpath, f)
def trace(self, msg):
if self.verbose:
sys.stderr.write(msg + '\n')
def openocd_cmd(self, args):
openocd = Tools.openocd()
openocd_args = [openocd]
openocd_args.extend(Tools.openocd_flags())
openocd_args.extend(args)
return openocd_args
def exec_openocd(self, args, env_override=''):
openocd_cmd_args = self.openocd_cmd(args)
openocd_cmd = ' '.join(openocd_cmd_args)
self.trace('Executing "%s"'%openocd_cmd)
exit_code = os.system(env_override + ' ' + openocd_cmd if env_override else openocd_cmd)
if exit_code != 0:
msg1 = '%s command returned %u status'%(openocd_cmd_args[0], exit_code)
msg2 = 'Command was "%s"'%openocd_cmd
raise Exception(msg1, msg2)
def exec_openocd_cmd(self, cmd, env_override=''):
self.exec_openocd(['-c', cmd])
def exec_openocd_script(self, script):
self.exec_openocd(['-f', script])
def reset_target(self):
try:
self.exec_openocd_cmd("'init; exit'", 'FTDI_BENIGN_BOOT=1 FTDI_HARD_RESET=1')
except Exception as e:
self.trace(str(e));
try:
self.exec_openocd_cmd("'init; exit'", 'FTDI_BENIGN_BOOT=1 FTDI_HARD_RESET=0')
except Exception as e:
self.trace(str(e));
try:
self.exec_openocd_cmd("'init; set_normal_boot; exit'", 'FTDI_BENIGN_BOOT=1')
except Exception as e:
self.trace(str(e));
time.sleep(1)
try:
self.exec_openocd_cmd("'init; set_normal_boot; exit'", 'FTDI_BENIGN_BOOT=1')
except Exception as e:
self.trace(str(e));
def exec_script(self, reset_target=True):
sys.stderr.write('calling exec_script\n')
if reset_target:
self.reset_target();
self.exec_openocd_script(self.script.name)
def read_n_modify_otp_aux(self, old_otp_nvds_path, erase_workarounds):
# Read
if self.tcl_script:
cmd = self.sydney_dump_nvm
self.script.write('%s {%s}\n'%(cmd, old_otp_nvds_path))
else:
self.exec_openocd_cmd("'init; %s {%s}; exit'"%(self.sydney_dump_nvm, old_otp_nvds_path))
# Modify
otp_paths = []
self.iter_images(self.sydney_burn_nvm, lambda p: otp_paths.append(p))
nvds_tool = NvdsTool(old_otp_nvds_path, self.path(self.otp_nvds), otp_paths)
nvds_tool_cmd = nvds_tool.merge_cmd(erase_workarounds)
if self.tcl_script:
self.script_trace(nvds_tool_cmd)
self.script.write("""if {{ [catch {{exec {0}}} msg] }} {{
puts stderr "nvds_tool merge failed: $msg"
puts stderr "Command was '{0}'"
}}\n""".format(nvds_tool_cmd))
return
self.trace('Executing "%s"'%nvds_tool_cmd)
exit_code = os.system(nvds_tool_cmd)
if exit_code != 0:
msgs = [
'nvds_tool merge returned %u status'%exit_code,
'Command was %s'%nvds_tool_cmd
]
raise Exception('\n'.join(msgs))
def read_n_modify_otp(self, erase_workarounds, keep_old_otp_nvds=False):
if self.tcl_script:
self.read_n_modify_otp_aux(self.path(self.old_otp_nvds), erase_workarounds)
return
pulled_otp, old_otp_nvds = tempfile.mkstemp()
self.read_n_modify_otp_aux(old_otp_nvds, erase_workarounds)
os.close(pulled_otp)
if not keep_old_otp_nvds:
os.unlink(old_otp_nvds)
def script_trace(self, cmd):
if self.verbose:
self.script.write('puts {Executing \'%s\'}\n'%cmd)
def print_cmd(self, openocd_fcn, imagepath, *op_specific_data):
tokens = [openocd_fcn]
tokens.append('{%s}'%imagepath)
op_specific_data_rev = list(op_specific_data)
op_specific_data_rev.reverse()
tokens_op_spec_rev = []
for v in op_specific_data_rev:
token = strattr(v, None, str, not tokens_op_spec_rev)
if token:
tokens_op_spec_rev.append(token)
tokens_op_spec_rev.reverse()
tokens.extend(tokens_op_spec_rev)
cmd = ' '.join(tokens)
self.script_trace(cmd)
self.script.write(cmd + '\n')
def iter_images(self, openocd_fcn, callback, num_images=None):
if num_images is None:
num_images = self.image_counts.get(openocd_fcn)
for i in range(0, num_images):
callback(self.path(WriteOpenOcdScript.imgname(openocd_fcn, i)))
def iter_images_all(self, callback):
for openocd_fcn, count in self.image_counts.items():
self.iter_images(openocd_fcn, callback, count)
def clean(self):
self.iter_images_all(remove_image_if_exists)
remove_image_if_exists(self.path(self.script.name))
if self.image_counts[self.sydney_burn_nvm] > 0:
remove_image_if_exists(self.path(self.otp_nvds))
@staticmethod
def imgname(openocd_fcn, count):
return '%s.%d'%(openocd_fcn, count)
def save_image(self, openocd_fcn, image):
imagename = WriteOpenOcdScript.imgname(openocd_fcn, self.image_counts[openocd_fcn])
self.image_counts[openocd_fcn] += 1
imagepath = self.path(imagename)
with open(imagepath, 'wb') as imagefile:
imagefile.write(image)
return imagepath
def save_img_n_print_cmd(self, openocd_fcn, image, do_check_image, *op_specific_data):
imagepath = self.save_image(openocd_fcn, image)
self.print_cmd(openocd_fcn, imagepath, *op_specific_data)
if self.check_image and do_check_image:
openocd_fcn_verify = openocd_fcn.replace('load', 'verify')
addr = op_specific_data[2]
if addr:
self.print_cmd(openocd_fcn_verify, imagepath, addr)
else:
self.print_cmd(openocd_fcn_verify, imagepath)
def LoadOtpNvds(self, image):
self.save_image('sydney_burn_nvm', image)
def LoadFlashNvds(self, image, region_start, region_size):
self.save_img_n_print_cmd('sydney_load_nvds', image, False, region_start, region_size)
def LoadFlash(self, image, region_start, region_size, address):
self.save_img_n_print_cmd('sydney_load_flash', image, True, region_start, region_size, address)
def Platform(self, family, name):
Tools.set_platform(name, family)
def burn_archive(archive, openocd_opts, verbose=False, check_image=False, tcl_script=None, dst_dir=None, program_only=False):
if openocd_opts.openocd_pkg_root is not None:
Tools.set_openocd_pkg_root(openocd_opts.openocd_pkg_root)
should_exec_oocd_script_and_cleanup = not (openocd_opts.openocd_script_only or tcl_script or dst_dir)
d = dst_dir if dst_dir else tempfile.mkdtemp(None, 'burn_arch_', Tools.openocd_pkg_root)
w = WriteOpenOcdScript(d, should_exec_oocd_script_and_cleanup, verbose, check_image, tcl_script, program_only)
iter_archive(archive, w)
w.finalize_script(openocd_opts.erase_workarounds)
if should_exec_oocd_script_and_cleanup:
w.exec_script()
w.clean()
os.rmdir(d)
elif not dst_dir:
print(d)
def ext_and_serialize_archive(archive, opcode, op_specific_data):
if extend_archive(archive, opcode, op_specific_data.image, op_specific_data):
serialize_archive(archive, op_specific_data.output_path)
return True
return False
def exec_cli_op(archive, opcode, op_specific_data):
if opcode == 'decode':
decode_mpr_meta_info(archive)
iter_archive(archive, PrintArchive(sys.stdout))
elif opcode == 'burn':
burn_archive(archive, op_specific_data, op_specific_data.verbose, op_specific_data.check_image, op_specific_data.tcl_script, op_specific_data.dst_dir, op_specific_data.program_only)
elif ext_and_serialize_archive(archive, opcode, op_specific_data):
pass
else:
raise Exception('Unknown openocd "%s"'%opcode)
def decode_mpr_meta_info(archive):
sys.stdout.write('MPR start: {}, size: {}, lock_size: {}\n'.format(
archive.meta.mpr_start, archive.meta.mpr_size, archive.meta.mpr_lock_size))
def extend_archive(archive, opcode, image, op_specific_data):
field = ISP.Archive.Command.DESCRIPTOR.fields_by_name[opcode]
cmdTy = field.message_type
cmdUnion = ISP.Archive.Command()
cmdAttr = getattr(cmdUnion, opcode)
cmd = getattr(ISP.Archive, cmdTy.name)()
if opcode == 'loadOtpNvds':
commonLoad = cmd.commonLoad
else:
commonLoad = cmd.commonLoadFlash.commonLoad
commonLoad.image = image.read()
if opcode.startswith('loadFlash'):
for attrName in ['region_start', 'region_size']:
attr = getattr(op_specific_data, attrName)
if attr is not None:
setattr(cmd.commonLoadFlash, attrName, attr)
if opcode == 'loadFlash':
if op_specific_data.address is not None:
cmd.address = op_specific_data.address
for attrName in ['mpr_start', 'mpr_size', 'mpr_lock_size']:
attr = getattr(op_specific_data, attrName)
if attr is not None:
if op_specific_data.verbose:
print('store meta {}={} '.format(attrName, attr))
setattr(archive.meta, attrName, attr)
cmdAttr.CopyFrom(cmd)
archive.script.append(cmdUnion)
return archive
def main(args=None, namespace=None):
args = parse_args(args, namespace)
archive = ISP.Archive()
if args.opcode == 'init':
archive.meta.platform.family = args.family
archive.meta.platform.name = args.name
serialize_archive(archive, args.output_path)
return
if args.input_path == '-':
input_archive = sys.stdin.buffer.read()
else:
with open(args.input_path, 'rb') as input_archive_file:
input_archive = input_archive_file.read()
archive.ParseFromString(input_archive)
platform = archive.meta.platform
if platform.family and platform.name:
exec_cli_op(archive, args.opcode, args)
else:
sys.stderr.write('Uninitialized archive "{}"\n'.format(args.input_path))
sys.exit(1)
if __name__ == '__main__':
if sys.version_info[0] < 3:
sys.stderr.write('%s requires Python 3\n'%__file__)
sys.exit(2)
main()