Finished unit tests for new release.

development
Shawn Davis 2 years ago
parent 85f7d8f792
commit 7a64eeff09
  1. 1
      .coveragerc
  2. 2
      LICENSE.txt
  3. 1
      PEP440.txt
  4. 1
      RELEASE.txt
  5. 2
      VERSION.txt
  6. 84
      scripttease/factory.py
  7. 60
      scripttease/lib/commands/base.py
  8. 2
      scripttease/lib/commands/centos.py
  9. 6
      scripttease/lib/commands/django.py
  10. 7
      scripttease/lib/commands/mysql.py
  11. 2
      scripttease/lib/commands/posix.py
  12. 4
      scripttease/lib/factories.py
  13. 16
      scripttease/lib/loaders/base.py
  14. 13
      scripttease/lib/loaders/ini.py
  15. 1
      scripttease/lib/mappings.py
  16. 0
      scripttease/library/__init__.py
  17. 3
      scripttease/library/commands/__init__.py
  18. 309
      scripttease/library/commands/base.py
  19. 197
      scripttease/library/commands/templates.py
  20. 0
      scripttease/library/overlays/__init__.py
  21. 281
      scripttease/library/overlays/centos.py
  22. 150
      scripttease/library/overlays/common.py
  23. 188
      scripttease/library/overlays/django.py
  24. 262
      scripttease/library/overlays/mysql.py
  25. 226
      scripttease/library/overlays/pgsql.py
  26. 758
      scripttease/library/overlays/posix.py
  27. 338
      scripttease/library/overlays/ubuntu.py
  28. 69
      scripttease/library/scripts.py
  29. 4
      scripttease/parsers/__init__.py
  30. 80
      scripttease/parsers/base.py
  31. 179
      scripttease/parsers/ini.py
  32. 325
      scripttease/parsers/utils.py
  33. 0
      scripttease/parsers/yaml.py
  34. 10
      scripttease/version.py
  35. 2
      setup.py
  36. 14
      tests/examples/apache_examples.ini
  37. 9
      tests/examples/bad_custom_example.ini
  38. 15
      tests/examples/bad_variables.ini
  39. 9
      tests/examples/custom_example.ini
  40. 6
      tests/examples/python_examples.ini
  41. 3
      tests/examples/templates/settings.py
  42. 2
      tests/examples/variables.ini
  43. 45
      tests/test_factory.py
  44. 416
      tests/test_lib_commands_base.py
  45. 13
      tests/test_lib_commands_centos.py
  46. 14
      tests/test_lib_commands_django.py
  47. 46
      tests/test_lib_commands_messages.py
  48. 33
      tests/test_lib_commands_mysql.py
  49. 33
      tests/test_lib_commands_pgsql.py
  50. 7
      tests/test_lib_commands_php.py
  51. 221
      tests/test_lib_commands_posix.py
  52. 25
      tests/test_lib_commands_python.py
  53. 21
      tests/test_lib_commands_ubuntu.py
  54. 37
      tests/test_lib_contexts.py
  55. 50
      tests/test_lib_factories.py
  56. 83
      tests/test_lib_loaders_base.py
  57. 23
      tests/test_lib_loaders_ini.py
  58. 4
      tests/test_lib_loaders_yaml.py
  59. 130
      tests/test_library_commands_base.py
  60. 113
      tests/test_library_commands_templates.py
  61. 57
      tests/test_library_overlays_common.py
  62. 31
      tests/test_library_scripts.py
  63. 30
      tests/test_parsers_base.py
  64. 48
      tests/test_parsers_ini.py
  65. 133
      tests/test_parsers_utils.py

@ -2,6 +2,7 @@
omit =
docs/*
scripttease/cli/*
scripttease/lib/loaders/yaml.py
scripttease/variables.py
scripttease/version.py
sandbox

@ -10,7 +10,7 @@ are permitted provided that the following conditions are met:
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of Pleasant Tents, LLC nor the names of its contributors
* Neither the name of copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.

@ -1 +0,0 @@
6.8.33

@ -0,0 +1 @@
7.0.0-a

@ -1 +1 @@
6.8.33
7.0.0

@ -1,84 +0,0 @@
# Imports
import logging
from importlib import import_module
from .constants import LOGGER_NAME
from .library.commands import ItemizedCommand
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"Factory",
)
# Classes
class Factory(object):
"""A command factory."""
def __init__(self, overlay):
"""Initialize the factory.
:param overlay: The name of the overlay to use for generating commands.
:type overlay: str
"""
self.is_loaded = False
self.overlay = None
self._overlay = overlay
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self._overlay)
def get_command(self, name, *args, **kwargs):
"""Get a command.
:param name: The name of the command.
:type name: str
args and kwargs are passed to the initialize the command.
:rtype: scripttease.library.commands.Command | scripttease.library.commands.ItemizedCommand
:raise: RuntimeError
:raises: ``RuntimeError`` if the factory has not yet been loaded.
"""
if not self.is_loaded:
raise RuntimeError("Factory has not been loaded, so no commands are available. Call load() method first!")
if not self.overlay.command_exists(name):
log.warning("Command does not exist in %s overlay: %s" % (self._overlay, name))
return None
callback = self.overlay.MAPPINGS[name]
try:
items = kwargs.pop("items", None)
if items is not None:
return ItemizedCommand(callback, items, *args, name=name, **kwargs)
command = callback(*args, **kwargs)
command.name = name
return command
except (KeyError, NameError, TypeError, ValueError) as e:
log.critical("Failed to load %s command: %s" % (name, e))
return None
def load(self):
"""Load the factory.
:rtype: bool
"""
try:
self.overlay = import_module("scripttease.library.overlays.%s" % self._overlay)
self.is_loaded = True
except ImportError as e:
log.error("The %s overlay could not be imported: %s" % (self._overlay, str(e)))
pass
return self.is_loaded

@ -126,6 +126,33 @@ class Content(object):
def __init__(self, content_type, caption=None, css=None, heading=None, height=None, image=None, message=None,
width=None, **kwargs):
"""Initialize content.
:param content_type: The type of content; ``explain`` or ``screenshot``.
:type content_type: str
:param caption: A caption for images.
:type caption: str
:param css: The CSS class to be applied.
:type css: str
:param heading: A heading for explanatory content.
:type heading: str
:param height: The height of an image.
:type height: int | str
:param image: The URL or path to the image file.
:type image: str
:param message: The explanation.
:type message: str
:param width: The width of an image.
:type width: int | str
"""
self.caption = caption
self.css = css
self.heading = heading
@ -142,7 +169,7 @@ class Content(object):
return self.kwargs.get(item)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.content_type)
return "<%s %s>" % (self.__class__.__name__, self.type)
@property
def is_itemized(self):
@ -150,12 +177,12 @@ class Content(object):
return False
def get_output(self, output_format):
if self.content_type == "explain":
if self.type == "explain":
return self._get_message_output(output_format)
elif self.content_type == "screenshot":
elif self.type == "screenshot":
return self._get_image_output(output_format)
else:
log.warning("Invalid content type: %s" % self.content_type)
log.warning("Invalid content type: %s" % self.type)
return None
# noinspection PyUnusedLocal
@ -199,8 +226,8 @@ class Content(object):
if self.height is not None:
a.append(indent(":height: %s" % self.height, 8))
if self.height is not None:
a.append(indent(":height: %s" % self.height, 8))
if self.width is not None:
a.append(indent(":width: %s" % self.width, 8))
else:
if self.caption:
a.append("%s: %s" % (self.caption, self.image))
@ -295,7 +322,7 @@ class ItemizedCommand(object):
"""
kwargs = self.kwargs.copy()
kwargs['name'] = self.name
# kwargs['name'] = self.name
a = list()
for item in self.items:
@ -334,6 +361,18 @@ class MultipleCommands(object):
"""An interface for handling command composition where multiple statements must be generated."""
def __init__(self, commands, comment=None, tags=None, **kwargs):
"""Initialize multiple commands.
:param commands: A list of comments.
:type commands: list[BaseType[scripttease.lib.commands.Command]]
:param comment: A comment regarding the commands.
:type comment: str
:param tags: A list of tags.
:type tags: list[str]
"""
self.commands = commands
self.comment = comment or "run multiple commands"
self.tags = tags or list()
@ -576,6 +615,7 @@ class Template(object):
content = read_file(template)
return content % self.context
if self.parser == self.PARSER_JINJA:
try:
return parse_jinja_template(template, self.context)
except TemplateNotFound:
@ -585,6 +625,8 @@ class Template(object):
log.error("Could not parse %s template: %s" % (template, e))
return None
return None
# noinspection PyUnusedLocal
def get_statement(self, cd=True, include_comment=True, include_register=True, include_stop=True):
lines = list()
@ -599,7 +641,7 @@ class Template(object):
# Get the content; e.g. parse the template.
content = self.get_content()
if content is None:
lines.append("# NOT CONTENT AVAILABLE")
lines.append("# NO CONTENT AVAILABLE")
return "\n".join(lines)
# Templates that are bash scripts will fail to write because of the shebang.
@ -645,6 +687,8 @@ class Template(object):
return "python"
elif self.target.endswith(".sh"):
return "bash"
elif self.target.endswith(".sql"):
return "sql"
elif self.target.endswith(".yml"):
return "yaml"
else:

@ -1,7 +1,7 @@
# Imports
from commonkit import split_csv
from .base import Command, Template
from .base import Command
from .django import DJANGO_MAPPINGS
from .messages import MESSAGE_MAPPINGS
from .mysql import MYSQL_MAPPINGS

@ -72,7 +72,7 @@ def django_dump(target, path=None, **kwargs):
kwargs.setdefault("indent", 4)
if path is None:
path = "../deploy/fixtures/%s.%s" % (target, kwargs['format'])
path = "../fixtures/%s.%s" % (target, kwargs['format'])
return django("dumpdata", target, "> %s" % path, **kwargs)
@ -81,7 +81,7 @@ def django_load(target, path=None, **kwargs):
kwargs.setdefault("comment", "load app/model data from %s" % target)
input_format = kwargs.pop("format", "json")
if path is None:
path = "../deploy/fixtures/%s.%s" % (target, input_format)
path = "../fixtures/%s.%s" % (target, input_format)
return django("loaddata", path, **kwargs)
@ -101,7 +101,9 @@ DJANGO_MAPPINGS = {
'django': django,
'django.check': django_check,
'django.dump': django_dump,
'django.dumpdata': django_dump,
'django.load': django_load,
'django.loaddata': django_load,
'django.migrate': django_migrate,
'django.static': django_static,
}

@ -9,8 +9,10 @@ from .base import Command
__all__ = (
"MYSQL_MAPPINGS",
"mysql_create",
"mysql_drop",
"mysql_dump",
"mysql_exists",
"mysql_grant",
"mysql_load",
"mysql_user",
)
@ -80,7 +82,6 @@ def mysql_drop(database, **kwargs):
def mysql_dump(database, path=None, **kwargs):
kwargs.setdefault("comment", "dump mysql database")
kwargs.setdefault("complete_inserts", True)
if path is None:
path = "%s.sql" % database
@ -135,7 +136,7 @@ def mysql_grant(to, database=None, privileges="ALL", **kwargs):
def mysql_load(database, path, **kwargs):
kwargs.setdefault("comment", "load data into a mysql database")
return mysql("psql", database, "< %s" % path, **kwargs)
return mysql("mysql", database, "< %s" % path, **kwargs)
def mysql_user(name, admin_pass=None, admin_user="root", op="create", password=None, **kwargs):
@ -165,7 +166,7 @@ def mysql_user(name, admin_pass=None, admin_user="root", op="create", password=N
return command
elif op == "exists":
kwargs.setdefault("comment", "determine if %s mysql user exists" % name)
kwargs.setdefault("register", "mysql_use_exists")
kwargs.setdefault("register", "mysql_user_exists")
command = mysql("mysql", password=admin_pass, user=admin_user, **kwargs)

@ -103,7 +103,7 @@ def copy(from_path, to_path, overwrite=False, recursive=False, **kwargs):
return Command(" ".join(a), **kwargs)
def directory(path, group=None, mode=None, owner=None, recursive=True, **kwargs):
def directory(path, group=None, mode=None, owner=None, recursive=False, **kwargs):
"""Create a directory.
- path (str): The path to be created.

@ -33,9 +33,9 @@ def command_factory(loader, excluded_kwargs=None, mappings=None, profile=PROFILE
"""
# Identify the command mappings to be used.
if profile == "centos":
if profile == PROFILE.CENTOS:
_mappings = CENTOS_MAPPINGS
elif profile == "ubuntu":
elif profile == PROFILE.UBUNTU:
_mappings = UBUNTU_MAPPINGS
else:
log.error("Unsupported or unrecognized profile: %s" % profile)

@ -2,7 +2,7 @@
from commonkit import any_list_item, parse_jinja_string, parse_jinja_template, pick, read_file, smart_cast, split_csv, \
File
from configparser import ParsingError, RawConfigParser
from configparser import ParsingError, ConfigParser
from jinja2.exceptions import TemplateError, TemplateNotFound
import logging
import os
@ -74,7 +74,7 @@ def load_variables(path, env=None):
log.warning("Variables file does not exist: %s" % path)
return list()
ini = RawConfigParser()
ini = ConfigParser()
try:
ini.read(path)
except ParsingError as e:
@ -82,18 +82,18 @@ def load_variables(path, env=None):
return list()
variables = list()
for variable_name in ini.sections():
if ":" in variable_name:
variable_name, _environment = variable_name.split(":")
for section in ini.sections():
if ":" in section:
variable_name, _environment = section.split(":")
else:
_environment = None
variable_name = variable_name
variable_name = section
kwargs = {
'environment': _environment,
}
_value = None
for key, value in ini.items(variable_name):
for key, value in ini.items(section):
if key == "value":
_value = smart_cast(value)
continue
@ -212,7 +212,7 @@ class BaseLoader(File):
be defined.
- ``groups`` is assumed to be a CSV list of groups if provided as a string.
- ``items`` is assumed to be a CSV list if provided as a string. These are used to create an "itemized" command.
- ``tags`` is assumed to be a CSV list oif provided as a string.
- ``tags`` is assumed to be a CSV list if provided as a string.
All other keys are used as is. Values provided as a CSV list are smart cast to a Python value.

@ -48,8 +48,9 @@ class INILoader(BaseLoader):
kwargs['comment'] = comment
for key, value in ini.items(comment):
if key.startswith("_"):
continue
# Is there a reason for this?
# if key.startswith("_"):
# continue
# The first key/value pair is the command name and arguments.
if count == 0:
@ -63,8 +64,8 @@ class INILoader(BaseLoader):
continue
# Arguments surrounded by quotes are considered to be one argument. All others are split into a
# list to be passed to the parser. It is also possible that this is a call where no arguments are
# present, so the whole thing is wrapped to protect against an index error. A TypeError is raised in
# list to be passed to the command factory. It is also possible that this is a call where no
# arguments are present, so the whole thing is wrapped to protect against an index error. A TypeError is raised in
# cases where a command is provided with no positional arguments; we interpret this as True.
try:
if value[0] == '"':
@ -73,9 +74,9 @@ class INILoader(BaseLoader):
args = value.split(" ")
except IndexError:
pass
except TypeError:
# except TypeError:
# noinspection PyTypeChecker
args.append(True)
# args.append(True)
else:
_key, _value = self._get_key_value(key, value)
kwargs[_key] = _value

@ -1 +0,0 @@
from .commands.posix import POSIX_MAPPINGS

@ -1,3 +0,0 @@
from .base import Command, ItemizedCommand
from .templates import Template
# from .factory import command_factory

@ -1,309 +0,0 @@
# Classes
class Command(object):
"""A command line statement."""
def __init__(self, statement, comment=None, condition=None, cd=None, environments=None, function=None, name=None,
prefix=None, register=None, shell=None, stop=False, sudo=None, tags=None, **kwargs):
"""Initialize a command.
:param statement: The statement to be executed.
:type statement: str
:param comment: A comment regarding the statement.
:type comment: str
:param condition: A (system-specific) condition for the statement to be executed.
:type condition: str
:param cd: The direction from which the statement should be executed.
:type cd: str
:param environments: A list of target environments where the statement should be executed.
:type environments: list[str]
:param function: The name of the function in which the statement is executed.
:type function: str
:param name: The name of the command from the mapping. Not used and not required for programmatic use, but
automatically assigned during factory instantiation.
:type name: str
:param prefix: A statement to execute before the main statement is executed.
:type prefix: str
:param register: A variable name to use for capture the success for failure of the statement's execution.
:type register: str
:param shell: The shell execute through which the statement is executed.
:type shell: str
:param stop: Indicates process should stop if the statement fails to execute.
:type stop: bool | None
:param sudo: Indicates whether sudo should be invoked for the statement. Given as a bool or user name or
:py:class:`scripttease.library.commands.base.Sudo` instance.
:type sudo: bool | str | Sudo
:param tags: A list of tags describing the statement.
:type tags: list[str]
Additional kwargs are available as dynamic attributes of the Command instance.
"""
self.comment = comment
self.condition = condition
self.cd = cd
self.environments = environments or list()
self.function = function
self.name = name
self.prefix = prefix
self.register = register
self.shell = shell
self.statement = statement
self.stop = stop
self.tags = tags or list()
if isinstance(sudo, Sudo):
self.sudo = sudo
elif type(sudo) is str:
self.sudo = Sudo(enabled=True, user=sudo)
elif sudo is True:
self.sudo = Sudo(enabled=True)
else:
self.sudo = Sudo()
self._attributes = kwargs
def __getattr__(self, item):
return self._attributes.get(item)
def __repr__(self):
if self.comment is not None:
return "<%s %s>" % (self.__class__.__name__, self.comment)
return "<%s>" % self.__class__.__name__
def get_statement(self, cd=False, suppress_comment=False):
"""Get the full statement.
:param cd: Include the directory change, if given.
:type cd: bool
:param suppress_comment: Don't include the comment.
:type suppress_comment: bool
:rtype: str
"""
a = list()
if cd and self.cd is not None:
a.append("( cd %s &&" % self.cd)
if self.prefix is not None:
a.append("%s &&" % self.prefix)
if self.sudo:
statement = "%s %s" % (self.sudo, self._get_statement())
else:
statement = self._get_statement()
a.append("%s" % statement)
if cd and self.cd is not None:
a.append(")")
b = list()
if self.comment is not None and not suppress_comment:
b.append("# %s" % self.comment)
if self.condition is not None:
b.append("if [[ %s ]]; then %s; fi;" % (self.condition, " ".join(a)))
else:
b.append(" ".join(a))
if self.register is not None:
b.append("%s=$?;" % self.register)
if self.stop:
b.append("if [[ $%s -gt 0 ]]; exit 1; fi;" % self.register)
elif self.stop:
b.append("if [[ $? -gt 0 ]]; exit 1; fi;")
else:
pass
return "\n".join(b)
def has_attribute(self, name):
"""Indicates whether the command has the named, dynamic attribute.
:param name: The name of the attribute to be checked.
:type name: str
:rtype: bool
"""
return name in self._attributes
@property
def is_itemized(self):
"""Always returns ``False``."""
return False
def set_attribute(self, name, value):
"""Set the value of a dynamic attribute.
:param name: The name of the attribute.
:type name: str
:param value: The value of the attribute.
"""
self._attributes[name] = value
def _get_statement(self):
"""By default, get the statement passed upon command initialization.
:rtype: str
"""
return self.statement
class ItemizedCommand(object):
"""An itemized command represents multiple commands of with the same statement but different parameters."""
def __init__(self, callback, items, *args, name=None, **kwargs):
"""Initialize the command.
:param callback: The function to be used to generate the command.
:param items: The command arguments.
:type items: list[str]
:param name: The name of the command from the mapping. Not used and not required for programmatic use, but
automatically assigned during factory instantiation.
:type name: str
:param args: The itemized arguments. ``$item`` should be included.
Keyword arguments are passed to the command class upon instantiation.
"""
self.args = args
self.callback = callback
self.items = items
self.kwargs = kwargs
self.name = name
# Set defaults for when ItemizedCommand is referenced directly before individual commands are instantiated. For
# example, when command filtering occurs.
self.kwargs.setdefault("environments", list())
self.kwargs.setdefault("tags", list())
def __getattr__(self, item):
return self.kwargs.get(item)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.callback.__name__)
def get_commands(self):
"""Get the commands to be executed.
:rtype: list[BaseType(Command)]
"""
kwargs = self.kwargs.copy()
a = list()
for item in self.items:
args = list()
for arg in self.args:
args.append(arg.replace("$item", item))
command = self.callback(*args, **kwargs)
a.append(command)
return a
def get_statement(self, cd=False, suppress_comment=False):
"""Override to get multiple commands."""
kwargs = self.kwargs.copy()
comment = kwargs.pop("comment", "execute multiple commands")
a = list()
# a.append("# %s" % comment)
commands = self.get_commands()
for c in commands:
a.append(c.get_statement(cd=cd, suppress_comment=suppress_comment))
a.append("")
# for item in self.items:
# args = list()
# for arg in self.args:
# args.append(arg.replace("$item", item))
#
# command = self.command_class(*args, **kwargs)
# a.append(command.preview(cwd=cwd))
# a.append("")
return "\n".join(a)
def has_attribute(self, name):
"""Indicates whether the command has the named, dynamic attribute.
:param name: The name of the attribute to be checked.
:type name: str
:rtype: bool
"""
return name in self.kwargs
@property
def is_itemized(self):
"""Always returns ``True``."""
return True
def set_attribute(self, name, value):
"""Set the value of a dynamic attribute.
:param name: The name of the attribute.
:type name: str
:param value: The value of the attribute.
.. note::
This is applied to all command in the itemized list.
"""
self.kwargs[name] = value
class Sudo(object):
"""Helper class for defining sudo options."""
def __init__(self, enabled=False, user="root"):
"""Initialize the helper.
:param enabled: Indicates sudo is enabled.
:type enabled: bool
:param user: The user to be invoked.
:type user: str
"""
self.enabled = enabled
self.user = user
def __bool__(self):
return self.enabled
def __str__(self):
if self.enabled:
return "sudo -u %s" % self.user
return ""

@ -1,197 +0,0 @@
# Imports
from commonkit import parse_jinja_template, read_file
from jinja2.exceptions import TemplateError, TemplateNotFound
import logging
import os
from ...constants import LOGGER_NAME
from .base import Command
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"Template",
)
# Classes
class Template(Command):
"""Parse a template."""
PARSER_JINJA = "jinja2"
PARSER_SIMPLE = "simple"
def __init__(self, source, target, backup=True, lines=False, parser=PARSER_JINJA, pythonic=False, **kwargs):
"""Initialize the command.
:param source: The template source file.
:type source: str
:param target: The path to the output file.
:type target: str
:param backup: Indicates a copy of an existing file should be madee.
:type backup: bool
:param parser: The parser to use.
:type parser: str
:param pythonic: Use a Python one-liner to write the file. Requires Python installation, obviously. This is
useful when the content of the file cannot be handled with a cat command; for example, shell
script templates.
:type pythonic: bool
"""
# Base parameters need to be captured, because all others are assumed to be switches for the management command.
self._kwargs = {
'comment': kwargs.pop("comment", None),
'cd': kwargs.pop("cd", None),
'environments': kwargs.pop("environments", None),
'function': kwargs.pop("function", None),
# 'local': kwargs.pop("local", False),
'name': "template",
'prefix': kwargs.pop("prefix", None),
'shell': kwargs.pop("shell", "/bin/bash"),
'stop': kwargs.pop("stop", False),
'sudo': kwargs.pop('sudo', False),
'tags': kwargs.pop("tags", None),
}
self.backup_enabled = backup
self.context = kwargs.pop("context", dict())
self.parser = parser or self.PARSER_JINJA
self.pythonic = pythonic
self.line_by_line = lines
self.locations = kwargs.pop("locations", list())
self.source = os.path.expanduser(source)
self.target = target
# Remaining kwargs are added to the context.
# print(_kwargs['comment'], kwargs)
self.context.update(kwargs)
super().__init__("# template: %s" % source, **self._kwargs)
def get_content(self):
"""Parse the template.
:rtype: str | None
"""
template = self.get_template()
if self.parser == self.PARSER_SIMPLE:
content = read_file(template)
for key, value in self.context.items():
replace = "$%s$" % key
content = content.replace(replace, str(value))
return content
try:
return parse_jinja_template(template, self.context)
except TemplateNotFound:
log.error("Template not found: %s" % template)
return None
except TemplateError as e:
log.error("Could not parse %s template: %s" % (template, e))
return None
def get_statement(self, cd=False, suppress_comment=False):
"""Override to get the statement based on the parser."""
if self.parser == self.PARSER_JINJA:
return self._get_jinja2_statement(cd=cd).statement
elif self.parser == self.PARSER_SIMPLE:
return self._get_simple_statement(cd=cd).statement
else:
log.error("Unknown or unsupported template parser: %s" % self.parser)
return None
def get_template(self):
"""Get the template path.
:rtype: str
"""
source = self.source
for location in self.locations:
_source = os.path.join(location, self.source)
if os.path.exists(_source):
return _source
return source
def _get_command(self, content):
"""Get the cat command."""
output = list()
# TODO: Template backup is not system safe, but is specific to bash.
if self.backup_enabled:
output.append('if [[ -f "%s" ]]; then mv %s %s.b; fi;' % (self.target, self.target, self.target))
if content.startswith("#!"):
_content = content.split("\n")
first_line = _content.pop(0)
output.append('echo "%s" > %s' % (first_line, self.target))
output.append("cat >> %s << EOF" % self.target)
output.append("\n".join(_content))
output.append("EOF")
else:
output.append("cat > %s << EOF" % self.target)
output.append(content)
output.append("EOF")
statement = "\n".join(output)
return Command(statement, **self._kwargs)
# # BUG: This still does not seem to work, possibly because a shell script includes EOF? The work around is to use
# # get_content(), self.target, and write the file manually.
# if self.line_by_line:
# a = list()
# a.append('touch %s' % self.target)
# for i in content.split("\n"):
# i = i.replace('"', r'\"')
# a.append('echo "%s" >> %s' % (i, self.target))
#
# output.append("\n".join(a))
# elif self.pythonic:
# target_file = File(self.target)
# script_file = "write_%s_template.py" % target_file.name.replace("-", "_")
#
# a = list()
# a.append('content = """%s' % content)
# a.append('"""')
# a.append("")
# a.append('with open("%s", "w") as f:' % self.target)
# a.append(' f.write(content)')
# a.append(' f.close()')
# a.append('')
# output.append('cat > %s <<EOF' % script_file)
# output.append("\n".join(a))
# output.append('EOF')
# output.append("")
# output.append("rm %s" % script_file)
# else:
# output.append("cat > %s << EOF" % self.target)
# output.append(content)
# output.append("EOF")
#
# statement = "\n".join(output)
# return Command(statement, **self._kwargs)
# noinspection PyUnusedLocal
def _get_jinja2_statement(self, cd=False):
"""Parse a Jinja2 template."""
content = self.get_content()
return self._get_command(content)
# noinspection PyUnusedLocal
def _get_simple_statement(self, cd=False):
"""Parse a "simple" template."""
content = self.get_content()
return self._get_command(content)

@ -1,281 +0,0 @@
# Imports
from commonkit import split_csv
from ..commands import Command, Template
from .common import COMMON_MAPPINGS
from .django import DJANGO_MAPPINGS
from .mysql import MYSQL_MAPPINGS
from .pgsql import PGSQL_MAPPINGS
from .posix import POSIX_MAPPINGS, Function
# Exports
__all__ = (
"MAPPINGS",
"apache",
"apache_reload",
"apache_restart",
"apache_start",
"apache_stop",
"apache_test",
"command_exists",
"service_reload",
"service_restart",
"service_start",
"service_stop",
"system",
"system_install",
"system_reboot",
"system_update",
"system_upgrade",
"system_uninstall",
"template",
"user",
"Function",
)
def command_exists(name):
"""Indicates whether a given command exists in this overlay.
:param name: The name of the command.
:type name: str
:rtype: bool
"""
return name in MAPPINGS
def apache(op, **kwargs):
"""Execute an Apache-related command.
- op (str): The operation to perform; reload, restart, start, stop, test.
"""
if op == "reload":
return apache_reload(**kwargs)
elif op == "restart":
return apache_restart(**kwargs)
elif op == "start":
return apache_start(**kwargs)
elif op == "stop":
return apache_stop(**kwargs)
elif op == "test":
return apache_test(**kwargs)
else:
raise NameError("Unrecognized or unsupported apache operation: %s" % op)
def apache_reload(**kwargs):
kwargs.setdefault("comment", "reload apache")
kwargs.setdefault("register", "apache_reloaded")
return Command("apachectl –k reload", **kwargs)
def apache_restart(**kwargs):
kwargs.setdefault("comment", "restart apache")
kwargs.setdefault("register", "apache_restarted")
return Command("apachectl –k restart", **kwargs)
def apache_start(**kwargs):
kwargs.setdefault("comment", "start apache")
kwargs.setdefault("register", "apache_started")
return Command("apachectl –k start", **kwargs)
def apache_stop(**kwargs):
kwargs.setdefault("comment", "stop apache")
return Command("apachectl –k stop", **kwargs)
def apache_test(**kwargs):
kwargs.setdefault("comment", "check apache configuration")
kwargs.setdefault("register", "apache_checks_out")
return Command("apachectl configtest", **kwargs)
def service_reload(name, **kwargs):
"""Reload a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "reload %s service" % name)
kwargs.setdefault("register", "%s_reloaded" % name)
return Command("systemctl reload %s" % name, **kwargs)
def service_restart(name, **kwargs):
"""Restart a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "restart %s service" % name)
kwargs.setdefault("register", "%s_restarted" % name)
return Command("ssystemctl restart %s" % name, **kwargs)
def service_start(name, **kwargs):
"""Start a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "start %s service" % name)
kwargs.setdefault("register", "%s_started" % name)
return Command("systemctl start %s" % name, **kwargs)
def service_stop(name, **kwargs):
"""Stop a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "stop %s service" % name)
kwargs.setdefault("register", "%s_stopped" % name)
return Command("systemctl stop %s" % name, **kwargs)
def system(op, **kwargs):
"""Perform a system operation.
- op (str): The operation to perform; reboot, update, upgrade.
"""
if op == "reboot":
return system_reboot(**kwargs)
elif op == "update":
return system_update(**kwargs)
elif op == "upgrade":
return system_upgrade(**kwargs)
else:
raise NameError("Unrecognized or unsupported system operation: %s" % op)
def system_install(name, **kwargs):
"""Install a system-level package.
- name (str): The name of the package to install.
"""
kwargs.setdefault("comment", "install system package %s" % name)
return Command("yum install -y %s" % name, **kwargs)
def system_reboot(**kwargs):
kwargs.setdefault("comment", "reboot the system")
return Command("reboot", **kwargs)
def system_uninstall(name, **kwargs):
"""Uninstall a system-level package.
- name (str): The name of the package to uninstall.
"""
kwargs.setdefault("comment", "remove system package %s" % name)
return Command("yum remove -y %s" % name, **kwargs)
def system_update(**kwargs):
kwargs.setdefault("comment", "update system package info")
return Command("yum check-update", **kwargs)
def system_upgrade(**kwargs):
kwargs.setdefault("comment", "upgrade the system")
return Command("yum update -y", **kwargs)
def template(source, target, backup=True, parser=None, **kwargs):
"""Create a file from a template.
- source (str): The path to the template file.
- target (str): The path to where the new file should be created.
- backup (bool): Indicates whether a backup should be made if the target file already exists.
- parser (str): The parser to use ``jinja`` (the default) or ``simple``.
"""
return Template(source, target, backup=backup, parser=parser, **kwargs)
def user(name, groups=None, home=None, op="add", password=None, **kwargs):
"""Create or remove a user.
- name (str): The user name.
- groups (str | list): A list of groups to which the user should belong.
- home (str): The path to the user's home directory.
- op (str); The operation to perform; ``add`` or ``remove``.
- password (str): The user's password. (NOT IMPLEMENTED)
"""
if op == "add":
kwargs.setdefault("comment", "create a user named %s" % name)
commands = list()
a = list()
a.append('adduser %s' % name)
if home is not None:
a.append("--home %s" % home)
commands.append(Command(" ".join(a), **kwargs))
if type(groups) is str:
groups = split_csv(groups, smart=False)
if type(groups) in [list, tuple]:
for group in groups:
commands.append(Command("gpasswd -a %s %s" % (name, group), **kwargs))
a = list()
for c in commands:
a.append(c.get_statement(suppress_comment=True))
return Command("\n".join(a), **kwargs)
elif op == "remove":
kwargs.setdefault("comment", "remove a user named %s" % name)
return Command("userdel -r %s" % name, **kwargs)
else:
raise NameError("Unsupported or unrecognized operation: %s" % op)
MAPPINGS = {
'apache': apache,
'install': system_install,
'reboot': system_reboot,
'reload': service_reload,
'restart': service_restart,
'start': service_start,
'stop': service_stop,
'system': system,
'template': template,
'update': system_update,
'uninstall': system_uninstall,
'upgrade': system_upgrade,
'user': user,
}
MAPPINGS.update(COMMON_MAPPINGS)
MAPPINGS.update(DJANGO_MAPPINGS)
MAPPINGS.update(MYSQL_MAPPINGS)
MAPPINGS.update(PGSQL_MAPPINGS)
MAPPINGS.update(POSIX_MAPPINGS)

@ -1,150 +0,0 @@
# Imports
from ..commands import Command
# Exports
__all__ = (
"COMMON_MAPPINGS",
"python_pip",
"python_virtualenv",
"run",
"slack",
"twist",
"udf",
)
# Functions
def python_pip(name, op="install", upgrade=False, venv=None, version=3, **kwargs):
"""Use pip to install or uninstall a Python package.
- name (str): The name of the package.
- op (str): The operation to perform; install, uninstall
- upgrade (bool): Upgrade an installed package.
- venv (str): The name of the virtual environment to load.
- version (int): The Python version to use, e.g. ``2`` or ``3``.
"""
manager = "pip"
if version == 3:
manager = "pip3"
if upgrade:
statement = "%s install --upgrade %s" % (manager, name)
else:
statement = "%s %s %s" % (manager, op, name)
if venv is not None:
kwargs['prefix'] = "source %s/bin/activate" % venv
kwargs.setdefault("comment", "%s %s" % (op, name))
return Command(statement, **kwargs)
def python_virtualenv(name, **kwargs):
"""Create a Python virtual environment.
- name (str): The name of the environment to create.
"""
kwargs.setdefault("comment", "create %s virtual environment" % name)
return Command("virtualenv %s" % name, **kwargs)
def run(statement, **kwargs):
"""Run any statement.
- statement (str): The statement to be executed.
"""
kwargs.setdefault("comment", "run statement")
return Command(statement, **kwargs)
def slack(message, url=None, **kwargs):
"""Send a message to Slack.
- message (str): The message to be sent.
- url (str): The webhook URL. This is required. See documentation.
"""
if url is None:
raise ValueError("A url is required to use the slack command.")
kwargs.setdefault("comment", "send a message to slack")
a = list()
a.append("curl -X POST -H 'Content-type: application/json' --data")
a.append("'" + '{"text": "%s"}' % message + "'")
a.append(url)
return Command(" ".join(a), **kwargs)
def twist(message, title="Notice", url=None, **kwargs):
"""Send a message to Twist.
- message (str): The message to be sent.
- title (str): The message title.
- url (str): The webhook URL. This is required. See documentation.
"""
if url is None:
raise ValueError("A url is required to use the twist command.")
kwargs.setdefault("comment", "send a message to twist")
a = list()
# curl -X POST -H 'Content-type: application/json' --data '{"content": "This is the message.", "title": "Message Title"}' "https://twist.com/api/v3/integration_incoming/post_data?install_id=116240&install_token=116240_bfb05bde51ecd0f728b4b161bee6fcee"
a.append("curl -X POST -H 'Content-type: application/json' --data")
data = '{"content": "%s", "title": "%s"}' % (message, title)
a.append("'%s'" % data)
a.append('"%s"' % url)
return Command(" ".join(a), **kwargs)
def udf(name, default=None, example=None, label=None, **kwargs):
"""Create a UDF prompt for a StackScript.
- name (str): The name of the variable.
- default: The default value.
- example: An example value, instead of a default.
- label (str): The label for the variable.
"""
kwargs.setdefault("prompt for %s in stackscript" % name)
label = label or name.replace("_", " ").title()
a = ['# <UDF name="%s" label="%s"' % (name, label)]
if default is not None:
a.append('default="%s"' % default)
elif example is not None:
a.append('example="%s"' % example)
else:
pass
a.append("/>")
return Command(" ".join(a), **kwargs)
# Mappings
COMMON_MAPPINGS = {
'pip': python_pip,
'run': run,
'slack': slack,
'twist': twist,
'udf': udf,
'virtualenv': python_virtualenv,
}

@ -1,188 +0,0 @@
# Imports
import os
from ..commands import Command
# Exports
__all__ = (
"DJANGO_MAPPINGS",
"django",
"django_check",
"django_collect_static",
"django_dumpdata",
"django_loaddata",
"django_migrate",
)
# Functions
def _django(name, *args, venv=None, **kwargs):
"""Process a django-based command.
:param name: The name of the management command.
:type name: str
:param venv: The virtual environment to use.
:type venv: str
args and kwargs are used to instantiate the command instance.
This exists because we need ``django()`` to serve as an interface for any management command.
"""
if venv is not None:
kwargs['prefix'] = "source %s/bin/activate" % venv
kwargs.setdefault("comment", "run %s django management command" % name)
# Base parameters need to be captured, because all others are assumed to be switches for the management command.
_kwargs = {
'comment': kwargs.pop("comment", None),
'condition': kwargs.pop("condition", None),
'cd': kwargs.pop("cd", None),
'environments': kwargs.pop("environments", None),
'function': kwargs.pop("function", None),
# 'local': kwargs.pop("local", False),
'prefix': kwargs.pop("prefix", None),
'register': kwargs.pop("register", None),
'shell': kwargs.pop("shell", "/bin/bash"),
'stop': kwargs.pop("stop", False),
'sudo': kwargs.pop('sudo', False),
'tags': kwargs.pop("tags", None),
}
statement = list()
statement.append("./manage.py %s" % name)
# Remaining kwargs are assumed to be switches.
for key, value in kwargs.items():
key = key.replace("_", "-")
if type(value) is bool:
if value is True:
statement.append("--%s" % key)
else:
statement.append("--%s=%s" % (key, value))
if len(args) > 0:
statement.append(" ".join(args))
return Command(" ".join(statement), **_kwargs)
def django(name, *args, venv=None, **kwargs):
"""Run any Django management command.
- name (str): The name of the management command.
- venv (str): The of the virtual environment to use.
args are passed as positional arguments, while kwargs are given as switches.
"""
if name == "check":
return django_check(venv=venv, **kwargs)
elif name in ("collectstatic", "static"):
return django_collect_static(venv=venv, **kwargs)
elif name == "migrate":
return django_migrate(venv=venv, **kwargs)
else:
return _django(name, *args, venv=venv, **kwargs)
def django_check(venv=None, **kwargs):
"""Run the Django check command.
- venv (str): The of the virtual environment to use.
"""
kwargs.setdefault("comment", "run django checks")
kwargs.setdefault("register", "django_checks_out")
return _django("check", venv=venv, **kwargs)
def django_collect_static(venv=None, **kwargs):
"""Collect static files.
- venv (str): The of the virtual environment to use.
"""
kwargs.setdefault("comment", "collect static files")
return _django("collectstatic", venv=venv, **kwargs)
def django_dumpdata(app_name, base_path="local", file_name="initial", indent=4, natural_foreign=False,
natural_primary=False, path=None, venv=None, **kwargs):
"""Dump data from the database.
- app_name (str): The name (app label) of the app. ``app_label.ModelName`` may also be given.
- base_path (str): The path under which apps are located in source.
- file_name (str): The file name to which the data will be dumped.
- indent (int): Indentation of the exported fixtures.
- natural_foreign (bool): Use the natural foreign parameter.
- natural_primary (bool): Use the natural primary parameter.
- path (str): The path to the data file.
- venv (str): The of the virtual environment to use.
"""
kwargs.setdefault("comment", "export fixtures for %s" % app_name)
output_format = kwargs.pop("format", "json")
_path = path or os.path.join(base_path, app_name, "fixtures", "%s.%s" % (file_name, output_format))
return _django(
"dumpdata",
app_name,
"> %s" % _path,
format=output_format,
indent=indent,
natural_foreign=natural_foreign,
natural_primary=natural_primary,
venv=venv,
**kwargs
)
def django_loaddata(app_name, base_path="local", file_name="initial", path=None, venv=None, **kwargs):
"""Load data into the database.
- app_name (str): The name (app label) of the app. ``app_label.ModelName`` may also be given.
- base_path (str): The path under which apps are located in source.
- file_name (str): The file name to which the data will be dumped.
- path (str): The path to the data file.
- venv (str): The of the virtual environment to use.
"""
kwargs.setdefault("comment", "load fixtures for %s" % app_name)
output_format = kwargs.pop("format", "json")
_path = path or os.path.join(base_path, app_name, "fixtures", "%s.%s" % (file_name, output_format))
return _django("loaddata", _path, venv=venv, **kwargs)
def django_migrate(venv=None, **kwargs):
"""Apply database migrations.
- venv (str): The of the virtual environment to use.
"""
kwargs.setdefault("comment", "run django database migrations")
return _django("migrate", venv=venv, **kwargs)
# Mapping
DJANGO_MAPPINGS = {
'django': django,
'django.check': django_check,
'django.collect_static': django_collect_static,
'django.dumpdata': django_dumpdata,
'django.loaddata': django_loaddata,
'django.migrate': django_migrate,
}

@ -1,262 +0,0 @@
# Imports
from ..commands import Command
# Exports
__all__ = (
"MYSQL_MAPPINGS",
"mysql_create",
"mysql_drop",
"mysql_dump",
"mysql_exec",
"mysql_exists",
"mysql_grant",
"mysql_user",
)
# Functions
def _get_mysql_command(host="localhost", name="mysql", password=None, port=3306, user="root"):
a = list()
a.append("%s --user=%s" % (name, user))
a.append("--host=%s" % host)
a.append("--port=%s" % port)
if password:
a.append('--password="%s"' % password)
return a
def mysql_create(database, host="localhost", owner=None, password=None, port=3306, user="root", **kwargs):
"""Create a MySQL database.
- database (str): The database name.
- host (str): The database host name or IP address.
- password (str): The password for the user with sufficient access privileges to execute the command.
- owner (str): The owner (user/role name) of the new database.
- port (int): The TCP port number of the MySQL service running on the host.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
kwargs.setdefault("comment", "create the %s mysql database" % database)
# MySQL commands always run without sudo because the --user may be provided.
kwargs['sudo'] = False
# Assemble the command.
a = _get_mysql_command(host=host, name="mysqladmin", password=password, port=port, user=user)
a.append("create %s" % database)
if owner:
grant = mysql_grant(
owner,
database=database,
host=host,
password=password,
port=port,
user=user,
)
a.append("&& %s" % grant.get_statement(suppress_comment=True))
return Command(" ".join(a), **kwargs)
def mysql_drop(database, host="localhost", password=None, port=3306, user="root", **kwargs):
"""Drop (remove) a MySQL database.
- database (str): The database name.
- host (str): The database host name or IP address.
- password (str): The password for the user with sufficient access privileges to execute the command.
- port (int): The TCP port number of the MySQL service running on the host.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
kwargs.setdefault("comment", "remove the %s mysql database" % database)
# MySQL commands always run without sudo because the --user may be provided.
kwargs['sudo'] = False
# Assemble the command.
a = _get_mysql_command(host=host, name="mysqladmin", password=password, port=port, user=user)
a.append("drop %s" % database)
return Command(" ".join(a), **kwargs)
def mysql_dump(database, file_name=None, host="localhost", inserts=False, password=None, port=3306, user="root",
**kwargs):
"""Dump (export) a MySQL database.
- database (str): The database name.
- host (str): The database host name or IP address.
- password (str): The password for the user with sufficient access privileges to execute the command.
- port (int): The TCP port number of the MySQL service running on the host.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
kwargs.setdefault("comment", "dump the %s mysql database" % database)
# MySQL commands always run without sudo because the --user may be provided.
# kwargs['sudo'] = False
# Assemble the command.
a = _get_mysql_command(host=host, name="mysqldump", password=password, port=port, user=user)
# if data_only:
# a.append("--no-create-info")
# elif schema_only:
# a.append("--no-data")
# else:
# pass
if inserts:
a.append("--complete-inserts")
a.append(database)
_file_name = file_name or "%s.sql" % database
a.append("> %s" % _file_name)
return Command(" ".join(a), **kwargs)
def mysql_exec(sql, database="default", host="localhost", password=None, port=3306, user="root", **kwargs):
"""Execute a MySQL statement.
- sql (str): The SQL to run.
- database (str): The name of the database.
- host (str): The host name.
- password (str): The password for the user with sufficient access privileges to execute the command.
- port (int): The TCP port number.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
kwargs['sudo'] = False
kwargs.setdefault("comment", "execute mysql statement")
a = _get_mysql_command(host=host, password=password, port=port, user=user)
a.append('--execute="%s"' % sql)
a.append(database)
return Command(" ".join(a), **kwargs)
def mysql_exists(database, host="localhost", password=None, port=3306, user="root", **kwargs):
"""Determine if a MySQL database exists.
- database (str): The database name.
- host (str): The database host name or IP address.
- password (str): The password for the user with sufficient access privileges to execute the command.
- port (int): The TCP port number of the MySQL service running on the host.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
kwargs.setdefault("comment", "determine if the %s mysql database exists" % database)
kwargs.setdefault("register", "mysql_database_exists")
# MySQL commands always run without sudo because the --user may be provided.
kwargs['sudo'] = False
# Assemble the command.
a = _get_mysql_command(host=host, password=password, port=port, user=user)
sql = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '%s'" % database
a.append('--execute="%s"' % sql)
return Command(" ".join(a), **kwargs)
def mysql_grant(to, database=None, host="localhost", password=None, port=3306, privileges="ALL", user="root", **kwargs):
"""Grant privileges to a user.
- to (str): The user name to which privileges are granted.
- database (str): The database name.
- host (str): The database host name or IP address.
- password (str): The password for the user with sufficient access privileges to execute the command.
- port (int): The TCP port number of the MySQL service running on the host.
- privileges (str): The privileges to be granted.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
kwargs.setdefault("comment", "grant mysql privileges to %s" % to)
a = _get_mysql_command(host=host, password=password, port=port, user=user)
# See https://dev.mysql.com/doc/refman/5.7/en/grant.html
_database = database or "*"
sql = "GRANT %(privileges)s ON %(database)s.* TO '%(user)s'@'%(host)s'" % {
'database': _database,
'host': host,
'privileges': privileges,
'user': to,
}
a.append('--execute="%s"' % sql)
return Command(" ".join(a))
def mysql_user(name, host="localhost", op="create", passwd=None, password=None, port=3306, user="root", **kwargs):
"""Work with a MySQL user.
- name (str): The user name.
- host (str): The host name.
- op (str): The operation to perform: ``create``, ``drop``, ``exists``.
- passwd (str): The password for a new user.
- password (str): The password for the user with sufficient access privileges to execute the command.
- port (int): The TCP port number.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
kwargs['sudo'] = False
if op == "create":
kwargs.setdefault("comment", "create %s mysql user" % name)
a = _get_mysql_command(host=host, password=password, port=port, user=user)
sql = "CREATE USER IF NOT EXISTS '%(user)s'@'%(host)s'" % {
'host': host,
'user': name,
}
if passwd:
sql += " IDENTIFIED BY PASSWORD('%s')" % passwd
a.append('--execute="%s"' % sql)
return Command(" ".join(a), **kwargs)
elif op == "drop":
kwargs.setdefault("comment", "drop %s mysql user" % name)
a = _get_mysql_command(host=host, password=password, port=port, user=user)
sql = "DROP USER IF EXISTS '%(user)s'@'%(host)s'" % {
'host': host,
'user': name,
}
a.append('--execute="%s"' % sql)
return Command(" ".join(a), **kwargs)
elif op == "exists":
kwargs.setdefault("comment", "determine if %s mysql user exists" % name)
kwargs.setdefault("register", "mysql_user_exists")
a = _get_mysql_command(host=host, password=password, port=port, user=user)
sql = "SELECT EXISTS(SELECT 1 FROM mysql.user WHERE user = '%s')" % name
a.append('--execute="%s"' % sql)
return Command(" ".join(a), **kwargs)
else:
raise NameError("Unrecognized or unsupported MySQL user operation: %s" % op)
MYSQL_MAPPINGS = {
'mysql.create': mysql_create,
'mysql.drop': mysql_drop,
'mysql.dump': mysql_dump,
'mysql.exists': mysql_exists,
'mysql.grant': mysql_grant,
'mysql.sql': mysql_exec,
'mysql.user': mysql_user,
}

@ -1,226 +0,0 @@
# Imports
from ..commands import Command
# Exports
__all__ = (
"PGSQL_MAPPINGS",
"pgsql_create",
"pgsql_drop",
"pgsql_dump",
"pgsql_exec",
"pgsql_exists",
"pgsql_user",
)
# Functions
def _get_pgsql_command(name, host="localhost", password=None, port=5432, user="postgres"):
"""Get a postgres-related command using commonly required parameters.
:param name: The name of the command.
:type name: str
:param host: The host name.
:type host: str
:param password: The password to use.
:type password: str
:param port: The TCP port number.
:type port: int
:param user: The user name that will be used to execute the command.
:rtype: list[str]
"""
a = list()
if password:
a.append('export PGPASSWORD="%s" &&' % password)
a.append(name)
a.append("--host=%s" % host)
a.append("--port=%s" % port)
a.append("--username=%s" % user)
return a
def pgsql_create(database, admin_pass=None, admin_user="postgres", host="localhost", owner=None, port=5432, template=None,
**kwargs):
"""Create a PostgreSQL database.
- database (str): The database name.
- admin_pass (str): The password for the user with sufficient access privileges to execute the command.
- admin_user (str): The name of the user with sufficient access privileges to execute the command.
- host (str): The database host name or IP address.
- owner (str): The owner (user/role name) of the new database.
- port (int): The port number of the Postgres service running on the host.
- template (str): The database template name to use, if any.
"""
_owner = owner or admin_user
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("createdb", host=host, password=admin_pass, port=port)
base.append("--owner=%s" % _owner)
if template is not None:
base.append("--template=%s" % template)
base.append(database)
return Command(" ".join(base), **kwargs)
def pgsql_drop(database, admin_pass=None, admin_user="postgres", host="localhost", port=5432, **kwargs):
"""Remove a PostgreSQL database.
- database (str): The database name.
- admin_pass (str): The password for the user with sufficient access privileges to execute the command.
- admin_user (str): The name of the user with sufficient access privileges to execute the command.
- host (str): The database host name or IP address.
- port (int): The port number of the Postgres service running on the host.
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("dropdb", host=host, password=admin_pass, port=port, user=admin_user)
base.append(database)
return Command(" ".join(base), **kwargs)
def pgsql_dump(database, admin_pass=None, admin_user="postgres", file_name=None, host="localhost", port=5432, **kwargs):
"""Export a Postgres database.
- database (str): The database name.
- admin_pass (str): The password for the user with sufficient access privileges to execute the command.
- admin_user (str): The name of the user with sufficient access privileges to execute the command.
- file_name (str): The name/path of the export file. Defaults the database name plus ``.sql``.
- host (str): The database host name or IP address.
- port (int): The port number of the Postgres service running on the host.
"""
_file_name = file_name or "%s.sql" % database
# Postgres commands always run without sudo because the -U may be provided.
# kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("pg_dump", host=host, password=admin_pass, port=port, user=admin_user)
base.append("--column-inserts")
base.append("--file=%s" % _file_name)
base.append(database)
return Command(" ".join(base), **kwargs)
def pgsql_exec(sql, database="template1", host="localhost", password=None, port=5432, user="postgres", **kwargs):
"""Execute a psql command.
- sql (str): The SQL to be executed.
- database (str): The database name.
- admin_pass (str): The password for the user with sufficient access privileges to execute the command.
- admin_user (str): The name of the user with sufficient access privileges to execute the command.
- host (str): The database host name or IP address.
- port (int): The port number of the Postgres service running on the host.
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
# Assemble the command.
base = _get_pgsql_command("psql", host=host, password=password, port=port, user=user)
base.append("--dbname=%s" % database)
base.append('-c "%s"' % sql)
return Command(" ".join(base), **kwargs)
def pgsql_exists(database, admin_pass=None, admin_user="postgres", host="localhost", port=5432, **kwargs):
"""Determine if a Postgres database exists.
- database (str): The database name.
- admin_pass (str): The password for the user with sufficient access privileges to execute the command.
- admin_user (str): The name of the user with sufficient access privileges to execute the command.
- host (str): The database host name or IP address.
- owner (str): The owner (user/role name) of the new database.
- port (int): The port number of the Postgres service running on the host.
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
kwargs.setdefault("register", "pgsql_db_exists")
base = _get_pgsql_command("psql", host=host, password=admin_pass, port=port, user=admin_user)
base.append(r"-lqt | cut -d \| -f 1 | grep -qw %s" % database)
return Command(" ".join(base), **kwargs)
def pgsql_user(name, admin_pass=None, admin_user="postgres", host="localhost", op="create", password=None, port=5432, **kwargs):
"""Work with a PostgreSQL user.
- name (str): The user name.
- host (str): The host name.
- op (str): The operation to perform: ``create``, ``drop``, ``exists``.
- passwd (str): The password for a new user.
- password (str): The password for the user with sufficient access privileges to execute the command.
- port (int): The TCP port number.
- user (str): The name of the user with sufficient access privileges to execute the command.
"""
# Postgres commands always run without sudo because the -U may be provided.
kwargs['sudo'] = False
if op == "create":
kwargs.setdefault("comment", "create %s postgres user" % name)
# Assemble the command.
base = _get_pgsql_command("createuser", host=host, password=admin_pass, port=port)
base.append("-DRS")
base.append(name)
if password is not None:
base.append("&& psql -h %s -U %s" % (host, admin_user))
base.append("-c \"ALTER USER %s WITH ENCRYPTED PASSWORD '%s';\"" % (name, password))
return Command(" ".join(base), **kwargs)
elif op == "drop":
kwargs.setdefault("comment", "drop %s postgres user" % name)
base = _get_pgsql_command("dropuser", host=host, password=admin_pass, port=port, user=admin_user)
base.append(name)
return Command(" ".join(base), **kwargs)
elif op == "exists":
kwargs.setdefault("comment", "determine if %s postgres user exits" % name)
kwargs.setdefault("register", "pgsql_use_exists")
base = _get_pgsql_command("psql", host=host, password=admin_pass, port=port, user=admin_user)
sql = "SELECT 1 FROM pgsql_roles WHERE rolname='%s'" % name
base.append('-c "%s"' % sql)
return Command(" ".join(base), **kwargs)
else:
raise NameError("Unrecognized or unsupported Postgres user operation: %s" % op)
PGSQL_MAPPINGS = {
'pgsql.create': pgsql_create,
'pgsql.drop': pgsql_drop,
'pgsql.dump': pgsql_dump,
'pgsql.exists': pgsql_exists,
'pgsql.sql': pgsql_exec,
'pgsql.user': pgsql_user,
}

@ -1,758 +0,0 @@
# Imports
from commonkit import indent, split_csv
import os
from ..commands import Command
# Exports
__all__ = (
"POSIX_MAPPINGS",
"archive",
"certbot",
"dialog",
"echo",
"extract",
"file_append",
"file_copy",
"file_write",
"mkdir",
"move",
"perms",
"prompt",
"remove",
"rename",
"rsync",
"scopy",
"sed",
"symlink",
"touch",
"wait",
"Function",
"Prompt",
)
# Functions
def archive(from_path, absolute=False, exclude=None, file_name="archive.tgz", strip=None, to_path=".", view=False,
**kwargs):
"""Create a file archive.
- from_path (str): The path that should be archived.
- absolute (bool): Set to ``True`` to preserve the leading slash.
- exclude (str): A pattern to be excluded from the archive.
- strip (int): Remove the specified number of leading elements from the path.
- to_path (str): Where the archive should be created. This should *not* include the file name.
- view (bool): View the output of the command as it happens.
"""
tokens = ["tar"]
switches = ["-cz"]
if absolute:
switches.append("P")
if view:
switches.append("v")
tokens.append("".join(switches))
if exclude:
tokens.append("--exclude %s" % exclude)
if strip:
tokens.append("--strip-components %s" % strip)
to_path = os.path.join(to_path, file_name)
tokens.append('-f %s %s' % (to_path, from_path))
name = " ".join(tokens)
return Command(name, **kwargs)
def certbot(domain_name, email=None, webroot=None, **kwargs):
"""Get new SSL certificate from Let's Encrypt.
- domain_name (str): The domain name for which the SSL certificate is requested.
- email (str): The email address of the requester sent to the certificate authority. Required.
- webroot (str): The directory where the challenge file will be created.
"""
_email = email or os.environ.get("SCRIPTTEASE_CERTBOT_EMAIL", None)
_webroot = webroot or os.path.join("/var", "www", "domains", domain_name.replace(".", "_"), "www")
if not _email:
raise ValueError("Email is required for certbot command.")
template = "certbot certonly --agree-tos --email %(email)s -n --webroot -w %(webroot)s -d %(domain_name)s"
name = template % {
'domain_name': domain_name,
'email': _email,
'webroot': _webroot,
}
return Command(name, **kwargs)
def dialog(message, height=15, title="Message", width=100, **kwargs):
"""Display a dialog message.
- message (str): The message to be displayed.
- height (int): The height of the dialog.
- title (str): The title of the dialog.
- width (int): The width of the dialog.
"""
kwargs.setdefault("comment", "display a dialog message")
a = list()
a.append('dialog --clear --backtitle "%s"' % title)
a.append('--msgbox "%s" %s %s; clear;' % (message, height, width))
return Command(" ".join(a), **kwargs)
def echo(message, **kwargs):
"""Echo a message.
- message (str): The message to be printed to screen.
"""
kwargs.setdefault("comment", "print message to screen")
return Command('echo "%s"' % message, **kwargs)
def extract(from_path, absolute=False, exclude=None, strip=None, to_path=None, view=False, **kwargs):
"""Extract a file archive.
- from_path (str): The path that should be archived.
- absolute (bool): Set to ``True`` to preserve the leading slash.
- exclude (str): A pattern to be excluded from the archive.
- strip (int): Remove the specified number of leading elements from the path.
- to_path (str): Where the archive should be extracted. This should *not* include the file name.
- view (bool): View the output of the command as it happens.
"""
_to_path = to_path or "./"
tokens = ["tar"]
switches = ["-xz"]
if absolute:
switches.append("P")
if view:
switches.append("v")
tokens.append("".join(switches))
if exclude:
tokens.append("--exclude %s" % exclude)
if strip:
tokens.append("--strip-components %s" % strip)
tokens.append('-f %s %s' % (from_path, _to_path))
name = " ".join(tokens)
return Command(name, **kwargs)
def file_append(path, content=None, **kwargs):
"""Append content to a file.
- path (str): The path to the file.
- content (str): The content to be appended.
"""
kwargs.setdefault("comment", "append to %s" % path)
statement = 'echo "%s" >> %s' % (content or "", path)
return Command(statement, **kwargs)
def file_copy(from_path, to_path, overwrite=False, recursive=False, **kwargs):
"""Copy a file or directory.
- from_path (str): The file or directory to be copied.
- to_path (str): The location to which the file or directory should be copied.
- overwrite (bool): Indicates files and directories should be overwritten if they exist.
- recursive (bool): Copy sub-directories.
"""
kwargs.setdefault("comment", "copy %s to %s" % (from_path, to_path))
a = list()
a.append("cp")
if not overwrite:
a.append("-n")
if recursive:
a.append("-R")
a.append(from_path)
a.append(to_path)
return Command(" ".join(a), **kwargs)
def file_write(path, content=None, **kwargs):
"""Write to a file.
- path (str): The file to be written.
- content (str): The content to be written. Note: If omitted, this command is equivalent to ``touch``.
"""
_content = content or ""
kwargs.setdefault("comment", "write to %s" % path)
a = list()
if len(_content.split("\n")) > 1:
a.append("cat > %s << EOF" % path)
a.append(_content)
a.append("EOF")
else:
a.append('echo "%s" > %s' % (_content, path))
return Command(" ".join(a), **kwargs)
def mkdir(path, mode=None, recursive=True, **kwargs):
"""Create a directory.
- path (str): The path to be created.
- mode (int | str): The access permissions of the new directory.
- recursive (bool): Create all directories along the path.
"""
kwargs.setdefault("comment", "create directory %s" % path)
statement = ["mkdir"]
if mode is not None:
statement.append("-m %s" % mode)
if recursive:
statement.append("-p")
statement.append(path)
return Command(" ".join(statement), **kwargs)
def move(from_path, to_path, **kwargs):
"""Move a file or directory.
- from_path (str): The current path.
- to_path (str): The new path.
"""
kwargs.setdefault("comment", "move %s to %s" % (from_path, to_path))
statement = "mv %s %s" % (from_path, to_path)
return Command(statement, **kwargs)
def perms(path, group=None, mode=None, owner=None, recursive=False, **kwargs):
"""Set permissions on a file or directory.
- path (str): The path to be changed.
- group (str): The name of the group to be applied.
- mode (int | str): The access permissions of the file or directory.
- owner (str): The name of the user to be applied.
- recursive: Create all directories along the path.
"""
commands = list()
kwargs['comment'] = "set permissions on %s" % path
if group is not None:
statement = ["chgrp"]
if recursive:
statement.append("-R")
statement.append(group)
statement.append(path)
commands.append(Command(" ".join(statement), **kwargs))
if owner is not None:
statement = ["chown"]
if recursive:
statement.append("-R")
statement.append(owner)
statement.append(path)
commands.append(Command(" ".join(statement), **kwargs))
if mode is not None:
statement = ["chmod"]
if recursive:
statement.append("-R")
statement.append(str(mode))
statement.append(path)
commands.append(Command(" ".join(statement), **kwargs))
kwargs.setdefault("comment", "set permissions on %s" % path)
a = list()
for c in commands:
a.append(c.get_statement(suppress_comment=True))
return Command("\n".join(a), **kwargs)
def prompt(name, back_title="Input", choices=None, default=None, fancy=False, help_text=None, label=None, **kwargs):
"""Prompt the user for input.
- name (str): The programmatic name of the input.
- back_title (str): The back title used with the dialog command.
- choices (str | list): A list of valid choices.
- default: The default value.
- fancy (bool): Use a dialog command for the prompt.
- help_text (str): The text to display with the dialog command.
- label (str): The label for the input.
"""
return Prompt(
name,
back_title=back_title,
choices=choices,
default=default,
fancy=fancy,
help_text=help_text,
label=label,
**kwargs
)
def remove(path, force=False, recursive=False, **kwargs):
"""Remove a file or directory.
- path (str): The path to be removed.
- force (bool): Force the removal.
- recursive (bool): Remove all directories along the path.
"""
kwargs.setdefault("comment", "remove %s" % path)
statement = ["rm"]
if force:
statement.append("-f")
if recursive:
statement.append("-r")
statement.append(path)
return Command(" ".join(statement), **kwargs)
def rename(from_name, to_name, **kwargs):
"""Rename a file or directory.
- from_name (str): The name (or path) of the existing file.
- to_name (str): The name (or path) of the new file.
"""
kwargs.setdefault("comment", "rename %s" % from_name)
return move(from_name, to_name, **kwargs)
def rsync(source, target, delete=False, exclude=None, host=None, key_file=None, links=True, port=22,
recursive=True, user=None, **kwargs):
"""Synchronize a directory structure.
- source (str): The source directory.
- target (str): The target directory.
- delete (bool): Indicates target files that exist in source but not in target should be removed.
- exclude (str): The path to an exclude file.
- host (str): The host name or IP address. This causes the command to run over SSH.
- key_file (str): The privacy SSH key (path) for remote connections. User expansion is automatically applied.
- links (bool): Include symlinks in the sync.
- port (int): The SSH port to use for remote connections.
- recursive (bool): Indicates source contents should be recursively synchronized.
- user (str): The user name to use for remote connections.
"""
# - guess: When ``True``, the ``host``, ``key_file``, and ``user`` will be guessed based on the base name of
# the source path.
# :type guess: bool
# if guess:
# host = host or os.path.basename(source).replace("_", ".")
# key_file = key_file or os.path.expanduser(os.path.join("~/.ssh", os.path.basename(source)))
# user = user or os.path.basename(source)
# else:
# host = host
# key_file = key_file
# user = user
kwargs.setdefault("comment", "sync %s with %s" % (source, target))
# rsync -e "ssh -i $(SSH_KEY) -p $(SSH_PORT)" -P -rvzc --delete
# $(OUTPUTH_PATH) $(SSH_USER)@$(SSH_HOST):$(UPLOAD_PATH) --cvs-exclude;
tokens = list()
tokens.append("rsync")
tokens.append("--cvs-exclude")
tokens.append("--checksum")
tokens.append("--compress")
if links:
tokens.append("--copy-links")
if delete:
tokens.append("--delete")
if exclude is not None:
tokens.append("--exclude-from=%s" % exclude)
# --partial and --progress
tokens.append("-P")
if recursive:
tokens.append("--recursive")
tokens.append(source)
conditions = [
host is not None,
key_file is not None,
user is not None,
]
if all(conditions):
tokens.append('-e "ssh -i %s -p %s"' % (key_file, port))
tokens.append("%s@%s:%s" % (user, host, target))
else:
tokens.append(target)
statement = " ".join(tokens)
return Command(statement, **kwargs)
def scopy(from_path, to_path, host=None, key_file=None, port=22, user=None, **kwargs):
"""Copy a file or directory to a remote server.
- from_path (str): The source directory.
- to_path (str): The target directory.
- host (str): The host name or IP address. Required.
- key_file (str): The privacy SSH key (path) for remote connections. User expansion is automatically applied.
- port (int): The SSH port to use for remote connections.
- user (str): The user name to use for remote connections.
"""
kwargs.setdefault("comment", "copy %s to remote %s" % (from_path, to_path))
# TODO: What to do to force local versus remote commands?
# kwargs['local'] = True
kwargs['sudo'] = False
statement = ["scp"]
if key_file is not None:
statement.append("-i %s" % key_file)
statement.append("-P %s" % port)
statement.append(from_path)
if host is not None and user is not None:
statement.append("%s@%s:%s" % (user, host, to_path))
elif host is not None:
statement.append("%s:%s" % (host, to_path))
else:
raise ValueError("Host is a required keyword argument.")
return Command(" ".join(statement), **kwargs)
def sed(path, backup=".b", delimiter="/", find=None, replace=None, **kwargs):
"""Find and replace text in a file.
- path (str): The path to the file to be edited.
- backup (str): The backup file extension to use.
- delimiter (str): The pattern delimiter.
- find (str): The old text. Required.
- replace (str): The new text. Required.
"""
kwargs.setdefault("comment", "find and replace in %s" % path)
context = {
'backup': backup,
'delimiter': delimiter,
'path': path,
'pattern': find,
'replace': replace,
}
template = "sed -i %(backup)s 's%(delimiter)s%(pattern)s%(delimiter)s%(replace)s%(delimiter)sg' %(path)s"
statement = template % context
return Command(statement, **kwargs)
def symlink(source, force=False, target=None, **kwargs):
"""Create a symlink.
- source (str): The source of the link.
- force (bool): Force the creation of the link.
- target (str): The name or path of the target. Defaults to the base name of the source path.
"""
_target = target or os.path.basename(source)
kwargs.setdefault("comment", "link to %s" % source)
statement = ["ln -s"]
if force:
statement.append("-f")
statement.append(source)
statement.append(_target)
return Command(" ".join(statement), **kwargs)
def touch(path, **kwargs):
"""Touch a file or directory.
- path (str): The file or directory to touch.
"""
kwargs.setdefault("comment", "touch %s" % path)
return Command("touch %s" % path, **kwargs)
def wait(seconds, **kwargs):
"""Pause execution for a number of seconds.
- seconds (int): The number of seconds to wait.
"""
kwargs.setdefault("comment", "pause for %s seconds" % seconds)
return Command("sleep %s" % seconds, **kwargs)
# Classes
class Function(object):
"""A function that may be used to organize related commands to be called together."""
def __init__(self, name, commands=None, comment=None):
"""Initialize a function.
:param name: The name of the function.
:type name: str
:param commands: The command instances to be included in the function's output.
:type commands: list
:param comment: A comment regarding the function.
:type comment: str
"""
self.commands = commands or list()
self.comment = comment
self.name = name
def to_string(self):
"""Export the function as a string.
:rtype: str
"""
a = list()
if self.comment is not None:
a.append("# %s" % self.comment)
a.append("function %s()" % self.name)
a.append("{")
for command in self.commands:
a.append(indent(command.get_statement(cd=True)))
a.append("")
a.append("}")
return "\n".join(a)
class Prompt(Command):
"""Prompt the user for input."""
def __init__(self, name, back_title="Input", choices=None, default=None, fancy=False, help_text=None, label=None,
**kwargs):
"""Initialize a prompt for user input.
:param name: The variable name.
:type name: str
:param back_title: The back title of the input. Used only when ``dialog`` is enabled.
:type back_title: str
:param choices: Valid choices for the variable. May be given as a list of strings or a comma separated string.
:type choices: list[str] | str
:param default: The default value of the variable.
:param fancy: Indicates the dialog command should be used.
:type fancy: bool
:param help_text: Additional text to display. Only use when ``fancy`` is ``True``.
:type help_text: str
:param label: The label of the prompt.
"""
self.back_title = back_title
self.default = default
self.dialog_enabled = fancy
self.help_text = help_text
self.label = label or name.replace("_", " ").title()
self.variable_name = name
if type(choices) in (list, tuple):
self.choices = choices
elif type(choices) is str:
self.choices = split_csv(choices, smart=False)
# for i in choices.split(","):
# self.choices.append(i.strip())
else:
self.choices = None
kwargs.setdefault("comment", "prompt user for %s input" % name)
super().__init__(name, **kwargs)
def get_statement(self, cd=False, suppress_comment=False):
"""Get the statement using dialog or read."""
if self.dialog_enabled:
return self._get_dialog_statement()
return self._get_read_statement()
def _get_dialog_statement(self):
"""Get the dialog statement."""
a = list()
a.append('dialog --clear --backtitle "%s" --title "%s"' % (self.back_title, self.label))
if self.choices is not None:
a.append('--menu "%s" 15 40 %s' % (self.help_text or "Select", len(self.choices)))
count = 1
for choice in self.choices:
a.append('"%s" %s' % (choice, count))
count += 1
a.append('2>/tmp/input.txt')
else:
if self.help_text is not None:
a.append('--inputbox "%s"' % self.help_text)
else:
a.append('--inputbox ""')
a.append('8 60 2>/tmp/input.txt')
b = list()
b.append('touch /tmp/input.txt')
b.append(" ".join(a))
b.append('%s=$(</tmp/input.txt)' % self.variable_name)
b.append('clear')
b.append('rm /tmp/input.txt')
if self.default is not None:
b.append('if [[ -z "$%s" ]]; then %s="%s"; fi;' % (self.variable_name, self.variable_name, self.default))
# b.append('echo "$%s"' % self.name)
return "\n".join(b)
def _get_read_statement(self):
"""Get the standard read statement."""
a = list()
if self.choices is not None:
a.append('echo "%s "' % self.label)
options = list()
for choice in self.choices:
options.append('"%s"' % choice)
a.append('options=(%s)' % " ".join(options))
a.append('select opt in "${options[@]}"')
a.append('do')
a.append(' case $opt in')
for choice in self.choices:
a.append(' "%s") %s=$opt; break;;' % (choice, self.variable_name))
# a.append(' %s) %s=$opt;;' % ("|".join(self.choices), self.name))
a.append(' *) echo "invalid choice";;')
a.append(' esac')
a.append('done')
# a.append("read %s" % self.name)
else:
a.append('echo -n "%s "' % self.label)
a.append("read %s" % self.variable_name)
if self.default is not None:
a.append('if [[ -z "$%s" ]]; then %s="%s"; fi;' % (self.variable_name, self.variable_name, self.default))
# a.append('echo "$%s"' % self.name)
return "\n".join(a)
# Mappings
POSIX_MAPPINGS = {
'append': file_append,
'archive': archive,
'certbot': certbot,
'copy': file_copy,
'dialog': dialog,
'echo': echo,
'extract': extract,
'func': Function,
# 'function': Function,
'mkdir': mkdir,
'move': move,
'perms': perms,
'prompt': prompt,
'push': rsync,
'remove': remove,
'rename': rename,
'rsync': rsync,
'scopy': scopy,
'sed': sed,
'ssl': certbot,
'symlink': symlink,
'touch': touch,
'wait': wait,
'write': file_write,
}

@ -1,338 +0,0 @@
# Imports
from commonkit import split_csv
from ..commands import Command, Template
from .common import COMMON_MAPPINGS
from .django import DJANGO_MAPPINGS
from .mysql import MYSQL_MAPPINGS
from .pgsql import PGSQL_MAPPINGS
from .posix import POSIX_MAPPINGS, Function
# Exports
__all__ = (
"MAPPINGS",
"apache",
"apache_disable_module",
"apache_disable_site",
"apache_enable_module",
"apache_enable_site",
"apache_reload",
"apache_restart",
"apache_start",
"apache_stop",
"apache_test",
"command_exists",
"service_reload",
"service_restart",
"service_start",
"service_stop",
"system",
"system_install",
"system_reboot",
"system_update",
"system_upgrade",
"system_uninstall",
"template",
"user",
"Function",
)
def command_exists(name):
"""Indicates whether a given command exists in this overlay.
:param name: The name of the command.
:type name: str
:rtype: bool
"""
return name in MAPPINGS
def apache(op, **kwargs):
"""Execute an Apache-related command.
- op (str): The operation to perform; reload, restart, start, stop, test.
"""
if op == "reload":
return apache_reload(**kwargs)
elif op == "restart":
return apache_restart(**kwargs)
elif op == "start":
return apache_start(**kwargs)
elif op == "stop":
return apache_stop(**kwargs)
elif op == "test":
return apache_test(**kwargs)
else:
raise NameError("Unrecognized or unsupported apache operation: %s" % op)
def apache_disable_module(name, **kwargs):
"""Disable an Apache module.
- name (str): The module name.
"""
kwargs.setdefault("comment", "disable %s apache module" % name)
return Command("a2dismod %s" % name, **kwargs)
def apache_disable_site(name, **kwargs):
"""Disable an Apache site.
- name (str): The domain name.
"""
kwargs.setdefault("comment", "disable %s apache site" % name)
return Command("a2dissite %s" % name, **kwargs)
def apache_enable_module(name, **kwargs):
"""Enable an Apache module.
- name (str): The module name.
"""
kwargs.setdefault("comment", "enable %s apache module" % name)
return Command("a2enmod %s" % name, **kwargs)
def apache_enable_site(name, **kwargs):
"""Enable an Apache site.
"""
kwargs.setdefault("comment", "enable %s apache module" % name)
return Command("a2ensite %s" % name, **kwargs)
def apache_reload(**kwargs):
kwargs.setdefault("comment", "reload apache")
kwargs.setdefault("register", "apache_reloaded")
return Command("service apache2 reload", **kwargs)
def apache_restart(**kwargs):
kwargs.setdefault("comment", "restart apache")
kwargs.setdefault("register", "apache_restarted")
return Command("service apache2 restart", **kwargs)
def apache_start(**kwargs):
kwargs.setdefault("comment", "start apache")
kwargs.setdefault("register", "apache_started")
return Command("service apache2 start", **kwargs)
def apache_stop(**kwargs):
kwargs.setdefault("comment", "stop apache")
return Command("service apache2 stop", **kwargs)
def apache_test(**kwargs):
kwargs.setdefault("comment", "check apache configuration")
kwargs.setdefault("register", "apache_checks_out")
return Command("apachectl configtest", **kwargs)
def service_reload(name, **kwargs):
"""Reload a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "reload %s service" % name)
kwargs.setdefault("register", "%s_reloaded" % name)
return Command("service %s reload" % name, **kwargs)
def service_restart(name, **kwargs):
"""Restart a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "restart %s service" % name)
kwargs.setdefault("register", "%s_restarted" % name)
return Command("service %s restart" % name, **kwargs)
def service_start(name, **kwargs):
"""Start a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "start %s service" % name)
kwargs.setdefault("register", "%s_started" % name)
return Command("service %s start" % name, **kwargs)
def service_stop(name, **kwargs):
"""Stop a service.
- name (str): The service name.
"""
kwargs.setdefault("comment", "stop %s service" % name)
kwargs.setdefault("register", "%s_stopped" % name)
return Command("service %s stop" % name, **kwargs)
def system(op, **kwargs):
"""Perform a system operation.
- op (str): The operation to perform; reboot, update, upgrade.
"""
if op == "reboot":
return system_reboot(**kwargs)
elif op == "update":
return system_update(**kwargs)
elif op == "upgrade":
return system_upgrade(**kwargs)
else:
raise NameError("Unrecognized or unsupported system operation: %s" % op)
def system_install(name, **kwargs):
"""Install a system-level package.
- name (str): The name of the package to install.
"""
kwargs.setdefault("comment", "install system package %s" % name)
return Command("apt-get install -y %s" % name, **kwargs)
def system_reboot(**kwargs):
kwargs.setdefault("comment", "reboot the system")
return Command("reboot", **kwargs)
def system_uninstall(name, **kwargs):
"""Uninstall a system-level package.
- name (str): The name of the package to uninstall.
"""
kwargs.setdefault("comment", "remove system package %s" % name)
return Command("apt-get uninstall -y %s" % name, **kwargs)
def system_update(**kwargs):
kwargs.setdefault("comment", "update system package info")
return Command("apt-get update -y", **kwargs)
def system_upgrade(**kwargs):
kwargs.setdefault("comment", "upgrade the system")
return Command("apt-get upgrade -y", **kwargs)
def template(source, target, backup=True, parser=None, **kwargs):
"""Create a file from a template.
- source (str): The path to the template file.
- target (str): The path to where the new file should be created.
- backup (bool): Indicates whether a backup should be made if the target file already exists.
- parser (str): The parser to use ``jinja`` (the default) or ``simple``.
"""
return Template(source, target, backup=backup, parser=parser, **kwargs)
def user(name, groups=None, home=None, op="add", password=None, **kwargs):
"""Create or remove a user.
- name (str): The user name.
- groups (str | list): A list of groups to which the user should belong.
- home (str): The path to the user's home directory.
- op (str); The operation to perform; ``add`` or ``remove``.
- password (str): The user's password. (NOT IMPLEMENTED)
"""
if op == "add":
kwargs.setdefault("comment", "create a user named %s" % name)
commands = list()
# The gecos switch eliminates the prompts.
a = list()
a.append('adduser %s --disabled-password --gecos ""' % name)
if home is not None:
a.append("--home %s" % home)
commands.append(Command(" ".join(a), **kwargs))
if type(groups) is str:
groups = split_csv(groups, smart=False)
if type(groups) in [list, tuple]:
for group in groups:
commands.append(Command("adduser %s %s" % (name, group), **kwargs))
a = list()
for c in commands:
a.append(c.get_statement(suppress_comment=True))
return Command("\n".join(a), **kwargs)
elif op == "remove":
kwargs.setdefault("comment", "remove a user named %s" % name)
return Command("deluser %s" % name, **kwargs)
else:
raise NameError("Unsupported or unrecognized operation: %s" % op)
MAPPINGS = {
'apache': apache,
'apache.disable_module': apache_disable_module,
'apache.disable_site': apache_disable_site,
'apache.enable_module': apache_enable_module,
'apache.enable_site': apache_enable_site,
'apache.reload': apache_reload,
'apache.restart': apache_restart,
'apache.start': apache_start,
'apache.stop': apache_stop,
'apache.test': apache_test,
'install': system_install,
'reboot': system_reboot,
'reload': service_reload,
'restart': service_restart,
'start': service_start,
'stop': service_stop,
'system': system,
'template': template,
'update': system_update,
'uninstall': system_uninstall,
'upgrade': system_upgrade,
'user': user,
}
MAPPINGS.update(COMMON_MAPPINGS)
MAPPINGS.update(DJANGO_MAPPINGS)
MAPPINGS.update(MYSQL_MAPPINGS)
MAPPINGS.update(PGSQL_MAPPINGS)
MAPPINGS.update(POSIX_MAPPINGS)

@ -1,69 +0,0 @@
# Classes
class Script(object):
"""A script is a collection of commands."""
def __init__(self, name, commands=None, functions=None, shell="bash"):
"""Initialize a script.
:param name: The name of the script. Note: This becomes the file name.
:type name: str
:param commands: The commands to be included.
:type commands: list[scripttease.library.commands.base.Command]
:param functions: The functions to be included.
:type functions: list[Function]
:param shell: The shell to use for the script.
:type shell: str
"""
self.commands = commands or list()
self.functions = functions
self.name = name
self.shell = shell
def __str__(self):
return self.to_string()
def append(self, command):
"""Append a command instance to the script's commands.
:param command: The command instance to be included.
:type command: BaseType[Command] | ItemizedCommand
"""
self.commands.append(command)
def to_string(self, shebang="#! /usr/bin/env %(shell)s"):
"""Export the script as a string.
:param shebang: The shebang to be included. Set to ``None`` to omit the shebang.
:type shebang: str
:rtype: str
"""
a = list()
if shebang is not None:
a.append(shebang % {'shell': self.shell})
a.append("")
if self.functions is not None:
for function in self.functions:
a.append(function.to_string())
a.append("")
# for function in self.functions:
# a.append("%s;" % function.name)
a.append("")
for command in self.commands:
a.append(command.get_statement(cd=True))
a.append("")
return "\n".join(a)

@ -1,4 +0,0 @@
# Imports
from .ini import Config
from .utils import filter_commands, load_commands, load_config

@ -1,80 +0,0 @@
# Imports
from commonkit import File
from ..factory import Factory
from ..library.scripts import Script
# Exports
__all__ = (
"Parser",
)
# Classes
class Parser(File):
"""Base class for implementing a command parser."""
def __init__(self, path, context=None, locations=None, options=None, overlay="ubuntu"):
super().__init__(path)
self.context = context
self.factory = Factory(overlay)
self.is_loaded = False
self.locations = locations or list()
self.options = options or dict()
self.overlay = overlay
self._commands = list()
self._functions = list()
def as_script(self):
"""Convert loaded commands to a script.
:rtype: Script
"""
return Script(
"%s.sh" % self.name,
commands=self.get_commands(),
functions=self.get_functions()
)
def get_commands(self):
"""Get the commands that have been loaded from the file.
:rtype: list[BaseType[scripttease.library.commands.base.Command]]
"""
a = list()
for c in self._commands:
if c.function is not None:
continue
a.append(c)
return a
def get_functions(self):
"""Get the functions that have been loaded from the file.
:rtype: list[scripttease.library.scripts.Function]
"""
a = list()
for f in self._functions:
for c in self._commands:
if c.function is not None and f.name == c.function:
f.commands.append(c)
a.append(f)
return a
def load(self):
"""Load the factory and the configuration file.
:rtype: bool
"""
raise NotImplementedError()

@ -1,179 +0,0 @@
# Imports
from commonkit import parse_jinja_template, read_file, smart_cast, split_csv
from configparser import ConfigParser, ParsingError
import logging
import os
from ..constants import LOGGER_NAME
from ..library.commands import ItemizedCommand
from ..library.commands.templates import Template
from .base import Parser
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"Config",
)
# Classes
class Config(Parser):
"""An INI configuration for loading commands."""
def load(self):
"""Load commands from a INI file."""
if not self.exists:
return False
if not self.factory.load():
return False
ini = self._load_ini()
if ini is None:
return False
success = True
for comment in ini.sections():
args = list()
command_name = None
count = 0
kwargs = self.options.copy()
kwargs['comment'] = comment
for key, value in ini.items(comment):
# The first key/value pair is the command name and arguments.
if count == 0:
command_name = key
# Arguments surrounded by quotes are considered to be one argument. All others are split into a
# list to be passed to the callback. It is also possible that this is a call where no arguments are
# present, so the whole thing is wrapped to protect against an index error.
try:
if value[0] == '"':
args.append(value.replace('"', ""))
else:
args = value.split(" ")
except IndexError:
pass
else:
_key, _value = self._get_key_value(key, value)
kwargs[_key] = _value
count += 1
command = self.factory.get_command(command_name, *args, **kwargs)
if command is not None:
if isinstance(command, self.factory.overlay.Function):
self._functions.append(command)
elif isinstance(command, Template):
self._load_template(command)
self._commands.append(command)
elif isinstance(command, ItemizedCommand):
itemized_template = False
for c in command.get_commands():
if isinstance(c, Template):
itemized_template = True
self._load_template(c)
self._commands.append(c)
if not itemized_template:
self._commands.append(command)
else:
self._commands.append(command)
# if isinstance(command, Function):
# self._functions.append(command)
# elif isinstance(command, Include):
# subcommands = self._load_include(command)
# if subcommands is not None:
# self._commands += subcommands
# elif isinstance(command, Template):
# self._load_template(command)
# self._commands.append(command)
# elif isinstance(command, ItemizedCommand) and issubclass(command.command_class, Template):
# for c in command.get_commands():
# self._load_template(c)
# self._commands.append(c)
# else:
# self._commands.append(command)
else:
success = False
self.is_loaded = success
return self.is_loaded
# noinspection PyMethodMayBeStatic
def _get_key_value(self, key, value):
"""Process a key/value pair from an INI section.
:param key: The key to be processed.
:type key: str
:param value: The value to be processed.
:rtype: tuple
:returns: The key and value, both of which may be modified from the originals.
"""
if key in ("environments", "environs", "envs", "env"):
_key = "environments"
_value = split_csv(value)
elif key in ("func", "function"):
_key = "function"
_value = value
elif key == "items":
_key = "items"
_value = split_csv(value)
elif key == "tags":
_key = "tags"
_value = split_csv(value)
else:
_key = key
_value = smart_cast(value)
return _key, _value
def _load_ini(self):
"""Load the configuration file.
:rtype: ConfigParser | None
"""
ini = ConfigParser()
if self.context is not None:
try:
content = parse_jinja_template(self.path, self.context)
except Exception as e:
log.error("Failed to parse %s as template: %s" % (self.path, e))
return None
else:
content = read_file(self.path)
try:
ini.read_string(content)
return ini
except ParsingError as e:
log.error("Failed to parse %s: %s" % (self.path, e))
return None
def _load_template(self, command):
"""Load additional resources for a template command.
:param command: The template command.
:type command: Template
"""
# This may produce problems if template kwargs are the same as the given context.
if self.context is not None:
command.context.update(self.context)
# Custom locations come before default locations.
command.locations += self.locations
# This allows template files to be specified relative to the configuration file.
command.locations.append(os.path.join(self.directory, "templates"))
command.locations.append(self.directory)

@ -1,325 +0,0 @@
# Imports
from commonkit import any_list_item, smart_cast, split_csv
from configparser import RawConfigParser
import logging
import os
from ..constants import LOGGER_NAME
from .ini import Config
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"filter_commands",
"load_commands",
"load_config",
"load_variables",
"Context",
"Variable",
)
# Functions
def filter_commands(commands, environments=None, tags=None):
"""Filter commands based on the given criteria.
:param commands: The commands to be filtered.
:type commands: list
:param environments: Environment names to be matched.
:type environments: list[str]
:param tags: Tag names to be matched.
:type tags: list[str]
"""
filtered = list()
for command in commands:
if environments is not None and len(command.environments) > 0:
if not any_list_item(environments, command.environments):
continue
if tags is not None:
if not any_list_item(tags, command.tags):
continue
filtered.append(command)
return filtered
def load_commands(path, filters=None, overlay="ubuntu", **kwargs):
"""Load commands from a configuration file.
:param path: The path to the configuration file.
:type path: str
:param filters: Used to filter commands.
:type filters: dict
:param overlay: The name of the command overlay to apply to generated commands.
:type overlay: str
:rtype: list[scriptetease.library.commands.base.Command] | scriptetease.library.commands.base.ItemizedCommand] |
None
:returns: A list of command instances or ``None`` if the configuration could not be loaded.
kwargs are passed to the configuration class for instantiation.
"""
_config = load_config(path, overlay, **kwargs)
if _config is None:
return None
commands = _config.get_commands()
if filters is not None:
criteria = dict()
for attribute, values in filters.items():
criteria[attribute] = values
commands = filter_commands(commands, **criteria)
return commands
def load_config(path, overlay="ubuntu", **kwargs):
"""Load a command configuration.
:param path: The path to the configuration file.
:type path: str
:param overlay: The name of the command overlay to apply to generated commands.
:type overlay: str
:rtype: Config | None
kwargs are passed to the configuration class for instantiation.
"""
if path.endswith(".ini"):
_config = Config(path, overlay=overlay, **kwargs)
# elif path.endswith(".yml"):
# _config = YAML(path, **kwargs)
else:
log.warning("Input file format is not currently supported: %s" % path)
return None
if not _config.load():
log.error("Failed to load config file: %s" % path)
return None
return _config
def load_variables(path, environment=None):
"""Load variables from a file.
:param path: The path to the file.
:type path: str
:param environment: Filter variables by the given environment name.
:type environment: str
:rtype: list[scripttease.parsers.utils.Variable]
"""
if not os.path.exists(path):
log.warning("Path to variables file does not exist: %s" % path)
return list()
if path.endswith(".ini"):
return _load_variables_ini(path, environment=environment)
else:
log.warning("Variable file format is not currently supports: %s" % path)
return list()
def _load_variables_ini(path, environment=None):
"""Load variables from an INI file. See ``load_variables()``."""
ini = RawConfigParser()
ini.read(path)
a = list()
for section in ini.sections():
if ":" in section:
variable_name, _environment = section.split(":")
else:
_environment = None
variable_name = section
_kwargs = {
'environment': _environment,
}
for key, value in ini.items(section):
if key == "tags":
value = split_csv(value)
else:
value = smart_cast(value)
_kwargs[key] = value
a.append(Variable(variable_name, **_kwargs))
if environment is not None:
b = list()
for var in a:
if var.environment and var.environment == environment or var.environment is None:
b.append(var)
return b
return a
# Classes
class Context(object):
"""A collection of variables."""
def __init__(self, **kwargs):
"""Initialize the context.
kwargs are added as variable instances.
"""
self.variables = dict()
for key, value in kwargs.items():
self.add(key, value)
def __getattr__(self, item):
if item in self.variables:
return self.variables[item].value
return None
def __repr__(self):
return "<%s (%s)>" % (self.__class__.__name__, len(self.variables))
def add(self, name, value, environment=None, tags=None):
"""Add a variable to the context.
:param name: The name of the variable.
:type name: str
:param value: The value of the variable in this context.
:param environment: The environment name to which the variable applies. ``None`` applies to all environments.
:type environment: str
:param tags: A list of tags that describe the variable.
:type tags: list[str]
:rtype: scripttease.parsers.utils.Variable
:raise: RuntimeError
:raises: ``RuntimeError`` if the variable already exists.
"""
if name in self.variables:
raise RuntimeError("Variable already exists: %s" % name)
v = Variable(name, value, environment=environment, tags=tags)
self.variables[name] = v
return v
def get(self, name, default=None):
"""Get a the value of the variable from the context.
:param name: The name of the variable.
:type name: str
:param default: The default value to return.
"""
if not self.has(name):
return default
return self.variables[name].value
def has(self, name):
"""Indicates whether the named variable exists in this context, and whether the value is not ``None``.
:rtype: bool
"""
if name not in self.variables:
return False
return self.variables[name].value is not None
def join(self, variables):
"""Join a list of variables to the context.
:param variables: the list of variables to be added.
:type variables: list[scripttease.parsers.utils.Variable]
.. note::
This *replaces* a variable if it already exists.
"""
for v in variables:
self.variables[v.name] = v
def mapping(self):
"""Export the context as a dictionary.
:rtype: dict
"""
values = dict()
for key, var in self.variables.items():
values[key] = var.value or var.default
return values
def merge(self, context):
"""Merge another context with this one.
:param context: The context to be merged.
:type context: scripttease.parser.utils.Context
.. note::
Variables that exist in the current context are *not* replaced with variables from the provided context.
"""
for name, var in context.variables.items():
if not self.has(name):
self.variables[name] = var
class Variable(object):
"""Represents a variable to be used in the context of pre-processing a config file."""
def __init__(self, name, value, **kwargs):
"""Initialize a variable.
:param name: The variable name.
:type name: str
:param value: The value of the variable.
kwargs are added as attributes of the instance.
"""
self.name = name
self.value = value
kwargs.setdefault("tags", list())
self._attributes = kwargs
def __eq__(self, other):
return self.value == other
def __getattr__(self, item):
return self._attributes.get(item)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.name)

@ -1,5 +1,5 @@
DATE = "2022-09-24"
VERSION = "6.8.33"
MAJOR = 6
MINOR = 8
PATCH = 33
DATE = "2023-03-30"
VERSION = "7.0.0"
MAJOR = 7
MINOR = 0
PATCH = 0

@ -13,7 +13,7 @@ def read_file(path):
setup(
name='python-scripttease',
version=read_file("PEP440.txt"),
version=read_file("VERSION.txt"),
description=read_file("DESCRIPTION.txt"),
long_description=read_file("README.markdown"),
long_description_content_type="text/markdown",

@ -1,17 +1,5 @@
[disable the default site]
apache = disable
site = default
[enable SSL]
apache = enable
mod = ssl
[original syntax disable the default site]
apache.disable_site = default
[original syntax enable SSL]
[enable SSL]
apache.enable_module = ssl
[atlernative syntax disable the default site]
apache = site default
state = disabled

@ -0,0 +1,9 @@
[disable the default site]
apache.disable_site = default
[enable SSL]
apache.enable_module = ssl
; see test_lib_factories
[this will raise a type error because bad_custom_command doesn't accept kwargs]
bad_custom = path/to/display

@ -0,0 +1,15 @@
[domain_name
value = example.com
[domain_tld]
value = example_com
[debug_enabled:testing]
value = True
[postgres_version]
value = 11
tags = postgres
[mailgun_domain:live]
value = mg.example.com

@ -0,0 +1,9 @@
[disable the default site]
apache.disable_site = default
[enable SSL]
apache.enable_module = ssl
; see test_lib_factories
[this requires that a mapping be passed to the command factory]
custom = path/to/display

@ -1,3 +1,6 @@
[here's how to install the virtualenv package]
explain = Installing virtualenv is easy.
[install the virtualenv package]
pip = virtualenv
tags = python-support
@ -7,6 +10,9 @@ virtualenv = python
cd = /path/to/project
tags = python-support
[a nice soft pillow]
screenshot = static/images/pillow.jpg
[install pillow]
pip = Pillow
cd = /path/to/project

@ -0,0 +1,3 @@
TESTING = %(testing)s
TOTAL_TIMES = %(times)s

@ -12,4 +12,4 @@ value = 11
tags = postgres
[mailgun_domain:live]
value = "mg.example.com
value = mg.example.com

@ -1,45 +0,0 @@
import pytest
from scripttease.library.commands import Command, ItemizedCommand
from scripttease.factory import Factory
class TestFactory(object):
def test_get_command(self):
f = Factory("ubuntu")
with pytest.raises(RuntimeError):
f.get_command("testing")
f = Factory("ubuntu")
f.load()
# Non-existent command.
c = f.get_command("nonexistent")
assert c is None
# A good command with itemized parameters.
c = f.get_command(
"pip",
"$item",
items=["Pillow", "psycopg2-binary", "django"]
)
assert isinstance(c, ItemizedCommand)
# A good, normal command.
c = f.get_command("pip", "django")
assert isinstance(c, Command)
# Command exists, but given bad arguments.
c = f.get_command("pip")
assert c is None
def test_load(self):
f = Factory("nonexistent")
assert f.load() is False
f = Factory("ubuntu")
assert f.load() is True
def test_repr(self):
f = Factory("centos")
assert repr(f) == "<Factory centos>"

@ -0,0 +1,416 @@
from scripttease.lib.commands.base import Command, Content, ItemizedCommand, MultipleCommands, Prompt, Sudo, Template
from scripttease.lib.commands.python import python_pip
class TestCommand(object):
def test_getattr(self):
c = Command("ls -ls", extra=True)
assert c.extra is True
def test_get_statement(self):
c = Command(
"ls -ls",
comment="kitchen sink",
condition="$last_command -eq 0",
cd="/path/to/project",
prefix="source python/bin/active",
register="list_success",
stop=True,
sudo="deploy"
)
statement = c.get_statement(cd=True)
assert "( cd" in statement
assert "sudo" in statement
assert ")" in statement
assert "# kitchen sink" in statement
assert "if [[ $last_command" in statement
assert "list_success=$?" in statement
assert "if [[ $list_success" in statement
c = Command(
"ls -ls",
stop=True
)
statement = c.get_statement()
assert "if [[ $?" in statement
def test_init(self):
c = Command("ls -ls", sudo=Sudo(user="deploy"))
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "deploy"
c = Command("ls -ls", sudo="deploy")
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "deploy"
c = Command("ls -ls", sudo=True)
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "root"
c = Command("ls -ls")
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "root"
assert c.sudo.enabled is False
def test_is_itemized(self):
c = Command("ls -ls")
assert c.is_itemized is False
def test_repr(self):
c = Command("ls -ls", comment="listing")
assert repr(c) == "<Command listing>"
c = Command("ls -ls")
assert repr(c) == "<Command>"
class TestContent(object):
def test_getattr(self):
c = Content("testing", extra=True)
assert c.extra is True
def test_get_image_output(self):
c = Content("screenshot", caption="this is a test", image="static/images/testing.png")
assert "# this is a test" in c.get_output("bash")
assert "# static/images/testing.png" in c.get_output("bash")
c.css = "img right"
c.height = 100
c.width = 150
assert '<img src="static/images/testing.png"' in c.get_output("html")
assert 'alt="this is a test"' in c.get_output("html")
assert 'class="img right"' in c.get_output("html")
assert 'height="100"' in c.get_output("html")
assert 'width="150"' in c.get_output("html")
assert c.get_output("md") == "![this is a test](static/images/testing.png)\n"
assert ".. figure:: static/images/testing.png" in c.get_output("rst")
assert ":alt: this is a test" in c.get_output("rst")
assert ":height: 100" in c.get_output("rst")
assert ":width: 150" in c.get_output("rst")
assert "this is a test: static/images/testing.png" in c.get_output("plain")
c.caption = None
assert "static/images/testing.png" in c.get_output("plain")
def test_get_message_output(self):
c = Content("explain", message="this is a test")
assert "# this is a test" in c.get_output("bash")
assert "<p>this is a test" in c.get_output("html")
assert "this is a test" in c.get_output("md")
assert "this is a test" in c.get_output("rst")
assert "this is a test" in c.get_output("plain")
c.heading = "Test"
assert "# Test: this is a test" in c.get_output("bash")
assert "<h2>Test</h2>" in c.get_output("html")
assert "## Test" in c.get_output("md")
assert "Test" in c.get_output("rst")
assert "====" in c.get_output("rst")
assert "***** Test *****" in c.get_output("plain")
def test_get_output(self):
c = Content("testing")
assert c.get_output("nonexistent") is None
def test_is_itemized(self):
c = Content("testing")
assert c.is_itemized is False
def test_repr(self):
c = Content("testing")
assert repr(c) == "<Content testing>"
def test_get_statement(self):
c = Content("screenshot", caption="this is a test", image="static/images/testing.png")
assert "# this is a test" in c.get_statement()
assert "# static/images/testing.png" in c.get_statement()
class TestItemizedCommand(object):
def test_getattr(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item", extra=True)
assert c.extra is True
def test_get_commands(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
commands = c.get_commands()
for i in commands:
assert isinstance(i, Command)
def test_get_statement(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
statement = c.get_statement()
assert "Pillow" in statement
assert "psycopg2-binary" in statement
assert "django" in statement
def test_is_itemized(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
assert c.is_itemized is True
def test_repr(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
assert repr(c) == "<ItemizedCommand python_pip>"
class TestMultipleCommands(object):
def test_getattr(self):
commands = [
Command("ls -ls", name="run"),
Command("pip install django", name="pip"),
Command("touch /tmp/testing.txt", name="touch")
]
multiple = MultipleCommands(commands, extra=True)
assert multiple.extra is True
def test_iter(self):
commands = [
Command("ls -ls", name="run"),
Command("pip install django", name="pip"),
Command("touch /tmp/testing.txt", name="touch")
]
multiple = MultipleCommands(commands)
count = 0
for c in multiple:
count += 1
assert count == 3
def test_repr(self):
commands = [
Command("ls -ls", name="run"),
Command("pip install django", name="pip"),
Command("touch /tmp/testing.txt", name="touch")
]
multiple = MultipleCommands(commands)
assert repr(multiple) == "<MultipleCommands run multiple commands>"
def test_get_statement(self):
commands = [
Command("ls -ls", name="run"),
Command("pip install django", name="pip"),
Command("touch /tmp/testing.txt", name="touch")
]
multiple = MultipleCommands(commands)
statement = multiple.get_statement()
assert "ls -ls" in statement
assert "pip install django" in statement
assert "touch /tmp/testing.txt" in statement
class TestPrompt(object):
def test_get_dialog_statement(self):
prompt = Prompt("testing", choices=["1", "2", "3"], default="1", dialog=True, help_text="This is a test.", label="Test")
statement = prompt.get_statement()
assert "--menu" in statement
assert "This is a test." in statement
prompt = Prompt("testing", choices="1,2,3", default="1", dialog=True, help_text="This is a test.", label="Test")
statement = prompt.get_statement()
assert "--menu" in statement
prompt = Prompt("testing", default="1", dialog=True, help_text="This is a test.", label="Test")
statement = prompt.get_statement()
assert "--inputbox" in statement
assert "This is a test." in statement
prompt = Prompt("testing", default="1", dialog=True, label="Test")
statement = prompt.get_statement()
assert "--inputbox" in statement
def test_get_read_statement(self):
prompt = Prompt("testing", choices="1,2,3", default="1", help_text="This is a test.", label="Test")
statement = prompt.get_statement()
assert "select opt in" in statement
prompt = Prompt("testing", default="1", help_text="This is a test.", label="Test")
statement = prompt.get_statement()
assert "echo -n" in statement
class TestSudo(object):
def test_bool(self):
s = Sudo()
assert bool(s) is False
s = Sudo(True)
assert bool(s) is True
def test_str(self):
s = Sudo()
assert str(s) == ""
s = Sudo(True)
assert str(s) == "sudo -u root"
class TestTemplate(object):
def test_get_content(self):
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.txt",
"tests/tmp/simple.txt",
backup=False,
context=context,
parser=Template.PARSER_SIMPLE
)
content = t.get_content()
assert "I am testing? yes" in content
assert "How many times? 123" in content
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.sh.txt",
"tests/tmp/simple.sh",
backup=False,
context=context,
parser=Template.PARSER_SIMPLE
)
content = t.get_content()
assert "I am testing? yes" in content
assert "How many times? 123" in content
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/good.j2.txt",
"tests/tmp/good.txt",
backup=False,
context=context
)
content = t.get_content()
assert "I am testing? yes" in content
assert "How many times? 123" in content
t = Template("tests/examples/templates/nonexistent.j2.txt", "test/tmp/nonexistent.txt")
assert t.get_content() is None
t = Template("tests/examples/templates/bad.j2.txt", "test/tmp/nonexistent.txt")
assert t.get_content() is None
context = {
'testing': True,
'times': 3,
}
t = Template("tests/examples/templates/settings.py", "test/tmp/settings.py", context=context, parser=Template.PARSER_PYTHON)
content = t.get_content()
assert "TESTING = True" in content
assert "TOTAL_TIMES = 3" in content
def test_get_statement(self):
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.txt",
"tests/tmp/simple.txt",
context=context,
comment="A simple parser example.",
parser=Template.PARSER_SIMPLE,
register="template_created",
stop=True,
sudo=Sudo(user="root")
)
s = t.get_statement()
assert "I am testing? yes" in s
assert "How many times? 123" in s
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.sh.txt",
"tests/tmp/simple.txt",
context=context,
parser=Template.PARSER_SIMPLE,
stop=True,
sudo="root"
)
s = t.get_statement()
assert "I am testing? yes" in s
assert "How many times? 123" in s
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/good.j2.txt",
"tests/tmp/good.txt",
context=context,
sudo=True
)
s = t.get_statement()
assert "I am testing? yes" in s
assert "How many times? 123" in s
t = Template(
"tests/examples/templates/simple.txt",
"tests/tmp/simple.txt",
parser="nonexistent"
)
assert "# NO CONTENT AVAILABLE" in t.get_statement()
def test_repr(self):
t = Template("/path/to/template.conf", "/path/to/file.conf")
assert repr(t) == "<Template /path/to/template.conf>"
def test_get_target_language(self):
t = Template("/path/to/template.conf", "/path/to/file.conf", lang="c++")
assert t.get_target_language() == "c++"
t = Template("/path/to/template.conf", "/path/to/file.conf")
assert t.get_target_language() == "conf"
t = Template("/path/to/template.ini", "/path/to/file.ini")
assert t.get_target_language() == "ini"
t = Template("/path/to/template.php", "/path/to/file.php")
assert t.get_target_language() == "php"
t = Template("/path/to/template.py", "/path/to/file.py")
assert t.get_target_language() == "python"
t = Template("/path/to/template.sh", "/path/to/file.sh")
assert t.get_target_language() == "bash"
t = Template("/path/to/template.sql", "/path/to/file.sql")
assert t.get_target_language() == "sql"
t = Template("/path/to/template.yml", "/path/to/file.yml")
assert t.get_target_language() == "yaml"
t = Template("/path/to/template.txt", "/path/to/file.txt")
assert t.get_target_language() == "text"
def test_get_template(self):
t = Template(
"simple.txt",
"tests/tmp/simple.txt",
locations=["tests/examples/templates"]
)
assert t.get_template() == "tests/examples/templates/simple.txt"
def test_is_itemized(self):
t = Template("/path/to/template.conf", "/path/to/file.conf")
assert t.is_itemized is False

@ -1,6 +1,5 @@
import pytest
from scripttease.library.commands.templates import Template
from scripttease.library.overlays.centos import *
from scripttease.lib.commands.centos import *
def test_apache():
@ -23,11 +22,6 @@ def test_apache():
apache("nonexistent")
def test_command_exists():
assert command_exists("apache") is True
assert command_exists("nonexistent") is False
def test_service_reload():
c = service_reload("postfix")
assert "systemctl reload postfix" in c.get_statement()
@ -72,11 +66,6 @@ def test_system_uninstall():
assert "yum remove -y lftp" in c.get_statement()
def test_template():
t = template("/path/to/source.txt", "/path/to/target.txt")
assert isinstance(t, Template)
def test_user():
statement = user("deploy", groups="sudo", home="/path/to/deploy/root").get_statement()
assert "adduser deploy" in statement

@ -1,4 +1,4 @@
from scripttease.library.overlays.django import *
from scripttease.lib.commands.django import *
def test_django():
@ -29,28 +29,28 @@ def test_django_check():
def test_django_collect_static():
c = django_collect_static(venv="python")
c = django_static(venv="python")
s = c.get_statement()
assert "./manage.py collectstatic" in s
assert "source python/bin/activate" in s
def test_django_dumpdata():
c = django_dumpdata("projects")
c = django_dump("projects")
s = c.get_statement()
assert "./manage.py dumpdata" in s
assert "projects >" in s
assert "--format=json" in s
assert '--format="json"' in s
assert "--indent=4" in s
assert "local/projects/fixtures/initial.json" in s
assert "../fixtures/projects.json" in s
def test_django_loaddata():
c = django_loaddata("projects")
c = django_load("projects")
s = c.get_statement()
print(s)
assert "./manage.py loaddata" in s
assert "local/projects/fixtures/initial.json" in s
assert "../fixtures/projects.json" in s
def test_django_migrate():

@ -0,0 +1,46 @@
import pytest
from scripttease.exceptions import InvalidInput
from scripttease.lib.commands.messages import *
def test_dialog():
c = dialog("This is a test.", title="Testing")
s = c.get_statement(include_comment=False)
# dialog --clear --backtitle "Testing" --msgbox "This is a test." 15 100; clear;
assert 'dialog --clear --backtitle "Testing"' in s
assert '--msgbox "This is a test." 15 100; clear;'
def test_echo():
c = echo("This is a test.")
s = c.get_statement(include_comment=False)
assert "echo" in s
assert "This is a test." in s
def test_explain():
assert explain("this is a test") is not None
def test_screenshot():
assert screenshot("static/images/testing.png") is not None
def test_slack():
with pytest.raises(InvalidInput):
slack("This is a test.")
c = slack("This is a test.", url="https://example.slack.com/asdf/1234")
s = c.get_statement(include_comment=False)
assert "curl -X POST -H 'Content-type: application/json' --data" in s
assert "This is a test." in s
assert "https://example.slack.com/asdf/1234" in s
def test_twist():
with pytest.raises(InvalidInput):
twist("This is a test.")
c = twist("This is a test.", url="https://example.twist.com/asdf/1234")
s = c.get_statement(include_comment=False)
print(s)

@ -1,5 +1,6 @@
import pytest
from scripttease.library.overlays.mysql import *
from scripttease.exceptions import InvalidInput
from scripttease.lib.commands.mysql import *
def test_mysql_create():
@ -9,7 +10,6 @@ def test_mysql_create():
assert '--password="P455W0rD"' in s
assert "--host=" in s
assert "--port=" in s
assert "create testing" in s
assert '--execute="GRANT ALL ON testing.* TO \'bob\'@\'localhost\'"' in s
@ -21,26 +21,26 @@ def test_mysql_drop():
def test_mysql_dump():
c = mysql_dump("testing", inserts=True)
c = mysql_dump("testing", complete_inserts=True)
s = c.get_statement()
assert "mysqldump" in s
assert "--complete-inserts" in s
assert "> testing.sql" in s
def test_mysql_exec():
c = mysql_exec("SELECT * FROM projects", database="testing")
s = c.get_statement()
assert "mysql" in s
assert "--execute=" in s
assert '"SELECT * FROM projects"' in s
# def test_mysql_exec():
# c = mysql("SELECT * FROM projects", database="testing")
# s = c.get_statement()
# assert "mysql" in s
# assert "--execute=" in s
# assert '"SELECT * FROM projects"' in s
def test_mysql_exists():
c = mysql_exists("testing")
s = c.get_statement()
assert "mysql" in s
assert "mysql_database_exists" in s
assert "testing_exists" in s
def test_mysql_grant():
@ -52,8 +52,17 @@ def test_mysql_grant():
assert "'bob'@'localhost'" in s
def test_mysql_load():
c = mysql_load("testing", "path/to/file.sql", bogus=1)
s = c.get_statement()
assert "mysql" in s
assert "testing" in s
assert "< path/to/file.sql" in s
assert "--bogus=1" in s # to test passing any key
def test_mysql_user():
c = mysql_user("bob", passwd="secret")
c = mysql_user("bob", password="secret")
s = c.get_statement()
assert "mysql" in s
assert "CREATE USER IF NOT EXISTS" in s
@ -72,5 +81,5 @@ def test_mysql_user():
assert "mysql_user_exists" in s
assert "SELECT EXISTS(SELECT 1 FROM mysql.user WHERE user = 'bob')" in s
with pytest.raises(NameError):
with pytest.raises(InvalidInput):
mysql_user("bob", op="nonexistent")

@ -1,17 +1,18 @@
import pytest
from scripttease.library.overlays.pgsql import *
from scripttease.exceptions import InvalidInput
from scripttease.lib.commands.pgsql import *
def test_pgsql_create():
c = pgsql_create("testing", admin_pass="secret", template="mytemplate")
c = pgsql_create("testing", password="secret", owner="testing", template="mytemplate")
s = c.get_statement()
assert "createdb" in s
assert "export PGPASSWORD=" in s
assert "--host=" in s
assert "--port=" in s
assert "--username=" in s
assert "-U" in s
assert "--owner=" in s
assert "--template=mytemplate" in s
assert '--template="mytemplate"' in s
assert "testing" in s
@ -19,7 +20,7 @@ def test_pgsql_exists():
c = pgsql_exists("testing")
s = c.get_statement()
assert "psql" in s
assert "pgsql_db_exists" in s
assert "testing_exists" in s
def test_pgsql_drop():
@ -34,15 +35,23 @@ def test_pgsql_dump():
s = c.get_statement()
assert "pg_dump" in s
assert "--column-inserts" in s
assert "--file=testing.sql" in s
assert '--file="testing.sql"' in s
def test_pgsql_exec():
c = pgsql_exec("SELECT * FROM projects", database="testing")
# def test_pgsql_exec():
# c = pgsql_exec("SELECT * FROM projects", database="testing")
# s = c.get_statement()
# assert "psql" in s
# assert "--dbname=testing" in s
# assert '-c "SELECT * FROM projects"' in s
def test_pgsql_load():
c = pgsql_load("testing", "path/to/file.sql", bogus=1)
s = c.get_statement()
assert "psql" in s
assert "--dbname=testing" in s
assert '-c "SELECT * FROM projects"' in s
assert '--dbname="testing"' in s
assert '--file="path/to/file.sql"' in s
assert '--bogus=1' in s # to test passing any key
def test_pgsql_user():
@ -62,5 +71,5 @@ def test_pgsql_user():
s = c.get_statement()
assert "SELECT 1 FROM pgsql_roles" in s
with pytest.raises(NameError):
with pytest.raises(InvalidInput):
pgsql_user("testing", op="nonexistent")

@ -0,0 +1,7 @@
from scripttease.lib.commands.php import *
def test_php_module():
c = php_module("testing")
s = c.get_statement()
assert "phpenmod testing" in s

@ -1,5 +1,10 @@
import pytest
from scripttease.library.overlays.posix import *
from scripttease.lib.commands.posix import *
def test_append():
c = append("/path/to/file.txt", content="testing = yes")
assert 'echo "testing = yes" >> /path/to/file.txt' in c.get_statement()
def test_archive():
@ -27,19 +32,47 @@ def test_certbot():
assert "--webroot -w /var/www/domains/example_com/www -d example.com" in s
def test_dialog():
c = dialog("This is a test.", title="Testing")
s = c.get_statement(suppress_comment=True)
# dialog --clear --backtitle "Testing" --msgbox "This is a test." 15 100; clear;
assert 'dialog --clear --backtitle "Testing"' in s
assert '--msgbox "This is a test." 15 100; clear;'
def test_copy():
c = copy("/path/to/file.txt", "/path/to/new-file.txt")
s = c.get_statement()
assert "cp" in s
assert "-n" in s
assert "/path/to/file.txt /path/to/new-file.txt" in s
c = copy("/path/to/dir", "/path/to/newdir", recursive=True)
s = c.get_statement()
assert "cp" in s
assert "-R" in s
assert "/path/to/dir /path/to/newdir" in s
def test_echo():
c = echo("This is a test.")
s = c.get_statement(suppress_comment=True)
assert "echo" in s
assert "This is a test." in s
def test_directory():
c = directory("/path/to/dir", group="www-data", mode=755, owner="deploy", recursive=True)
s = c.get_statement()
assert "mkdir" in s
assert "-m 755" in s
assert "-p" in s
assert "/path/to/dir" in s
c = directory("/path/to/dir", mode="755")
s = c.get_statement()
assert "-m 755" in s
c = directory("/path/to/dir", group="www-data", recursive=True)
s = c.get_statement()
assert "chgrp -R www-data" in s
c = directory("/path/to/dir", group="www-data")
s = c.get_statement()
assert "chgrp www-data" in s
c = directory("/path/to/dir", owner="deploy", recursive=True)
s = c.get_statement()
assert "chown -R deploy" in s
c = directory("/path/to/dir", owner="deploy")
s = c.get_statement()
assert "chown deploy" in s
def test_extract():
@ -55,49 +88,12 @@ def test_extract():
assert "-f /path/to/archive.tgz ./" in s
def test_file_append():
c = file_append("/path/to/file.txt", content="testing = yes")
assert 'echo "testing = yes" >> /path/to/file.txt' in c.get_statement()
def test_file_copy():
c = file_copy("/path/to/file.txt", "/path/to/new-file.txt")
s = c.get_statement()
assert "cp" in s
assert "-n" in s
assert "/path/to/file.txt /path/to/new-file.txt" in s
c = file_copy("/path/to/dir", "/path/to/newdir", recursive=True)
s = c.get_statement()
assert "cp" in s
assert "-R" in s
assert "/path/to/dir /path/to/newdir" in s
def test_file_write():
c = file_write("/path/to/file.txt", content="testing 123")
assert 'echo "testing 123" > /path/to/file.txt' in c.get_statement()
content = [
"I am testing",
"I am testing",
"I am testing",
"testing 123",
]
c = file_write("/path/to/file.txt", content="\n".join(content))
def test_link():
c = link("/var/www/domains", force=True)
s = c.get_statement()
assert "cat > /path/to/file.txt << EOF" in s
assert "I am testing" in s
assert "testing 123" in s
def test_mkdir():
c = mkdir("/path/to/dir", mode=755, recursive=True)
s = c.get_statement()
assert "mkdir" in s
assert "-m 755" in s
assert "-p" in s
assert "/path/to/dir" in s
assert "ln -s" in s
assert "-f" in s
assert "/var/www/domains" in s
def test_move():
@ -112,6 +108,10 @@ def test_perms():
assert "chown -R deploy /path/to/dir" in s
assert "chmod -R 755 /path/to/dir" in s
c = perms("/path/to/dir", mode=755)
s = c.get_statement()
assert "chmod 755 /path/to/dir" in s
def test_prompt():
# This is just a placeholder to test the prompt() interface. See TestPrompt.
@ -128,11 +128,6 @@ def test_remove():
assert "/path/to/dir" in s
def test_rename():
c = rename("/path/to/file.txt", "/path/to/renamed.txt")
assert "mv /path/to/file.txt /path/to/renamed.txt" in c.get_statement()
def test_rsync():
c = rsync(
"/path/to/local/",
@ -146,7 +141,11 @@ def test_rsync():
user="deploy"
)
s = c.get_statement()
assert "rsync --cvs-exclude --checksum --compress --copy-links --delete" in s
assert "rsync" in s
assert "--checksum" in s
assert "--compress" in s
assert "--copy-links" in s
assert "--delete" in s
assert "--exclude-from=deploy/exclude.txt" in s
assert "-P" in s
assert "--recursive /path/to/local/" in s
@ -162,7 +161,11 @@ def test_rsync():
recursive=True,
)
s = c.get_statement()
assert "rsync --cvs-exclude --checksum --compress --copy-links --delete" in s
assert "rsync" in s
assert "--checksum" in s
assert "--compress" in s
assert "--copy-links" in s
assert "--delete" in s
assert "--exclude-from=deploy/exclude.txt" in s
assert "-P" in s
assert "--recursive" in s
@ -170,6 +173,12 @@ def test_rsync():
assert "/path/to/remote" in s
def test_run():
c = run("ls -ls")
s = c.get_statement()
assert "ls -ls" in s
def test_scopy():
with pytest.raises(ValueError):
c = scopy("/path/to/local/file.txt", "/path/to/remote/file.txt")
@ -199,19 +208,31 @@ def test_scopy():
def test_sed():
c = sed("/path/to/file.txt", find="testing", replace="123")
c = replace("/path/to/file.txt", find="testing", replace="123")
s = c.get_statement()
assert "sed -i .b" in s
assert "s/testing/123/g" in s
assert "/path/to/file.txt" in s
def test_symlink():
c = symlink("/var/www/domains", force=True)
def test_sync():
c = sync(
"/path/to/local/",
"/path/to/remote",
links=True,
delete=True,
exclude="~/exclude.txt",
recursive=True,
)
s = c.get_statement()
assert "ln -s" in s
assert "-f" in s
assert "/var/www/domains" in s
assert "rsync" in s
assert "--checksum" in s
assert "--compress" in s
assert "--copy-links" in s
assert "--delete" in s
assert "--exclude-from=~/exclude.txt" in s
assert "-P" in s
assert "--recursive /path/to/local/" in s
def test_touch():
@ -225,58 +246,18 @@ def test_wait():
assert 'sleep 5' in s
class TestFunction(object):
def test_to_string(self):
f = Function("testing", comment="A test function.")
f.commands.append(touch("/path/to/file.txt"))
s = f.to_string()
assert "# A test function." in s
assert "function testing()" in s
assert "touch /path/to/file.txt" in s
class TestPrompt(object):
def test_init(self):
c = Prompt("testing", choices=["yes", "no", "maybe"])
assert type(c.choices) is list
c = Prompt("testing", choices="yes, no, maybe")
assert type(c.choices) is list
c = Prompt("testing")
assert c.choices is None
def test_get_dialog_statement(self):
c = Prompt("testing", choices=["yes", "no", "maybe"], fancy=True)
s = c.get_statement()
assert 'dialog --clear --backtitle "Input" --title "Testing"' in s
assert '--menu "Select" 15 40 3 "yes" 1 "no" 2 "maybe" 3 2>/tmp/input.txt' in s
assert 'testing=$(</tmp/input.txt)' in s
assert 'clear' in s
assert 'rm /tmp/input.txt' in s
c = Prompt("testing", fancy=True, help_text="This is a test.")
s = c.get_statement()
assert 'dialog --clear --backtitle "Input" --title "Testing"' in s
assert '--inputbox "This is a test." 8 60 2>/tmp/input.txt' in s
assert 'testing=$(</tmp/input.txt)' in s
assert 'clear' in s
assert 'rm /tmp/input.txt' in s
c = Prompt("testing", default="it is", fancy=True)
s = c.get_statement()
assert 'if [[ -z "$testing" ]]; then testing="it is"; fi;' in s
def test_get_read_statement(self):
c = Prompt("testing", choices=["yes", "no", "maybe"])
s = c.get_statement()
assert 'options=("yes" "no" "maybe")' in s
assert 'select opt in "${options[@]}"' in s
def test_write():
c = write("/path/to/file.txt", content="testing 123")
assert 'echo "testing 123" > /path/to/file.txt' in c.get_statement()
c = Prompt("testing", default="yes")
content = [
"I am testing",
"I am testing",
"I am testing",
"testing 123",
]
c = write("/path/to/file.txt", content="\n".join(content))
s = c.get_statement()
assert 'echo -n "Testing "' in s
assert "read testing" in s
assert 'then testing="yes"' in s
assert "cat > /path/to/file.txt << EOF" in s
assert "I am testing" in s
assert "testing 123" in s

@ -0,0 +1,25 @@
import pytest
from scripttease.lib.commands.python import *
def test_python_pip():
c = python_pip("Pillow")
assert "pip3 install Pillow" in c.get_statement()
c = python_pip("Pillow", upgrade=True)
assert "--upgrade" in c.get_statement()
c = python_pip("Pillow", venv="python")
assert "source python/bin/activate" in c.get_statement()
def test_python_pip_file():
c = python_pip_file("deploy/packages/testing.pip", venv="python")
s = c.get_statement()
assert "pip3 install -r deploy/packages/testing.pip" in s
assert "source python/bin/activate" in s
def test_python_virtual_env():
c = python_virtualenv("python")
assert "virtualenv python" in c.get_statement()

@ -1,6 +1,5 @@
import pytest
from scripttease.library.commands.templates import Template
from scripttease.library.overlays.ubuntu import *
from scripttease.lib.commands.ubuntu import *
def test_apache():
@ -43,11 +42,6 @@ def test_apache_enable_site():
assert "a2ensite example.com" in c.get_statement()
def test_command_exists():
assert command_exists("apache") is True
assert command_exists("nonexistent") is False
def test_service_reload():
c = service_reload("postfix")
assert "service postfix reload" in c.get_statement()
@ -73,10 +67,10 @@ def test_system():
assert "reboot" in c.get_statement()
c = system("update")
assert "apt-get update -y" in c.get_statement()
assert "apt update -y" in c.get_statement()
c = system("upgrade")
assert "apt-get upgrade -y" in c.get_statement()
assert "apt upgrade -y" in c.get_statement()
with pytest.raises(NameError):
system("nonexistent")
@ -84,17 +78,12 @@ def test_system():
def test_system_install():
c = system_install("vim")
assert "apt-get install -y vim" in c.get_statement()
assert "apt install -y vim" in c.get_statement()
def test_system_uninstall():
c = system_uninstall("lftp")
assert "apt-get uninstall -y lftp" in c.get_statement()
def test_template():
t = template("/path/to/source.txt", "/path/to/target.txt")
assert isinstance(t, Template)
assert "apt uninstall -y lftp" in c.get_statement()
def test_user():

@ -0,0 +1,37 @@
from scripttease.lib.contexts import *
class TestContext(object):
def test_add(self):
c = Context()
v = c.add("testing", True)
assert isinstance(v, Variable)
def test_append(self):
c = Context()
v = Variable("testing", True)
c.append(v)
assert len(c.variables.keys()) == 1
def test_getattr(self):
c = Context()
c.add("testing", True)
assert c.testing.value is True
def test_mapping(self):
c = Context()
c.add("testing", True)
assert 'testing' in c.mapping()
class TestVariable(object):
def test_getattr(self):
tags = ["this", "is", "a", "test"]
v = Variable("testing", True, tags=tags)
assert v.tags == tags
def test_str(self):
v = Variable("testing", True)
assert str(v) == "True"

@ -0,0 +1,50 @@
import pytest
from scripttease.exceptions import InvalidInput
from scripttease.lib.commands.base import Command
from scripttease.lib.loaders import INILoader
from scripttease.lib.factories import *
def bad_custom_command(path):
# this will fail because the function doesn't except kwargs.
pass
def custom_command(path, **kwargs):
return Command("ls -ls %s" % path, name="custom", **kwargs)
def test_command_factory():
ini = INILoader("tests/examples/kitchen_sink.ini")
assert command_factory(ini, profile="nonexistent") is None
ini = INILoader("tests/examples/kitchen_sink.ini")
ini.load()
commands = command_factory(ini)
assert len(commands) == 48
ini = INILoader("tests/examples/bad_command.ini")
ini.load()
commands = command_factory(ini)
assert len(commands) == 0
ini = INILoader("tests/examples/apache_examples.ini")
ini.load()
commands = command_factory(ini)
assert len(commands) == 2
# This should result in no commands because CentOS doesn't have enable/disable site or module.
ini = INILoader("tests/examples/apache_examples.ini")
ini.load()
commands = command_factory(ini, profile="centos")
assert len(commands) == 0
ini = INILoader("tests/examples/custom_example.ini")
ini.load()
commands = command_factory(ini, mappings={'custom': custom_command})
assert len(commands) == 3
ini = INILoader("tests/examples/bad_custom_example.ini")
ini.load()
with pytest.raises(InvalidInput):
command_factory(ini, mappings={'bad_custom': bad_custom_command})

@ -0,0 +1,83 @@
import pytest
from scripttease.exceptions import InvalidInput
from scripttease.lib.contexts import Context
from scripttease.lib.loaders.base import *
def test_load_variables():
vars = load_variables("nonexistent.ini")
assert len(vars) == 0
vars = load_variables("tests/examples/bad_variables.ini")
assert len(vars) == 0
vars = load_variables("tests/examples/variables.ini")
assert len(vars) == 5
vars = load_variables("tests/examples/variables.ini", env="testing")
assert len(vars) == 4
vars = load_variables("tests/examples/variables.ini", env="live")
assert len(vars) == 4
class TestBaseLoader(object):
def test_get_context(self):
c = Context()
c.add("testing", True)
o = BaseLoader("path/does/not/matter.txt", context=c)
assert 'testing' in o.get_context()
def test_get_key_value(self):
o = BaseLoader("path/does/not/matter.txt")
key, value = o._get_key_value("env", ["development", "testing"])
assert value == ["development", "testing"]
key, value = o._get_key_value("env", "testing")
assert value == ["testing"]
key, value = o._get_key_value("env", "development, testing")
assert value == ["development", "testing"]
key, value = o._get_key_value("func", "test_function")
assert key == "function"
assert value == "test_function"
key, value = o._get_key_value("groups", ["one", "two", "three"])
assert value == ["one", "two", "three"]
key, value = o._get_key_value("groups", "one, two, three")
assert value == ["one", "two", "three"]
key, value = o._get_key_value("items", ["one", "two", "three"])
assert value == ["one", "two", "three"]
key, value = o._get_key_value("items", "one, two, three")
assert value == ["one", "two", "three"]
key, value = o._get_key_value("tags", ["one", "two", "three"])
assert value == ["one", "two", "three"]
key, value = o._get_key_value("tags", "one, two, three")
assert value == ["one", "two", "three"]
key, value = o._get_key_value("testing", "1")
assert value == 1
def test_load(self):
o = BaseLoader("path/does/not/matter.txt")
with pytest.raises(NotImplementedError):
o.load()
def test_read_file(self):
c = Context()
c.add("domain_tld", "example_app")
o = BaseLoader("tests/examples/template_example.ini", context=c)
assert "example_app" in o.read_file()
c = Context()
c.add("domain_tld", "example_app")
o = BaseLoader("tests/examples/bad_template_example.ini", context=c)
assert o.read_file() is None

@ -0,0 +1,23 @@
import pytest
from scripttease.exceptions import InvalidInput
from scripttease.lib.contexts import Context
from scripttease.lib.loaders import INILoader
class TestINILoader(object):
def test_load(self):
o = INILoader("nonexistent.ini")
assert o.load() is False
# ConfigParser raises InterpolationSyntaxError if context is not provided.
c = Context()
c.add("testing", True)
o = INILoader("tests/examples/bad_template_example.ini", context=c)
assert o.load() is False
o = INILoader("tests/examples/python_examples.ini")
assert o.load() is True
o = INILoader('tests/examples/bad_examples.ini')
assert o.load() is False

@ -0,0 +1,4 @@
import pytest
from scripttease.exceptions import InvalidInput
from scripttease.lib.loaders import YMLLoader

@ -1,130 +0,0 @@
from scripttease.library.commands.base import Command, ItemizedCommand, Sudo
from scripttease.library.overlays.common import python_pip
class TestCommand(object):
def test_getattr(self):
c = Command("ls -ls", extra=True)
assert c.extra is True
def test_get_statement(self):
c = Command(
"ls -ls",
comment="kitchen sink",
condition="$last_command -eq 0",
cd="/path/to/project",
prefix="source python/bin/active",
register="list_success",
stop=True,
sudo="deploy"
)
statement = c.get_statement(cd=True)
assert "( cd" in statement
assert "sudo" in statement
assert ")" in statement
assert "# kitchen sink" in statement
assert "if [[ $last_command" in statement
assert "list_success=$?" in statement
assert "if [[ $list_success" in statement
c = Command(
"ls -ls",
stop=True
)
statement = c.get_statement()
assert "if [[ $?" in statement
def test_has_attribute(self):
c = Command("ls -ls")
assert c.has_attribute("testing") is False
def test_init(self):
c = Command("ls -ls", sudo=Sudo(user="deploy"))
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "deploy"
c = Command("ls -ls", sudo="deploy")
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "deploy"
c = Command("ls -ls", sudo=True)
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "root"
c = Command("ls -ls")
assert isinstance(c.sudo, Sudo)
assert c.sudo.user == "root"
assert c.sudo.enabled is False
def test_is_itemized(self):
c = Command("ls -ls")
assert c.is_itemized is False
def test_repr(self):
c = Command("ls -ls", comment="listing")
assert repr(c) == "<Command listing>"
c = Command("ls -ls")
assert repr(c) == "<Command>"
def test_set_attribute(self):
c = Command("ls -ls")
assert c.testing is None
c.set_attribute("testing", True)
assert c.testing is True
class TestItemizedCommand(object):
def test_getattr(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item", extra=True)
assert c.extra is True
def test_get_commands(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
commands = c.get_commands()
for i in commands:
assert isinstance(i, Command)
def test_get_statement(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
statement = c.get_statement()
assert "Pillow" in statement
assert "psycopg2-binary" in statement
assert "django" in statement
def test_has_attribute(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
assert c.has_attribute("testing") is False
def test_is_itemized(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
assert c.is_itemized is True
def test_repr(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
assert repr(c) == "<ItemizedCommand python_pip>"
def test_set_attribute(self):
c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item")
assert c.testing is None
c.set_attribute("testing", True)
assert c.testing is True
class TestSudo(object):
def test_bool(self):
s = Sudo()
assert bool(s) is False
s = Sudo(True)
assert bool(s) is True
def test_str(self):
s = Sudo()
assert str(s) == ""
s = Sudo(True)
assert str(s) == "sudo -u root"

@ -1,113 +0,0 @@
from scripttease.library.commands.base import Command, ItemizedCommand, Sudo
from scripttease.library.commands.templates import Template
class TestTemplate(object):
def test_get_content(self):
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.txt",
"tests/tmp/simple.txt",
backup=False,
context=context,
parser=Template.PARSER_SIMPLE
)
content = t.get_content()
assert "I am testing? yes" in content
assert "How many times? 123" in content
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.sh.txt",
"tests/tmp/simple.sh",
backup=False,
context=context,
parser=Template.PARSER_SIMPLE
)
content = t.get_content()
assert "I am testing? yes" in content
assert "How many times? 123" in content
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/good.j2.txt",
"tests/tmp/good.txt",
backup=False,
context=context
)
content = t.get_content()
assert "I am testing? yes" in content
assert "How many times? 123" in content
t = Template("tests/examples/templates/nonexistent.j2.txt", "test/tmp/nonexistent.txt")
assert t.get_content() is None
t = Template("tests/examples/templates/bad.j2.txt", "test/tmp/nonexistent.txt")
assert t.get_content() is None
def test_get_statement(self):
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.txt",
"tests/tmp/simple.txt",
context=context,
parser=Template.PARSER_SIMPLE
)
s = t.get_statement()
assert "I am testing? yes" in s
assert "How many times? 123" in s
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/simple.sh.txt",
"tests/tmp/simple.txt",
context=context,
parser=Template.PARSER_SIMPLE
)
s = t.get_statement()
assert "I am testing? yes" in s
assert "How many times? 123" in s
context = {
'testing': "yes",
'times': 123,
}
t = Template(
"tests/examples/templates/good.j2.txt",
"tests/tmp/good.txt",
context=context
)
s = t.get_statement()
assert "I am testing? yes" in s
assert "How many times? 123" in s
t = Template(
"tests/examples/templates/simple.txt",
"tests/tmp/simple.txt",
parser="nonexistent"
)
assert t.get_statement() is None
def test_get_template(self):
t = Template(
"simple.txt",
"tests/tmp/simple.txt",
locations=["tests/examples/templates"]
)
assert t.get_template() == "tests/examples/templates/simple.txt"

@ -1,57 +0,0 @@
import pytest
from scripttease.library.overlays.common import *
def test_python_pip():
c = python_pip("Pillow")
assert "pip3 install Pillow" in c.get_statement()
c = python_pip("Pillow", upgrade=True)
assert "--upgrade" in c.get_statement()
c = python_pip("Pillow", venv="python")
assert "source python/bin/activate" in c.get_statement()
def test_python_virtual_env():
c = python_virtualenv("python")
assert "virtualenv python" in c.get_statement()
def test_run():
c = run("ls -ls")
assert "ls -ls" in c.get_statement()
def test_slack():
with pytest.raises(ValueError):
slack("This is a test.")
c = slack("This is a test.", url="https://example.slack.com/asdf/1234")
s = c.get_statement(suppress_comment=True)
assert "curl -X POST -H 'Content-type: application/json' --data" in s
assert "This is a test." in s
assert "https://example.slack.com/asdf/1234" in s
def test_twist():
with pytest.raises(ValueError):
twist("This is a test.")
c = twist("This is a test.", url="https://example.twist.com/asdf/1234")
s = c.get_statement(suppress_comment=True)
print(s)
def test_udf():
c = udf("testing")
s = c.get_statement()
assert s == '# <UDF name="testing" label="Testing" />'
c = udf("testing", default="yes")
s = c.get_statement()
assert s == '# <UDF name="testing" label="Testing" default="yes" />'
c = udf("testing", example="example.com")
s = c.get_statement()
assert s == '# <UDF name="testing" label="Testing" example="example.com" />'

@ -1,31 +0,0 @@
from scripttease.library.commands import Command, ItemizedCommand
from scripttease.library.overlays.posix import Function
from scripttease.library.scripts import Script
class TestScript(object):
def test_append(self):
s = Script("testing")
s.append(Command("ls -ls", comment="list some stuff"))
s.append(Command("touch /path/to/file.txt", comment="touch a file"))
s.append(Command("ln -s /path/to/file.txt", comment="link to a file"))
assert len(s.commands) == 3
def test_to_string(self):
s = Script("testing")
s.append(Command("ls -ls", comment="list some stuff"))
s.append(Command("touch /path/to/file.txt", comment="touch a file"))
s.append(Command("ln -s /path/to/file.txt", comment="link to a file"))
s.functions = list()
s.functions.append(Function("testing"))
output = s.to_string()
assert output == str(s)
assert "ls -ls" in output
assert "touch /path/to/file.txt" in output
assert "ln -s /path/to/file.txt" in output
assert "function testing()" in output

@ -1,30 +0,0 @@
import pytest
from scripttease.library.overlays.posix import touch, Function
from scripttease.library.scripts import Script
# from scripttease.parsers import filter_commands, load_commands
from scripttease.parsers.base import Parser
class TestParser(object):
def test_as_script(self):
p = Parser("/path/to/nonexistent.txt")
assert isinstance(p.as_script(), Script)
# def test_get_commands(self):
# pass
#
def test_get_functions(self):
parser = Parser("/it/does/not/matter.ini")
command = touch("/path/to/file.txt", function="testing")
function = Function("testing")
parser._commands.append(command)
parser._functions.append(function)
assert len(parser.get_functions()) == 1
def test_load(self):
p = Parser("/path/to/nonexistent.txt")
with pytest.raises(NotImplementedError):
p.load()

@ -1,48 +0,0 @@
import pytest
from scripttease.parsers.ini import Config
class TestConfig(object):
def test_get_commands(self):
c = Config("tests/examples/kitchen_sink.ini")
assert c.load() is True
assert len(c.get_commands()) > 0
def test_get_functions(self):
c = Config("tests/examples/kitchen_sink.ini")
assert c.load() is True
assert len(c.get_functions()) > 0
def test_load(self):
c = Config("nonexistent.ini")
assert c.load() is False
c = Config("tests/examples/python_examples.ini", overlay="nonexistent")
assert c.load() is False
c = Config("tests/examples/bad_examples.ini")
assert c.load() is False
c = Config("tests/examples/kitchen_sink.ini")
assert c.load() is True
c = Config("tests/examples/kitchen_sink.ini", context={'testing': "yes"})
assert c.load() is True
c = Config("tests/examples/bad_command.ini")
assert c.load() is False
context = {
'domain_tld': "example_com",
}
c = Config("tests/examples/template_example.ini", context=context)
assert c.load() is True
context = {
'domain_tld': "example_com",
}
c = Config("tests/examples/bad_template_example.ini", context=context)
assert c.load() is False

@ -1,133 +0,0 @@
import os
import pytest
from scripttease.library.commands import Command, ItemizedCommand
from scripttease.parsers.utils import *
def test_filter_commands():
commands = [
Command("apt-get install apache2 -y", environments=["base"], tags=["web"]),
Command("apt-get install apache-top -y", environments=["live"], tags=["web"]),
Command("pip install django-debug-toolbar", environments=["development"], tags=["django"]),
Command("pip install django", environments=["base"], tags=["django"]),
]
f1 = filter_commands(commands, environments=["base", "live"])
assert len(f1) == 3
f2 = filter_commands(commands, tags=["django"])
assert len(f2) == 2
f3 = filter_commands(commands, environments=["base", "development"])
assert len(f3) == 3
f4 = filter_commands(commands, environments=["base"], tags=["web"])
assert len(f4) == 1
def test_load_commands():
commands = load_commands("nonexistent.xml")
assert commands is None
commands = load_commands("nonexistent.ini")
assert commands is None
commands = load_commands("tests/examples/bad_examples.ini")
assert commands is None
commands = load_commands(
"tests/examples/python_examples.ini",
filters={
'tags': ["python-support"],
}
)
assert len(commands) == 2
def test_load_variables():
assert len(load_variables("nonexistent.ini")) == 0
assert len(load_variables(os.path.join("tests", "examples", "templates", "simple.txt"))) == 0
variables = load_variables(os.path.join("tests", "examples", "variables.ini"))
assert len(variables) == 5
variables = load_variables(os.path.join("tests", "examples", "variables.ini"), environment="testing")
assert len(variables) == 4
class TestContext(object):
def test_add(self):
c = Context()
c.add("testing", True)
assert len(c.variables) == 1
c.add("also_testing", False)
assert len(c.variables) == 2
assert isinstance(c.add("still_testing", True), Variable)
with pytest.raises(RuntimeError):
c.add("testing", True)
def test_get(self):
c = Context(testing=True)
assert c.get("testing") is True
assert c.get("nonexistent") is None
assert c.get("nonexistent", default=True) is True
def test_getattr(self):
c = Context(testing=True)
assert c.testing is True
assert c.nonexistent is None
def test_has(self):
c = Context(testing=True)
assert c.has("testing") is True
assert c.has("nonexistent") is False
def test_init(self):
c = Context(testing=True, also_testing=123)
assert len(c.variables) == 2
def test_join(self):
c = Context(testing=True)
variables = [
Variable("testing", True),
Variable("also_testing", True),
]
c.join(variables)
assert len(c.variables) == 2
def test_mapping(self):
c = Context(testing=True, also_testing=False, still_testing=True)
assert type(c.mapping()) is dict
assert len(c.mapping()) == 3
def test_merge(self):
c1 = Context(testing=True, also_testing=False)
c2 = Context(still_testing=True)
c1.merge(c2)
assert len(c1.variables) == 3
def test_repr(self):
c = Context(testing=True, also_testing=False, still_testing=True)
assert repr(c) == "<Context (3)>"
class TestVariable(object):
def test_eq(self):
var = Variable("testing", True)
assert var == True
def test_getattr(self):
var = Variable("testing", True, one="a", two="b")
assert var.one == "a"
assert var.two == "b"
assert var.three is None
def test_repr(self):
var = Variable("testing", True)
assert repr(var) == "<Variable testing>"
Loading…
Cancel
Save