"""
After initial setup, configure autopilot: create an autopilot directory and a prefs.json file
"""
import npyscreen as nps
import datetime
import _curses
import pprint
import json
import os
import subprocess
import argparse
import sys
import typing
import re
from pathlib import Path
from autopilot.setup.forms import Autopilot_Setup, DIRECTORY_STRUCTURE
from autopilot.setup.run_script import run_script, list_scripts, run_scripts
from autopilot.prefs import _DEFAULTS, Scopes
[docs]def make_dir(adir:Path, permissions:int=0o777):
"""
Make a directory if it doesn't exist and set its permissions to `0777`
Args:
adir (str): Path to the directory
permissions (int): an octal integer used to set directory permissions (default ``0o777``)
"""
adir = Path(adir)
if not adir.exists():
adir.mkdir(parents=True,exist_ok=True)
adir.chmod(permissions)
[docs]def make_alias(launch_script: Path, bash_profile: typing.Optional[str]=None) -> typing.Tuple[bool, str]:
"""
Make an alias so that calling ``autopilot`` calls ``autopilot_dir/launch_autopilot.sh``
Arguments:
launch_script (str): the path to the autopilot launch script to be aliased
bash_profile (str, None): Optional, location of shell profile to edit. if None, use ``.bashrc`` then ``.bash_profile`` if they exist
"""
result = True
message = 'alias for autopilot successfully created, open autopilot by calling upon it by its name like an old friend ;)'
# find bash file
if bash_profile is None:
if (Path.home() / '.bashrc').exists():
bash_profile = Path.home() / '.bashrc'
elif (Path.home() / '.bash_profile').exists():
bash_profile = Path.home() / '.bash_profile'
else:
result = False
message = 'No bash_profile provided and cant find in default locations! couldnt make alias'
return result, message
with open(bash_profile, 'r') as pfile:
profile = pfile.read()
# remove any previously set autopilot alias
re.sub('\n# autopilot alias generated by setup_autopilot.py.*\nalias autopilot.*', '', profile)
# make and append alias to profile
profile = profile + f"\n# autopilot alias generated by setup_autopilot.py on {datetime.datetime.now().isoformat()}\nalias autopilot={Path(launch_script).resolve()}\n"
try:
with open(bash_profile, 'w') as pfile:
pfile.write(profile)
except PermissionError as e:
result = False
message = f"Got a permission error trying to write into the .bashrc found at {bash_profile}"
return result, message
[docs]def parse_manual_prefs(manual_prefs:typing.List[str]) -> dict:
prefdict = {}
for p in manual_prefs:
key, val = p.split('=')
prefdict[key] = val
return prefdict
[docs]def parse_args():
parser = argparse.ArgumentParser(description="Setup an Autopilot Agent")
parser.add_argument('-f', '--prefs',
help="Location of .json prefs file (default: ~/autopilot/prefs.json")
parser.add_argument('-d', '--dir',
help="Autopilot directory (default: ~/autopilot)")
parser.add_argument('-p', '--pref',
help="Specify a pref manually like -p PREFNAME=value",
action='append')
parser.add_argument('-s', '--scripts',
help="Run setup scripts without entering a full setup routine. specify one or more like -s script1 -s script2 for available scripts see -l",
action='append')
parser.add_argument('-l', '--list_scripts',
help="list available setup scripts!",
action='store_true')
parser.add_argument('-q', '--quiet',
help="run setup in quiet mode, dont launch the interactive prompt, just use the prefs specified by -p and -f, and the scripts specified by -s",
action='store_true')
args = parser.parse_args()
if args.pref:
args.pref = parse_manual_prefs(args.pref)
if args.list_scripts:
list_scripts()
sys.exit()
elif args.scripts and not args.quiet:
run_scripts(args.scripts)
sys.exit()
return args
[docs]def locate_user_dir(args) -> Path:
if args.dir:
autopilot_dir = Path(args.dir).resolve()
else:
# check for a ~/.autopilot file that should point us to the autopilot directory if it exists
autopilot_conf_fn = Path().home() / '.autopilot'
if autopilot_conf_fn.exists():
with open(autopilot_conf_fn, 'r') as aconf:
autopilot_dir = Path(aconf.read()).resolve()
# autopilot_dir = autopilof_conf['AUTOPILOTDIR']
else:
autopilot_dir = Path().home() / 'autopilot/'
make_dir(autopilot_dir)
return autopilot_dir
[docs]def make_launch_script(prefs:dict, prefs_fn=None, launch_file=None, permissions:int=0o775) -> Path:
# Create a launch script
if prefs_fn is None:
prefs_fn = Path(prefs['BASEDIR']) / 'prefs.json'
else:
prefs_fn = Path(prefs_fn)
if launch_file is None:
launch_file = Path(prefs['BASEDIR']) / 'launch_autopilot.sh'
else:
launch_file = Path(launch_file)
if prefs['AGENT'] in ('PILOT', 'CHILD'):
with open(launch_file, 'w') as launch_file_open:
launch_file_open.write('#!/bin/bash\n')
launch_file_open.write('killall jackd\n')
launch_file_open.write('sudo killall pigpiod\n')
launch_file_open.write('sudo mount -o remount,size=128M /dev/shm\n')
if prefs['VENV']:
launch_file_open.write("source " + os.path.join(prefs['VENV'], 'bin', 'activate')+'\n')
launch_file_open.write('python3 -m autopilot.core.pilot -f {}'.format(prefs_fn))
elif prefs['AGENT'] == 'TERMINAL':
with open(launch_file, 'w') as launch_file_open:
launch_file_open.write('#!/bin/bash\n')
if prefs['VENV']:
launch_file_open.write("source " + os.path.join(prefs['VENV'], 'bin', 'activate')+'\n')
launch_file_open.write("python3 -m autopilot.core.terminal -f " + str(prefs_fn) + "\n")
os.chmod(launch_file, permissions)
return launch_file
[docs]def make_systemd(prefs:dict, launch_file:Path) -> typing.Tuple[bool, str]:
success = False
message = ''
systemd_string = '''
[Unit]
Description=autopilot
After=multi-user.target
[Service]
Type=idle
ExecStart={launch_pi}
Restart=on-failure
[Install]
WantedBy=multi-user.target'''.format(launch_pi=launch_file)
unit_loc = '/lib/systemd/system/autopilot.service'
try:
subprocess.call('sudo sh -c \"echo \'{}\' > {}\"'.format(systemd_string, unit_loc), shell=True)
# enable the service
subprocess.call(['sudo', 'systemctl', 'daemon-reload'])
sysd_result = subprocess.call(['sudo', 'systemctl', 'enable', 'autopilot.service'])
if sysd_result != 0:
message = 'Systemd service could not be enabled :('
else:
success = True
message = 'Systemd service installed and enabled, unit file written to {}'.format(unit_loc)
except PermissionError:
message = "systemd service could not be installed due to a permissions error.\n" + \
"create a unit file containing the following at {}\n\n{}".format(unit_loc, systemd_string)
return success, message
[docs]def results_string(env_results:dict,
config_msgs:typing.List[str],
error_msgs:typing.List[str],
prefs_fn:str,
prefs) -> str:
env_result = '\n----------------------------------------\n'
env_result += 'prefs.json has been created and saved to {}\n'.format(prefs_fn)
env_result += pprint.pformat(prefs)
env_result += '\n----------------------------------------\n'
env_result += "\033[0;32;40m\n--------------------------------\nEnvironment Configuration:\n"
for config, result in env_results.items():
if result:
env_result += " [ SUCCESS ] "
else:
env_result += " [ FAILURE ] "
env_result += config
env_result += '\n'
if prefs.get('VENV', False):
env_result +=f" [ SUCCESS ] virtualenv detected, path: {prefs['VENV']}\n"
else:
env_result += " [ CMONDOG ]\033[1;37;41m no virtualenv detected\u001b[0m, running autopilot outside a venv is not recommended but it might work who knows\n"
if len(config_msgs)>0:
env_result += '\nAdditional Messages:'
for msg in config_msgs:
env_result += ' '
env_result += msg
env_result += '\n'
if len(error_msgs)>0:
env_result += '\n--------------------------------\n'
for i, msg in enumerate(error_msgs):
env_result += '\033[1;37;41mSomething went wrong during setup, this is wrong thing #{}\u001b[0m'.format(i)
env_result += '\033[0;31;40m\n{}\n\u001b[0m'.format(msg)
env_result += "\u001b[0m"
return env_result
[docs]def make_ectopic_dirnames(basedir:Path) -> dict:
basedir = Path(basedir).resolve()
out = {}
out['BASEDIR'] = str(basedir)
for key, val in _DEFAULTS.items():
if val['scope'] == Scopes.DIRECTORY and key != "BASEDIR":
out[key] = str(basedir / Path(val['default']).stem)
return out
[docs]def main():
env = {}
env_results = {}
prefs = {}
extra_dirs = {}
error_msgs = []
config_msgs = []
args = parse_args()
autopilot_dir = locate_user_dir(args)
# if we were passed an explicit basedir, make a dict of the usual directory structure
# to use as defaults
if args.dir is not None:
extra_dirs = make_ectopic_dirnames(autopilot_dir)
# attempt to load .prefs from standard location (~/autopilot/prefs.json)
if args.prefs:
prefs_fn = Path(args.prefs).resolve()
else:
prefs_fn = autopilot_dir / 'prefs.json'
if prefs_fn.exists():
if not args.quiet:
print(f'Existing prefs found, loading from {prefs_fn}')
with open(prefs_fn, 'r') as prefs_f:
prefs = json.load(prefs_f)
else:
if not args.quiet:
print('No existing prefs found, starting from defaults')
# recombine default directories after loading prefs, but before
# incorporating any custom passed prefs.
prefs.update(extra_dirs)
# combine and manually given prefs
if args.pref is not None:
prefs.update(args.pref)
#################
# run interactive form unless we were passed one or both of the things it gives us (prefs and scripts)
if not args.quiet:
new_prefs, active_scripts = run_form(prefs)
prefs.update(new_prefs)
else:
if args.scripts is not None:
active_scripts = args.scripts
else:
active_scripts = []
####################################
# Configure Environment
# run our scripts
env_results.update(run_scripts(active_scripts, return_all=True, print_status=False))
# Create directory structure if needed
dirs_to_make = [path for dir_name, path in prefs.items() if dir_name in DIRECTORY_STRUCTURE.keys()]
print('Creating Directories: \n' + '\n'.join(dirs_to_make))
for make_this_dir in dirs_to_make:
make_dir(make_this_dir)
# make our launch script
launch_file = make_launch_script(prefs)
config_msgs.append("Launch file created at {}".format(launch_file))
# install as systemd service if requested
if 'systemd' in active_scripts:
made_systemd, msg = make_systemd(prefs, launch_file)
env_results['systemd'] = made_systemd
if made_systemd:
config_msgs.append(msg)
else:
error_msgs.append(msg)
if 'alias' in active_scripts:
# make an alias for autopilot!
made_alias, msg = make_alias(launch_file)
env_results['alias'] = made_alias
if made_alias:
config_msgs.append(msg)
else:
error_msgs.append(msg)
####################################
# save prefs and finalize environment
# save prefs
with open(prefs_fn, 'w') as prefs_f:
json.dump(prefs, prefs_f, indent=4, separators=(',', ': '), sort_keys=True)
# save basedir in autopilot user file
with open(os.path.join(os.path.expanduser('~'), '.autopilot'), 'w') as autopilot_f:
autopilot_f.write(prefs['BASEDIR'])
#####################################3
# User feedback
result_str = results_string(env_results, config_msgs, error_msgs, prefs_fn, prefs)
print(result_str)
if __name__ == "__main__":
main()