Source code for autopilot.setup.setup_autopilot

"""
After initial setup, configure autopilot: create an autopilot directory and a prefs.json file

"""

import npyscreen as nps
import datetime
import _curses
import pprint
import json
import os
import subprocess
import argparse
import sys
import typing
import re
from pathlib import Path

from autopilot.setup.forms import Autopilot_Setup, DIRECTORY_STRUCTURE
from autopilot.setup.run_script import run_script, list_scripts, run_scripts
from autopilot.prefs import _DEFAULTS, Scopes


[docs]def make_dir(adir:Path, permissions:int=0o777): """ Make a directory if it doesn't exist and set its permissions to `0777` Args: adir (str): Path to the directory permissions (int): an octal integer used to set directory permissions (default ``0o777``) """ adir = Path(adir) if not adir.exists(): adir.mkdir(parents=True,exist_ok=True) adir.chmod(permissions)
[docs]def make_alias(launch_script: Path, bash_profile: typing.Optional[str]=None) -> typing.Tuple[bool, str]: """ Make an alias so that calling ``autopilot`` calls ``autopilot_dir/launch_autopilot.sh`` Arguments: launch_script (str): the path to the autopilot launch script to be aliased bash_profile (str, None): Optional, location of shell profile to edit. if None, use ``.bashrc`` then ``.bash_profile`` if they exist """ result = True message = 'alias for autopilot successfully created, open autopilot by calling upon it by its name like an old friend ;)' # find bash file if bash_profile is None: if (Path.home() / '.bashrc').exists(): bash_profile = Path.home() / '.bashrc' elif (Path.home() / '.bash_profile').exists(): bash_profile = Path.home() / '.bash_profile' else: result = False message = 'No bash_profile provided and cant find in default locations! couldnt make alias' return result, message with open(bash_profile, 'r') as pfile: profile = pfile.read() # remove any previously set autopilot alias re.sub('\n# autopilot alias generated by setup_autopilot.py.*\nalias autopilot.*', '', profile) # make and append alias to profile profile = profile + f"\n# autopilot alias generated by setup_autopilot.py on {datetime.datetime.now().isoformat()}\nalias autopilot={Path(launch_script).resolve()}\n" try: with open(bash_profile, 'w') as pfile: pfile.write(profile) except PermissionError as e: result = False message = f"Got a permission error trying to write into the .bashrc found at {bash_profile}" return result, message
[docs]def parse_manual_prefs(manual_prefs:typing.List[str]) -> dict: prefdict = {} for p in manual_prefs: key, val = p.split('=') prefdict[key] = val return prefdict
[docs]def parse_args(): parser = argparse.ArgumentParser(description="Setup an Autopilot Agent") parser.add_argument('-f', '--prefs', help="Location of .json prefs file (default: ~/autopilot/prefs.json") parser.add_argument('-d', '--dir', help="Autopilot directory (default: ~/autopilot)") parser.add_argument('-p', '--pref', help="Specify a pref manually like -p PREFNAME=value", action='append') parser.add_argument('-s', '--scripts', help="Run setup scripts without entering a full setup routine. specify one or more like -s script1 -s script2 for available scripts see -l", action='append') parser.add_argument('-l', '--list_scripts', help="list available setup scripts!", action='store_true') parser.add_argument('-q', '--quiet', help="run setup in quiet mode, dont launch the interactive prompt, just use the prefs specified by -p and -f, and the scripts specified by -s", action='store_true') args = parser.parse_args() if args.pref: args.pref = parse_manual_prefs(args.pref) if args.list_scripts: list_scripts() sys.exit() elif args.scripts and not args.quiet: run_scripts(args.scripts) sys.exit() return args
[docs]def locate_user_dir(args) -> Path: if args.dir: autopilot_dir = Path(args.dir).resolve() else: # check for a ~/.autopilot file that should point us to the autopilot directory if it exists autopilot_conf_fn = Path().home() / '.autopilot' if autopilot_conf_fn.exists(): with open(autopilot_conf_fn, 'r') as aconf: autopilot_dir = Path(aconf.read()).resolve() # autopilot_dir = autopilof_conf['AUTOPILOTDIR'] else: autopilot_dir = Path().home() / 'autopilot/' make_dir(autopilot_dir) return autopilot_dir
[docs]def run_form(prefs:dict) -> typing.Tuple[dict, typing.List[str]]: ###################################3 # Run the npyscreen prompt try: setup = Autopilot_Setup(prefs) setup.run() except (_curses.error, nps.wgwidget.NotEnoughSpaceForWidget) as e: # get minimum column count try: min_cols = setup.getForm(setup.STARTING_FORM).min_c print(f'Problem opening the Setup GUI!\nThis is most likely due to this window not being wide enough\n' + \ 'The minimum width for the setup GUI is:\n\033[0;32;40m' + \ "-" * min_cols + '\u001b[0m\n\n' + f'Got error:\n{e}') except: print( f'Problem opening the Setup GUI!\nThis is most likely due to this window not being wide enough\n\nGot Error:\n{e}') sys.exit() #################################### # Collect values new_prefs = setup.unpack_prefs() active_scripts = setup.active_scripts return new_prefs, active_scripts
[docs]def make_launch_script(prefs:dict, prefs_fn=None, launch_file=None, permissions:int=0o775) -> Path: # Create a launch script if prefs_fn is None: prefs_fn = Path(prefs['BASEDIR']) / 'prefs.json' else: prefs_fn = Path(prefs_fn) if launch_file is None: launch_file = Path(prefs['BASEDIR']) / 'launch_autopilot.sh' else: launch_file = Path(launch_file) if prefs['AGENT'] in ('PILOT', 'CHILD'): with open(launch_file, 'w') as launch_file_open: launch_file_open.write('#!/bin/bash\n') launch_file_open.write('killall jackd\n') launch_file_open.write('sudo killall pigpiod\n') launch_file_open.write('sudo mount -o remount,size=128M /dev/shm\n') if prefs['VENV']: launch_file_open.write("source " + os.path.join(prefs['VENV'], 'bin', 'activate')+'\n') launch_file_open.write('python3 -m autopilot.core.pilot -f {}'.format(prefs_fn)) elif prefs['AGENT'] == 'TERMINAL': with open(launch_file, 'w') as launch_file_open: launch_file_open.write('#!/bin/bash\n') if prefs['VENV']: launch_file_open.write("source " + os.path.join(prefs['VENV'], 'bin', 'activate')+'\n') launch_file_open.write("python3 -m autopilot.core.terminal -f " + str(prefs_fn) + "\n") os.chmod(launch_file, permissions) return launch_file
[docs]def make_systemd(prefs:dict, launch_file:Path) -> typing.Tuple[bool, str]: success = False message = '' systemd_string = ''' [Unit] Description=autopilot After=multi-user.target [Service] Type=idle ExecStart={launch_pi} Restart=on-failure [Install] WantedBy=multi-user.target'''.format(launch_pi=launch_file) unit_loc = '/lib/systemd/system/autopilot.service' try: subprocess.call('sudo sh -c \"echo \'{}\' > {}\"'.format(systemd_string, unit_loc), shell=True) # enable the service subprocess.call(['sudo', 'systemctl', 'daemon-reload']) sysd_result = subprocess.call(['sudo', 'systemctl', 'enable', 'autopilot.service']) if sysd_result != 0: message = 'Systemd service could not be enabled :(' else: success = True message = 'Systemd service installed and enabled, unit file written to {}'.format(unit_loc) except PermissionError: message = "systemd service could not be installed due to a permissions error.\n" + \ "create a unit file containing the following at {}\n\n{}".format(unit_loc, systemd_string) return success, message
[docs]def results_string(env_results:dict, config_msgs:typing.List[str], error_msgs:typing.List[str], prefs_fn:str, prefs) -> str: env_result = '\n----------------------------------------\n' env_result += 'prefs.json has been created and saved to {}\n'.format(prefs_fn) env_result += pprint.pformat(prefs) env_result += '\n----------------------------------------\n' env_result += "\033[0;32;40m\n--------------------------------\nEnvironment Configuration:\n" for config, result in env_results.items(): if result: env_result += " [ SUCCESS ] " else: env_result += " [ FAILURE ] " env_result += config env_result += '\n' if prefs.get('VENV', False): env_result +=f" [ SUCCESS ] virtualenv detected, path: {prefs['VENV']}\n" else: env_result += " [ CMONDOG ]\033[1;37;41m no virtualenv detected\u001b[0m, running autopilot outside a venv is not recommended but it might work who knows\n" if len(config_msgs)>0: env_result += '\nAdditional Messages:' for msg in config_msgs: env_result += ' ' env_result += msg env_result += '\n' if len(error_msgs)>0: env_result += '\n--------------------------------\n' for i, msg in enumerate(error_msgs): env_result += '\033[1;37;41mSomething went wrong during setup, this is wrong thing #{}\u001b[0m'.format(i) env_result += '\033[0;31;40m\n{}\n\u001b[0m'.format(msg) env_result += "\u001b[0m" return env_result
[docs]def make_ectopic_dirnames(basedir:Path) -> dict: basedir = Path(basedir).resolve() out = {} out['BASEDIR'] = str(basedir) for key, val in _DEFAULTS.items(): if val['scope'] == Scopes.DIRECTORY and key != "BASEDIR": out[key] = str(basedir / Path(val['default']).stem) return out
[docs]def main(): env = {} env_results = {} prefs = {} extra_dirs = {} error_msgs = [] config_msgs = [] args = parse_args() autopilot_dir = locate_user_dir(args) # if we were passed an explicit basedir, make a dict of the usual directory structure # to use as defaults if args.dir is not None: extra_dirs = make_ectopic_dirnames(autopilot_dir) # attempt to load .prefs from standard location (~/autopilot/prefs.json) if args.prefs: prefs_fn = Path(args.prefs).resolve() else: prefs_fn = autopilot_dir / 'prefs.json' if prefs_fn.exists(): if not args.quiet: print(f'Existing prefs found, loading from {prefs_fn}') with open(prefs_fn, 'r') as prefs_f: prefs = json.load(prefs_f) else: if not args.quiet: print('No existing prefs found, starting from defaults') # recombine default directories after loading prefs, but before # incorporating any custom passed prefs. prefs.update(extra_dirs) # combine and manually given prefs if args.pref is not None: prefs.update(args.pref) ################# # run interactive form unless we were passed one or both of the things it gives us (prefs and scripts) if not args.quiet: new_prefs, active_scripts = run_form(prefs) prefs.update(new_prefs) else: if args.scripts is not None: active_scripts = args.scripts else: active_scripts = [] #################################### # Configure Environment # run our scripts env_results.update(run_scripts(active_scripts, return_all=True, print_status=False)) # Create directory structure if needed dirs_to_make = [path for dir_name, path in prefs.items() if dir_name in DIRECTORY_STRUCTURE.keys()] print('Creating Directories: \n' + '\n'.join(dirs_to_make)) for make_this_dir in dirs_to_make: make_dir(make_this_dir) # make our launch script launch_file = make_launch_script(prefs) config_msgs.append("Launch file created at {}".format(launch_file)) # install as systemd service if requested if 'systemd' in active_scripts: made_systemd, msg = make_systemd(prefs, launch_file) env_results['systemd'] = made_systemd if made_systemd: config_msgs.append(msg) else: error_msgs.append(msg) if 'alias' in active_scripts: # make an alias for autopilot! made_alias, msg = make_alias(launch_file) env_results['alias'] = made_alias if made_alias: config_msgs.append(msg) else: error_msgs.append(msg) #################################### # save prefs and finalize environment # save prefs with open(prefs_fn, 'w') as prefs_f: json.dump(prefs, prefs_f, indent=4, separators=(',', ': '), sort_keys=True) # save basedir in autopilot user file with open(os.path.join(os.path.expanduser('~'), '.autopilot'), 'w') as autopilot_f: autopilot_f.write(prefs['BASEDIR']) #####################################3 # User feedback result_str = results_string(env_results, config_msgs, error_msgs, prefs_fn, prefs) print(result_str)
if __name__ == "__main__": main()