Finished initial development and testing.

development
Shawn Davis 4 years ago
parent 6fec406d04
commit b6a7b9dbbc
  1. 3
      .coveragerc
  2. 2
      README.markdown
  3. 2
      VERSION.txt
  4. 204
      scripttease/cli/__init__.py
  5. 58
      scripttease/cli/initialize.py
  6. 76
      scripttease/cli/subcommands.py
  7. 47
      scripttease/data/overlays/ubuntu.ini
  8. 24
      scripttease/factory.py
  9. 224
      scripttease/library/_overlays.py
  10. 2
      scripttease/library/commands/__init__.py
  11. 134
      scripttease/library/commands/apache.py
  12. 62
      scripttease/library/commands/base.py
  13. 119
      scripttease/library/commands/factory.py
  14. 4
      scripttease/library/commands/mappings.py
  15. 23
      scripttease/library/commands/packages.py
  16. 43
      scripttease/library/commands/python.py
  17. 19
      scripttease/library/overlays/common.py
  18. 164
      scripttease/library/overlays/django.py
  19. 292
      scripttease/library/overlays/pgsql.py
  20. 475
      scripttease/library/overlays/posix.py
  21. 83
      scripttease/library/overlays/ubuntu.py
  22. 12
      scripttease/library/scripts.py
  23. 83
      scripttease/parsers/__init__.py
  24. 16
      scripttease/parsers/base.py
  25. 50
      scripttease/parsers/ini.py
  26. 110
      scripttease/parsers/utils.py
  27. 159
      scripttease/utils.py
  28. 10
      setup.py
  29. 12
      tests/examples/apache_examples.ini
  30. 2
      tests/examples/bad_command.ini
  31. 2
      tests/examples/bad_examples.ini
  32. 6
      tests/examples/bad_template_example.ini
  33. 25
      tests/examples/function_examples.ini
  34. 192
      tests/examples/kitchen_sink.ini
  35. 3
      tests/examples/python_examples.ini
  36. 2
      tests/examples/template_example.ini
  37. 40
      tests/test_factory.py
  38. 15
      tests/test_library_commands_base.py
  39. 20
      tests/test_library_commands_factory.py
  40. 18
      tests/test_library_commands_python.py
  41. 17
      tests/test_library_overlays_common.py
  42. 61
      tests/test_library_overlays_django.py
  43. 60
      tests/test_library_overlays_pgsql.py
  44. 209
      tests/test_library_overlays_posix.py
  45. 86
      tests/test_library_overlays_ubuntu.py
  46. 31
      tests/test_library_scripts.py
  47. 19
      tests/test_overlays.py
  48. 22
      tests/test_parsers_base.py
  49. 45
      tests/test_parsers_ini.py
  50. 42
      tests/test_parsers_utils.py

@ -1,8 +1,9 @@
[run] [run]
omit = omit =
docs/* docs/*
scripttease/cli/__init__.py scripttease/cli/*
sandbox sandbox
setup.py setup.py
tests/*
tmp/* tmp/*
tmp.* tmp.*

@ -2,6 +2,6 @@
![](https://img.shields.io/badge/status-active-green.svg) ![](https://img.shields.io/badge/status-active-green.svg)
![](https://img.shields.io/badge/stage-development-blue.svg) ![](https://img.shields.io/badge/stage-development-blue.svg)
![](https://img.shields.io/badge/coverage-55%25-yellow.svg) ![](https://img.shields.io/badge/coverage-100%25-green.svg)
A collection of classes and commands for automated command line scripting using Python. A collection of classes and commands for automated command line scripting using Python.

@ -1 +1 @@
5.8.18-d 6.0.0-d

@ -0,0 +1,204 @@
# Imports
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from superpython.logging import LoggingHelper
from ..constants import LOGGER_NAME
from . import initialize
from . import subcommands
DEBUG = 10
logging = LoggingHelper(colorize=True, name=LOGGER_NAME)
log = logging.setup()
# Commands
def main_command():
"""Process script configurations."""
__author__ = "Shawn Davis <shawn@develmaycare.com>"
__date__ = "2020-07-21"
__help__ = """NOTES
This command is used to parse configuration files and output the commands.
"""
__version__ = "0.10.0-d"
# Main argument parser from which sub-commands are created.
parser = ArgumentParser(description=__doc__, epilog=__help__, formatter_class=RawDescriptionHelpFormatter)
parser.add_argument(
"path",
default="commands.ini",
nargs="?",
help="The path to the configuration file."
)
parser.add_argument(
"-c",
"--color",
action="store_true",
dest="color_enabled",
help="Enable code highlighting for terminal output."
)
parser.add_argument(
"-C=",
"--context=",
action="append",
dest="variables",
help="Context variables for use in pre-parsing the config and templates. In the form of: name:value"
)
parser.add_argument(
"-d",
"--docs",
action="store_true",
dest="docs_enabled",
help="Output documentation instead of code."
)
# parser.add_argument(
# "-d=",
# "--docs=",
# choices=["html", "markdown", "plain", "rst"],
# dest="docs_enabled",
# help="Output documentation instead of code."
# )
parser.add_argument(
"-D",
"--debug",
action="store_true",
dest="debug_enabled",
help="Enable debug output."
)
parser.add_argument(
"-f=",
"--filter=",
action="append",
dest="filters",
help="Filter the commands in the form of: attribute:value"
)
parser.add_argument(
"-O=",
"--option=",
action="append",
dest="options",
help="Common command options in the form of: name:value"
)
# parser.add_argument(
# "-O=",
# "--output=",
# # default=os.path.join("prototype", "output"),
# dest="output_path",
# help="Output to the given directory. Defaults to ./prototype/output/"
# )
parser.add_argument(
"-s",
"--script",
action="store_true",
dest="script_enabled",
help="Output commands as a script."
)
parser.add_argument(
"-T=",
"--template-path=",
action="append",
dest="template_locations",
help="The location of template files that may be used with the template command."
)
parser.add_argument(
"-w=",
"--write=",
dest="output_file",
help="Write the output to disk."
)
parser.add_argument(
"-V=",
"--variables-file=",
dest="variables_file",
help="Load variables from a file."
)
# Access to the version number requires special consideration, especially
# when using sub parsers. The Python 3.3 behavior is different. See this
# answer: http://stackoverflow.com/questions/8521612/argparse-optional-subparser-for-version
parser.add_argument(
"-v",
action="version",
help="Show version number and exit.",
version=__version__
)
parser.add_argument(
"--version",
action="version",
help="Show verbose version information and exit.",
version="%(prog)s" + " %s %s by %s" % (__version__, __date__, __author__)
)
# Parse arguments.
args = parser.parse_args()
if args.debug_enabled:
log.setLevel(DEBUG)
log.debug("Namespace: %s" % args)
# Load context.
context = dict()
if args.variables:
context = initialize.context_from_cli(args.variables)
# Handle filters.
filters = None
if args.filters:
filters = initialize.filters_from_cli(args.filters)
# Handle options.
options = None
if args.options:
options = initialize.options_from_cli(args.options)
if args.variables_file:
variables = initialize.variable_from_file(args.variables_file)
if variables:
context.update(variables)
if args.docs_enabled:
exit_code = subcommands.output_docs(
args.path,
context=context,
filters=filters,
locations=args.template_locations,
options=options
)
elif args.script_enabled:
exit_code = subcommands.output_script(
args.path,
color_enabled=args.color_enabled,
context=context,
locations=args.template_locations,
options=options
)
else:
exit_code = subcommands.output_commands(
args.path,
color_enabled=args.color_enabled,
context=context,
filters=filters,
locations=args.template_locations,
options=options
)
exit(exit_code)

@ -0,0 +1,58 @@
# Imports
from configparser import ConfigParser
import logging
import os
from superpython.utils import smart_cast
from ..constants import LOGGER_NAME
log = logging.getLogger(LOGGER_NAME)
# Functions
def context_from_cli(variables):
context = dict()
for i in variables:
key, value = i.split(":")
context[key] = smart_cast(value)
return context
def filters_from_cli(filters):
_filters = dict()
for i in filters:
key, value = i.split(":")
if key not in filters:
_filters[key] = list()
_filters[key].append(value)
return _filters
def options_from_cli(options):
_options = dict()
for i in options:
key, value = i.split(":")
_options[key] = smart_cast(value)
return _options
def variable_from_file(path):
if not os.path.exists(path):
log.warning("Variables file does not exist: %s" % path)
return None
ini = ConfigParser()
ini.read(path)
variables = dict()
for section in ini.sections():
for key, value in ini.items(section):
key = "%s_%s" % (section, key)
variables[key] = smart_cast(vaue)
return variables

@ -0,0 +1,76 @@
# Imports
from superpython.shell import EXIT
from superpython.utils import highlight_code
from ..parsers import load_commands, load_config
# Functions
def output_commands(path, color_enabled=False, context=None, filters=None, locations=None, options=None):
commands = load_commands(
path,
context=context,
filters=filters,
locations=locations,
options=options
)
if commands is None:
return EXIT.ERROR
output = list()
for command in commands:
statement = command.get_statement(cd=True)
if statement is None:
continue
output.append(statement)
output.append("")
if color_enabled:
print(highlight_code("\n".join(output), language="bash"))
else:
print("\n".join(output))
return EXIT.OK
def output_docs(path, context=None, filters=None, locations=None, options=None):
commands = load_commands(
path,
context=context,
filters=filters,
locations=locations,
options=options
)
if commands is None:
return EXIT.ERROR
count = 1
output = list()
for command in commands:
output.append("%s. %s" % (count, command.comment))
count += 1
print("\n".join(output))
return EXIT.OK
def output_script(path, color_enabled=False, context=None, filters=None, locations=None, options=None):
config = load_config(
path,
context=context,
locations=locations,
options=options
)
if config is None:
return EXIT.ERROR
script = config.as_script()
if color_enabled:
print(highlight_code(script.to_string(), language="bash"))
else:
print(script)
return EXIT.OK

@ -1,47 +0,0 @@
[apache]
disable_module = a2dismod {{ module_name }}
disable_site = a2dissite {{ domain_name }}.conf
enable_module = a2enmod {{ module_name }}
enable_site = a2ensite {{ domain_name }}.conf
reload = service apache2 reload
restart = service apache2 restart
start = service apache2 start
stop = service apache2 stop
test = apachectl configtest
[package_install]
system = apt-get install -y {{ package_name }}
pip = pip3 install{% if upgrade %} --upgrade{% endif %} --quiet {{ package_name }}
[package_remove]
system = apt-get uninstall -y {{ package_name }}
pip = pip3 uninstall --quiet {{ package_name }}
[system]
install = apt-get install -y {{ package_name }}
reboot = reboot
remove = apt-get uninstall -y {{ package_name }}
update = apt-get update -y
upgrade = apt-get upgrade -y
[python]
virtualenv = virtualenv {{ name }}
install = pip3 install{% if upgrade %} --upgrade{% endif %} --quiet {{ package_name }}
remove = pip3 uninstall --quiet {{ package_name }}
[files]
append = echo "{{ content }}" >> {{ path }}
chgrp = chgrp{% if recursive %} -R{% endif %} {{ group }} {{ path }}
chmod = chmod{% if recursive %} -R{% endif %} {{ owner }} {{ path }}
chown = chown{% if recursive %} -R{% endif %} {{ mode }} {{ path }}
copy = cp{% if recursive %} -R{% endif %}{% if overwrite %} -n{% endif %} {{ from_path }} {{ to_path }}
mkdir = mkdir{% if mode %} -m {{ mode }}{% endif %}{% if recursive %} -p{% endif %} {{ path }}
move = move {{ from_path }} {{ to_path }}
rename = move {{ from_path }} {{ to_path }}
remove = rm{% if force %} -f{% endif %}{% if recursive %} -r{% endif %} {{ path }}
;rsync = ?
;scopy = ?
;sed = ?
symlink = ln -s{% if force %} -f{% endif %} {{ source }} {{ target }}
touch = touch {{ path }}
;write = ?

@ -1,6 +1,11 @@
# Imports # Imports
import logging
from importlib import import_module from importlib import import_module
from .constants import LOGGER_NAME
from .library.commands import ItemizedCommand
log = logging.getLogger(LOGGER_NAME)
# Exports # Exports
@ -25,6 +30,9 @@ class Factory(object):
self.overlay = None self.overlay = None
self._overlay = overlay self._overlay = overlay
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self._overlay)
def get_command(self, name, *args, **kwargs): def get_command(self, name, *args, **kwargs):
"""Get a command. """Get a command.
@ -37,19 +45,19 @@ class Factory(object):
""" """
if not self.overlay.command_exists(name): if not self.overlay.command_exists(name):
# log.warning("Command does not exist in %s overlay: %s" % (overlay.name, name)) log.warning("Command does not exist in %s overlay: %s" % (self._overlay, name))
return None return None
callback = self.overlay.MAPPINGS[name] callback = self.overlay.MAPPINGS[name]
try: try:
# items = kwargs.pop("items", None) items = kwargs.pop("items", None)
# if items is not None: if items is not None:
# return ItemizedCommand(callback, items, *args, **kwargs) return ItemizedCommand(callback, items, *args, **kwargs)
return callback(*args, **kwargs) return callback(*args, **kwargs)
except (KeyError, TypeError, ValueError) as e: except (KeyError, NameError, TypeError, ValueError) as e:
# log.critical("Failed to load %s command: %s" % (name, e)) log.critical("Failed to load %s command: %s" % (name, e))
return None return None
def load(self): def load(self):
@ -62,7 +70,7 @@ class Factory(object):
self.overlay = import_module("scripttease.library.overlays.%s" % self._overlay) self.overlay = import_module("scripttease.library.overlays.%s" % self._overlay)
self.is_loaded = True self.is_loaded = True
except ImportError as e: except ImportError as e:
# log.error("The %s overlay could not be imported: %s" % (overlay, str(e))) log.error("The %s overlay could not be imported: %s" % (self._overlay, str(e)))
pass pass
return self.is_loaded return self.is_loaded

@ -1,224 +0,0 @@
# Imports
from configparser import RawConfigParser
import os
from superpython.utils import parse_jinja_string
from ..constants import PATH_TO_SCRIPT_TEASE
# Exports
__all__ = (
"Overlay",
)
# Classes
class Overlay(object):
"""An overlay applies commands specific to a given operating system or platform."""
def __init__(self, name):
self.is_loaded = False
self._name = name
self._path = os.path.join(PATH_TO_SCRIPT_TEASE, "data", "overlays", "%s.ini" % name)
self._sections = dict()
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self._name)
@property
def exists(self):
"""Indicates whether the overlay file exists.
:rtype: bool
"""
return os.path.exists(self._path)
def get(self, section, key, **kwargs):
"""Get the command statement for the given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
kwargs are used to parse the value of the key within the section.
:rtype: str | None
"""
if not self.has(section, key):
return None
template = self._sections[section][key]
return parse_jinja_string(template, kwargs)
def has(self, section, key):
"""Determine whether the overlay contains a given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
:rtype: bool
"""
if section not in self._sections:
return False
if key not in self._sections[section]:
return False
return True
def load(self):
"""Load the overlay.
:rtype: bool
"""
if not self.exists:
return False
ini = RawConfigParser()
ini.read(self._path)
for section in ini.sections():
self._sections[section] = dict()
for key, value in ini.items(section):
self._sections[section][key] = value
self.is_loaded = True
return True
def to_mapping(self):
"""Export the overlay as a dictionary with command names as values.
:rtype: dict
"""
d = dict()
for section in self._sections:
d[section] = list()
for command_name, statement in self._sections[section].items():
d[section].append(command_name)
class Overlay2(object):
"""An overlay applies commands specific to a given operating system or platform."""
def __init__(self, name):
self.is_loaded = False
self._name = name
self._path = os.path.join(PATH_TO_SCRIPT_TEASE, "data", "overlays", "%s.ini" % name)
self._sections = dict()
self.exists = os.path.exists(self._path)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self._name)
def command_exists(self, name):
"""Determine whether a given command exists.
:param name: The name of the command to check.
:type name: str
:rtype: bool
"""
section = None
if "." in name:
section, name = name.split(".")
if section is not None:
if section in self._sections:
return name in self._sections[section]
for section in self._sections.keys():
if name in self._sections[section]:
return True
return False
def get_statement(self, name, *args, **kwargs):
pass
def get(self, section, key, **kwargs):
"""Get the command statement for the given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
kwargs are used to parse the value of the key within the section.
:rtype: str | None
"""
if not self.has(section, key):
return None
template = self._sections[section][key]
return parse_jinja_string(template, kwargs)
def has(self, section, key):
"""Determine whether the overlay contains a given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
:rtype: bool
"""
if section not in self._sections:
return False
if key not in self._sections[section]:
return False
return True
def load(self):
"""Load the overlay.
:rtype: bool
"""
if not self.exists:
return False
ini = RawConfigParser()
ini.read(self._path)
for section in ini.sections():
self._sections[section] = dict()
for command_name, statement_template in ini.items(section):
self._sections[section][command_name] = statement_template
self.is_loaded = True
return True
def to_mapping(self):
"""Export the overlay as a dictionary with command names as values.
:rtype: dict
"""
d = dict()
for section in self._sections:
d[section] = list()
for command_name, statement in self._sections[section].items():
d[section].append(command_name)

@ -1,2 +1,2 @@
from .base import Command, ItemizedCommand from .base import Command, ItemizedCommand
from .factory import command_factory # from .factory import command_factory

@ -1,134 +0,0 @@
# Imports
import logging
from ...constants import LOGGER_NAME
from .base import Command
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"MAPPING",
"ConfigTest",
"DisableModule",
"DisableSite",
"EnableModule",
"EnableSite",
"Reload",
"Restart",
"Start",
"Stop",
)
# Classes
class ConfigTest(Command):
"""Run an apache config test."""
def __init__(self, overlay=None, **kwargs):
"""There is no argument."""
if overlay is not None:
statement = overlay.get("apache", "test")
else:
statement = "apachectl configtest"
kwargs.setdefault('register', "apache_checks_out")
super().__init__(statement, **kwargs)
class DisableModule(Command):
"""Disable an Apache module."""
def __init__(self, module_name, overlay=None, **kwargs):
"""Initialize the command.
:param module_name: The module name.
:type module_name: str
"""
if overlay is not None:
statement = overlay.get("apache", "disable_module", module_name=module_name)
statement = "a2dismod %s" % module_name
super().__init__(statement, **kwargs)
class DisableSite(Command):
"""Disable a virtual host."""
def __init__(self, domain_name, **kwargs):
"""Initialize the command.
:param domain_name: The domain name.
:type domain_name: str
"""
statement = "a2dissite %s.conf" % domain_name
super().__init__(statement, **kwargs)
class Enable(Command):
def __init__(self, what, name, **kwargs):
if what in ("mod", "module"):
statement = EnableModule(name, **kwargs).statement
elif what == "site":
statement = EnableSite(name, **kwargs).statement
else:
raise ValueError("Invalid Apache item to be enabled: %s" % what)
super().__init__(statement, **kwargs)
class EnableModule(Command):
"""Enable an Apache module."""
def __init__(self, module_name, **kwargs):
"""Initialize the command.
:param module_name: The module name.
:type module_name: str
"""
statement = "a2enmod %s" % module_name
super().__init__(statement, **kwargs)
class EnableSite(Command):
"""Enable a virtual host."""
def __init__(self, domain_name, **kwargs):
"""Initialize the command.
:param domain_name: The domain name.
:type domain_name: str
"""
statement = "a2ensite %s.conf" % domain_name
super().__init__(statement, **kwargs)
MAPPING = {
# 'apache': Apache,
'apache.check': ConfigTest,
'apache.config': ConfigTest,
'apache.configtest': ConfigTest,
'apache.disable': Disable,
'apache.disable_mod': DisableModule,
'apache.disable_module': DisableModule,
'apache.disable_site': DisableSite,
'apache.enable': Enable,
'apache.enable_mod': EnableModule,
'apache.enable_module': EnableModule,
'apache.enable_site': EnableSite,
'apache.mod': EnableModule,
'apache.module': EnableModule,
'apache.test': ConfigTest,
}

@ -2,9 +2,52 @@
class Command(object): class Command(object):
"""A command line statement."""
def __init__(self, statement, comment=None, condition=None, cd=None, environments=None, function=None, prefix=None, def __init__(self, statement, comment=None, condition=None, cd=None, environments=None, function=None, prefix=None,
register=None, shell=None, stop=False, sudo=None, tags=None, **kwargs): register=None, shell=None, stop=False, sudo=None, tags=None, **kwargs):
"""Initialize a command.
:param statement: The statement to be executed.
:type statement: str
:param comment: A comment regarding the statement.
:type comment: str
:param condition: A (system-specific) condition for the statement to be executed.
:type condition: str
:param cd: The direction from which the statement should be executed.
:type cd: str
:param environments: A list of target environments where the statement should be executed.
:type environments: list[str]
:param function: The name of the function in which the statement is executed.
:type function: str
:param prefix: A statement to execute before the main statement is executed.
:type prefix: str
:param register: A variable name to use for capture the success for failure of the statement's execution.
:type register: str
:param shell: The shell execute through which the statement is executed.
:type shell: str
:param stop: Indicates process should stop if the statement fails to execute.
:type stop: bool | None
:param sudo: Indicates whether sudo should be invoked for the statement. Given as a bool or user name or
:py:class:`scripttease.library.commands.base.Sudo` instance.
:type sudo: bool | str | Sudo
:param tags: A list of tags describing the statement.
:type tags: list[str]
Additional kwargs are available as dynamic attributes of the Command instance.
"""
self.comment = comment self.comment = comment
self.condition = condition self.condition = condition
self.cd = cd self.cd = cd
@ -37,12 +80,15 @@ class Command(object):
return "<%s>" % self.__class__.__name__ return "<%s>" % self.__class__.__name__
def get_statement(self, cd=False): def get_statement(self, cd=False, suppress_comment=False):
"""Get the full statement. """Get the full statement.
:param cd: Include the directory change, if given. :param cd: Include the directory change, if given.
:type cd: bool :type cd: bool
:param suppress_comment: Don't include the comment.
:type suppress_comment: bool
:rtype: str :rtype: str
""" """
@ -65,7 +111,7 @@ class Command(object):
a.append(")") a.append(")")
b = list() b = list()
if self.comment is not None: if self.comment is not None and not suppress_comment:
b.append("# %s" % self.comment) b.append("# %s" % self.comment)
if self.condition is not None: if self.condition is not None:
@ -95,12 +141,12 @@ class Command(object):
class ItemizedCommand(object): class ItemizedCommand(object):
"""An itemized command represents multiple commands of with the same statement but different parameters."""
def __init__(self, command_class, items, *args, **kwargs): def __init__(self, callback, items, *args, **kwargs):
"""Initialize the command. """Initialize the command.
:param command_class: The command class to be used. :param callback: The function to be used to generate the command.
:type command_class: class
:param items: The command arguments. :param items: The command arguments.
:type items: list[str] :type items: list[str]
@ -111,7 +157,7 @@ class ItemizedCommand(object):
""" """
self.args = args self.args = args
self.command_class = command_class self.callback = callback
self.items = items self.items = items
self.kwargs = kwargs self.kwargs = kwargs
@ -119,7 +165,7 @@ class ItemizedCommand(object):
return self.kwargs.get(item) return self.kwargs.get(item)
def __repr__(self): def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.command_class.__name__) return "<%s %s>" % (self.__class__.__name__, self.callback.__name__)
def get_commands(self): def get_commands(self):
"""Get the commands to be executed. """Get the commands to be executed.
@ -135,7 +181,7 @@ class ItemizedCommand(object):
for arg in self.args: for arg in self.args:
args.append(arg.replace("$item", item)) args.append(arg.replace("$item", item))
command = self.command_class(*args, **kwargs) command = self.callback(*args, **kwargs)
a.append(command) a.append(command)
return a return a

@ -1,119 +0,0 @@
# Imports
from importlib import import_module
import logging
# from ..scripts import Function
from ...constants import LOGGER_NAME
# from .base import ItemizedCommand
# from .mappings import MAPPING
log = logging.getLogger(LOGGER_NAME)
# Functions
def command_factory(name, comment, overlay, *args, **kwargs):
# try:
# _overlay = import_module("scripttease.library.overlays.%s" % overlay)
# except ImportError as e:
# log.error("The %s overlay could not be imported: %s" % (overlay, str(e)))
# return None
if not overlay.command_exists(name):
log.warning("Command does not exist in %s overlay: %s" % (overlay.name, name))
return None
kwargs['comment'] = comment
callback = overlay.MAPPINGS[name]
return callback(*args, **kwargs)
'''
def command_exists(name):
"""Indicates whether the named command exists.
:param name: The name of the command to be checked.
:type name: str
:rtype: bool
"""
return name in MAPPING
def command_factory(name, comment, overlay, *args, **kwargs):
# if name in ("func", "function"):
# kwargs['comment'] = comment
# return Function(*args, **kwargs)
if not command_exists(name):
log.warning("No mapping for command: %s" % name)
return None
_args = list(args)
kwargs['comment'] = comment
kwargs['overlay'] = overlay
log.debug("%s: %s" % (comment, kwargs))
command_class = MAPPING[name]
try:
items = kwargs.pop("items", None)
if items is not None:
return ItemizedCommand(command_class, items, *_args, **kwargs)
return command_class(*_args, **kwargs)
except (KeyError, TypeError, ValueError) as e:
log.critical("Failed to load %s command: %s" % (name, e))
return None
'''
#
#
#
# MAPPINGS = {
# 'apache.disable_module': apache_disable_module,
# 'apache.disable_site': apache_disable_site,
# 'apache.enable_module': apache_enable_module,
# 'apache.enable_site': apache_enable_site,
# 'apache.reload': apache_reload,
# 'apache.restart': apache_restart,
# 'apache.start': apache_start,
# 'apache.stop': apache_stop,
# 'apache.test': apache_test,
# 'copy': file_copy,
# 'pip': python_pip,
# 'virtualenv': python_virtualenv,
# # 'python': ("pip", "virtualenv"),
# # 'apache': ("disable_module", "disable_site", "enable_module", "enable_site", "test"),
# }
def nother_command_exists(name):
return name in MAPPINGS
def other_command_exists(name, section=None):
if section is not None:
if section not in MAPPINGS:
return False
return name in MAPPINGS[section]
for _section, commands in MAPPINGS.items():
if name in commands:
return True
return False
def other_command_factory(name, comment, overlay, *args, **kwargs):
if not overlay.command_exists(name):
log.warning("The %s overlay does not have a mapping for command: %s" % (overlay, name))
return None
items = kwargs.pop("items", None)
if items is not None:
return ItemizedCommand

@ -1,4 +0,0 @@
from .python import MAPPING as PYTHON_MAPPING
MAPPING = dict()
MAPPING.update(PYTHON_MAPPING)

@ -1,23 +0,0 @@
# Classes
class Install(object):
def __init__(self, name, manager="pip", overlay=None, upgrade=False, **kwargs):
if overlay is not None:
statement = overlay.get("package_install", manager, package_name=name, upgrade=upgrade)
else:
statement = "%s install %s" % (manager, name)
self.statement = statement
class Remove(object):
def __init__(self, name, manager="pip", overlay=None):
if overlay is not None:
statement = overlay.get("package_remove", manager, package_name=name)
else:
statement = "%s uninstall %s" % (manager, name)

@ -1,43 +0,0 @@
# Imports
from .base import Command
# Exports
__all__ = (
"Pip",
"VirtualEnv",
)
# Classes
class Pip(Command):
def __init__(self, name, op="install", overlay=None, upgrade=False, venv=None, **kwargs):
if overlay is not None:
statement = overlay.get("python", op, package_name=name, upgrade=upgrade)
else:
statement = "pip %s -y %s" % (op, name)
if venv is not None:
kwargs['prefix'] = "source %s/bin/activate" % venv
kwargs.setdefault("comment", "%s %s" % (op, name))
super().__init__(statement, **kwargs)
class VirtualEnv(Command):
def __init__(self, name="python", overlay=None, **kwargs):
kwargs.setdefault("comment", "create %s virtual environment" % name)
statement = "virtualenv %s" % name
super().__init__(statement, **kwargs)
MAPPING = {
'pip': Pip,
'virtualenv': VirtualEnv,
}

@ -1,5 +1,17 @@
# Imports
from ..commands import Command from ..commands import Command
# Exports
__all__ = (
"COMMON_MAPPINGS",
"python_pip",
"python_virtualenv",
)
# Functions
def python_pip(name, op="install", upgrade=False, venv=None, **kwargs): def python_pip(name, op="install", upgrade=False, venv=None, **kwargs):
if upgrade: if upgrade:
@ -19,3 +31,10 @@ def python_virtualenv(name="python", **kwargs):
kwargs.setdefault("comment", "create %s virtual environment" % name) kwargs.setdefault("comment", "create %s virtual environment" % name)
return Command("virtualenv %s" % name, **kwargs) return Command("virtualenv %s" % name, **kwargs)
# Mappings
COMMON_MAPPINGS = {
'pip': python_pip,
'virtualenv': python_virtualenv,
}

@ -0,0 +1,164 @@
# Imports
import os
from ..commands import Command
# Exports
__all__ = (
"DJANGO_MAPPINGS",
"django",
"django_check",
"django_collect_static",
"django_dumpdata",
"django_loaddata",
"django_migrate",
)
# Functions
def _django(name, *args, venv=None, **kwargs):
if venv is not None:
kwargs['prefix'] = "source %s/bin/activate" % venv
kwargs.setdefault("comment", "run %s django management command" % name)
# Base parameters need to be captured, because all others are assumed to be switches for the management command.
_kwargs = {
'comment': kwargs.pop("comment", None),
'condition': kwargs.pop("condition", None),
'cd': kwargs.pop("cd", None),
'environments': kwargs.pop("environments", None),
'function': kwargs.pop("function", None),
# 'local': kwargs.pop("local", False),
'prefix': kwargs.pop("prefix", None),
'register': kwargs.pop("register", None),
'shell': kwargs.pop("shell", "/bin/bash"),
'stop': kwargs.pop("stop", False),
'sudo': kwargs.pop('sudo', False),
'tags': kwargs.pop("tags", None),
}
statement = list()
statement.append("./manage.py %s" % name)
# Remaining kwargs are assumed to be switches.
for key, value in kwargs.items():
key = key.replace("_", "-")
if type(value) is bool:
if value is True:
statement.append("--%s" % key)
else:
statement.append("--%s=%s" % (key, value))
if len(args) > 0:
statement.append(" ".join(args))
return Command(" ".join(statement), **_kwargs)
def django(name, *args, venv=None, **kwargs):
if name == "check":
return django_check(venv=venv, **kwargs)
elif name in ("collectstatic", "static"):
return django_collect_static(venv=venv, **kwargs)
elif name == "migrate":
return django_migrate(venv=venv, **kwargs)
else:
return _django(name, *args, venv=venv, **kwargs)
def django_check(venv=None, **kwargs):
kwargs.setdefault("comment", "run django checks")
kwargs.setdefault("register", "django_checks_out")
return _django("check", venv=venv, **kwargs)
def django_collect_static(venv=None, **kwargs):
kwargs.setdefault("comment", "collect static files")
return _django("collectstatic", venv=venv, **kwargs)
def django_dumpdata(app_name, base_path="local", file_name="initial", indent=4, natural_foreign=False,
natural_primary=False, path=None, venv=None, **kwargs):
"""Initialize the command.
:param app_name: The name (app label) of the app. ``app_label.ModelName`` may also be given.
:type app_name: str
:param file_name: The file name to which the data will be dumped.
:type file_name: str
:param indent: Indentation of the exported fixtures.
:type indent: int
:param natural_foreign: Use the natural foreign parameter.
:type natural_foreign: bool
:param natural_primary: Use the natural primary parameter.
:type natural_primary: bool
:param path: The path to the data file.
:type path: str
"""
kwargs.setdefault("comment", "export fixtures for %s" % app_name)
output_format = kwargs.pop("format", "json")
_path = path or os.path.join(base_path, app_name, "fixtures", "%s.%s" % (file_name, output_format))
return _django(
"dumpdata",
app_name,
"> %s" % _path,
format=output_format,
indent=indent,
natural_foreign=natural_foreign,
natural_primary=natural_primary,
venv=venv,
**kwargs
)
def django_loaddata(app_name, base_path="local", file_name="initial", path=None, venv=None, **kwargs):
"""Initialize the command.
:param app_name: The name (app label) of the app.
:type app_name: str
:param file_name: The file name to which the data will be dumped.
:type file_name: str
:param path: The path to the data file.
:type path: str
"""
kwargs.setdefault("comment", "load fixtures for %s" % app_name)
output_format = kwargs.pop("format", "json")
_path = path or os.path.join(base_path, app_name, "fixtures", "%s.%s" % (file_name, output_format))
return _django("loaddata", _path, venv=venv, **kwargs)
def django_migrate(venv=None, **kwargs):
kwargs.setdefault("comment", "run django database migrations")
return _django("migrate", venv=venv, **kwargs)
# Mapping
DJANGO_MAPPINGS = {
'django': django,
'django.check': django_check,
'django.collect_static': django_collect_static,
'django.dumpdata': django_dumpdata,
'django.loaddata': django_loaddata,
'django.migrate': django_migrate,
}

@ -0,0 +1,292 @@
# Imports
from ..commands import Command
# Exports
__all__ = (
"PGSQL_MAPPINGS",
"pg_create_database",
"pg_create_user",
"pg_database_exists",
"pg_drop_database",
"pg_drop_user",
"pg_dump_database",
"psql",
)
# Functions
def _get_pgsql_command(name, host="localhost", password=None, port=5432, user="postgres"):
a = list()
if password:
a.append('export PGPASSWORD="%s" &&' % password)
a.append(name)
a.append("--host=%s" % host)
a.append("--port=%s" % port)
a.append("--username=%s" % user)
return a
def pg_create_database(name, admin_pass=None, admin_user="postgres", host="localhost", owner=None, port=5432,
template=None, **kwargs):
"""Create a PostgreSQL database.
:param name: The database name.
:type name: str
:param admin_pass: The password for the user with sufficient access privileges to execute the command.
:type admin_pass: str
:param admin_user: The name of the user with sufficient access privileges to execute the command.
:type admin_user: str
:param host: The database host name or IP address.
:type host: str
:param owner: The owner (user/role name) of the new database.
:type owner: str
:param port: The port number of the Postgres service running on the host.
:type port: int
:param template: The database template name to use, if any.
:type template: str
"""
_owner = owner or admin_user
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("createdb", host=host, password=admin_pass, port=port)
base.append("--owner=%s" % _owner)
if template is not None:
base.append("--template=%s" % template)
base.append(name)
return Command(" ".join(base), **kwargs)
def pg_create_user(name, admin_pass=None, admin_user="postgres", host="localhost", password=None, port=5432, **kwargs):
"""Create a PostgreSQL user.
:param name: The user name.
:type name: str
:param admin_pass: The password for the user with sufficient access privileges to execute the command.
:type admin_pass: str
:param admin_user: The name of the user with sufficient access privileges to execute the command.
:type admin_user: str
:param host: The database host name or IP address.
:type host: str
:param password: The password for the new user.
:type password: str
:param port: The port number of the Postgres service running on the host.
:type port: int
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("createuser", host=host, password=admin_pass, port=port)
base.append("-DRS")
base.append(name)
if password is not None:
base.append("&& psql -h %s -U %s" % (host, admin_user))
base.append("-c \"ALTER USER %s WITH ENCRYPTED PASSWORD '%s';\"" % (name, password))
return Command(" ".join(base), **kwargs)
def pg_database_exists(name, admin_pass=None, admin_user="postgres", host="localhost", port=5432, **kwargs):
"""Determine if a Postgres database exists.
:param name: The database name.
:type name: str
:param admin_pass: The password for the user with sufficient access privileges to execute the command.
:type admin_pass: str
:param admin_user: The name of the user with sufficient access privileges to execute the command.
:type admin_user: str
:param host: The database host name or IP address.
:type host: str
:param port: The port number of the Postgres service running on the host.
:type port: int
"""
# Postgres commands always run without sudo because the -U may be provided. However, sudo may be required for
# file writing.
# kwargs['sudo'] = False
kwargs.setdefault("register", "%s_db_exists" % name)
base = _get_pgsql_command("psql", host=host, password=admin_pass, port=port, user=admin_user)
base.append(r"-lqt | cut -d \| -f 1 | grep -qw %s" % name)
return Command(" ".join(base), **kwargs)
def pg_drop_database(name, admin_pass=None, admin_user="postgres", host="localhost", port=5432, **kwargs):
"""Remove a PostgreSQL database.
:param name: The database name.
:type name: str
:param admin_pass: The password for the user with sufficient access privileges to execute the command.
:type admin_pass: str
:param admin_user: The name of the user with sufficient access privileges to execute the command.
:type admin_user: str
:param host: The database host name or IP address.
:type host: str
:param port: The port number of the Postgres service running on the host.
:type port: int
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("dropdb", host=host, password=admin_pass, port=port, user=admin_user)
base.append(name)
return Command(" ".join(base), **kwargs)
def pg_drop_user(name, admin_pass=None, admin_user="postgres", host="localhost", port=5432, **kwargs):
"""Remove a Postgres user.
:param name: The user name.
:type name: str
:param admin_pass: The password for the user with sufficient access privileges to execute the command.
:type admin_pass: str
:param admin_user: The name of the user with sufficient access privileges to execute the command.
:type admin_user: str
:param host: The database host name or IP address.
:type host: str
:param port: The port number of the Postgres service running on the host.
:type port: int
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("dropuser", host=host, password=admin_pass, port=port, user=admin_user)
base.append(name)
return Command(" ".join(base), **kwargs)
def pg_dump_database(name, admin_pass=None, admin_user="postgres", file_name=None, host="localhost", port=5432,
**kwargs):
"""Export a Postgres database.
:param name: The database name.
:type name: str
:param admin_pass: The password for the user with sufficient access privileges to execute the command.
:type admin_pass: str
:param admin_user: The name of the user with sufficient access privileges to execute the command.
:type admin_user: str
:param host: The database host name or IP address.
:type host: str
:param file_name: The name (including the path, if desired) of the export file. Defaults to the
``database_name`` plus ".sql"
:type file_name: str
:param port: The port number of the Postgres service running on the host.
:type port: int
"""
_file_name = file_name or "%s.sql" % name
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("pg_dump", host=host, password=admin_pass, port=port, user=admin_user)
base.append("--column-inserts")
base.append("--file=%s" % _file_name)
base.append(name)
return Command(" ".join(base), **kwargs)
def psql(sql, database="template1", host="localhost", password=None, port=5432, user="postgres", **kwargs):
"""Execute a psql command.
:param sql: The SQL to be executed.
:type sql: str
:param database: The database name.
:type database: str
:param password: The password for the user with sufficient access privileges to execute the command.
:type password: str
:param host: The database host name or IP address.
:type host: str
:param port: The port number of the Postgres service running on the host.
:type port: int
:param user: The name of the user with sufficient access privileges to execute the command.
:type user: str
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("psql", host=host, password=password, port=port, user=user)
base.append("--dbname=%s" % database)
base.append('-c "%s"' % sql)
return Command(" ".join(base), **kwargs)
PGSQL_MAPPINGS = {
'pg.client': psql,
'pg.createdatabase': pg_create_database,
'pg.createdb': pg_create_database,
'pg.createuser': pg_create_user,
'pg.database': pg_create_database,
'pg.database_exists': pg_database_exists,
'pg.db': pg_create_database,
'pg.dropdatabase': pg_drop_database,
'pg.dropdb': pg_drop_database,
'pg.dropuser': pg_drop_user,
'pg.dump': pg_dump_database,
'pg.dumpdb': pg_dump_database,
'pg.exists': pg_database_exists,
'pg.user': pg_create_user,
'psql': psql,
}

@ -1,5 +1,172 @@
# Imports
import os
from superpython.utils import indent
from ..commands import Command from ..commands import Command
# Exports
__all__ = (
"POSIX_MAPPINGS",
"archive",
"certbot",
"extract",
"file_append",
"file_copy",
"file_write",
"mkdir",
"move",
"perms",
"remove",
"rsync",
"run",
"scopy",
"sed",
"symlink",
"touch",
"Function",
)
# Functions
def archive(from_path, absolute=False, exclude=None, file_name="archive.tgz", strip=None, to_path=".", view=False,
**kwargs):
"""Create a file archive.
:param from_path: The path that should be archived.
:type from_path: str
:param absolute: By default, the leading slash is stripped from each path. Set to ``True`` to preserve the
absolute path.
:type absolute: bool
:param bzip2: Compress using bzip2.
:type bzip2: bool
:param exclude: A pattern to be excluded from the archive.
:type exclude: str
:param format: The command to use for the operation.
:type format: str
:param gzip: Compress using gzip.
:type gzip: bool
:param strip: Remove the specified number of leading elements from the path. Paths with fewer elements will be
silently skipped.
:type strip: int
:param to_path: Where the archive should be created. This should *not* include the file name.
:type to_path: str
:param view: View the output of the command as it happens.
:type view: bool
"""
tokens = ["tar"]
switches = ["-cz"]
if absolute:
switches.append("P")
if view:
switches.append("v")
tokens.append("".join(switches))
if exclude:
tokens.append("--exclude %s" % exclude)
if strip:
tokens.append("--strip-components %s" % strip)
to_path = os.path.join(to_path, file_name)
tokens.append('-f %s %s' % (to_path, from_path))
name = " ".join(tokens)
return Command(name, **kwargs)
def certbot(domain_name, email=None, webroot=None, **kwargs):
"""Get new SSL certificate from Let's Encrypt.
:param domain_name: The domain name for which the SSL certificate is requested.
:type domain_name: str
:param email: The email address of the requester sent to the certificate authority. Required.
:type email: str
:param webroot: The directory where the challenge file will be created.
:type webroot: str
"""
_email = email or os.environ.get("SCRIPTTEASE_CERTBOT_EMAIL", None)
_webroot = webroot or os.path.join("/var", "www", "domains", domain_name.replace(".", "_"), "www")
if not _email:
raise ValueError("Email is required for certbot command.")
template = "certbot certonly --agree-tos --email %(email)s -n --webroot -w %(webroot)s -d %(domain_name)s"
name = template % {
'domain_name': domain_name,
'email': _email,
'webroot': _webroot,
}
return Command(name, **kwargs)
def extract(from_path, absolute=False, exclude=None, strip=None, to_path=None, view=False, **kwargs):
"""Extract a file archive.
:param from_path: The path to the archive file.
:type from_path: str
:param absolute: By default, the leading slash is stripped from each path. Set to ``True`` to preserve the
absolute path.
:type absolute: bool
:param exclude: A pattern to be excluded from the archive.
:type exclude: str
:param strip: Remove the specified number of leading elements from the path. Paths with fewer elements will be
silently skipped.
:type strip: int
:param to_path: Where the archive should be extracted.
:type to_path: str
:param view: View the output of the command as it happens.
:type view: bool
"""
_to_path = to_path or "./"
tokens = ["tar"]
switches = ["-xz"]
if absolute:
switches.append("P")
if view:
switches.append("v")
tokens.append("".join(switches))
if exclude:
tokens.append("--exclude %s" % exclude)
if strip:
tokens.append("--strip-components %s" % strip)
tokens.append('-f %s %s' % (from_path, _to_path))
name = " ".join(tokens)
return Command(name, **kwargs)
def file_append(path, content=None, **kwargs): def file_append(path, content=None, **kwargs):
"""Append content to a file. """Append content to a file.
@ -11,7 +178,6 @@ def file_append(path, content=None, **kwargs):
:type content: str :type content: str
""" """
print("HERE")
kwargs.setdefault("comment", "append to %s" % path) kwargs.setdefault("comment", "append to %s" % path)
statement = 'echo "%s" >> %s' % (content or "", path) statement = 'echo "%s" >> %s' % (content or "", path)
@ -52,6 +218,32 @@ def file_copy(from_path, to_path, overwrite=False, recursive=False, **kwargs):
return Command(" ".join(a), **kwargs) return Command(" ".join(a), **kwargs)
def file_write(path, content=None, **kwargs):
"""Initialize the command.
:param path: The file to be written.
:type path: str
:param content: The content to be written. Note: If omitted, this command is equivalent to :py:class:`Touch`.
:type content: str
"""
_content = content or ""
kwargs.setdefault("comment", "write to %s" % path)
a = list()
if len(_content.split("\n")) > 1:
a.append("cat > %s << EOF" % path)
a.append(_content)
a.append("EOF")
else:
a.append('echo "%s" > %s' % (_content, path))
return Command(" ".join(a), **kwargs)
def mkdir(path, mode=None, recursive=True, **kwargs): def mkdir(path, mode=None, recursive=True, **kwargs):
"""Initialize the command. """Initialize the command.
@ -59,7 +251,7 @@ def mkdir(path, mode=None, recursive=True, **kwargs):
:type path: str :type path: str
:param mode: The access permissions of the new directory. :param mode: The access permissions of the new directory.
:type mode: str :type mode: int | str
:param recursive: Create all directories along the path. :param recursive: Create all directories along the path.
:type recursive: bool :type recursive: bool
@ -96,7 +288,7 @@ def perms(path, group=None, mode=None, owner=None, recursive=False, **kwargs):
:type group: str :type group: str
:param mode: The access permissions of the file or directory. :param mode: The access permissions of the file or directory.
:type mode: str :type mode: int | str
:param owner: The name of the user to be applied. :param owner: The name of the user to be applied.
:type owner: str :type owner: str
@ -107,6 +299,8 @@ def perms(path, group=None, mode=None, owner=None, recursive=False, **kwargs):
""" """
commands = list() commands = list()
kwargs['comment'] = "set permissions on %s" % path
if group is not None: if group is not None:
statement = ["chgrp"] statement = ["chgrp"]
@ -144,7 +338,7 @@ def perms(path, group=None, mode=None, owner=None, recursive=False, **kwargs):
a = list() a = list()
for c in commands: for c in commands:
a.append(c.get_statement()) a.append(c.get_statement(suppress_comment=True))
return Command("\n".join(a), **kwargs) return Command("\n".join(a), **kwargs)
@ -175,3 +369,276 @@ def remove(path, force=False, recursive=False, **kwargs):
statement.append(path) statement.append(path)
return Command(" ".join(statement), **kwargs) return Command(" ".join(statement), **kwargs)
def rsync(source, target, delete=False, exclude=None, host=None, key_file=None, links=True, port=22,
recursive=True, user=None, **kwargs):
"""Initialize the command.
:param source: The source directory.
:type source: str
:param target: The target directory.
:type target: str
:param delete: Indicates target files that exist in source but not in target should be removed.
:type delete: bool
:param exclude: The path to an exclude file.
:type exclude: str
:param host: The host name or IP address. This causes the command to run over SSH and may require a
``key_file``, ``port``, and ``user``.
:type host: str
:param key_file: The path to the private SSH key to use for remove connections. User expansion is
automatically applied.
:type key_file: str
:param links: Include symlinks in the sync.
:type links: bool
:param port: The SSH port to use for remote connections.
:type port: int
:param recursive: Indicates source contents should be recursively synchronized.
:type recursive: bool
:param user: The user name to use for remote connections.
"""
# :param guess: When ``True``, the ``host``, ``key_file``, and ``user`` will be guessed based on the base name of
# the source path.
# :type guess: bool
# if guess:
# host = host or os.path.basename(source).replace("_", ".")
# key_file = key_file or os.path.expanduser(os.path.join("~/.ssh", os.path.basename(source)))
# user = user or os.path.basename(source)
# else:
# host = host
# key_file = key_file
# user = user
kwargs.setdefault("comment", "copy %s to remote %s" % (source, target))
# rsync -e "ssh -i $(SSH_KEY) -p $(SSH_PORT)" -P -rvzc --delete
# $(OUTPUTH_PATH) $(SSH_USER)@$(SSH_HOST):$(UPLOAD_PATH) --cvs-exclude;
tokens = list()
tokens.append('rsync')
tokens.append("--cvs-exclude")
tokens.append("--checksum")
tokens.append("--compress")
if links:
tokens.append("--copy-links")
if delete:
tokens.append("--delete")
if exclude is not None:
tokens.append("--exclude-from=%s" % exclude)
# --partial and --progress
tokens.append("-P")
if recursive:
tokens.append("--recursive")
tokens.append(source)
conditions = [
host is not None,
key_file is not None,
user is not None,
]
if all(conditions):
tokens.append('-e "ssh -i %s -p %s"' % (key_file, port))
tokens.append("%s@%s:%s" % (user, host, target))
else:
tokens.append(target)
statement = " ".join(tokens)
return Command(statement, **kwargs)
def run(statement, **kwargs):
"""Run any statement."""
kwargs.setdefault("comment", "run statement")
return Command(statement, **kwargs)
def scopy(from_path, to_path, host=None, key_file=None, port=22, user=None, **kwargs):
"""Initialize the command.
:param from_path: The source directory.
:type from_path: str
:param to_path: The target directory.
:type to_path: str
:param host: The host name or IP address. Required.
:type host: str
:param key_file: The path to the private SSH key to use for remove connections. User expansion is
automatically applied.
:type key_file: str
:param port: The SSH port to use for remote connections.
:type port: int
:param user: The user name to use for remote connections.
"""
kwargs.setdefault("comment", "copy %s to remote %s" % (from_path, to_path))
# TODO: What to do to force local versus remote commands?
# kwargs['local'] = True
kwargs['sudo'] = False
statement = ["scp"]
if key_file is not None:
statement.append("-i %s" % key_file)
statement.append("-P %s" % port)
statement.append(from_path)
if host is not None and user is not None:
statement.append("%s@%s:%s" % (user, host, to_path))
elif host is not None:
statement.append("%s:%s" % (host, to_path))
else:
raise ValueError("Host is a required keyword argument.")
return Command(" ".join(statement), **kwargs)
def sed(path, backup=".b", delimiter="/", find=None, replace=None, **kwargs):
"""Find and replace text in a file.
:param path: The path to the file to be edited.
:type path: str
:param backup: The backup file extension to use.
:type backup: str
:param delimiter: The pattern delimiter.
:param find: The old text. Required.
:type find: str
:param replace: The new text. Required.
:type replace: str
"""
kwargs.setdefault("comment", "find and replace in %s" % path)
context = {
'backup': backup,
'delimiter': delimiter,
'path': path,
'pattern': find,
'replace': replace,
}
template = "sed -i %(backup)s 's%(delimiter)s%(pattern)s%(delimiter)s%(replace)s%(delimiter)sg' %(path)s"
statement = template % context
return Command(statement, **kwargs)
def symlink(source, force=False, target=None, **kwargs):
"""Initialize the command.
:param source: The source of the link.
:type source: str
:param force: Force the creation of the link.
:type force: bool
:param target: The name or path of the target. Defaults to the base name of the source path.
:type target: str
"""
_target = target or os.path.basename(source)
kwargs.setdefault("comment", "link to %s" % source)
statement = ["ln -s"]
if force:
statement.append("-f")
statement.append(source)
statement.append(_target)
return Command(" ".join(statement), **kwargs)
def touch(path, **kwargs):
"""Initialize the command.
:param path: The file or directory to touch.
:type path: str
"""
kwargs.setdefault("comment", "touch %s" % path)
return Command("touch %s" % path, **kwargs)
# Classes
class Function(object):
"""A function that may be used to organize related commands to be called together."""
def __init__(self, name, commands=None, comment=None):
self.commands = commands or list()
self.comment = comment
self.name = name
def to_string(self):
a = list()
if self.comment is not None:
a.append("# %s" % self.comment)
a.append("function %s()" % self.name)
a.append("{")
for command in self.commands:
a.append(indent(command.get_statement(cd=True)))
a.append("")
a.append("}")
return "\n".join(a)
# Mappings
POSIX_MAPPINGS = {
'append': file_append,
'archive': archive,
'certbot': certbot,
'copy': file_copy,
'extract': extract,
'func': Function,
'function': Function,
'mkdir': mkdir,
'move': move,
'perms': perms,
'remove': remove,
'rsync': rsync,
'run': run,
'scopy': scopy,
'sed': sed,
'ssl': certbot,
'symlink': symlink,
'touch': touch,
'write': file_write,
}

@ -1,14 +1,59 @@
from ..commands import Command # Imports
from .common import python_pip, python_virtualenv
from .posix import file_append, file_copy, mkdir, move, perms, remove
name = "ubuntu" from ..commands import Command
from .common import COMMON_MAPPINGS
from .django import DJANGO_MAPPINGS
from .pgsql import PGSQL_MAPPINGS
from .posix import POSIX_MAPPINGS, Function
# Exports
__all__ = (
"MAPPINGS",
"apache",
"apache_disable_module",
"apache_disable_site",
"apache_enable_module",
"apache_enable_site",
"apache_reload",
"apache_restart",
"apache_start",
"apache_stop",
"apache_test",
"command_exists",
"service_reload",
"service_restart",
"service_start",
"service_stop",
"system",
"system_install",
"system_reboot",
"system_update",
"system_upgrade",
"system_uninstall",
"Function",
)
def command_exists(name): def command_exists(name):
return name in MAPPINGS return name in MAPPINGS
def apache(op, **kwargs):
if op == "reload":
return apache_reload(**kwargs)
elif op == "restart":
return apache_restart(**kwargs)
elif op == "start":
return apache_start(**kwargs)
elif op == "stop":
return apache_stop(**kwargs)
elif op == "test":
return apache_test(**kwargs)
else:
raise NameError("Unrecognized or unsupported apache operation: %s" % op)
def apache_disable_module(name, **kwargs): def apache_disable_module(name, **kwargs):
kwargs.setdefault("comment", "disable %s apache module" % name) kwargs.setdefault("comment", "disable %s apache module" % name)
@ -30,7 +75,7 @@ def apache_enable_module(name, **kwargs):
def apache_enable_site(name, **kwargs): def apache_enable_site(name, **kwargs):
kwargs.setdefault("comment", "enable %s apache module" % name) kwargs.setdefault("comment", "enable %s apache module" % name)
return Command("a2densite %s" % name, **kwargs) return Command("a2ensite %s" % name, **kwargs)
def apache_reload(**kwargs): def apache_reload(**kwargs):
@ -78,7 +123,7 @@ def service_restart(name, **kwargs):
kwargs.setdefault("comment", "restart %s service" % name) kwargs.setdefault("comment", "restart %s service" % name)
kwargs.setdefault("register", "%s_restarted" % name) kwargs.setdefault("register", "%s_restarted" % name)
return Command("service %s reload" % name, **kwargs) return Command("service %s restart" % name, **kwargs)
def service_start(name, **kwargs): def service_start(name, **kwargs):
@ -95,6 +140,17 @@ def service_stop(name, **kwargs):
return Command("service %s stop" % name, **kwargs) return Command("service %s stop" % name, **kwargs)
def system(op, **kwargs):
if op == "reboot":
return system_reboot(**kwargs)
elif op == "update":
return system_update(**kwargs)
elif op == "upgrade":
return system_upgrade(**kwargs)
else:
raise NameError("Unrecognized or unsupported system operation: %s" % op)
def system_install(name, **kwargs): def system_install(name, **kwargs):
kwargs.setdefault("comment", "install system package %s" % name) kwargs.setdefault("comment", "install system package %s" % name)
@ -126,6 +182,7 @@ def system_upgrade(**kwargs):
MAPPINGS = { MAPPINGS = {
'apache': apache,
'apache.disable_module': apache_disable_module, 'apache.disable_module': apache_disable_module,
'apache.disable_site': apache_disable_site, 'apache.disable_site': apache_disable_site,
'apache.enable_module': apache_enable_module, 'apache.enable_module': apache_enable_module,
@ -135,21 +192,19 @@ MAPPINGS = {
'apache.start': apache_start, 'apache.start': apache_start,
'apache.stop': apache_stop, 'apache.stop': apache_stop,
'apache.test': apache_test, 'apache.test': apache_test,
'append': file_append,
'copy': file_copy,
'install': system_install, 'install': system_install,
'mkdir': mkdir,
'move': move,
'perms': perms,
'pip': python_pip,
'reboot': system_reboot, 'reboot': system_reboot,
'reload': service_reload, 'reload': service_reload,
'remove': remove,
'restart': service_restart, 'restart': service_restart,
'start': service_start, 'start': service_start,
'stop': service_stop, 'stop': service_stop,
'system': system,
'update': system_update, 'update': system_update,
'uninstall': system_uninstall, 'uninstall': system_uninstall,
'upgrade': system_upgrade, 'upgrade': system_upgrade,
'virtualenv': python_virtualenv,
} }
MAPPINGS.update(COMMON_MAPPINGS)
MAPPINGS.update(DJANGO_MAPPINGS)
MAPPINGS.update(PGSQL_MAPPINGS)
MAPPINGS.update(POSIX_MAPPINGS)

@ -11,7 +11,7 @@ class Script(object):
:type name: str :type name: str
:param commands: The commands to be included. :param commands: The commands to be included.
:type commands: list[BaseType[Command]] :type commands: list[scripttease.library.commands.base.Command]
:param functions: The functions to be included. :param functions: The functions to be included.
:type functions: list[Function] :type functions: list[Function]
@ -49,21 +49,21 @@ class Script(object):
a = list() a = list()
if shebang is not None: if shebang is not None:
a.append("%s" % {'shell': self.shell}) a.append(shebang % {'shell': self.shell})
a.append("") a.append("")
if self.functions is not None: if self.functions is not None:
for function in self.functions: for function in self.functions:
a.append(function.preview()) a.append(function.to_string())
a.append("") a.append("")
for function in self.functions: # for function in self.functions:
a.append("%s;" % function.name) # a.append("%s;" % function.name)
a.append("") a.append("")
for command in self.commands: for command in self.commands:
a.append(command.preview(cwd=True)) a.append(command.get_statement(cd=True))
a.append("") a.append("")
return "\n".join(a) return "\n".join(a)

@ -1,85 +1,4 @@
# Imports # Imports
import logging
from superpython.utils import any_list_item
from ..constants import LOGGER_NAME
from .ini import Config from .ini import Config
from .utils import filter_commands, load_commands, load_config
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"filter_commands",
"load_commands",
)
# Functions
def filter_commands(commands, environments=None, tags=None):
"""Filter commands based on the given criteria.
:param commands: The commands to be filtered.
:type commands: list
:param environments: Environment names to be matched.
:type environments: list[str]
:param tags: Tag names to be matched.
:type tags
:return:
"""
filtered = list()
for command in commands:
if environments is not None:
if not any_list_item(environments, command.environments):
continue
if tags is not None:
if not any_list_item(tags, command.tags):
continue
filtered.append(command)
return filtered
def load_commands(path, filters=None, overlay=None, **kwargs):
"""Load commands from a configuration file.
:param path: The path to the configuration file.
:type path: str
:param filters: Used to filter commands.
:type filters: dict
:rtype: list[BaseType[Command] | ItemizedCommand] | None
:returns: A list of command instances or ``None`` if the configuration could not be loaded.
kwargs are passed to the configuration class for instantiation.
"""
if path.endswith(".ini"):
_config = Config(path, overlay=overlay, **kwargs)
# elif path.endswith(".yml"):
# _config = YAML(path, **kwargs)
else:
log.warning("Input file format is not currently supported: %s" % path)
return None
if _config.load():
commands = _config.get_commands()
if filters is not None:
criteria = dict()
for attribute, values in filters.items():
criteria[attribute] = values
commands = filter_commands(commands, **criteria)
return commands
log.error("Failed to load config file: %s" % path)
return None

@ -1,13 +1,13 @@
# Imports # Imports
from superpython.utils import File from superpython.utils import File
from ..library.overlays import Overlay from ..factory import Factory
from ..library.scripts import Script from ..library.scripts import Script
# Exports # Exports
__all__ = ( __all__ = (
"Parser" "Parser",
) )
# Classes # Classes
@ -16,19 +16,18 @@ __all__ = (
class Parser(File): class Parser(File):
"""Base class for implementing a command parser.""" """Base class for implementing a command parser."""
def __init__(self, path, context=None, locations=None, options=None, overlay=None): def __init__(self, path, context=None, locations=None, options=None, overlay="ubuntu"):
super().__init__(path) super().__init__(path)
self.context = context self.context = context
self.factory = Factory(overlay)
self.is_loaded = False self.is_loaded = False
self.locations = locations or list() self.locations = locations or list()
self.options = options or dict() self.options = options or dict()
self.overlay = overlay or Overlay("ubuntu") self.overlay = overlay
self._commands = list() self._commands = list()
self._functions = list() self._functions = list()
self.overlay.load()
def as_script(self): def as_script(self):
"""Convert loaded commands to a script. """Convert loaded commands to a script.
@ -73,4 +72,9 @@ class Parser(File):
return a return a
def load(self): def load(self):
"""Load the factory and the configuration file.
:rtype: bool
"""
raise NotImplementedError() raise NotImplementedError()

@ -4,7 +4,6 @@ from configparser import ConfigParser, ParsingError
import logging import logging
from superpython.utils import parse_jinja_template, read_file, smart_cast, split_csv from superpython.utils import parse_jinja_template, read_file, smart_cast, split_csv
import os import os
from ..library.commands import command_factory
from ..constants import LOGGER_NAME from ..constants import LOGGER_NAME
from .base import Parser from .base import Parser
@ -23,9 +22,13 @@ class Config(Parser):
"""An INI configuration for loading commands.""" """An INI configuration for loading commands."""
def load(self): def load(self):
"""Load commands from a INI file."""
if not self.exists: if not self.exists:
return False return False
if not self.factory.load():
return False
ini = self._load_ini() ini = self._load_ini()
if ini is None: if ini is None:
return False return False
@ -36,12 +39,15 @@ class Config(Parser):
command_name = None command_name = None
count = 0 count = 0
kwargs = self.options.copy() kwargs = self.options.copy()
kwargs['comment'] = comment
for key, value in ini.items(comment): for key, value in ini.items(comment):
# The first key/value pair is the command name and arguments. # The first key/value pair is the command name and arguments.
if count == 0: if count == 0:
command_name = key command_name = key
# Arguments surrounded by quotes are considered to be one argument. All others are split into a
# list to be passed to the callback.
if value[0] == '"': if value[0] == '"':
args.append(value.replace('"', "")) args.append(value.replace('"', ""))
else: else:
@ -53,8 +59,13 @@ class Config(Parser):
count += 1 count += 1
command = command_factory(command_name, comment, self.overlay, *args, **kwargs) command = self.factory.get_command(command_name, *args, **kwargs)
if command is not None: if command is not None:
if isinstance(command, self.factory.overlay.Function):
self._functions.append(command)
else:
self._commands.append(command)
# if isinstance(command, Function): # if isinstance(command, Function):
# self._functions.append(command) # self._functions.append(command)
# elif isinstance(command, Include): # elif isinstance(command, Include):
@ -70,7 +81,6 @@ class Config(Parser):
# self._commands.append(c) # self._commands.append(c)
# else: # else:
# self._commands.append(command) # self._commands.append(command)
self._commands.append(command)
else: else:
success = False success = False
@ -131,20 +141,20 @@ class Config(Parser):
log.error("Failed to parse %s: %s" % (self.path, e)) log.error("Failed to parse %s: %s" % (self.path, e))
return None return None
def _load_template(self, command): # def _load_template(self, command):
"""Load additional resources for a template command. # """Load additional resources for a template command.
#
:param command: The template command. # :param command: The template command.
:type command: Template # :type command: Template
#
""" # """
# This may produce problems if template kwargs are the same as the given context. # # This may produce problems if template kwargs are the same as the given context.
if self.context is not None: # if self.context is not None:
command.context.update(self.context) # command.context.update(self.context)
#
# Custom locations come before default locations. # # Custom locations come before default locations.
command.locations += self.locations # command.locations += self.locations
#
# This allows template files to be specified relative to the configuration file. # # This allows template files to be specified relative to the configuration file.
command.locations.append(os.path.join(self.directory, "templates")) # command.locations.append(os.path.join(self.directory, "templates"))
command.locations.append(self.directory) # command.locations.append(self.directory)

@ -0,0 +1,110 @@
# Imports
import logging
from superpython.utils import any_list_item
from ..constants import LOGGER_NAME
from .ini import Config
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"filter_commands",
"load_commands",
)
# Functions
def filter_commands(commands, environments=None, tags=None):
"""Filter commands based on the given criteria.
:param commands: The commands to be filtered.
:type commands: list
:param environments: Environment names to be matched.
:type environments: list[str]
:param tags: Tag names to be matched.
:type tags
:return:
"""
filtered = list()
for command in commands:
if environments is not None:
if not any_list_item(environments, command.environments):
continue
if tags is not None:
if not any_list_item(tags, command.tags):
continue
filtered.append(command)
return filtered
def load_commands(path, filters=None, overlay="ubuntu", **kwargs):
"""Load commands from a configuration file.
:param path: The path to the configuration file.
:type path: str
:param filters: Used to filter commands.
:type filters: dict
:param overlay: The name of the command overlay to apply to generated commands.
:type overlay: str
:rtype: list[scriptetease.library.commands.base.Command] | scriptetease.library.commands.base.ItemizedCommand] |
None
:returns: A list of command instances or ``None`` if the configuration could not be loaded.
kwargs are passed to the configuration class for instantiation.
"""
_config = load_config(path, overlay, **kwargs)
if _config is None:
return None
commands = _config.get_commands()
if filters is not None:
criteria = dict()
for attribute, values in filters.items():
criteria[attribute] = values
commands = filter_commands(commands, **criteria)
return commands
def load_config(path, overlay="ubuntu", **kwargs):
"""Load a command configuration.
:param path: The path to the configuration file.
:type path: str
:param overlay: The name of the command overlay to apply to generated commands.
:type overlay: str
:rtype: Config | None
kwargs are passed to the configuration class for instantiation.
"""
if path.endswith(".ini"):
_config = Config(path, overlay=overlay, **kwargs)
# elif path.endswith(".yml"):
# _config = YAML(path, **kwargs)
else:
log.warning("Input file format is not currently supported: %s" % path)
return None
if not _config.load():
log.error("Failed to load config file: %s" % path)
return None
return _config

@ -1,159 +0,0 @@
# Imports
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import get_formatter_by_name
BashLexer = get_lexer_by_name("bash")
JSONLexer = get_lexer_by_name("json")
PythonLexer = get_lexer_by_name("python")
TerminalFormatter = get_formatter_by_name("terminal", linenos=True)
# Exports
__all__ = (
"any_list_item",
"filter_commands",
"filter_objects",
"highlight_code",
"split_csv",
)
# Functions
def any_list_item(a, b):
"""Determine whether any item in ``a`` also exists in ``b``.
:param a: The first list to be compared.
:type a: list
:param b: The second list to be compared.
:type b: list
:rtype: bool
"""
for i in a:
for j in b:
if i == j:
return True
return False
def filter_commands(commands, values, attribute="tags"):
"""Filter commands for a given set of values.
:param commands: The commands to be filtered.
:type commands: list[BaseType[Command]]
:param values: The values to be compared.
:type values: list
:param attribute: The name of the command attribute to check. This attribute must be a list or tuple of values of
the same type given in ``values``.
:type attribute: str
:rtype: bool
.. code-block:: python
commands = [
AddUser("bob"),
Apt("apache2", tags=["apache", "www"]),
Reload("postgresql", tags=["database", "pgsql"]),
Touch("/var/www/index.html", tags=["www"]),
]
values = ["apache", "www"]
# Outputs the Apt and Touch commands above.
filtered_commands = filter_commands(command, values)
print(filtered_commands)
"""
filtered = list()
for command in commands:
try:
list_b = getattr(command, attribute)
except AttributeError:
continue
if not any_list_item(values, list_b):
continue
filtered.append(command)
return filtered
def filter_objects(objects, environments=None, scope=None, tags=None):
"""Filter the given objects by the given keys.
:param objects: The objects to be filtered.
:type objects: list
:param environments: The environments to be included.
:type environments: list[str]
:param scope: The scope by which to filter; deploy, provision, tenant.
:type scope: str
:param tags: The tags to be included.
:type tags: list[str]
:rtype: list
:returns: Returns the objects that match the given keys.
"""
filtered = list()
# print("object, object environments, environments, any_list_item")
for o in objects:
# print(o, o.environments, environments, any_list_item(environments, o.environments))
# Apply environment filter.
if environments is not None:
if hasattr(o, "environment"):
if o.environment is not None and o.environment not in environments:
continue
elif hasattr(o, "environments"):
if type(o.environments) in (list, tuple) and not any_list_item(environments, o.environments):
continue
else:
pass
# # Apply scope filter.
# if scope is not None:
# if o.scope not in [None, SCOPE_ALL, scope]:
# continue
# Apply tag filter.
if tags is not None:
if not any_list_item(tags, o.tags):
continue
# The object has passed the tests above.
filtered.append(o)
return filtered
def highlight_code(string, lexer=None):
"""Highlight (colorize) the given string as Python code.
:param string: The string to be highlighted.
:type string: str
:rtype: str
"""
if lexer is None:
lexer = BashLexer
return highlight(string, lexer, TerminalFormatter)

@ -12,22 +12,22 @@ def read_file(path):
setup( setup(
name='python-script-tease', name='python-scripttease',
version=read_file("VERSION.txt"), version=read_file("VERSION.txt"),
description=read_file("DESCRIPTION.txt"), description=read_file("DESCRIPTION.txt"),
long_description=read_file("README.markdown"), long_description=read_file("README.markdown"),
author='Shawn Davis', author='Shawn Davis',
author_email='shawn@myninjas.net', author_email='shawn@develmaycare.com',
url='https://bitbucket.com/myninjas/python-script-tease', url='https://github.com/develmaycare/python-scripttease',
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
install_requires=[ install_requires=[
"jinja2", "jinja2",
"pygments", "pygments",
"python-myninjas", "superpython",
], ],
dependency_links=[ dependency_links=[
"https://bitbucket.com/myninjas/python-myninjas/master.tar.gz#python-myninjas", "https://github.com/develmaycare/superpython",
], ],
classifiers=[ classifiers=[
'Development Status :: 2 - Pre-Alpha', 'Development Status :: 2 - Pre-Alpha',

@ -15,15 +15,3 @@ apache.enable_module = ssl
[atlernative syntax disable the default site] [atlernative syntax disable the default site]
apache = site default apache = site default
state = disabled state = disabled
[apache]
disable_module = a2dismod {{ module_name }}
disable_site = a2dissite {{ domain_name }}.conf
enable_module = a2enmod {{ module_name }}
enable_site = a2ensite {{ domain_name }}.conf
reload = service apache2 reload
restart = service apache2 restart
start = service apache2 start
stop = service apache2 stop
test = apachectl configtest

@ -0,0 +1,2 @@
[this command will fail to load]
nonexistent = testing

@ -0,0 +1,2 @@
[this command will fail to load]
run - testing

@ -0,0 +1,6 @@
[create the site directory]
mkdir: /var/www/domains/{{ domain_tld }}
[this will cause template parsing to fail]
touch: /path/to/{% if this.will.break %}testing

@ -0,0 +1,25 @@
[function for setting up apache]
func = apache_setup
[install apache]
install: apache2
func: apache_setup
[install wsgi]
install: mod_wsgi
func: apache_setup
[enable wsgi]
apache.enable_module: mod_wsgi
func: apache_setup
[disable the default site]
apache.disable_site = default
func: apache_setup
[restart apache]
apache: restart
func: apache_setup
[call apache setup]
run: apache_setup

@ -0,0 +1,192 @@
[update system repos]
system: update
[upgrade the system]
system: upgrade
[reboot the system]
system: reboot
[function for setting up apache]
func = apache_setup
[install apache]
install: apache2
func: apache_setup
[install wsgi]
install: mod_wsgi
func: apache_setup
[enable wsgi]
apache.enable_module: mod_wsgi
func: apache_setup
[disable the default site in function]
apache.disable_site = default
func: apache_setup
[restart apache]
apache: restart
func: apache_setup
[call apache setup]
run: apache_setup
[disable the default site]
apache.disable_site = default
[enable mod SSL]
apache.enable_module = mod_ssl
[enable more than one apache module at once]
apache.enable_module = $item
items = mod_wsgi, mod_rewrite
[make sure apache can be reloaded]
apache: test
[restart apache outside of function]
apache: restart
condition: $apache_checks_out -eq 0
[install the virtualenv package]
pip = virtualenv
[install django debug toolbar]
pip: django-debug-toolbar
env: development
tags: python, project
[create a virtual environment]
virtualenv = python
cd = /path/to/project
tags = python, project
[install pillow]
pip = Pillow
cd = /path/to/project
upgrade = yes
venv = python
tags = python, project
[apply database migrations]
django: migrate
cd: /path/to/project
venv: python
[run a custom django command]
django = custom_command arg1 arg2
cd = /path/to/project
venv = python
settings = tenants.example_app.settings
quiet = yes
[collect the project's static files]
django: collectstatic
cd: /path/to/project
venv: python
[load data fixtures]
django.loaddata: categories
cd: /path/to/project
venv: python
[dump data fixtures]
django.dumpdata: projects
cd: /path/to/project
venv: python
[reload a service]
reload: postfix
[restart a service]
restart: postfix
[stop a service]
stop: postfix
[start a service]
start: postfix
[install a package]
install: python3
[remove a package]
uninstall: apache-top
[add to a file]
append: /path/to/file.txt
content: this is a test
[copy a file]
copy: /path/to/file.txt /new/path/to/file.txt
[write (overwrite) a file]
write: /path/to/file.txt
content: this replaces all text in the file
[create a directory]
mkdir: /path/to/dir
mode: 755
[move a file]
move: /path/to/file.txt /path/to/file.txt.b
[set permissions on a file]
perms: /path/to/file.txt
group: www-data
mode: 755
owner: deploy
recursive: yes
[remove a file]
remove: /path/to/file.txt
[sync a directory]
rsync: /path/to/source /path/to/target
[copy a file to remote server]
scopy: /path/to/file.txt /path/to/server/file.txt
host: example.com
[replace text in a file]
sed: /path/to/file.txt
find: logging = no
replace: logging = yes
[create a symlink]
symlink: /var/www/domains
[touch a file]
touch: /path/to/file.txt
[create a postgres user/role]
pg.user: example_app
[create a postgres database]
pg.db: example_app
owner: example_app
[determine whether a postgres database exists]
pg.database_exists: example_app
[export a postgres database]
pg.dump: testing
[drop a postgres user/role]
pg.dropuser: testing
[drop a postgres database]
pg.dropdb: testing
[run an SQL command on a postgres database]
psql: "SELECT * FROM projects WHERE category = 'testing'"
database: example_app
owner: example_app
[create a file archive]
archive: /var/www/domains/example_com
[extract a file archive]
extract: /var/www/domains/example_com.tgz

@ -1,12 +1,15 @@
[install the virtualenv package] [install the virtualenv package]
pip = virtualenv pip = virtualenv
tags = python-support
[create a virtual environment] [create a virtual environment]
virtualenv = python virtualenv = python
cd = /path/to/project cd = /path/to/project
tags = python-support
[install pillow] [install pillow]
pip = Pillow pip = Pillow
cd = /path/to/project cd = /path/to/project
upgrade = yes upgrade = yes
venv = python venv = python
tags = depends

@ -0,0 +1,2 @@
[create the site directory]
mkdir: /var/www/domains/{{ domain_tld }}

@ -0,0 +1,40 @@
from scripttease.library.commands import Command, ItemizedCommand
from scripttease.factory import Factory
class TestFactory(object):
def test_get_command(self):
f = Factory("ubuntu")
f.load()
# Non-existent command.
c = f.get_command("nonexistent")
assert c is None
# A good command with itemized parameters.
c = f.get_command(
"pip",
"$item",
items=["Pillow", "psycopg2-binary", "django"]
)
assert isinstance(c, ItemizedCommand)
# A good, normal command.
c = f.get_command("pip", "django")
assert isinstance(c, Command)
# Command exists, but given bad arguments.
c = f.get_command("pip")
assert c is None
def test_load(self):
f = Factory("nonexistent")
assert f.load() is False
f = Factory("ubuntu")
assert f.load() is True
def test_repr(self):
f = Factory("centos")
assert repr(f) == "<Factory centos>"

@ -1,6 +1,5 @@
from scripttease.library.commands.base import Command, ItemizedCommand, Sudo from scripttease.library.commands.base import Command, ItemizedCommand, Sudo
from scripttease.library.commands.python import Pip from scripttease.library.overlays.common import python_pip
from scripttease.library.overlays import Overlay
class TestCommand(object): class TestCommand(object):
@ -65,22 +64,22 @@ class TestCommand(object):
class TestItemizedCommand(object): class TestItemizedCommand(object):
def test_getattr(self): def test_getattr(self):
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item", extra=True) c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item", extra=True)
assert c.extra is True assert c.extra is True
def test_get_commands(self): def test_get_commands(self):
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item") c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
commands = c.get_commands() commands = c.get_commands()
for i in commands: for i in commands:
assert isinstance(i, Pip) assert isinstance(i, Command)
def test_get_statement(self): def test_get_statement(self):
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item") c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
statement = c.get_statement() statement = c.get_statement()
assert "Pillow" in statement assert "Pillow" in statement
assert "psycopg2-binary" in statement assert "psycopg2-binary" in statement
assert "django" in statement assert "django" in statement
def test_repr(self): def test_repr(self):
c = ItemizedCommand(Pip, ["Pillow", "psycopg2-binary", "django"], "$item") c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
assert repr(c) == "<ItemizedCommand Pip>" assert repr(c) == "<ItemizedCommand python_pip>"

@ -1,20 +0,0 @@
from scripttease.library.commands import command_factory, ItemizedCommand
from scripttease.library.commands.python import Pip
from scripttease.library.overlays import Overlay
def test_command_factory():
overlay = Overlay("ubuntu")
overlay.load()
command = command_factory("nonexistent", "non existent command", overlay)
assert command is None
command = command_factory("pip", "install pillow", overlay)
assert command is None
command = command_factory("pip", "install pillow", overlay, "Pillow")
assert isinstance(command, Pip)
command = command_factory("pip", "install various", overlay, "$item", items=["Pillow", "pyscopg2-binary", "django"])
assert isinstance(command, ItemizedCommand)

@ -1,18 +0,0 @@
from scripttease.library.commands.python import *
from scripttease.library.overlays import Overlay
def test_pip():
pip = Pip("Pillow")
assert "pip install -y Pillow" in pip.get_statement()
overlay = Overlay("ubuntu")
overlay.load()
pip = Pip("Pillow", op="remove", overlay=overlay, venv="python")
assert "source python/bin/activate && pip3 uninstall --quiet Pillow" in pip.get_statement()
def test_virtualenv():
virt = VirtualEnv()
assert "virtualenv python" in virt.get_statement()

@ -0,0 +1,17 @@
from scripttease.library.overlays.common import *
def test_python_pip():
c = python_pip("Pillow")
assert "pip install -y Pillow" in c.get_statement()
c = python_pip("Pillow", upgrade=True)
assert "--upgrade" in c.get_statement()
c = python_pip("Pillow", venv="python")
assert "source python/bin/activate" in c.get_statement()
def test_python_virtual_env():
c = python_virtualenv()
assert "virtualenv python" in c.get_statement()

@ -0,0 +1,61 @@
from scripttease.library.overlays.django import *
def test_django():
c = django("check")
assert "./manage.py check" in c.get_statement()
c = django("collectstatic")
assert "./manage.py collectstatic" in c.get_statement()
c = django("migrate")
assert "./manage.py migrate" in c.get_statement()
c = django("custom", "arg1", "arg2", venv="python", settings="tenants.example.settings", quiet=True)
s = c.get_statement()
assert "./manage.py custom" in s
assert "arg1" in s
assert "arg2" in s
assert "--settings=" in s
assert "source python/bin/activate" in s
assert "--quiet" in s
def test_django_check():
c = django_check(venv="python")
s = c.get_statement()
assert "./manage.py check" in s
assert "source python/bin/activate" in s
def test_django_collect_static():
c = django_collect_static(venv="python")
s = c.get_statement()
assert "./manage.py collectstatic" in s
assert "source python/bin/activate" in s
def test_django_dumpdata():
c = django_dumpdata("projects")
s = c.get_statement()
assert "./manage.py dumpdata" in s
assert "projects >" in s
assert "--format=json" in s
assert "--indent=4" in s
assert "local/projects/fixtures/initial.json" in s
def test_django_loaddata():
c = django_loaddata("projects")
s = c.get_statement()
print(s)
assert "./manage.py loaddata" in s
assert "local/projects/fixtures/initial.json" in s
def test_django_migrate():
c = django_migrate(cd="/path/to/project/", venv="python")
s = c.get_statement(cd=True)
assert "./manage.py migrate" in s
assert "source python/bin/activate" in s
assert "cd /path/to/project/" in s

@ -0,0 +1,60 @@
from scripttease.library.overlays.pgsql import *
def test_pg_create_database():
c = pg_create_database("testing", admin_pass="secret", template="mytemplate")
s = c.get_statement()
assert "createdb" in s
assert "export PGPASSWORD=" in s
assert "--host=" in s
assert "--port=" in s
assert "--username=" in s
assert "--owner=" in s
assert "--template=mytemplate" in s
assert "testing" in s
def test_pg_create_user():
c = pg_create_user("testing", password="secret")
s = c.get_statement()
assert "createuser" in s
assert "-DRS" in s
assert "testing" in s
assert "ALTER USER testing" in s
def test_pg_database_exists():
c = pg_database_exists("testing")
s = c.get_statement()
assert "psql" in s
assert "testing_db_exists" in s
def test_pg_drop_database():
c = pg_drop_database("testing")
s = c.get_statement()
assert "dropdb" in s
assert "testing" in s
def test_pg_drop_user():
c = pg_drop_user("testing")
s = c.get_statement()
assert "dropuser" in s
assert "testing" in s
def test_pg_dump_database():
c = pg_dump_database("testing")
s = c.get_statement()
assert "pg_dump" in s
assert "--column-inserts" in s
assert "--file=testing.sql" in s
def test_psql():
c = psql("SELECT * FROM projects", database="testing")
s = c.get_statement()
assert "psql" in s
assert "--dbname=testing" in s
assert '-c "SELECT * FROM projects"' in s

@ -0,0 +1,209 @@
import pytest
from scripttease.library.overlays.posix import *
def test_archive():
c = archive(
"/path/to/target",
absolute=True,
exclude="*.log",
strip=1,
view=True
)
s = c.get_statement()
print(s)
# tar -czPv --exclude *.log --strip-components 1 -f ./archive.tgz /path/to/target
assert "tar -czPv --exclude *.log --strip-components 1" in s
assert "-f ./archive.tgz /path/to/target" in s
def test_certbot():
with pytest.raises(ValueError):
c = certbot("example.com")
c = certbot("example.com", email="webmaster@example.com")
s = c.get_statement()
assert "certbot certonly --agree-tos --email webmaster@example.com -n" in s
assert "--webroot -w /var/www/domains/example_com/www -d example.com" in s
def test_extract():
c = extract(
"/path/to/archive.tgz",
absolute=True,
exclude="*.log",
strip=1,
view=True
)
s = c.get_statement()
assert "tar -xzPv --exclude *.log --strip-components 1" in s
assert "-f /path/to/archive.tgz ./" in s
def test_file_append():
c = file_append("/path/to/file.txt", content="testing = yes")
assert 'echo "testing = yes" >> /path/to/file.txt' in c.get_statement()
def test_file_copy():
c = file_copy("/path/to/file.txt", "/path/to/new-file.txt")
s = c.get_statement()
assert "cp" in s
assert "-n" in s
assert "/path/to/file.txt /path/to/new-file.txt" in s
c = file_copy("/path/to/dir", "/path/to/newdir", recursive=True)
s = c.get_statement()
assert "cp" in s
assert "-R" in s
assert "/path/to/dir /path/to/newdir" in s
def test_file_write():
c = file_write("/path/to/file.txt", content="testing 123")
assert 'echo "testing 123" > /path/to/file.txt' in c.get_statement()
content = [
"I am testing",
"I am testing",
"I am testing",
"testing 123",
]
c = file_write("/path/to/file.txt", content="\n".join(content))
s = c.get_statement()
assert "cat > /path/to/file.txt << EOF" in s
assert "I am testing" in s
assert "testing 123" in s
def test_mkdir():
c = mkdir("/path/to/dir", mode=755, recursive=True)
s = c.get_statement()
assert "mkdir" in s
assert "-m 755" in s
assert "-p" in s
assert "/path/to/dir" in s
def test_move():
c = move("/path/to/file.txt", "/path/to/file.txt.b")
assert "mv /path/to/file.txt /path/to/file.txt.b" in c.get_statement()
def test_perms():
c = perms("/path/to/dir", group="www-data", mode=755, owner="deploy", recursive=True)
s = c.get_statement()
assert "chgrp -R www-data /path/to/dir" in s
assert "chown -R deploy /path/to/dir" in s
assert "chmod -R 755 /path/to/dir" in s
def test_remove():
c = remove("/path/to/dir", force=True, recursive=True)
s = c.get_statement()
assert "rm" in s
assert "-f" in s
assert "-r" in s
assert "/path/to/dir" in s
def test_rsync():
c = rsync(
"/path/to/local/",
"/path/to/remote",
links=True,
delete=True,
exclude="deploy/exclude.txt",
recursive=True,
host="example.com",
key_file="~/.ssh/deploy",
user="deploy"
)
s = c.get_statement()
assert "rsync --cvs-exclude --checksum --compress --copy-links --delete" in s
assert "--exclude-from=deploy/exclude.txt" in s
assert "-P" in s
assert "--recursive /path/to/local/" in s
assert '-e "ssh -i ~/.ssh/deploy -p 22"' in s
assert "deploy@example.com:/path/to/remote" in s
c = rsync(
"/path/to/local/",
"/path/to/remote",
links=True,
delete=True,
exclude="deploy/exclude.txt",
recursive=True,
)
s = c.get_statement()
assert "rsync --cvs-exclude --checksum --compress --copy-links --delete" in s
assert "--exclude-from=deploy/exclude.txt" in s
assert "-P" in s
assert "--recursive" in s
assert "/path/to/local/" in s
assert "/path/to/remote" in s
def test_run():
c = run("ls -ls")
assert "ls -ls" in c.get_statement()
def test_scopy():
with pytest.raises(ValueError):
c = scopy("/path/to/local/file.txt", "/path/to/remote/file.txt")
c = scopy(
"/path/to/local/file.txt",
"/path/to/remote/file.txt",
key_file="~/.ssh/deploy",
host="example.com",
user="deploy"
)
s = c.get_statement()
assert "scp -i ~/.ssh/deploy" in s
assert "-P 22" in s
assert "/path/to/local/file.txt" in s
assert "deploy@example.com:/path/to/remote/file.txt" in s
c = scopy(
"/path/to/local/file.txt",
"/path/to/remote/file.txt",
host="example.com",
)
s = c.get_statement()
assert "scp -P 22" in s
assert "/path/to/local/file.txt" in s
assert "example.com:/path/to/remote/file.txt" in s
def test_sed():
c = sed("/path/to/file.txt", find="testing", replace="123")
s = c.get_statement()
assert "sed -i .b" in s
assert "s/testing/123/g" in s
assert "/path/to/file.txt" in s
def test_symlink():
c = symlink("/var/www/domains", force=True)
s = c.get_statement()
assert "ln -s" in s
assert "-f" in s
assert "/var/www/domains" in s
def test_touch():
c = touch("/path/to/file.txt")
assert "touch /path/to/file.txt" in c.get_statement()
class TestFunction(object):
def test_to_string(self):
f = Function("testing", comment="A test function.")
f.commands.append(touch("/path/to/file.txt"))
s = f.to_string()
assert "# A test function." in s
assert "function testing()" in s
assert "touch /path/to/file.txt" in s

@ -0,0 +1,86 @@
import pytest
from scripttease.library.overlays.ubuntu import *
def test_apache():
c = apache("reload")
assert "service apache2 reload" in c.get_statement()
c = apache("restart")
assert "service apache2 restart" in c.get_statement()
c = apache("start")
assert "service apache2 start" in c.get_statement()
c = apache("stop")
assert "service apache2 stop" in c.get_statement()
c = apache("test")
assert "apachectl configtest" in c.get_statement()
with pytest.raises(NameError):
apache("nonexistent")
def test_apache_disable_module():
c = apache_disable_module("mod_ssl")
assert "a2dismod mod_ssl" in c.get_statement()
def test_apache_disable_site():
c = apache_disable_site("default")
assert "a2dissite default" in c.get_statement()
def test_apache_enable_module():
c = apache_enable_module("mod_wsgi")
assert "a2enmod mod_wsgi" in c.get_statement()
def test_apache_enable_site():
c = apache_enable_site("example.com")
assert "a2ensite example.com" in c.get_statement()
def test_service_reload():
c = service_reload("postfix")
assert "service postfix reload" in c.get_statement()
def test_service_restart():
c = service_restart("postfix")
assert "service postfix restart" in c.get_statement()
def test_service_start():
c = service_start("postfix")
assert "service postfix start" in c.get_statement()
def test_service_stop():
c = service_stop("postfix")
assert "service postfix stop" in c.get_statement()
def test_system():
c = system("reboot")
assert "reboot" in c.get_statement()
c = system("update")
assert "apt-get update -y" in c.get_statement()
c = system("upgrade")
assert "apt-get upgrade -y" in c.get_statement()
with pytest.raises(NameError):
system("nonexistent")
def test_system_install():
c = system_install("vim")
assert "apt-get install -y vim" in c.get_statement()
def test_system_uninstall():
c = system_uninstall("lftp")
assert "apt-get uninstall -y lftp" in c.get_statement()

@ -0,0 +1,31 @@
from scripttease.library.commands import Command, ItemizedCommand
from scripttease.library.overlays.posix import Function
from scripttease.library.scripts import Script
class TestScript(object):
def test_append(self):
s = Script("testing")
s.append(Command("ls -ls", comment="list some stuff"))
s.append(Command("touch /path/to/file.txt", comment="touch a file"))
s.append(Command("ln -s /path/to/file.txt", comment="link to a file"))
assert len(s.commands) == 3
def test_to_string(self):
s = Script("testing")
s.append(Command("ls -ls", comment="list some stuff"))
s.append(Command("touch /path/to/file.txt", comment="touch a file"))
s.append(Command("ln -s /path/to/file.txt", comment="link to a file"))
s.functions = list()
s.functions.append(Function("testing"))
output = s.to_string()
assert output == str(s)
assert "ls -ls" in output
assert "touch /path/to/file.txt" in output
assert "ln -s /path/to/file.txt" in output
assert "function testing()" in output

@ -1,19 +0,0 @@
from scripttease.library.overlays import Overlay
class TestOverlay(object):
def test_get(self):
overlay = Overlay("ubuntu")
overlay.load()
assert overlay.get("nonexistent", "nonexistent") is None
def test_has(self):
overlay = Overlay("ubuntu")
overlay.load()
assert overlay.has("nonexistent", "nonexistent") is False
assert overlay.has("python", "nonexistent") is False
def test_load(self):
overlay = Overlay("nonexistent")
assert overlay.load() is False

@ -0,0 +1,22 @@
import pytest
from scripttease.library.scripts import Script
# from scripttease.parsers import filter_commands, load_commands
from scripttease.parsers.base import Parser
class TestParser(object):
def test_as_script(self):
p = Parser("/path/to/nonexistent.txt")
assert isinstance(p.as_script(), Script)
# def test_get_commands(self):
# pass
#
# def test_get_functions(self):
# pass
def test_load(self):
p = Parser("/path/to/nonexistent.txt")
with pytest.raises(NotImplementedError):
p.load()

@ -0,0 +1,45 @@
import pytest
from scripttease.parsers.ini import Config
class TestConfig(object):
def test_get_commands(self):
c = Config("tests/examples/kitchen_sink.ini")
assert c.load() is True
assert len(c.get_commands()) > 0
def test_get_functions(self):
c = Config("tests/examples/kitchen_sink.ini")
assert c.load() is True
assert len(c.get_functions()) > 0
def test_load(self):
c = Config("nonexistent.ini")
assert c.load() is False
c = Config("tests/examples/python_examples.ini", overlay="nonexistent")
assert c.load() is False
c = Config("tests/examples/bad_examples.ini")
assert c.load() is False
c = Config("tests/examples/kitchen_sink.ini")
assert c.load() is True
c = Config("tests/examples/bad_command.ini")
assert c.load() is False
context = {
'domain_tld': "example_com",
}
c = Config("tests/examples/template_example.ini", context=context)
assert c.load() is True
context = {
'domain_tld': "example_com",
}
c = Config("tests/examples/bad_template_example.ini", context=context)
assert c.load() is False

@ -0,0 +1,42 @@
import pytest
from scripttease.library.commands import Command, ItemizedCommand
from scripttease.parsers import filter_commands, load_commands
def test_filter_commands():
commands = [
Command("apt-get install apache2 -y", environments=["base"], tags=["web"]),
Command("apt-get install apache-top -y", environments=["live"], tags=["web"]),
Command("pip install django-debug-toolbar", environments=["development"], tags=["django"]),
Command("pip install django", environments=["base"], tags=["django"]),
]
f1 = filter_commands(commands, environments=["base", "live"])
assert len(f1) == 3
f2 = filter_commands(commands, tags=["django"])
assert len(f2) == 2
f3 = filter_commands(commands, environments=["base", "development"])
assert len(f3) == 3
f4 = filter_commands(commands, environments=["base"], tags=["web"])
assert len(f4) == 1
def test_load_commands():
commands = load_commands("nonexistent.xml")
assert commands is None
commands = load_commands("nonexistent.ini")
assert commands is None
commands = load_commands("tests/examples/bad_examples.ini")
assert commands is None
commands = load_commands(
"tests/examples/python_examples.ini",
filters={
'tags': ["python-support"],
}
)
assert len(commands) == 2
Loading…
Cancel
Save