commit
dea0ee119a
40 changed files with 1368 additions and 0 deletions
@ -0,0 +1,8 @@ |
||||
[run] |
||||
omit = |
||||
docs/* |
||||
scripttease/cli/__init__.py |
||||
sandbox |
||||
setup.py |
||||
tmp/* |
||||
tmp.* |
@ -0,0 +1,14 @@ |
||||
*.b |
||||
*.bak |
||||
*.pyc |
||||
*.swp |
||||
.coverage |
||||
.DS_Store |
||||
.idea |
||||
.pytest_cache |
||||
__pycache__ |
||||
_scraps |
||||
docs/build |
||||
htmlcov |
||||
tmp.* |
||||
tmp |
@ -0,0 +1 @@ |
||||
A collection of classes and commands for automated command line scripting using Python. |
@ -0,0 +1,27 @@ |
||||
Copyright (c) Pleasant Tents, LLC |
||||
|
||||
All rights reserved. |
||||
|
||||
Redistribution and use in source and binary forms, with or without modification, |
||||
are permitted provided that the following conditions are met: |
||||
|
||||
* Redistributions of source code must retain the above copyright notice, |
||||
this list of conditions and the following disclaimer. |
||||
* Redistributions in binary form must reproduce the above copyright notice, |
||||
this list of conditions and the following disclaimer in the documentation |
||||
and/or other materials provided with the distribution. |
||||
* Neither the name of Pleasant Tents, LLC nor the names of its contributors |
||||
may be used to endorse or promote products derived from this software |
||||
without specific prior written permission. |
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
||||
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
||||
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
||||
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
||||
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
||||
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
||||
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
||||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
@ -0,0 +1,44 @@ |
||||
.PHONY: docs help tests |
||||
|
||||
# The path to source code to be counted with cloc.
|
||||
CLOC_PATH := scripttease
|
||||
|
||||
# The directory where test coverage is generated.
|
||||
COVERAGE_PATH := docs/build/html/coverage
|
||||
|
||||
# Attempt to load a local makefile which may override any of the values above.
|
||||
-include local.makefile |
||||
|
||||
#> help - Show help.
|
||||
help: |
||||
@echo ""
|
||||
@echo "Management Commands"
|
||||
@echo "------------------------------------------------------------------------------"
|
||||
@cat Makefile | grep "^#>" | sed 's/\#\> //g';
|
||||
@echo ""
|
||||
|
||||
#> docs - Generate documentation.
|
||||
docs: lines |
||||
cd docs && make html;
|
||||
cd docs && make coverage;
|
||||
open docs/build/coverage/python.txt;
|
||||
open docs/build/html/index.html;
|
||||
|
||||
#> clean - Remove pyc files.
|
||||
clean: |
||||
find . -name '*.pyc' -delete;
|
||||
|
||||
# lines - Generate lines of code report.
|
||||
lines: |
||||
rm -f docs/source/_data/cloc.csv;
|
||||
echo "files,language,blank,comment,code" > docs/source/_data/cloc.csv;
|
||||
cloc $(CLOC_PATH) --csv --quiet --unix --report-file=tmp.csv
|
||||
tail -n +2 tmp.csv >> docs/source/_data/cloc.csv;
|
||||
rm tmp.csv;
|
||||
|
||||
#> tests - Run unit tests and generate coverage report.
|
||||
tests: |
||||
coverage run --source=. -m pytest;
|
||||
coverage html --directory=$(COVERAGE_PATH);
|
||||
open $(COVERAGE_PATH)/index.html;
|
||||
|
@ -0,0 +1,7 @@ |
||||
# Python Script Tease |
||||
|
||||
![](https://img.shields.io/badge/status-active-green.svg) |
||||
![](https://img.shields.io/badge/stage-development-blue.svg) |
||||
![](https://img.shields.io/badge/coverage-55%25-yellow.svg) |
||||
|
||||
A collection of classes and commands for automated command line scripting using Python. |
@ -0,0 +1 @@ |
||||
5.8.18-d |
@ -0,0 +1,9 @@ |
||||
[project] |
||||
category = developer |
||||
description = A collection of classes and commands for automated command line scripting using Pythonn. |
||||
title = Python Script Tease |
||||
type = cli |
||||
|
||||
[business] |
||||
code = PTL |
||||
name = Pleasant Tents, LLC |
@ -0,0 +1,3 @@ |
||||
[pytest] |
||||
norecursedirs = .git _scraps docs sandbox tmp |
||||
testpaths = tests |
@ -0,0 +1,3 @@ |
||||
coverage |
||||
django |
||||
sphinx |
@ -0,0 +1,10 @@ |
||||
import os |
||||
|
||||
__all__ = ( |
||||
"LOGGER_NAME", |
||||
"PATH_TO_SCRIPT_TEASE", |
||||
) |
||||
|
||||
LOGGER_NAME = "script-tease" |
||||
|
||||
PATH_TO_SCRIPT_TEASE = os.path.abspath(os.path.dirname(__file__)) |
@ -0,0 +1,47 @@ |
||||
[apache] |
||||
disable_module = a2dismod {{ module_name }} |
||||
disable_site = a2dissite {{ domain_name }}.conf |
||||
enable_module = a2enmod {{ module_name }} |
||||
enable_site = a2ensite {{ domain_name }}.conf |
||||
reload = service apache2 reload |
||||
restart = service apache2 restart |
||||
start = service apache2 start |
||||
stop = service apache2 stop |
||||
test = apachectl configtest |
||||
|
||||
[package_install] |
||||
system = apt-get install -y {{ package_name }} |
||||
pip = pip3 install{% if upgrade %} --upgrade{% endif %} --quiet {{ package_name }} |
||||
|
||||
[package_remove] |
||||
system = apt-get uninstall -y {{ package_name }} |
||||
pip = pip3 uninstall --quiet {{ package_name }} |
||||
|
||||
[system] |
||||
install = apt-get install -y {{ package_name }} |
||||
reboot = reboot |
||||
remove = apt-get uninstall -y {{ package_name }} |
||||
update = apt-get update -y |
||||
upgrade = apt-get upgrade -y |
||||
|
||||
[python] |
||||
virtualenv = virtualenv {{ name }} |
||||
install = pip3 install{% if upgrade %} --upgrade{% endif %} --quiet {{ package_name }} |
||||
remove = pip3 uninstall --quiet {{ package_name }} |
||||
|
||||
[files] |
||||
append = echo "{{ content }}" >> {{ path }} |
||||
chgrp = chgrp{% if recursive %} -R{% endif %} {{ group }} {{ path }} |
||||
chmod = chmod{% if recursive %} -R{% endif %} {{ owner }} {{ path }} |
||||
chown = chown{% if recursive %} -R{% endif %} {{ mode }} {{ path }} |
||||
copy = cp{% if recursive %} -R{% endif %}{% if overwrite %} -n{% endif %} {{ from_path }} {{ to_path }} |
||||
mkdir = mkdir{% if mode %} -m {{ mode }}{% endif %}{% if recursive %} -p{% endif %} {{ path }} |
||||
move = move {{ from_path }} {{ to_path }} |
||||
rename = move {{ from_path }} {{ to_path }} |
||||
remove = rm{% if force %} -f{% endif %}{% if recursive %} -r{% endif %} {{ path }} |
||||
;rsync = ? |
||||
;scopy = ? |
||||
;sed = ? |
||||
symlink = ln -s{% if force %} -f{% endif %} {{ source }} {{ target }} |
||||
touch = touch {{ path }} |
||||
;write = ? |
@ -0,0 +1,50 @@ |
||||
# Imports |
||||
|
||||
import logging |
||||
# from ..scripts import Function |
||||
from ...constants import LOGGER_NAME |
||||
from .base import ItemizedCommand |
||||
from .mappings import MAPPING |
||||
|
||||
log = logging.getLogger(LOGGER_NAME) |
||||
|
||||
# Functions |
||||
|
||||
|
||||
def command_exists(name): |
||||
"""Indicates whether the named command exists. |
||||
|
||||
:param name: The name of the command to be checked. |
||||
:type name: str |
||||
|
||||
:rtype: bool |
||||
|
||||
""" |
||||
return name in MAPPING |
||||
|
||||
|
||||
def command_factory(name, comment, overlay, *args, **kwargs): |
||||
# if name in ("func", "function"): |
||||
# kwargs['comment'] = comment |
||||
# return Function(*args, **kwargs) |
||||
|
||||
if not command_exists(name): |
||||
log.warning("No mapping for command: %s" % name) |
||||
return None |
||||
|
||||
_args = list(args) |
||||
kwargs['comment'] = comment |
||||
kwargs['overlay'] = overlay |
||||
|
||||
log.debug("%s: %s" % (comment, kwargs)) |
||||
|
||||
command_class = MAPPING[name] |
||||
try: |
||||
items = kwargs.pop("items", None) |
||||
if items is not None: |
||||
return ItemizedCommand(command_class, items, *_args, **kwargs) |
||||
|
||||
return command_class(*_args, **kwargs) |
||||
except (KeyError, TypeError, ValueError) as e: |
||||
log.critical("Failed to load %s command: %s" % (name, e)) |
||||
return None |
@ -0,0 +1,9 @@ |
||||
# Classes |
||||
|
||||
|
||||
class Apache(object): |
||||
|
||||
def __init__(self, subcommand, *args, **kwargs): |
||||
if subcommand == "disable": |
||||
pass |
||||
|
@ -0,0 +1,185 @@ |
||||
# Classes |
||||
|
||||
|
||||
class Command(object): |
||||
|
||||
def __init__(self, statement, comment=None, condition=None, cd=None, environments=None, function=None, prefix=None, |
||||
register=None, shell=None, stop=False, sudo=None, tags=None, **kwargs): |
||||
self.comment = comment |
||||
self.condition = condition |
||||
self.cd = cd |
||||
self.environments = environments or list() |
||||
self.function = function |
||||
self.prefix = prefix |
||||
self.register = register |
||||
self.shell = shell |
||||
self.statement = statement |
||||
self.stop = stop |
||||
self.tags = tags or list() |
||||
|
||||
if isinstance(sudo, Sudo): |
||||
self.sudo = sudo |
||||
elif type(sudo) is str: |
||||
self.sudo = Sudo(enabled=True, user=sudo) |
||||
elif sudo is True: |
||||
self.sudo = Sudo(enabled=True) |
||||
else: |
||||
self.sudo = Sudo() |
||||
|
||||
self._attributes = kwargs |
||||
|
||||
def __getattr__(self, item): |
||||
return self._attributes.get(item) |
||||
|
||||
def __repr__(self): |
||||
if self.comment is not None: |
||||
return "<%s %s>" % (self.__class__.__name__, self.comment) |
||||
|
||||
return "<%s>" % self.__class__.__name__ |
||||
|
||||
def get_statement(self, cd=False): |
||||
"""Get the full statement. |
||||
|
||||
:param cd: Include the directory change, if given. |
||||
:type cd: bool |
||||
|
||||
:rtype: str |
||||
|
||||
""" |
||||
a = list() |
||||
|
||||
if cd and self.cd is not None: |
||||
a.append("( cd %s &&" % self.cd) |
||||
|
||||
if self.prefix is not None: |
||||
a.append("%s &&" % self.prefix) |
||||
|
||||
if self.sudo: |
||||
statement = "sudo -u %s %s" % (self.sudo.user, self._get_statement()) |
||||
else: |
||||
statement = self._get_statement() |
||||
|
||||
a.append("%s" % statement) |
||||
|
||||
if cd and self.cd is not None: |
||||
a.append(")") |
||||
|
||||
b = list() |
||||
if self.comment is not None: |
||||
b.append("# %s" % self.comment) |
||||
|
||||
if self.condition is not None: |
||||
b.append("if [[ %s ]]; then %s; fi;" % (self.condition, " ".join(a))) |
||||
else: |
||||
b.append(" ".join(a)) |
||||
|
||||
if self.register is not None: |
||||
b.append("%s=$?;" % self.register) |
||||
|
||||
if self.stop: |
||||
b.append("if [[ $%s -gt 0 ]]; exit 1; fi;" % self.register) |
||||
elif self.stop: |
||||
b.append("if [[ $? -gt 0 ]]; exit 1; fi;") |
||||
else: |
||||
pass |
||||
|
||||
return "\n".join(b) |
||||
|
||||
def _get_statement(self): |
||||
"""By default, get the statement passed upon command initialization. |
||||
|
||||
:rtype: str |
||||
|
||||
""" |
||||
return self.statement |
||||
|
||||
|
||||
class ItemizedCommand(object): |
||||
|
||||
def __init__(self, command_class, items, *args, **kwargs): |
||||
"""Initialize the command. |
||||
|
||||
:param command_class: The command class to be used. |
||||
:type command_class: class |
||||
|
||||
:param items: The command arguments. |
||||
:type items: list[str] |
||||
|
||||
:param args: The itemized arguments. ``$item`` should be included. |
||||
|
||||
:param kwargs: Keyword arguments are passed to the command class upon instantiation. |
||||
|
||||
""" |
||||
self.args = args |
||||
self.command_class = command_class |
||||
self.items = items |
||||
self.kwargs = kwargs |
||||
|
||||
def __getattr__(self, item): |
||||
return self.kwargs.get(item) |
||||
|
||||
def __repr__(self): |
||||
return "<%s %s>" % (self.__class__.__name__, self.command_class.__name__) |
||||
|
||||
def get_commands(self): |
||||
"""Get the commands to be executed. |
||||
|
||||
:rtype: list[BaseType(Command)] |
||||
|
||||
""" |
||||
kwargs = self.kwargs.copy() |
||||
|
||||
a = list() |
||||
for item in self.items: |
||||
args = list() |
||||
for arg in self.args: |
||||
args.append(arg.replace("$item", item)) |
||||
|
||||
command = self.command_class(*args, **kwargs) |
||||
a.append(command) |
||||
|
||||
return a |
||||
|
||||
def get_statement(self, cd=False): |
||||
"""Override to get multiple commands.""" |
||||
kwargs = self.kwargs.copy() |
||||
comment = kwargs.pop("comment", "execute multiple commands") |
||||
|
||||
a = list() |
||||
# a.append("# %s" % comment) |
||||
|
||||
commands = self.get_commands() |
||||
for c in commands: |
||||
a.append(c.get_statement(cd=cd)) |
||||
a.append("") |
||||
|
||||
# for item in self.items: |
||||
# args = list() |
||||
# for arg in self.args: |
||||
# args.append(arg.replace("$item", item)) |
||||
# |
||||
# command = self.command_class(*args, **kwargs) |
||||
# a.append(command.preview(cwd=cwd)) |
||||
# a.append("") |
||||
|
||||
return "\n".join(a) |
||||
|
||||
|
||||
class Sudo(object): |
||||
"""Helper class for defining sudo options.""" |
||||
|
||||
def __init__(self, enabled=False, user="root"): |
||||
"""Initialize the helper. |
||||
|
||||
:param enabled: Indicates sudo is enabled. |
||||
:type enabled: bool |
||||
|
||||
:param user: The user to be invoked. |
||||
:type user: str |
||||
|
||||
""" |
||||
self.enabled = enabled |
||||
self.user = user |
||||
|
||||
def __bool__(self): |
||||
return self.enabled |
@ -0,0 +1,4 @@ |
||||
from .python import MAPPING as PYTHON_MAPPING |
||||
|
||||
MAPPING = dict() |
||||
MAPPING.update(PYTHON_MAPPING) |
@ -0,0 +1,23 @@ |
||||
# Classes |
||||
|
||||
|
||||
class Install(object): |
||||
|
||||
def __init__(self, name, manager="pip", overlay=None, upgrade=False, **kwargs): |
||||
if overlay is not None: |
||||
statement = overlay.get("package_install", manager, package_name=name, upgrade=upgrade) |
||||
else: |
||||
statement = "%s install %s" % (manager, name) |
||||
|
||||
self.statement = statement |
||||
|
||||
|
||||
class Remove(object): |
||||
|
||||
def __init__(self, name, manager="pip", overlay=None): |
||||
if overlay is not None: |
||||
statement = overlay.get("package_remove", manager, package_name=name) |
||||
else: |
||||
statement = "%s uninstall %s" % (manager, name) |
||||
|
||||
|
@ -0,0 +1,43 @@ |
||||
# Imports |
||||
|
||||
from .base import Command |
||||
|
||||
# Exports |
||||
|
||||
__all__ = ( |
||||
"Pip", |
||||
"VirtualEnv", |
||||
) |
||||
|
||||
# Classes |
||||
|
||||
|
||||
class Pip(Command): |
||||
|
||||
def __init__(self, name, op="install", overlay=None, upgrade=False, venv=None, **kwargs): |
||||
if overlay is not None: |
||||
statement = overlay.get("python", op, package_name=name, upgrade=upgrade) |
||||
else: |
||||
statement = "pip %s -y %s" % (op, name) |
||||
|
||||
if venv is not None: |
||||
kwargs['prefix'] = "source %s/bin/activate" % venv |
||||
|
||||
kwargs.setdefault("comment", "%s %s" % (op, name)) |
||||
|
||||
super().__init__(statement, **kwargs) |
||||
|
||||
|
||||
class VirtualEnv(Command): |
||||
|
||||
def __init__(self, name="python", overlay=None, **kwargs): |
||||
kwargs.setdefault("comment", "create %s virtual environment" % name) |
||||
|
||||
statement = "virtualenv %s" % name |
||||
super().__init__(statement, **kwargs) |
||||
|
||||
|
||||
MAPPING = { |
||||
'pip': Pip, |
||||
'virtualenv': VirtualEnv, |
||||
} |
@ -0,0 +1,97 @@ |
||||
# Imports |
||||
|
||||
from configparser import RawConfigParser |
||||
import os |
||||
from superpython.utils import parse_jinja_string |
||||
from ..constants import PATH_TO_SCRIPT_TEASE |
||||
|
||||
# Exports |
||||
|
||||
__all__ = ( |
||||
"Overlay", |
||||
) |
||||
|
||||
# Classes |
||||
|
||||
|
||||
class Overlay(object): |
||||
"""An overlay applies commands specific to a given operating system or platform.""" |
||||
|
||||
def __init__(self, name): |
||||
self.is_loaded = False |
||||
self._name = name |
||||
self._path = os.path.join(PATH_TO_SCRIPT_TEASE, "data", "overlays", "%s.ini" % name) |
||||
self._sections = dict() |
||||
|
||||
def __repr__(self): |
||||
return "<%s %s>" % (self.__class__.__name__, self._name) |
||||
|
||||
@property |
||||
def exists(self): |
||||
"""Indicates whether the overlay file exists. |
||||
|
||||
:rtype: bool |
||||
|
||||
""" |
||||
return os.path.exists(self._path) |
||||
|
||||
def get(self, section, key, **kwargs): |
||||
"""Get the command statement for the given section and key. |
||||
|
||||
:param section: The section name. |
||||
:type section: str |
||||
|
||||
:param key: The key within the section. |
||||
:type key: str |
||||
|
||||
kwargs are used to parse the value of the key within the section. |
||||
|
||||
:rtype: str | None |
||||
|
||||
""" |
||||
if not self.has(section, key): |
||||
return None |
||||
|
||||
template = self._sections[section][key] |
||||
|
||||
return parse_jinja_string(template, kwargs) |
||||
|
||||
def has(self, section, key): |
||||
"""Determine whether the overlay contains a given section and key. |
||||
|
||||
:param section: The section name. |
||||
:type section: str |
||||
|
||||
:param key: The key within the section. |
||||
:type key: str |
||||
|
||||
:rtype: bool |
||||
|
||||
""" |
||||
if section not in self._sections: |
||||
return False |
||||
|
||||
if key not in self._sections[section]: |
||||
return False |
||||
|
||||
return True |
||||
|
||||
def load(self): |
||||
"""Load the overlay. |
||||
|
||||
:rtype: bool |
||||
|
||||
""" |
||||
if not self.exists: |
||||
return False |
||||
|
||||
ini = RawConfigParser() |
||||
ini.read(self._path) |
||||
|
||||
for section in ini.sections(): |
||||
self._sections[section] = dict() |
||||
for key, value in ini.items(section): |
||||
self._sections[section][key] = value |
||||
|
||||
self.is_loaded = True |
||||
return True |
@ -0,0 +1,69 @@ |
||||
# Classes |
||||
|
||||
|
||||
class Script(object): |
||||
"""A script is a collection of commands.""" |
||||
|
||||
def __init__(self, name, commands=None, functions=None, shell="bash"): |
||||
"""Initialize a script. |
||||
|
||||
:param name: The name of the script. Note: This becomes the file name. |
||||
:type name: str |
||||
|
||||
:param commands: The commands to be included. |
||||
:type commands: list[BaseType[Command]] |
||||
|
||||
:param functions: The functions to be included. |
||||
:type functions: list[Function] |
||||
|
||||
:param shell: The shell to use for the script. |
||||
:type shell: str |
||||
|
||||
""" |
||||
self.commands = commands or list() |
||||
self.functions = functions |
||||
self.name = name |
||||
self.shell = shell |
||||
|
||||
def __str__(self): |
||||
return self.to_string() |
||||
|
||||
def append(self, command): |
||||
"""Append a command instance to the script's commands. |
||||
|
||||
:param command: The command instance to be included. |
||||
:type command: BaseType[Command] | ItemizedCommand |
||||
|
||||
""" |
||||
self.commands.append(command) |
||||
|
||||
def to_string(self, shebang="#! /usr/bin/env %(shell)s"): |
||||
"""Export the script as a string. |
||||
|
||||
:param shebang: The shebang to be included. Set to ``None`` to omit the shebang. |
||||
:type shebang: str |
||||
|
||||
:rtype: str |
||||
|
||||
""" |
||||
a = list() |
||||
|
||||
if shebang is not None: |
||||
a.append("%s" % {'shell': self.shell}) |
||||
a.append("") |
||||
|
||||
if self.functions is not None: |
||||
for function in self.functions: |
||||
a.append(function.preview()) |
||||
a.append("") |
||||
|
||||
for function in self.functions: |
||||
a.append("%s;" % function.name) |
||||
|
||||
a.append("") |
||||
|
||||
for command in self.commands: |
||||
a.append(command.preview(cwd=True)) |
||||
a.append("") |
||||
|
||||
return "\n".join(a) |
@ -0,0 +1,85 @@ |
||||
# Imports |
||||
|
||||
import logging |
||||
from superpython.utils import any_list_item |
||||
from ..constants import LOGGER_NAME |
||||
from .ini import Config |
||||
|
||||
log = logging.getLogger(LOGGER_NAME) |
||||
|
||||
# Exports |
||||
|
||||
__all__ = ( |
||||
"filter_commands", |
||||
"load_commands", |
||||
) |
||||
|
||||
# Functions |
||||
|
||||
|
||||
def filter_commands(commands, environments=None, tags=None): |
||||
"""Filter commands based on the given criteria. |
||||
|
||||
:param commands: The commands to be filtered. |
||||
:type commands: list |
||||
|
||||
:param environments: Environment names to be matched. |
||||
:type environments: list[str] |
||||
|
||||
:param tags: Tag names to be matched. |
||||
:type tags |
||||
:return: |
||||
""" |
||||
filtered = list() |
||||
for command in commands: |
||||
if environments is not None: |
||||
if not any_list_item(environments, command.environments): |
||||
continue |
||||
|
||||
if tags is not None: |
||||
if not any_list_item(tags, command.tags): |
||||
continue |
||||
|
||||
filtered.append(command) |
||||
|
||||
return filtered |
||||
|
||||
|
||||
def load_commands(path, filters=None, overlay=None, **kwargs): |
||||
"""Load commands from a configuration file. |
||||
|
||||
:param path: The path to the configuration file. |
||||
:type path: str |
||||
|
||||
:param filters: Used to filter commands. |
||||
:type filters: dict |
||||
|
||||
:rtype: list[BaseType[Command] | ItemizedCommand] | None |
||||
|
||||
:returns: A list of command instances or ``None`` if the configuration could not be loaded. |
||||
|
||||
kwargs are passed to the configuration class for instantiation. |
||||
|
||||
""" |
||||
if path.endswith(".ini"): |
||||
_config = Config(path, overlay=overlay, **kwargs) |
||||
# elif path.endswith(".yml"): |
||||
# _config = YAML(path, **kwargs) |
||||
else: |
||||
log.warning("Input file format is not currently supported: %s" % path) |
||||
return None |
||||
|
||||
if _config.load(): |
||||
commands = _config.get_commands() |
||||
|
||||
if filters is not None: |
||||
criteria = dict() |
||||
for attribute, values in filters.items(): |
||||
criteria[attribute] = values |
||||
|
||||
commands = filter_commands(commands, **criteria) |
||||
|
||||
return commands |
||||
|
||||
log.error("Failed to load config file: %s" % path) |
||||
return None |
@ -0,0 +1,76 @@ |
||||
# Imports |
||||
|
||||
from superpython.utils import File |
||||
from ..library.overlays import Overlay |
||||
from ..library.scripts import Script |
||||
|
||||
# Exports |
||||
|
||||
__all__ = ( |
||||
"Parser" |
||||
) |
||||
|
||||
# Classes |
||||
|
||||
|
||||
class Parser(File): |
||||
"""Base class for implementing a command parser.""" |
||||
|
||||
def __init__(self, path, context=None, locations=None, options=None, overlay=None): |
||||
super().__init__(path) |
||||
|
||||
self.context = context |
||||
self.is_loaded = False |
||||
self.locations = locations or list() |
||||
self.options = options or dict() |
||||
self.overlay = overlay or Overlay("ubuntu") |
||||
self._commands = list() |
||||
self._functions = list() |
||||
|
||||
self.overlay.load() |
||||
|
||||
def as_script(self): |
||||
"""Convert loaded commands to a script. |
||||
|
||||
:rtype: Script |
||||
|
||||
""" |
||||
return Script( |
||||
"%s.sh" % self.name, |
||||
commands=self.get_commands(), |
||||
functions=self.get_functions() |
||||
) |
||||
|
||||
def get_commands(self): |
||||
"""Get the commands that have been loaded from the file. |
||||
|
||||
:rtype: list[BaseType[scripttease.library.commands.base.Command]] |
||||
|
||||
""" |
||||
a = list() |
||||
for c in self._commands: |
||||
if c.function is not None: |
||||
continue |
||||
|
||||
a.append(c) |
||||
|
||||
return a |
||||
|
||||
def get_functions(self): |
||||
"""Get the functions that have been loaded from the file. |
||||
|
||||
:rtype: list[scripttease.library.scripts.Function] |
||||
|
||||
""" |
||||
a = list() |
||||
for f in self._functions: |
||||
for c in self._commands: |
||||
if c.function is not None and f.name == c.function: |
||||
f.commands.append(c) |
||||
|
||||
a.append(f) |
||||
|
||||
return a |
||||
|
||||
def load(self): |
||||
raise NotImplementedError() |
@ -0,0 +1,150 @@ |
||||
# Imports |
||||
|
||||
from configparser import ConfigParser, ParsingError |
||||
import logging |
||||
from superpython.utils import parse_jinja_template, read_file, smart_cast, split_csv |
||||
import os |
||||
from ..library.commands import command_factory |
||||
from ..constants import LOGGER_NAME |
||||
from .base import Parser |
||||
|
||||
log = logging.getLogger(LOGGER_NAME) |
||||
|
||||
# Exports |
||||
|
||||
__all__ = ( |
||||
"Config", |
||||
) |
||||
|
||||
# Classes |
||||
|
||||
|
||||
class Config(Parser): |
||||
"""An INI configuration for loading commands.""" |
||||
|
||||
def load(self): |
||||
if not self.exists: |
||||
return False |
||||
|
||||
ini = self._load_ini() |
||||
if ini is None: |
||||
return False |
||||
|
||||
success = True |
||||
for comment in ini.sections(): |
||||
args = list() |
||||
command_name = None |
||||
count = 0 |
||||
kwargs = self.options.copy() |
||||
|
||||
for key, value in ini.items(comment): |
||||
# The first key/value pair is the command name and arguments. |
||||
if count == 0: |
||||
command_name = key |
||||
|
||||
if value[0] == '"': |
||||
args.append(value.replace('"', "")) |
||||
else: |
||||
args = value.split(" ") |
||||
else: |
||||
_key, _value = self._get_key_value(key, value) |
||||
|
||||
kwargs[_key] = _value |
||||
|
||||
count += 1 |
||||
|
||||
command = command_factory(command_name, comment, self.overlay, *args, **kwargs) |
||||
if command is not None: |
||||
# if isinstance(command, Function): |
||||
# self._functions.append(command) |
||||
# elif isinstance(command, Include): |
||||
# subcommands = self._load_include(command) |
||||
# if subcommands is not None: |
||||
# self._commands += subcommands |
||||
# elif isinstance(command, Template): |
||||
# self._load_template(command) |
||||
# self._commands.append(command) |
||||
# elif isinstance(command, ItemizedCommand) and issubclass(command.command_class, Template): |
||||
# for c in command.get_commands(): |
||||
# self._load_template(c) |
||||
# self._commands.append(c) |
||||
# else: |
||||
# self._commands.append(command) |
||||
self._commands.append(command) |
||||
else: |
||||
success = False |
||||
|
||||
self.is_loaded = success |
||||
return self.is_loaded |
||||
|
||||
# noinspection PyMethodMayBeStatic |
||||
def _get_key_value(self, key, value): |
||||
"""Process a key/value pair from an INI section. |
||||
|
||||
:param key: The key to be processed. |
||||
:type key: str |
||||
|
||||
:param value: The value to be processed. |
||||
|
||||
:rtype: tuple |
||||
:returns: The key and value, both of which may be modified from the originals. |
||||
|
||||
""" |
||||
if key in ("environments", "environs", "envs", "env"): |
||||
_key = "environments" |
||||
_value = split_csv(value) |
||||
elif key in ("func", "function"): |
||||
_key = "function" |
||||
_value = value |
||||
elif key == "items": |
||||
_key = "items" |
||||
_value = split_csv(value) |
||||
elif key == "tags": |
||||
_key = "tags" |
||||
_value = split_csv(value) |
||||
else: |
||||
_key = key |
||||
_value = smart_cast(value) |
||||
|
||||
return _key, _value |
||||
|
||||
def _load_ini(self): |
||||
"""Load the configuration file. |
||||
|
||||
:rtype: ConfigParser | None |
||||
|
||||
""" |
||||
ini = ConfigParser() |
||||
if self.context is not None: |
||||
try: |
||||
content = parse_jinja_template(self.path, self.context) |
||||
except Exception as e: |
||||
log.error("Failed to parse %s as template: %s" % (self.path, e)) |
||||
return None |
||||
else: |
||||
content = read_file(self.path) |
||||
|
||||
try: |
||||
ini.read_string(content) |
||||
return ini |
||||
except ParsingError as e: |
||||
log.error("Failed to parse %s: %s" % (self.path, e)) |
||||
return None |
||||
|
||||
def _load_template(self, command): |
||||
"""Load additional resources for a template command. |
||||
|
||||
:param command: The template command. |
||||
:type command: Template |
||||
|
||||
""" |
||||
# This may produce problems if template kwargs are the same as the given context. |
||||
if self.context is not None: |
||||
command.context.update(self.context) |
||||
|
||||
# Custom locations come before default locations. |
||||
command.locations += self.locations |
||||
|
||||
# This allows template files to be specified relative to the configuration file. |
||||
command.locations.append(os.path.join(self.directory, "templates")) |
||||
command.locations.append(self.directory) |
@ -0,0 +1,159 @@ |
||||
# Imports |
||||
|
||||
from pygments import highlight |
||||
from pygments.lexers import get_lexer_by_name |
||||
from pygments.formatters import get_formatter_by_name |
||||
|
||||
BashLexer = get_lexer_by_name("bash") |
||||
JSONLexer = get_lexer_by_name("json") |
||||
PythonLexer = get_lexer_by_name("python") |
||||
TerminalFormatter = get_formatter_by_name("terminal", linenos=True) |
||||
|
||||
# Exports |
||||
|
||||
__all__ = ( |
||||
"any_list_item", |
||||
"filter_commands", |
||||
"filter_objects", |
||||
"highlight_code", |
||||
"split_csv", |
||||
) |
||||
|
||||
# Functions |
||||
|
||||
|
||||
def any_list_item(a, b): |
||||
"""Determine whether any item in ``a`` also exists in ``b``. |
||||
|
||||
:param a: The first list to be compared. |
||||
:type a: list |
||||
|
||||
:param b: The second list to be compared. |
||||
:type b: list |
||||
|
||||
:rtype: bool |
||||
|
||||
""" |
||||
for i in a: |
||||
for j in b: |
||||
if i == j: |
||||
return True |
||||
|
||||
return False |
||||
|
||||
|
||||
def filter_commands(commands, values, attribute="tags"): |
||||
"""Filter commands for a given set of values. |
||||
|
||||
:param commands: The commands to be filtered. |
||||
:type commands: list[BaseType[Command]] |
||||
|
||||
:param values: The values to be compared. |
||||
:type values: list |
||||
|
||||
:param attribute: The name of the command attribute to check. This attribute must be a list or tuple of values of |
||||
the same type given in ``values``. |
||||
:type attribute: str |
||||
|
||||
:rtype: bool |
||||
|
||||
.. code-block:: python |
||||
|
||||
commands = [ |
||||
AddUser("bob"), |
||||
Apt("apache2", tags=["apache", "www"]), |
||||
Reload("postgresql", tags=["database", "pgsql"]), |
||||
Touch("/var/www/index.html", tags=["www"]), |
||||
] |
||||
|
||||
values = ["apache", "www"] |
||||
|
||||
# Outputs the Apt and Touch commands above. |
||||
filtered_commands = filter_commands(command, values) |
||||
print(filtered_commands) |
||||
|
||||
""" |
||||
filtered = list() |
||||
for command in commands: |
||||
try: |
||||
list_b = getattr(command, attribute) |
||||
except AttributeError: |
||||
continue |
||||
|
||||
if not any_list_item(values, list_b): |
||||
continue |
||||
|
||||
filtered.append(command) |
||||
|
||||
return filtered |
||||
|
||||
|
||||
def filter_objects(objects, environments=None, scope=None, tags=None): |
||||
"""Filter the given objects by the given keys. |
||||
|
||||
:param objects: The objects to be filtered. |
||||
:type objects: list |
||||
|
||||
:param environments: The environments to be included. |
||||
:type environments: list[str] |
||||
|
||||
:param scope: The scope by which to filter; deploy, provision, tenant. |
||||
:type scope: str |
||||
|
||||
:param tags: The tags to be included. |
||||
:type tags: list[str] |
||||
|
||||
:rtype: list |
||||
:returns: Returns the objects that match the given keys. |
||||
|
||||
""" |
||||
filtered = list() |
||||
|
||||
# print("object, object environments, environments, any_list_item") |
||||
|
||||
for o in objects: |
||||
|
||||
# print(o, o.environments, environments, any_list_item(environments, o.environments)) |
||||
|
||||
# Apply environment filter. |
||||
if environments is not None: |
||||
if hasattr(o, "environment"): |
||||
if o.environment is not None and o.environment not in environments: |
||||
continue |
||||
elif hasattr(o, "environments"): |
||||
if type(o.environments) in (list, tuple) and not any_list_item(environments, o.environments): |
||||
continue |
||||
else: |
||||
pass |
||||
|
||||
# # Apply scope filter. |
||||
# if scope is not None: |
||||
# if o.scope not in [None, SCOPE_ALL, scope]: |
||||
# continue |
||||
|
||||
# Apply tag filter. |
||||
if tags is not None: |
||||
if not any_list_item(tags, o.tags): |
||||
continue |
||||
|
||||
# The object has passed the tests above. |
||||
filtered.append(o) |
||||
|
||||
return filtered |
||||
|
||||
|
||||
def highlight_code(string, lexer=None): |
||||
"""Highlight (colorize) the given string as Python code. |
||||
|
||||
:param string: The string to be highlighted. |
||||
:type string: str |
||||
|
||||
:rtype: str |
||||
|
||||
""" |
||||
if lexer is None: |
||||
lexer = BashLexer |
||||
|
||||
return highlight(string, lexer, TerminalFormatter) |
||||
|
||||
|
@ -0,0 +1,52 @@ |
||||
# See https://packaging.python.org/en/latest/distributing.html |
||||
# and https://docs.python.org/2/distutils/setupscript.html |
||||
# and https://pypi.python.org/pypi?%3Aaction=list_classifiers |
||||
from setuptools import setup, find_packages |
||||
|
||||
|
||||
def read_file(path): |
||||
with open(path, "r") as f: |
||||
contents = f.read() |
||||
f.close() |
||||
return contents |
||||
|
||||
|
||||
setup( |
||||
name='python-script-tease', |
||||
version=read_file("VERSION.txt"), |
||||
description=read_file("DESCRIPTION.txt"), |
||||
long_description=read_file("README.markdown"), |
||||
author='Shawn Davis', |
||||
author_email='shawn@myninjas.net', |
||||
url='https://bitbucket.com/myninjas/python-script-tease', |
||||
packages=find_packages(), |
||||
include_package_data=True, |
||||
install_requires=[ |
||||
"jinja2", |
||||
"pygments", |
||||
"python-myninjas", |
||||
], |
||||
dependency_links=[ |
||||
"https://bitbucket.com/myninjas/python-myninjas/master.tar.gz#python-myninjas", |
||||
], |
||||
classifiers=[ |
||||
'Development Status :: 2 - Pre-Alpha', |
||||
'Environment :: Console', |
||||
'Intended Audience :: Developers', |
||||
'License :: OSI Approved :: BSD License', |
||||
'Operating System :: OS Independent', |
||||
'Programming Language :: Python', |
||||
'Programming Language :: Python :: 3.6', |
||||
'Programming Language :: Python :: 3.7', |
||||
], |
||||
zip_safe=False, |
||||
tests_require=[ |
||||
"coverage", |
||||
], |
||||
test_suite='runtests.runtests', |
||||
entry_points={ |
||||
'console_scripts': [ |
||||
'tease = script_tease.cli:main_command', |
||||
], |
||||
}, |
||||
) |
@ -0,0 +1,2 @@ |
||||
import os |
||||
import pytest |
@ -0,0 +1,12 @@ |
||||
[install the virtualenv package] |
||||
pip = virtualenv |
||||
|
||||
[create a virtual environment] |
||||
virtualenv = python |
||||
cd = /path/to/project |
||||
|
||||
[install pillow] |
||||
pip = Pillow |
||||
cd = /path/to/project |
||||
upgrade = yes |
||||
venv = python |
@ -0,0 +1,32 @@ |
||||
# Testing |
||||
|
||||
## Set Up for Testing |
||||
|
||||
Install requirements: |
||||
|
||||
``pip install tests/requirements.pip`` |
||||
|
||||
## Running Tests |
||||
|
||||
Run all tests with coverage: |
||||
|
||||
``make tests`` |
||||
|
||||
Run a specific test: |
||||
|
||||
``python -m pytest tests/units/path/to/test.py`` |
||||
|
||||
Example: |
||||
|
||||
``python -m pytest tests/units/shortcuts/test_shortcuts.py`` |
||||
|
||||
To allow output from print statements within a test method, add the ``-s`` switch: |
||||
|
||||
``python -m pytest -s tests/units/path/to/test.py`` |
||||
|
||||
> Tip: Add ``-v`` to list the tests with PASS/FAIL. |
||||
|
||||
## Reference |
||||
|
||||
- [coverage](https://coverage.readthedocs.io/en/v4.5.x/) |
||||
- [pytest](https://pytest.org) |
@ -0,0 +1,2 @@ |
||||
coverage |
||||
pytest |
@ -0,0 +1,86 @@ |
||||
from scripttease.library.commands.base import Command, ItemizedCommand, Sudo |
||||
from scripttease.library.commands.python import Pip |
||||
from scripttease.library.overlays import Overlay |
||||
|
||||
|
||||
class TestCommand(object): |
||||
|
||||
def test_getattr(self): |
||||
c = Command("ls -ls", extra=True) |
||||
assert c.extra is True |
||||
|
||||
def test_get_statement(self): |
||||
c = Command( |
||||
"ls -ls", |
||||
comment="kitchen sink", |
||||
condition="$last_command -eq 0", |
||||
cd="/path/to/project", |
||||
prefix="source python/bin/active", |
||||
register="list_success", |
||||
stop=True, |
||||
sudo="deploy" |
||||
) |
||||
statement = c.get_statement(cd=True) |
||||
assert "( cd" in statement |
||||
assert "sudo" in statement |
||||
assert ")" in statement |
||||
assert "# kitchen sink" in statement |
||||
assert "if [[ $last_command" in statement |
||||
assert "list_success=$?" in statement |
||||
assert "if [[ $list_success" in statement |
||||
|
||||
c = Command( |
||||
"ls -ls", |
||||
stop=True |
||||
) |
||||
statement = c.get_statement() |
||||
assert "if [[ $?" in statement |
||||
|
||||
def test_init(self): |
||||
c = Command("ls -ls", sudo=Sudo(user="deploy")) |
||||
assert isinstance(c.sudo, Sudo) |
||||
assert c.sudo.user == "deploy" |
||||
|
||||
c = Command("ls -ls", sudo="deploy") |
||||
assert isinstance(c.sudo, Sudo) |
||||
assert c.sudo.user == "deploy" |
||||
|
||||
c = Command("ls -ls", sudo=True) |
||||
assert isinstance(c.sudo, Sudo) |
||||
assert c.sudo.user == "root" |
||||
|
||||
c = Command("ls -ls") |
||||
assert isinstance(c.sudo, Sudo) |
||||
assert c.sudo.user == "root" |
||||
assert c.sudo.enabled is False |
||||
|
||||
def test_repr(self): |
||||
c = Command("ls -ls", comment="listing") |
||||
assert repr(c) == "<Command listing>" |
||||
|
||||
c = Command("ls -ls") |
||||
assert repr(c) == "<Command>" |
||||
|
||||
|
||||
class TestItemizedCommand(object): |
||||
|
||||
def test_getattr(self): |
||||
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item", extra=True) |
||||
assert c.extra is True |
||||
|
||||
def test_get_commands(self): |
||||
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item") |
||||
commands = c.get_commands() |
||||
for i in commands: |
||||
assert isinstance(i, Pip) |
||||
|
||||
def test_get_statement(self): |
||||
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item") |
||||
statement = c.get_statement() |
||||
assert "Pillow" in statement |
||||
assert "psycopg2-binary" in statement |
||||
assert "django" in statement |
||||
|
||||
def test_repr(self): |
||||
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item") |
||||
assert repr(c) == "<ItemizedCommand Pip>" |
@ -0,0 +1,20 @@ |
||||
from scripttease.library.commands import command_factory, ItemizedCommand |
||||
from scripttease.library.commands.python import Pip |
||||
from scripttease.library.overlays import Overlay |
||||
|
||||
|
||||
def test_command_factory(): |
||||
overlay = Overlay("ubuntu") |
||||
overlay.load() |
||||
|
||||
command = command_factory("nonexistent", "non existent command", overlay) |
||||
assert command is None |
||||
|
||||
command = command_factory("pip", "install pillow", overlay) |
||||
assert command is None |
||||
|
||||
command = command_factory("pip", "install pillow", overlay, "Pillow") |
||||
assert isinstance(command, Pip) |
||||
|
||||
command = command_factory("pip", "install various", overlay, "$item", items=["Pillow", "pyscopg2-binary", "django"]) |
||||
assert isinstance(command, ItemizedCommand) |
@ -0,0 +1,18 @@ |
||||
from scripttease.library.commands.python import * |
||||
from scripttease.library.overlays import Overlay |
||||
|
||||
|
||||
def test_pip(): |
||||
pip = Pip("Pillow") |
||||
assert "pip install -y Pillow" in pip.get_statement() |
||||
|
||||
overlay = Overlay("ubuntu") |
||||
overlay.load() |
||||
|
||||
pip = Pip("Pillow", op="remove", overlay=overlay, venv="python") |
||||
assert "source python/bin/activate && pip3 uninstall --quiet Pillow" in pip.get_statement() |
||||
|
||||
|
||||
def test_virtualenv(): |
||||
virt = VirtualEnv() |
||||
assert "virtualenv python" in virt.get_statement() |
@ -0,0 +1,19 @@ |
||||
from scripttease.library.overlays import Overlay |
||||
|
||||
|
||||
class TestOverlay(object): |
||||
|
||||
def test_get(self): |
||||
overlay = Overlay("ubuntu") |
||||
overlay.load() |
||||
assert overlay.get("nonexistent", "nonexistent") is None |
||||
|
||||
def test_has(self): |
||||
overlay = Overlay("ubuntu") |
||||
overlay.load() |
||||
assert overlay.has("nonexistent", "nonexistent") is False |
||||
assert overlay.has("python", "nonexistent") is False |
||||
|
||||
def test_load(self): |
||||
overlay = Overlay("nonexistent") |
||||
assert overlay.load() is False |
Loading…
Reference in new issue