From 7a64eeff094eb9dee5cda49f7d1fe57aa7c78ef9 Mon Sep 17 00:00:00 2001 From: Shawn Davis Date: Tue, 4 Apr 2023 09:49:23 -0500 Subject: [PATCH] Finished unit tests for new release. --- .coveragerc | 1 + LICENSE.txt | 2 +- PEP440.txt | 1 - RELEASE.txt | 1 + VERSION.txt | 2 +- scripttease/factory.py | 84 -- scripttease/lib/commands/base.py | 76 +- scripttease/lib/commands/centos.py | 2 +- scripttease/lib/commands/django.py | 6 +- scripttease/lib/commands/mysql.py | 7 +- scripttease/lib/commands/posix.py | 2 +- scripttease/lib/factories.py | 4 +- scripttease/lib/loaders/base.py | 16 +- scripttease/lib/loaders/ini.py | 15 +- scripttease/lib/mappings.py | 1 - scripttease/library/__init__.py | 0 scripttease/library/commands/__init__.py | 3 - scripttease/library/commands/base.py | 309 ------- scripttease/library/commands/templates.py | 197 ----- scripttease/library/overlays/__init__.py | 0 scripttease/library/overlays/centos.py | 281 ------- scripttease/library/overlays/common.py | 150 ---- scripttease/library/overlays/django.py | 188 ----- scripttease/library/overlays/mysql.py | 262 ------ scripttease/library/overlays/pgsql.py | 226 ------ scripttease/library/overlays/posix.py | 758 ------------------ scripttease/library/overlays/ubuntu.py | 338 -------- scripttease/library/scripts.py | 69 -- scripttease/parsers/__init__.py | 4 - scripttease/parsers/base.py | 80 -- scripttease/parsers/ini.py | 179 ----- scripttease/parsers/utils.py | 325 -------- scripttease/parsers/yaml.py | 0 scripttease/version.py | 10 +- setup.py | 2 +- tests/examples/apache_examples.ini | 14 +- tests/examples/bad_custom_example.ini | 9 + tests/examples/bad_variables.ini | 15 + tests/examples/custom_example.ini | 9 + tests/examples/python_examples.ini | 6 + tests/examples/templates/settings.py | 3 + tests/examples/variables.ini | 2 +- tests/test_factory.py | 45 -- tests/test_lib_commands_base.py | 416 ++++++++++ ..._centos.py => test_lib_commands_centos.py} | 13 +- ..._django.py => test_lib_commands_django.py} | 14 +- tests/test_lib_commands_messages.py | 46 ++ ...ys_mysql.py => test_lib_commands_mysql.py} | 33 +- ...ys_pgsql.py => test_lib_commands_pgsql.py} | 33 +- tests/test_lib_commands_php.py | 7 + ...ys_posix.py => test_lib_commands_posix.py} | 225 +++--- tests/test_lib_commands_python.py | 25 + ..._ubuntu.py => test_lib_commands_ubuntu.py} | 21 +- tests/test_lib_contexts.py | 37 + tests/test_lib_factories.py | 50 ++ tests/test_lib_loaders_base.py | 83 ++ tests/test_lib_loaders_ini.py | 23 + tests/test_lib_loaders_yaml.py | 4 + tests/test_library_commands_base.py | 130 --- tests/test_library_commands_templates.py | 113 --- tests/test_library_overlays_common.py | 57 -- tests/test_library_scripts.py | 31 - tests/test_parsers_base.py | 30 - tests/test_parsers_ini.py | 48 -- tests/test_parsers_utils.py | 133 --- 65 files changed, 991 insertions(+), 4285 deletions(-) delete mode 100644 PEP440.txt create mode 100644 RELEASE.txt delete mode 100644 scripttease/factory.py delete mode 100644 scripttease/lib/mappings.py delete mode 100644 scripttease/library/__init__.py delete mode 100644 scripttease/library/commands/__init__.py delete mode 100644 scripttease/library/commands/base.py delete mode 100644 scripttease/library/commands/templates.py delete mode 100644 scripttease/library/overlays/__init__.py delete mode 100644 scripttease/library/overlays/centos.py delete mode 100644 scripttease/library/overlays/common.py delete mode 100644 scripttease/library/overlays/django.py delete mode 100644 scripttease/library/overlays/mysql.py delete mode 100644 scripttease/library/overlays/pgsql.py delete mode 100644 scripttease/library/overlays/posix.py delete mode 100644 scripttease/library/overlays/ubuntu.py delete mode 100644 scripttease/library/scripts.py delete mode 100644 scripttease/parsers/__init__.py delete mode 100644 scripttease/parsers/base.py delete mode 100644 scripttease/parsers/ini.py delete mode 100644 scripttease/parsers/utils.py delete mode 100644 scripttease/parsers/yaml.py create mode 100644 tests/examples/bad_custom_example.ini create mode 100644 tests/examples/bad_variables.ini create mode 100644 tests/examples/custom_example.ini create mode 100644 tests/examples/templates/settings.py delete mode 100644 tests/test_factory.py create mode 100644 tests/test_lib_commands_base.py rename tests/{test_library_overlays_centos.py => test_lib_commands_centos.py} (84%) rename tests/{test_library_overlays_django.py => test_lib_commands_django.py} (82%) create mode 100644 tests/test_lib_commands_messages.py rename tests/{test_library_overlays_mysql.py => test_lib_commands_mysql.py} (70%) rename tests/{test_library_overlays_pgsql.py => test_lib_commands_pgsql.py} (58%) create mode 100644 tests/test_lib_commands_php.py rename tests/{test_library_overlays_posix.py => test_lib_commands_posix.py} (57%) create mode 100644 tests/test_lib_commands_python.py rename tests/{test_library_overlays_ubuntu.py => test_lib_commands_ubuntu.py} (79%) create mode 100644 tests/test_lib_contexts.py create mode 100644 tests/test_lib_factories.py create mode 100644 tests/test_lib_loaders_base.py create mode 100644 tests/test_lib_loaders_ini.py create mode 100644 tests/test_lib_loaders_yaml.py delete mode 100644 tests/test_library_commands_base.py delete mode 100644 tests/test_library_commands_templates.py delete mode 100644 tests/test_library_overlays_common.py delete mode 100644 tests/test_library_scripts.py delete mode 100644 tests/test_parsers_base.py delete mode 100644 tests/test_parsers_ini.py delete mode 100644 tests/test_parsers_utils.py diff --git a/.coveragerc b/.coveragerc index 1bb41c7..7962c69 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,6 +2,7 @@ omit = docs/* scripttease/cli/* + scripttease/lib/loaders/yaml.py scripttease/variables.py scripttease/version.py sandbox diff --git a/LICENSE.txt b/LICENSE.txt index 2047a54..8071af9 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -10,7 +10,7 @@ are permitted provided that the following conditions are met: * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of Pleasant Tents, LLC nor the names of its contributors + * Neither the name of copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. diff --git a/PEP440.txt b/PEP440.txt deleted file mode 100644 index 74030cb..0000000 --- a/PEP440.txt +++ /dev/null @@ -1 +0,0 @@ -6.8.33 \ No newline at end of file diff --git a/RELEASE.txt b/RELEASE.txt new file mode 100644 index 0000000..c41eb10 --- /dev/null +++ b/RELEASE.txt @@ -0,0 +1 @@ +7.0.0-a diff --git a/VERSION.txt b/VERSION.txt index 74030cb..4122521 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -6.8.33 \ No newline at end of file +7.0.0 \ No newline at end of file diff --git a/scripttease/factory.py b/scripttease/factory.py deleted file mode 100644 index e8dc498..0000000 --- a/scripttease/factory.py +++ /dev/null @@ -1,84 +0,0 @@ -# Imports - -import logging -from importlib import import_module -from .constants import LOGGER_NAME -from .library.commands import ItemizedCommand - -log = logging.getLogger(LOGGER_NAME) - -# Exports - -__all__ = ( - "Factory", -) - -# Classes - - -class Factory(object): - """A command factory.""" - - def __init__(self, overlay): - """Initialize the factory. - - :param overlay: The name of the overlay to use for generating commands. - :type overlay: str - - """ - self.is_loaded = False - self.overlay = None - self._overlay = overlay - - def __repr__(self): - return "<%s %s>" % (self.__class__.__name__, self._overlay) - - def get_command(self, name, *args, **kwargs): - """Get a command. - - :param name: The name of the command. - :type name: str - - args and kwargs are passed to the initialize the command. - - :rtype: scripttease.library.commands.Command | scripttease.library.commands.ItemizedCommand - - :raise: RuntimeError - :raises: ``RuntimeError`` if the factory has not yet been loaded. - - """ - if not self.is_loaded: - raise RuntimeError("Factory has not been loaded, so no commands are available. Call load() method first!") - - if not self.overlay.command_exists(name): - log.warning("Command does not exist in %s overlay: %s" % (self._overlay, name)) - return None - - callback = self.overlay.MAPPINGS[name] - - try: - items = kwargs.pop("items", None) - if items is not None: - return ItemizedCommand(callback, items, *args, name=name, **kwargs) - - command = callback(*args, **kwargs) - command.name = name - return command - except (KeyError, NameError, TypeError, ValueError) as e: - log.critical("Failed to load %s command: %s" % (name, e)) - return None - - def load(self): - """Load the factory. - - :rtype: bool - - """ - try: - self.overlay = import_module("scripttease.library.overlays.%s" % self._overlay) - self.is_loaded = True - except ImportError as e: - log.error("The %s overlay could not be imported: %s" % (self._overlay, str(e))) - pass - - return self.is_loaded diff --git a/scripttease/lib/commands/base.py b/scripttease/lib/commands/base.py index b593b2b..d73379f 100644 --- a/scripttease/lib/commands/base.py +++ b/scripttease/lib/commands/base.py @@ -126,6 +126,33 @@ class Content(object): def __init__(self, content_type, caption=None, css=None, heading=None, height=None, image=None, message=None, width=None, **kwargs): + """Initialize content. + + :param content_type: The type of content; ``explain`` or ``screenshot``. + :type content_type: str + + :param caption: A caption for images. + :type caption: str + + :param css: The CSS class to be applied. + :type css: str + + :param heading: A heading for explanatory content. + :type heading: str + + :param height: The height of an image. + :type height: int | str + + :param image: The URL or path to the image file. + :type image: str + + :param message: The explanation. + :type message: str + + :param width: The width of an image. + :type width: int | str + + """ self.caption = caption self.css = css self.heading = heading @@ -142,7 +169,7 @@ class Content(object): return self.kwargs.get(item) def __repr__(self): - return "<%s %s>" % (self.__class__.__name__, self.content_type) + return "<%s %s>" % (self.__class__.__name__, self.type) @property def is_itemized(self): @@ -150,12 +177,12 @@ class Content(object): return False def get_output(self, output_format): - if self.content_type == "explain": + if self.type == "explain": return self._get_message_output(output_format) - elif self.content_type == "screenshot": + elif self.type == "screenshot": return self._get_image_output(output_format) else: - log.warning("Invalid content type: %s" % self.content_type) + log.warning("Invalid content type: %s" % self.type) return None # noinspection PyUnusedLocal @@ -199,8 +226,8 @@ class Content(object): if self.height is not None: a.append(indent(":height: %s" % self.height, 8)) - if self.height is not None: - a.append(indent(":height: %s" % self.height, 8)) + if self.width is not None: + a.append(indent(":width: %s" % self.width, 8)) else: if self.caption: a.append("%s: %s" % (self.caption, self.image)) @@ -295,7 +322,7 @@ class ItemizedCommand(object): """ kwargs = self.kwargs.copy() - kwargs['name'] = self.name + # kwargs['name'] = self.name a = list() for item in self.items: @@ -334,6 +361,18 @@ class MultipleCommands(object): """An interface for handling command composition where multiple statements must be generated.""" def __init__(self, commands, comment=None, tags=None, **kwargs): + """Initialize multiple commands. + + :param commands: A list of comments. + :type commands: list[BaseType[scripttease.lib.commands.Command]] + + :param comment: A comment regarding the commands. + :type comment: str + + :param tags: A list of tags. + :type tags: list[str] + + """ self.commands = commands self.comment = comment or "run multiple commands" self.tags = tags or list() @@ -576,14 +615,17 @@ class Template(object): content = read_file(template) return content % self.context - try: - return parse_jinja_template(template, self.context) - except TemplateNotFound: - log.error("Template not found: %s" % template) - return None - except TemplateError as e: - log.error("Could not parse %s template: %s" % (template, e)) - return None + if self.parser == self.PARSER_JINJA: + try: + return parse_jinja_template(template, self.context) + except TemplateNotFound: + log.error("Template not found: %s" % template) + return None + except TemplateError as e: + log.error("Could not parse %s template: %s" % (template, e)) + return None + + return None # noinspection PyUnusedLocal def get_statement(self, cd=True, include_comment=True, include_register=True, include_stop=True): @@ -599,7 +641,7 @@ class Template(object): # Get the content; e.g. parse the template. content = self.get_content() if content is None: - lines.append("# NOT CONTENT AVAILABLE") + lines.append("# NO CONTENT AVAILABLE") return "\n".join(lines) # Templates that are bash scripts will fail to write because of the shebang. @@ -645,6 +687,8 @@ class Template(object): return "python" elif self.target.endswith(".sh"): return "bash" + elif self.target.endswith(".sql"): + return "sql" elif self.target.endswith(".yml"): return "yaml" else: diff --git a/scripttease/lib/commands/centos.py b/scripttease/lib/commands/centos.py index 89e4d05..bb3a384 100644 --- a/scripttease/lib/commands/centos.py +++ b/scripttease/lib/commands/centos.py @@ -1,7 +1,7 @@ # Imports from commonkit import split_csv -from .base import Command, Template +from .base import Command from .django import DJANGO_MAPPINGS from .messages import MESSAGE_MAPPINGS from .mysql import MYSQL_MAPPINGS diff --git a/scripttease/lib/commands/django.py b/scripttease/lib/commands/django.py index 80106a3..c909def 100644 --- a/scripttease/lib/commands/django.py +++ b/scripttease/lib/commands/django.py @@ -72,7 +72,7 @@ def django_dump(target, path=None, **kwargs): kwargs.setdefault("indent", 4) if path is None: - path = "../deploy/fixtures/%s.%s" % (target, kwargs['format']) + path = "../fixtures/%s.%s" % (target, kwargs['format']) return django("dumpdata", target, "> %s" % path, **kwargs) @@ -81,7 +81,7 @@ def django_load(target, path=None, **kwargs): kwargs.setdefault("comment", "load app/model data from %s" % target) input_format = kwargs.pop("format", "json") if path is None: - path = "../deploy/fixtures/%s.%s" % (target, input_format) + path = "../fixtures/%s.%s" % (target, input_format) return django("loaddata", path, **kwargs) @@ -101,7 +101,9 @@ DJANGO_MAPPINGS = { 'django': django, 'django.check': django_check, 'django.dump': django_dump, + 'django.dumpdata': django_dump, 'django.load': django_load, + 'django.loaddata': django_load, 'django.migrate': django_migrate, 'django.static': django_static, } diff --git a/scripttease/lib/commands/mysql.py b/scripttease/lib/commands/mysql.py index 0668131..8f2fe5e 100644 --- a/scripttease/lib/commands/mysql.py +++ b/scripttease/lib/commands/mysql.py @@ -9,8 +9,10 @@ from .base import Command __all__ = ( "MYSQL_MAPPINGS", "mysql_create", + "mysql_drop", "mysql_dump", "mysql_exists", + "mysql_grant", "mysql_load", "mysql_user", ) @@ -80,7 +82,6 @@ def mysql_drop(database, **kwargs): def mysql_dump(database, path=None, **kwargs): kwargs.setdefault("comment", "dump mysql database") - kwargs.setdefault("complete_inserts", True) if path is None: path = "%s.sql" % database @@ -135,7 +136,7 @@ def mysql_grant(to, database=None, privileges="ALL", **kwargs): def mysql_load(database, path, **kwargs): kwargs.setdefault("comment", "load data into a mysql database") - return mysql("psql", database, "< %s" % path, **kwargs) + return mysql("mysql", database, "< %s" % path, **kwargs) def mysql_user(name, admin_pass=None, admin_user="root", op="create", password=None, **kwargs): @@ -165,7 +166,7 @@ def mysql_user(name, admin_pass=None, admin_user="root", op="create", password=N return command elif op == "exists": kwargs.setdefault("comment", "determine if %s mysql user exists" % name) - kwargs.setdefault("register", "mysql_use_exists") + kwargs.setdefault("register", "mysql_user_exists") command = mysql("mysql", password=admin_pass, user=admin_user, **kwargs) diff --git a/scripttease/lib/commands/posix.py b/scripttease/lib/commands/posix.py index 84b1bf7..3182bd1 100644 --- a/scripttease/lib/commands/posix.py +++ b/scripttease/lib/commands/posix.py @@ -103,7 +103,7 @@ def copy(from_path, to_path, overwrite=False, recursive=False, **kwargs): return Command(" ".join(a), **kwargs) -def directory(path, group=None, mode=None, owner=None, recursive=True, **kwargs): +def directory(path, group=None, mode=None, owner=None, recursive=False, **kwargs): """Create a directory. - path (str): The path to be created. diff --git a/scripttease/lib/factories.py b/scripttease/lib/factories.py index 956d331..50894fb 100644 --- a/scripttease/lib/factories.py +++ b/scripttease/lib/factories.py @@ -33,9 +33,9 @@ def command_factory(loader, excluded_kwargs=None, mappings=None, profile=PROFILE """ # Identify the command mappings to be used. - if profile == "centos": + if profile == PROFILE.CENTOS: _mappings = CENTOS_MAPPINGS - elif profile == "ubuntu": + elif profile == PROFILE.UBUNTU: _mappings = UBUNTU_MAPPINGS else: log.error("Unsupported or unrecognized profile: %s" % profile) diff --git a/scripttease/lib/loaders/base.py b/scripttease/lib/loaders/base.py index 63f639a..f8e54ea 100644 --- a/scripttease/lib/loaders/base.py +++ b/scripttease/lib/loaders/base.py @@ -2,7 +2,7 @@ from commonkit import any_list_item, parse_jinja_string, parse_jinja_template, pick, read_file, smart_cast, split_csv, \ File -from configparser import ParsingError, RawConfigParser +from configparser import ParsingError, ConfigParser from jinja2.exceptions import TemplateError, TemplateNotFound import logging import os @@ -74,7 +74,7 @@ def load_variables(path, env=None): log.warning("Variables file does not exist: %s" % path) return list() - ini = RawConfigParser() + ini = ConfigParser() try: ini.read(path) except ParsingError as e: @@ -82,18 +82,18 @@ def load_variables(path, env=None): return list() variables = list() - for variable_name in ini.sections(): - if ":" in variable_name: - variable_name, _environment = variable_name.split(":") + for section in ini.sections(): + if ":" in section: + variable_name, _environment = section.split(":") else: _environment = None - variable_name = variable_name + variable_name = section kwargs = { 'environment': _environment, } _value = None - for key, value in ini.items(variable_name): + for key, value in ini.items(section): if key == "value": _value = smart_cast(value) continue @@ -212,7 +212,7 @@ class BaseLoader(File): be defined. - ``groups`` is assumed to be a CSV list of groups if provided as a string. - ``items`` is assumed to be a CSV list if provided as a string. These are used to create an "itemized" command. - - ``tags`` is assumed to be a CSV list oif provided as a string. + - ``tags`` is assumed to be a CSV list if provided as a string. All other keys are used as is. Values provided as a CSV list are smart cast to a Python value. diff --git a/scripttease/lib/loaders/ini.py b/scripttease/lib/loaders/ini.py index 2e0648e..b86d26d 100644 --- a/scripttease/lib/loaders/ini.py +++ b/scripttease/lib/loaders/ini.py @@ -48,8 +48,9 @@ class INILoader(BaseLoader): kwargs['comment'] = comment for key, value in ini.items(comment): - if key.startswith("_"): - continue + # Is there a reason for this? + # if key.startswith("_"): + # continue # The first key/value pair is the command name and arguments. if count == 0: @@ -63,8 +64,8 @@ class INILoader(BaseLoader): continue # Arguments surrounded by quotes are considered to be one argument. All others are split into a - # list to be passed to the parser. It is also possible that this is a call where no arguments are - # present, so the whole thing is wrapped to protect against an index error. A TypeError is raised in + # list to be passed to the command factory. It is also possible that this is a call where no + # arguments are present, so the whole thing is wrapped to protect against an index error. A TypeError is raised in # cases where a command is provided with no positional arguments; we interpret this as True. try: if value[0] == '"': @@ -73,9 +74,9 @@ class INILoader(BaseLoader): args = value.split(" ") except IndexError: pass - except TypeError: - # noinspection PyTypeChecker - args.append(True) + # except TypeError: + # noinspection PyTypeChecker + # args.append(True) else: _key, _value = self._get_key_value(key, value) kwargs[_key] = _value diff --git a/scripttease/lib/mappings.py b/scripttease/lib/mappings.py deleted file mode 100644 index 9025962..0000000 --- a/scripttease/lib/mappings.py +++ /dev/null @@ -1 +0,0 @@ -from .commands.posix import POSIX_MAPPINGS diff --git a/scripttease/library/__init__.py b/scripttease/library/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/scripttease/library/commands/__init__.py b/scripttease/library/commands/__init__.py deleted file mode 100644 index d4107d6..0000000 --- a/scripttease/library/commands/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .base import Command, ItemizedCommand -from .templates import Template -# from .factory import command_factory diff --git a/scripttease/library/commands/base.py b/scripttease/library/commands/base.py deleted file mode 100644 index 0abb978..0000000 --- a/scripttease/library/commands/base.py +++ /dev/null @@ -1,309 +0,0 @@ -# Classes - - -class Command(object): - """A command line statement.""" - - def __init__(self, statement, comment=None, condition=None, cd=None, environments=None, function=None, name=None, - prefix=None, register=None, shell=None, stop=False, sudo=None, tags=None, **kwargs): - """Initialize a command. - - :param statement: The statement to be executed. - :type statement: str - - :param comment: A comment regarding the statement. - :type comment: str - - :param condition: A (system-specific) condition for the statement to be executed. - :type condition: str - - :param cd: The direction from which the statement should be executed. - :type cd: str - - :param environments: A list of target environments where the statement should be executed. - :type environments: list[str] - - :param function: The name of the function in which the statement is executed. - :type function: str - - :param name: The name of the command from the mapping. Not used and not required for programmatic use, but - automatically assigned during factory instantiation. - :type name: str - - :param prefix: A statement to execute before the main statement is executed. - :type prefix: str - - :param register: A variable name to use for capture the success for failure of the statement's execution. - :type register: str - - :param shell: The shell execute through which the statement is executed. - :type shell: str - - :param stop: Indicates process should stop if the statement fails to execute. - :type stop: bool | None - - :param sudo: Indicates whether sudo should be invoked for the statement. Given as a bool or user name or - :py:class:`scripttease.library.commands.base.Sudo` instance. - :type sudo: bool | str | Sudo - - :param tags: A list of tags describing the statement. - :type tags: list[str] - - Additional kwargs are available as dynamic attributes of the Command instance. - - """ - self.comment = comment - self.condition = condition - self.cd = cd - self.environments = environments or list() - self.function = function - self.name = name - self.prefix = prefix - self.register = register - self.shell = shell - self.statement = statement - self.stop = stop - self.tags = tags or list() - - if isinstance(sudo, Sudo): - self.sudo = sudo - elif type(sudo) is str: - self.sudo = Sudo(enabled=True, user=sudo) - elif sudo is True: - self.sudo = Sudo(enabled=True) - else: - self.sudo = Sudo() - - self._attributes = kwargs - - def __getattr__(self, item): - return self._attributes.get(item) - - def __repr__(self): - if self.comment is not None: - return "<%s %s>" % (self.__class__.__name__, self.comment) - - return "<%s>" % self.__class__.__name__ - - def get_statement(self, cd=False, suppress_comment=False): - """Get the full statement. - - :param cd: Include the directory change, if given. - :type cd: bool - - :param suppress_comment: Don't include the comment. - :type suppress_comment: bool - - :rtype: str - - """ - a = list() - - if cd and self.cd is not None: - a.append("( cd %s &&" % self.cd) - - if self.prefix is not None: - a.append("%s &&" % self.prefix) - - if self.sudo: - statement = "%s %s" % (self.sudo, self._get_statement()) - else: - statement = self._get_statement() - - a.append("%s" % statement) - - if cd and self.cd is not None: - a.append(")") - - b = list() - if self.comment is not None and not suppress_comment: - b.append("# %s" % self.comment) - - if self.condition is not None: - b.append("if [[ %s ]]; then %s; fi;" % (self.condition, " ".join(a))) - else: - b.append(" ".join(a)) - - if self.register is not None: - b.append("%s=$?;" % self.register) - - if self.stop: - b.append("if [[ $%s -gt 0 ]]; exit 1; fi;" % self.register) - elif self.stop: - b.append("if [[ $? -gt 0 ]]; exit 1; fi;") - else: - pass - - return "\n".join(b) - - def has_attribute(self, name): - """Indicates whether the command has the named, dynamic attribute. - - :param name: The name of the attribute to be checked. - :type name: str - - :rtype: bool - - """ - return name in self._attributes - - @property - def is_itemized(self): - """Always returns ``False``.""" - return False - - def set_attribute(self, name, value): - """Set the value of a dynamic attribute. - - :param name: The name of the attribute. - :type name: str - - :param value: The value of the attribute. - - """ - self._attributes[name] = value - - def _get_statement(self): - """By default, get the statement passed upon command initialization. - - :rtype: str - - """ - return self.statement - - -class ItemizedCommand(object): - """An itemized command represents multiple commands of with the same statement but different parameters.""" - - def __init__(self, callback, items, *args, name=None, **kwargs): - """Initialize the command. - - :param callback: The function to be used to generate the command. - - :param items: The command arguments. - :type items: list[str] - - :param name: The name of the command from the mapping. Not used and not required for programmatic use, but - automatically assigned during factory instantiation. - :type name: str - - :param args: The itemized arguments. ``$item`` should be included. - - Keyword arguments are passed to the command class upon instantiation. - - """ - self.args = args - self.callback = callback - self.items = items - self.kwargs = kwargs - self.name = name - - # Set defaults for when ItemizedCommand is referenced directly before individual commands are instantiated. For - # example, when command filtering occurs. - self.kwargs.setdefault("environments", list()) - self.kwargs.setdefault("tags", list()) - - def __getattr__(self, item): - return self.kwargs.get(item) - - def __repr__(self): - return "<%s %s>" % (self.__class__.__name__, self.callback.__name__) - - def get_commands(self): - """Get the commands to be executed. - - :rtype: list[BaseType(Command)] - - """ - kwargs = self.kwargs.copy() - - a = list() - for item in self.items: - args = list() - for arg in self.args: - args.append(arg.replace("$item", item)) - - command = self.callback(*args, **kwargs) - a.append(command) - - return a - - def get_statement(self, cd=False, suppress_comment=False): - """Override to get multiple commands.""" - kwargs = self.kwargs.copy() - comment = kwargs.pop("comment", "execute multiple commands") - - a = list() - # a.append("# %s" % comment) - - commands = self.get_commands() - for c in commands: - a.append(c.get_statement(cd=cd, suppress_comment=suppress_comment)) - a.append("") - - # for item in self.items: - # args = list() - # for arg in self.args: - # args.append(arg.replace("$item", item)) - # - # command = self.command_class(*args, **kwargs) - # a.append(command.preview(cwd=cwd)) - # a.append("") - - return "\n".join(a) - - def has_attribute(self, name): - """Indicates whether the command has the named, dynamic attribute. - - :param name: The name of the attribute to be checked. - :type name: str - - :rtype: bool - - """ - return name in self.kwargs - - @property - def is_itemized(self): - """Always returns ``True``.""" - return True - - def set_attribute(self, name, value): - """Set the value of a dynamic attribute. - - :param name: The name of the attribute. - :type name: str - - :param value: The value of the attribute. - - .. note:: - This is applied to all command in the itemized list. - - """ - self.kwargs[name] = value - - -class Sudo(object): - """Helper class for defining sudo options.""" - - def __init__(self, enabled=False, user="root"): - """Initialize the helper. - - :param enabled: Indicates sudo is enabled. - :type enabled: bool - - :param user: The user to be invoked. - :type user: str - - """ - self.enabled = enabled - self.user = user - - def __bool__(self): - return self.enabled - - def __str__(self): - if self.enabled: - return "sudo -u %s" % self.user - - return "" diff --git a/scripttease/library/commands/templates.py b/scripttease/library/commands/templates.py deleted file mode 100644 index 01931ad..0000000 --- a/scripttease/library/commands/templates.py +++ /dev/null @@ -1,197 +0,0 @@ -# Imports - -from commonkit import parse_jinja_template, read_file -from jinja2.exceptions import TemplateError, TemplateNotFound -import logging -import os -from ...constants import LOGGER_NAME -from .base import Command - -log = logging.getLogger(LOGGER_NAME) - -# Exports - -__all__ = ( - "Template", -) - -# Classes - - -class Template(Command): - """Parse a template.""" - - PARSER_JINJA = "jinja2" - PARSER_SIMPLE = "simple" - - def __init__(self, source, target, backup=True, lines=False, parser=PARSER_JINJA, pythonic=False, **kwargs): - """Initialize the command. - - :param source: The template source file. - :type source: str - - :param target: The path to the output file. - :type target: str - - :param backup: Indicates a copy of an existing file should be madee. - :type backup: bool - - :param parser: The parser to use. - :type parser: str - - :param pythonic: Use a Python one-liner to write the file. Requires Python installation, obviously. This is - useful when the content of the file cannot be handled with a cat command; for example, shell - script templates. - :type pythonic: bool - - """ - # Base parameters need to be captured, because all others are assumed to be switches for the management command. - self._kwargs = { - 'comment': kwargs.pop("comment", None), - 'cd': kwargs.pop("cd", None), - 'environments': kwargs.pop("environments", None), - 'function': kwargs.pop("function", None), - # 'local': kwargs.pop("local", False), - 'name': "template", - 'prefix': kwargs.pop("prefix", None), - 'shell': kwargs.pop("shell", "/bin/bash"), - 'stop': kwargs.pop("stop", False), - 'sudo': kwargs.pop('sudo', False), - 'tags': kwargs.pop("tags", None), - } - - self.backup_enabled = backup - self.context = kwargs.pop("context", dict()) - self.parser = parser or self.PARSER_JINJA - self.pythonic = pythonic - self.line_by_line = lines - self.locations = kwargs.pop("locations", list()) - self.source = os.path.expanduser(source) - self.target = target - - # Remaining kwargs are added to the context. - # print(_kwargs['comment'], kwargs) - self.context.update(kwargs) - - super().__init__("# template: %s" % source, **self._kwargs) - - def get_content(self): - """Parse the template. - - :rtype: str | None - - """ - template = self.get_template() - - if self.parser == self.PARSER_SIMPLE: - content = read_file(template) - for key, value in self.context.items(): - replace = "$%s$" % key - content = content.replace(replace, str(value)) - - return content - - try: - return parse_jinja_template(template, self.context) - except TemplateNotFound: - log.error("Template not found: %s" % template) - return None - except TemplateError as e: - log.error("Could not parse %s template: %s" % (template, e)) - return None - - def get_statement(self, cd=False, suppress_comment=False): - """Override to get the statement based on the parser.""" - if self.parser == self.PARSER_JINJA: - return self._get_jinja2_statement(cd=cd).statement - elif self.parser == self.PARSER_SIMPLE: - return self._get_simple_statement(cd=cd).statement - else: - log.error("Unknown or unsupported template parser: %s" % self.parser) - return None - - def get_template(self): - """Get the template path. - - :rtype: str - - """ - source = self.source - for location in self.locations: - _source = os.path.join(location, self.source) - if os.path.exists(_source): - return _source - - return source - - def _get_command(self, content): - """Get the cat command.""" - output = list() - - # TODO: Template backup is not system safe, but is specific to bash. - if self.backup_enabled: - output.append('if [[ -f "%s" ]]; then mv %s %s.b; fi;' % (self.target, self.target, self.target)) - - if content.startswith("#!"): - _content = content.split("\n") - first_line = _content.pop(0) - output.append('echo "%s" > %s' % (first_line, self.target)) - output.append("cat >> %s << EOF" % self.target) - output.append("\n".join(_content)) - output.append("EOF") - else: - output.append("cat > %s << EOF" % self.target) - output.append(content) - output.append("EOF") - - statement = "\n".join(output) - - return Command(statement, **self._kwargs) - - # # BUG: This still does not seem to work, possibly because a shell script includes EOF? The work around is to use - # # get_content(), self.target, and write the file manually. - # if self.line_by_line: - # a = list() - # a.append('touch %s' % self.target) - # for i in content.split("\n"): - # i = i.replace('"', r'\"') - # a.append('echo "%s" >> %s' % (i, self.target)) - # - # output.append("\n".join(a)) - # elif self.pythonic: - # target_file = File(self.target) - # script_file = "write_%s_template.py" % target_file.name.replace("-", "_") - # - # a = list() - # a.append('content = """%s' % content) - # a.append('"""') - # a.append("") - # a.append('with open("%s", "w") as f:' % self.target) - # a.append(' f.write(content)') - # a.append(' f.close()') - # a.append('') - # output.append('cat > %s < %s << EOF" % self.target) - # output.append(content) - # output.append("EOF") - # - # statement = "\n".join(output) - # return Command(statement, **self._kwargs) - - # noinspection PyUnusedLocal - def _get_jinja2_statement(self, cd=False): - """Parse a Jinja2 template.""" - content = self.get_content() - return self._get_command(content) - - # noinspection PyUnusedLocal - def _get_simple_statement(self, cd=False): - """Parse a "simple" template.""" - content = self.get_content() - - return self._get_command(content) diff --git a/scripttease/library/overlays/__init__.py b/scripttease/library/overlays/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/scripttease/library/overlays/centos.py b/scripttease/library/overlays/centos.py deleted file mode 100644 index 50b21bc..0000000 --- a/scripttease/library/overlays/centos.py +++ /dev/null @@ -1,281 +0,0 @@ -# Imports - -from commonkit import split_csv -from ..commands import Command, Template -from .common import COMMON_MAPPINGS -from .django import DJANGO_MAPPINGS -from .mysql import MYSQL_MAPPINGS -from .pgsql import PGSQL_MAPPINGS -from .posix import POSIX_MAPPINGS, Function - -# Exports - -__all__ = ( - "MAPPINGS", - "apache", - "apache_reload", - "apache_restart", - "apache_start", - "apache_stop", - "apache_test", - "command_exists", - "service_reload", - "service_restart", - "service_start", - "service_stop", - "system", - "system_install", - "system_reboot", - "system_update", - "system_upgrade", - "system_uninstall", - "template", - "user", - "Function", -) - - -def command_exists(name): - """Indicates whether a given command exists in this overlay. - - :param name: The name of the command. - :type name: str - - :rtype: bool - - """ - return name in MAPPINGS - - -def apache(op, **kwargs): - """Execute an Apache-related command. - - - op (str): The operation to perform; reload, restart, start, stop, test. - - """ - if op == "reload": - return apache_reload(**kwargs) - elif op == "restart": - return apache_restart(**kwargs) - elif op == "start": - return apache_start(**kwargs) - elif op == "stop": - return apache_stop(**kwargs) - elif op == "test": - return apache_test(**kwargs) - else: - raise NameError("Unrecognized or unsupported apache operation: %s" % op) - - -def apache_reload(**kwargs): - kwargs.setdefault("comment", "reload apache") - kwargs.setdefault("register", "apache_reloaded") - - return Command("apachectl –k reload", **kwargs) - - -def apache_restart(**kwargs): - kwargs.setdefault("comment", "restart apache") - kwargs.setdefault("register", "apache_restarted") - - return Command("apachectl –k restart", **kwargs) - - -def apache_start(**kwargs): - kwargs.setdefault("comment", "start apache") - kwargs.setdefault("register", "apache_started") - - return Command("apachectl –k start", **kwargs) - - -def apache_stop(**kwargs): - kwargs.setdefault("comment", "stop apache") - - return Command("apachectl –k stop", **kwargs) - - -def apache_test(**kwargs): - kwargs.setdefault("comment", "check apache configuration") - kwargs.setdefault("register", "apache_checks_out") - - return Command("apachectl configtest", **kwargs) - - -def service_reload(name, **kwargs): - """Reload a service. - - - name (str): The service name. - - """ - kwargs.setdefault("comment", "reload %s service" % name) - kwargs.setdefault("register", "%s_reloaded" % name) - - return Command("systemctl reload %s" % name, **kwargs) - - -def service_restart(name, **kwargs): - """Restart a service. - - - name (str): The service name. - - """ - kwargs.setdefault("comment", "restart %s service" % name) - kwargs.setdefault("register", "%s_restarted" % name) - - return Command("ssystemctl restart %s" % name, **kwargs) - - -def service_start(name, **kwargs): - """Start a service. - - - name (str): The service name. - - """ - kwargs.setdefault("comment", "start %s service" % name) - kwargs.setdefault("register", "%s_started" % name) - - return Command("systemctl start %s" % name, **kwargs) - - -def service_stop(name, **kwargs): - """Stop a service. - - - name (str): The service name. - - """ - kwargs.setdefault("comment", "stop %s service" % name) - kwargs.setdefault("register", "%s_stopped" % name) - - return Command("systemctl stop %s" % name, **kwargs) - - -def system(op, **kwargs): - """Perform a system operation. - - - op (str): The operation to perform; reboot, update, upgrade. - - """ - if op == "reboot": - return system_reboot(**kwargs) - elif op == "update": - return system_update(**kwargs) - elif op == "upgrade": - return system_upgrade(**kwargs) - else: - raise NameError("Unrecognized or unsupported system operation: %s" % op) - - -def system_install(name, **kwargs): - """Install a system-level package. - - - name (str): The name of the package to install. - - """ - kwargs.setdefault("comment", "install system package %s" % name) - - return Command("yum install -y %s" % name, **kwargs) - - -def system_reboot(**kwargs): - kwargs.setdefault("comment", "reboot the system") - - return Command("reboot", **kwargs) - - -def system_uninstall(name, **kwargs): - """Uninstall a system-level package. - - - name (str): The name of the package to uninstall. - - """ - kwargs.setdefault("comment", "remove system package %s" % name) - - return Command("yum remove -y %s" % name, **kwargs) - - -def system_update(**kwargs): - kwargs.setdefault("comment", "update system package info") - - return Command("yum check-update", **kwargs) - - -def system_upgrade(**kwargs): - kwargs.setdefault("comment", "upgrade the system") - - return Command("yum update -y", **kwargs) - - -def template(source, target, backup=True, parser=None, **kwargs): - """Create a file from a template. - - - source (str): The path to the template file. - - target (str): The path to where the new file should be created. - - backup (bool): Indicates whether a backup should be made if the target file already exists. - - parser (str): The parser to use ``jinja`` (the default) or ``simple``. - - """ - return Template(source, target, backup=backup, parser=parser, **kwargs) - - -def user(name, groups=None, home=None, op="add", password=None, **kwargs): - """Create or remove a user. - - - name (str): The user name. - - groups (str | list): A list of groups to which the user should belong. - - home (str): The path to the user's home directory. - - op (str); The operation to perform; ``add`` or ``remove``. - - password (str): The user's password. (NOT IMPLEMENTED) - - """ - if op == "add": - kwargs.setdefault("comment", "create a user named %s" % name) - - commands = list() - - a = list() - a.append('adduser %s' % name) - if home is not None: - a.append("--home %s" % home) - - commands.append(Command(" ".join(a), **kwargs)) - - if type(groups) is str: - groups = split_csv(groups, smart=False) - - if type(groups) in [list, tuple]: - for group in groups: - commands.append(Command("gpasswd -a %s %s" % (name, group), **kwargs)) - - a = list() - for c in commands: - a.append(c.get_statement(suppress_comment=True)) - - return Command("\n".join(a), **kwargs) - elif op == "remove": - kwargs.setdefault("comment", "remove a user named %s" % name) - return Command("userdel -r %s" % name, **kwargs) - else: - raise NameError("Unsupported or unrecognized operation: %s" % op) - - -MAPPINGS = { - 'apache': apache, - 'install': system_install, - 'reboot': system_reboot, - 'reload': service_reload, - 'restart': service_restart, - 'start': service_start, - 'stop': service_stop, - 'system': system, - 'template': template, - 'update': system_update, - 'uninstall': system_uninstall, - 'upgrade': system_upgrade, - 'user': user, -} - -MAPPINGS.update(COMMON_MAPPINGS) -MAPPINGS.update(DJANGO_MAPPINGS) -MAPPINGS.update(MYSQL_MAPPINGS) -MAPPINGS.update(PGSQL_MAPPINGS) -MAPPINGS.update(POSIX_MAPPINGS) diff --git a/scripttease/library/overlays/common.py b/scripttease/library/overlays/common.py deleted file mode 100644 index 7e3a90d..0000000 --- a/scripttease/library/overlays/common.py +++ /dev/null @@ -1,150 +0,0 @@ -# Imports - -from ..commands import Command - -# Exports - -__all__ = ( - "COMMON_MAPPINGS", - "python_pip", - "python_virtualenv", - "run", - "slack", - "twist", - "udf", -) - -# Functions - - -def python_pip(name, op="install", upgrade=False, venv=None, version=3, **kwargs): - """Use pip to install or uninstall a Python package. - - - name (str): The name of the package. - - op (str): The operation to perform; install, uninstall - - upgrade (bool): Upgrade an installed package. - - venv (str): The name of the virtual environment to load. - - version (int): The Python version to use, e.g. ``2`` or ``3``. - - """ - manager = "pip" - if version == 3: - manager = "pip3" - - if upgrade: - statement = "%s install --upgrade %s" % (manager, name) - else: - statement = "%s %s %s" % (manager, op, name) - - if venv is not None: - kwargs['prefix'] = "source %s/bin/activate" % venv - - kwargs.setdefault("comment", "%s %s" % (op, name)) - - return Command(statement, **kwargs) - - -def python_virtualenv(name, **kwargs): - """Create a Python virtual environment. - - - name (str): The name of the environment to create. - - """ - kwargs.setdefault("comment", "create %s virtual environment" % name) - - return Command("virtualenv %s" % name, **kwargs) - - -def run(statement, **kwargs): - """Run any statement. - - - statement (str): The statement to be executed. - - """ - kwargs.setdefault("comment", "run statement") - return Command(statement, **kwargs) - - -def slack(message, url=None, **kwargs): - """Send a message to Slack. - - - message (str): The message to be sent. - - url (str): The webhook URL. This is required. See documentation. - - """ - if url is None: - raise ValueError("A url is required to use the slack command.") - - kwargs.setdefault("comment", "send a message to slack") - - a = list() - - a.append("curl -X POST -H 'Content-type: application/json' --data") - a.append("'" + '{"text": "%s"}' % message + "'") - a.append(url) - - return Command(" ".join(a), **kwargs) - - -def twist(message, title="Notice", url=None, **kwargs): - """Send a message to Twist. - - - message (str): The message to be sent. - - title (str): The message title. - - url (str): The webhook URL. This is required. See documentation. - - """ - if url is None: - raise ValueError("A url is required to use the twist command.") - - kwargs.setdefault("comment", "send a message to twist") - - a = list() - - # curl -X POST -H 'Content-type: application/json' --data '{"content": "This is the message.", "title": "Message Title"}' "https://twist.com/api/v3/integration_incoming/post_data?install_id=116240&install_token=116240_bfb05bde51ecd0f728b4b161bee6fcee" - a.append("curl -X POST -H 'Content-type: application/json' --data") - - data = '{"content": "%s", "title": "%s"}' % (message, title) - a.append("'%s'" % data) - a.append('"%s"' % url) - - return Command(" ".join(a), **kwargs) - - -def udf(name, default=None, example=None, label=None, **kwargs): - """Create a UDF prompt for a StackScript. - - - name (str): The name of the variable. - - default: The default value. - - example: An example value, instead of a default. - - label (str): The label for the variable. - - """ - kwargs.setdefault("prompt for %s in stackscript" % name) - - label = label or name.replace("_", " ").title() - - a = ['# ") - - return Command(" ".join(a), **kwargs) - - -# Mappings - -COMMON_MAPPINGS = { - 'pip': python_pip, - 'run': run, - 'slack': slack, - 'twist': twist, - 'udf': udf, - 'virtualenv': python_virtualenv, -} diff --git a/scripttease/library/overlays/django.py b/scripttease/library/overlays/django.py deleted file mode 100644 index c45afd5..0000000 --- a/scripttease/library/overlays/django.py +++ /dev/null @@ -1,188 +0,0 @@ -# Imports - -import os -from ..commands import Command - -# Exports - -__all__ = ( - "DJANGO_MAPPINGS", - "django", - "django_check", - "django_collect_static", - "django_dumpdata", - "django_loaddata", - "django_migrate", -) - -# Functions - - -def _django(name, *args, venv=None, **kwargs): - """Process a django-based command. - - :param name: The name of the management command. - :type name: str - - :param venv: The virtual environment to use. - :type venv: str - - args and kwargs are used to instantiate the command instance. - - This exists because we need ``django()`` to serve as an interface for any management command. - - """ - if venv is not None: - kwargs['prefix'] = "source %s/bin/activate" % venv - - kwargs.setdefault("comment", "run %s django management command" % name) - - # Base parameters need to be captured, because all others are assumed to be switches for the management command. - _kwargs = { - 'comment': kwargs.pop("comment", None), - 'condition': kwargs.pop("condition", None), - 'cd': kwargs.pop("cd", None), - 'environments': kwargs.pop("environments", None), - 'function': kwargs.pop("function", None), - # 'local': kwargs.pop("local", False), - 'prefix': kwargs.pop("prefix", None), - 'register': kwargs.pop("register", None), - 'shell': kwargs.pop("shell", "/bin/bash"), - 'stop': kwargs.pop("stop", False), - 'sudo': kwargs.pop('sudo', False), - 'tags': kwargs.pop("tags", None), - } - - statement = list() - statement.append("./manage.py %s" % name) - - # Remaining kwargs are assumed to be switches. - for key, value in kwargs.items(): - key = key.replace("_", "-") - if type(value) is bool: - if value is True: - statement.append("--%s" % key) - else: - statement.append("--%s=%s" % (key, value)) - - if len(args) > 0: - statement.append(" ".join(args)) - - return Command(" ".join(statement), **_kwargs) - - -def django(name, *args, venv=None, **kwargs): - """Run any Django management command. - - - name (str): The name of the management command. - - venv (str): The of the virtual environment to use. - - args are passed as positional arguments, while kwargs are given as switches. - - """ - if name == "check": - return django_check(venv=venv, **kwargs) - elif name in ("collectstatic", "static"): - return django_collect_static(venv=venv, **kwargs) - elif name == "migrate": - return django_migrate(venv=venv, **kwargs) - else: - return _django(name, *args, venv=venv, **kwargs) - - -def django_check(venv=None, **kwargs): - """Run the Django check command. - - - venv (str): The of the virtual environment to use. - - """ - kwargs.setdefault("comment", "run django checks") - kwargs.setdefault("register", "django_checks_out") - - return _django("check", venv=venv, **kwargs) - - -def django_collect_static(venv=None, **kwargs): - """Collect static files. - - - venv (str): The of the virtual environment to use. - - """ - kwargs.setdefault("comment", "collect static files") - - return _django("collectstatic", venv=venv, **kwargs) - - -def django_dumpdata(app_name, base_path="local", file_name="initial", indent=4, natural_foreign=False, - natural_primary=False, path=None, venv=None, **kwargs): - """Dump data from the database. - - - app_name (str): The name (app label) of the app. ``app_label.ModelName`` may also be given. - - base_path (str): The path under which apps are located in source. - - file_name (str): The file name to which the data will be dumped. - - indent (int): Indentation of the exported fixtures. - - natural_foreign (bool): Use the natural foreign parameter. - - natural_primary (bool): Use the natural primary parameter. - - path (str): The path to the data file. - - venv (str): The of the virtual environment to use. - - """ - kwargs.setdefault("comment", "export fixtures for %s" % app_name) - - output_format = kwargs.pop("format", "json") - - _path = path or os.path.join(base_path, app_name, "fixtures", "%s.%s" % (file_name, output_format)) - - return _django( - "dumpdata", - app_name, - "> %s" % _path, - format=output_format, - indent=indent, - natural_foreign=natural_foreign, - natural_primary=natural_primary, - venv=venv, - **kwargs - ) - - -def django_loaddata(app_name, base_path="local", file_name="initial", path=None, venv=None, **kwargs): - """Load data into the database. - - - app_name (str): The name (app label) of the app. ``app_label.ModelName`` may also be given. - - base_path (str): The path under which apps are located in source. - - file_name (str): The file name to which the data will be dumped. - - path (str): The path to the data file. - - venv (str): The of the virtual environment to use. - - """ - kwargs.setdefault("comment", "load fixtures for %s" % app_name) - - output_format = kwargs.pop("format", "json") - - _path = path or os.path.join(base_path, app_name, "fixtures", "%s.%s" % (file_name, output_format)) - - return _django("loaddata", _path, venv=venv, **kwargs) - - -def django_migrate(venv=None, **kwargs): - """Apply database migrations. - - - venv (str): The of the virtual environment to use. - - """ - kwargs.setdefault("comment", "run django database migrations") - - return _django("migrate", venv=venv, **kwargs) - -# Mapping - - -DJANGO_MAPPINGS = { - 'django': django, - 'django.check': django_check, - 'django.collect_static': django_collect_static, - 'django.dumpdata': django_dumpdata, - 'django.loaddata': django_loaddata, - 'django.migrate': django_migrate, -} diff --git a/scripttease/library/overlays/mysql.py b/scripttease/library/overlays/mysql.py deleted file mode 100644 index 8461532..0000000 --- a/scripttease/library/overlays/mysql.py +++ /dev/null @@ -1,262 +0,0 @@ -# Imports - -from ..commands import Command - -# Exports - -__all__ = ( - "MYSQL_MAPPINGS", - "mysql_create", - "mysql_drop", - "mysql_dump", - "mysql_exec", - "mysql_exists", - "mysql_grant", - "mysql_user", -) - -# Functions - - -def _get_mysql_command(host="localhost", name="mysql", password=None, port=3306, user="root"): - a = list() - a.append("%s --user=%s" % (name, user)) - a.append("--host=%s" % host) - a.append("--port=%s" % port) - - if password: - a.append('--password="%s"' % password) - - return a - - -def mysql_create(database, host="localhost", owner=None, password=None, port=3306, user="root", **kwargs): - """Create a MySQL database. - - - database (str): The database name. - - host (str): The database host name or IP address. - - password (str): The password for the user with sufficient access privileges to execute the command. - - owner (str): The owner (user/role name) of the new database. - - port (int): The TCP port number of the MySQL service running on the host. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - kwargs.setdefault("comment", "create the %s mysql database" % database) - - # MySQL commands always run without sudo because the --user may be provided. - kwargs['sudo'] = False - - # Assemble the command. - a = _get_mysql_command(host=host, name="mysqladmin", password=password, port=port, user=user) - a.append("create %s" % database) - - if owner: - grant = mysql_grant( - owner, - database=database, - host=host, - password=password, - port=port, - user=user, - ) - a.append("&& %s" % grant.get_statement(suppress_comment=True)) - - return Command(" ".join(a), **kwargs) - - -def mysql_drop(database, host="localhost", password=None, port=3306, user="root", **kwargs): - """Drop (remove) a MySQL database. - - - database (str): The database name. - - host (str): The database host name or IP address. - - password (str): The password for the user with sufficient access privileges to execute the command. - - port (int): The TCP port number of the MySQL service running on the host. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - kwargs.setdefault("comment", "remove the %s mysql database" % database) - - # MySQL commands always run without sudo because the --user may be provided. - kwargs['sudo'] = False - - # Assemble the command. - a = _get_mysql_command(host=host, name="mysqladmin", password=password, port=port, user=user) - a.append("drop %s" % database) - - return Command(" ".join(a), **kwargs) - - -def mysql_dump(database, file_name=None, host="localhost", inserts=False, password=None, port=3306, user="root", - **kwargs): - """Dump (export) a MySQL database. - - - database (str): The database name. - - host (str): The database host name or IP address. - - password (str): The password for the user with sufficient access privileges to execute the command. - - port (int): The TCP port number of the MySQL service running on the host. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - kwargs.setdefault("comment", "dump the %s mysql database" % database) - - # MySQL commands always run without sudo because the --user may be provided. - # kwargs['sudo'] = False - - # Assemble the command. - a = _get_mysql_command(host=host, name="mysqldump", password=password, port=port, user=user) - - # if data_only: - # a.append("--no-create-info") - # elif schema_only: - # a.append("--no-data") - # else: - # pass - - if inserts: - a.append("--complete-inserts") - - a.append(database) - - _file_name = file_name or "%s.sql" % database - a.append("> %s" % _file_name) - - return Command(" ".join(a), **kwargs) - - -def mysql_exec(sql, database="default", host="localhost", password=None, port=3306, user="root", **kwargs): - """Execute a MySQL statement. - - - sql (str): The SQL to run. - - database (str): The name of the database. - - host (str): The host name. - - password (str): The password for the user with sufficient access privileges to execute the command. - - port (int): The TCP port number. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - kwargs['sudo'] = False - kwargs.setdefault("comment", "execute mysql statement") - - a = _get_mysql_command(host=host, password=password, port=port, user=user) - a.append('--execute="%s"' % sql) - a.append(database) - - return Command(" ".join(a), **kwargs) - - -def mysql_exists(database, host="localhost", password=None, port=3306, user="root", **kwargs): - """Determine if a MySQL database exists. - - - database (str): The database name. - - host (str): The database host name or IP address. - - password (str): The password for the user with sufficient access privileges to execute the command. - - port (int): The TCP port number of the MySQL service running on the host. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - kwargs.setdefault("comment", "determine if the %s mysql database exists" % database) - kwargs.setdefault("register", "mysql_database_exists") - - # MySQL commands always run without sudo because the --user may be provided. - kwargs['sudo'] = False - - # Assemble the command. - a = _get_mysql_command(host=host, password=password, port=port, user=user) - - sql = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '%s'" % database - a.append('--execute="%s"' % sql) - - return Command(" ".join(a), **kwargs) - - -def mysql_grant(to, database=None, host="localhost", password=None, port=3306, privileges="ALL", user="root", **kwargs): - """Grant privileges to a user. - - - to (str): The user name to which privileges are granted. - - database (str): The database name. - - host (str): The database host name or IP address. - - password (str): The password for the user with sufficient access privileges to execute the command. - - port (int): The TCP port number of the MySQL service running on the host. - - privileges (str): The privileges to be granted. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - kwargs.setdefault("comment", "grant mysql privileges to %s" % to) - - a = _get_mysql_command(host=host, password=password, port=port, user=user) - - # See https://dev.mysql.com/doc/refman/5.7/en/grant.html - _database = database or "*" - sql = "GRANT %(privileges)s ON %(database)s.* TO '%(user)s'@'%(host)s'" % { - 'database': _database, - 'host': host, - 'privileges': privileges, - 'user': to, - } - a.append('--execute="%s"' % sql) - - return Command(" ".join(a)) - - -def mysql_user(name, host="localhost", op="create", passwd=None, password=None, port=3306, user="root", **kwargs): - """Work with a MySQL user. - - - name (str): The user name. - - host (str): The host name. - - op (str): The operation to perform: ``create``, ``drop``, ``exists``. - - passwd (str): The password for a new user. - - password (str): The password for the user with sufficient access privileges to execute the command. - - port (int): The TCP port number. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - kwargs['sudo'] = False - if op == "create": - kwargs.setdefault("comment", "create %s mysql user" % name) - - a = _get_mysql_command(host=host, password=password, port=port, user=user) - sql = "CREATE USER IF NOT EXISTS '%(user)s'@'%(host)s'" % { - 'host': host, - 'user': name, - } - if passwd: - sql += " IDENTIFIED BY PASSWORD('%s')" % passwd - - a.append('--execute="%s"' % sql) - - return Command(" ".join(a), **kwargs) - elif op == "drop": - kwargs.setdefault("comment", "drop %s mysql user" % name) - - a = _get_mysql_command(host=host, password=password, port=port, user=user) - - sql = "DROP USER IF EXISTS '%(user)s'@'%(host)s'" % { - 'host': host, - 'user': name, - } - a.append('--execute="%s"' % sql) - - return Command(" ".join(a), **kwargs) - elif op == "exists": - kwargs.setdefault("comment", "determine if %s mysql user exists" % name) - kwargs.setdefault("register", "mysql_user_exists") - - a = _get_mysql_command(host=host, password=password, port=port, user=user) - - sql = "SELECT EXISTS(SELECT 1 FROM mysql.user WHERE user = '%s')" % name - a.append('--execute="%s"' % sql) - - return Command(" ".join(a), **kwargs) - else: - raise NameError("Unrecognized or unsupported MySQL user operation: %s" % op) - - -MYSQL_MAPPINGS = { - 'mysql.create': mysql_create, - 'mysql.drop': mysql_drop, - 'mysql.dump': mysql_dump, - 'mysql.exists': mysql_exists, - 'mysql.grant': mysql_grant, - 'mysql.sql': mysql_exec, - 'mysql.user': mysql_user, -} diff --git a/scripttease/library/overlays/pgsql.py b/scripttease/library/overlays/pgsql.py deleted file mode 100644 index d20d4b9..0000000 --- a/scripttease/library/overlays/pgsql.py +++ /dev/null @@ -1,226 +0,0 @@ -# Imports - -from ..commands import Command - -# Exports - -__all__ = ( - "PGSQL_MAPPINGS", - "pgsql_create", - "pgsql_drop", - "pgsql_dump", - "pgsql_exec", - "pgsql_exists", - "pgsql_user", -) - -# Functions - - -def _get_pgsql_command(name, host="localhost", password=None, port=5432, user="postgres"): - """Get a postgres-related command using commonly required parameters. - - :param name: The name of the command. - :type name: str - - :param host: The host name. - :type host: str - - :param password: The password to use. - :type password: str - - :param port: The TCP port number. - :type port: int - - :param user: The user name that will be used to execute the command. - - :rtype: list[str] - - """ - a = list() - - if password: - a.append('export PGPASSWORD="%s" &&' % password) - - a.append(name) - - a.append("--host=%s" % host) - a.append("--port=%s" % port) - a.append("--username=%s" % user) - - return a - - -def pgsql_create(database, admin_pass=None, admin_user="postgres", host="localhost", owner=None, port=5432, template=None, - **kwargs): - """Create a PostgreSQL database. - - - database (str): The database name. - - admin_pass (str): The password for the user with sufficient access privileges to execute the command. - - admin_user (str): The name of the user with sufficient access privileges to execute the command. - - host (str): The database host name or IP address. - - owner (str): The owner (user/role name) of the new database. - - port (int): The port number of the Postgres service running on the host. - - template (str): The database template name to use, if any. - - """ - _owner = owner or admin_user - - # Postgres commands always run without sudo because the -U may be provided. - kwargs['sudo'] = False - - # Assemble the command. - base = _get_pgsql_command("createdb", host=host, password=admin_pass, port=port) - base.append("--owner=%s" % _owner) - - if template is not None: - base.append("--template=%s" % template) - - base.append(database) - - return Command(" ".join(base), **kwargs) - - -def pgsql_drop(database, admin_pass=None, admin_user="postgres", host="localhost", port=5432, **kwargs): - """Remove a PostgreSQL database. - - - database (str): The database name. - - admin_pass (str): The password for the user with sufficient access privileges to execute the command. - - admin_user (str): The name of the user with sufficient access privileges to execute the command. - - host (str): The database host name or IP address. - - port (int): The port number of the Postgres service running on the host. - - """ - # Postgres commands always run without sudo because the -U may be provided. - kwargs['sudo'] = False - - # Assemble the command. - base = _get_pgsql_command("dropdb", host=host, password=admin_pass, port=port, user=admin_user) - base.append(database) - - return Command(" ".join(base), **kwargs) - - -def pgsql_dump(database, admin_pass=None, admin_user="postgres", file_name=None, host="localhost", port=5432, **kwargs): - """Export a Postgres database. - - - database (str): The database name. - - admin_pass (str): The password for the user with sufficient access privileges to execute the command. - - admin_user (str): The name of the user with sufficient access privileges to execute the command. - - file_name (str): The name/path of the export file. Defaults the database name plus ``.sql``. - - host (str): The database host name or IP address. - - port (int): The port number of the Postgres service running on the host. - - """ - _file_name = file_name or "%s.sql" % database - - # Postgres commands always run without sudo because the -U may be provided. - # kwargs['sudo'] = False - - # Assemble the command. - base = _get_pgsql_command("pg_dump", host=host, password=admin_pass, port=port, user=admin_user) - base.append("--column-inserts") - base.append("--file=%s" % _file_name) - base.append(database) - - return Command(" ".join(base), **kwargs) - - -def pgsql_exec(sql, database="template1", host="localhost", password=None, port=5432, user="postgres", **kwargs): - """Execute a psql command. - - - sql (str): The SQL to be executed. - - database (str): The database name. - - admin_pass (str): The password for the user with sufficient access privileges to execute the command. - - admin_user (str): The name of the user with sufficient access privileges to execute the command. - - host (str): The database host name or IP address. - - port (int): The port number of the Postgres service running on the host. - - """ - # Postgres commands always run without sudo because the -U may be provided. - kwargs['sudo'] = False - - # Assemble the command. - base = _get_pgsql_command("psql", host=host, password=password, port=port, user=user) - base.append("--dbname=%s" % database) - base.append('-c "%s"' % sql) - - return Command(" ".join(base), **kwargs) - - -def pgsql_exists(database, admin_pass=None, admin_user="postgres", host="localhost", port=5432, **kwargs): - """Determine if a Postgres database exists. - - - database (str): The database name. - - admin_pass (str): The password for the user with sufficient access privileges to execute the command. - - admin_user (str): The name of the user with sufficient access privileges to execute the command. - - host (str): The database host name or IP address. - - owner (str): The owner (user/role name) of the new database. - - port (int): The port number of the Postgres service running on the host. - - """ - # Postgres commands always run without sudo because the -U may be provided. - kwargs['sudo'] = False - kwargs.setdefault("register", "pgsql_db_exists") - - base = _get_pgsql_command("psql", host=host, password=admin_pass, port=port, user=admin_user) - base.append(r"-lqt | cut -d \| -f 1 | grep -qw %s" % database) - - return Command(" ".join(base), **kwargs) - - -def pgsql_user(name, admin_pass=None, admin_user="postgres", host="localhost", op="create", password=None, port=5432, **kwargs): - """Work with a PostgreSQL user. - - - name (str): The user name. - - host (str): The host name. - - op (str): The operation to perform: ``create``, ``drop``, ``exists``. - - passwd (str): The password for a new user. - - password (str): The password for the user with sufficient access privileges to execute the command. - - port (int): The TCP port number. - - user (str): The name of the user with sufficient access privileges to execute the command. - - """ - # Postgres commands always run without sudo because the -U may be provided. - kwargs['sudo'] = False - - if op == "create": - kwargs.setdefault("comment", "create %s postgres user" % name) - # Assemble the command. - base = _get_pgsql_command("createuser", host=host, password=admin_pass, port=port) - base.append("-DRS") - base.append(name) - - if password is not None: - base.append("&& psql -h %s -U %s" % (host, admin_user)) - base.append("-c \"ALTER USER %s WITH ENCRYPTED PASSWORD '%s';\"" % (name, password)) - - return Command(" ".join(base), **kwargs) - elif op == "drop": - kwargs.setdefault("comment", "drop %s postgres user" % name) - base = _get_pgsql_command("dropuser", host=host, password=admin_pass, port=port, user=admin_user) - base.append(name) - - return Command(" ".join(base), **kwargs) - elif op == "exists": - kwargs.setdefault("comment", "determine if %s postgres user exits" % name) - kwargs.setdefault("register", "pgsql_use_exists") - - base = _get_pgsql_command("psql", host=host, password=admin_pass, port=port, user=admin_user) - - sql = "SELECT 1 FROM pgsql_roles WHERE rolname='%s'" % name - base.append('-c "%s"' % sql) - - return Command(" ".join(base), **kwargs) - else: - raise NameError("Unrecognized or unsupported Postgres user operation: %s" % op) - - -PGSQL_MAPPINGS = { - 'pgsql.create': pgsql_create, - 'pgsql.drop': pgsql_drop, - 'pgsql.dump': pgsql_dump, - 'pgsql.exists': pgsql_exists, - 'pgsql.sql': pgsql_exec, - 'pgsql.user': pgsql_user, -} diff --git a/scripttease/library/overlays/posix.py b/scripttease/library/overlays/posix.py deleted file mode 100644 index dee494e..0000000 --- a/scripttease/library/overlays/posix.py +++ /dev/null @@ -1,758 +0,0 @@ -# Imports - -from commonkit import indent, split_csv -import os -from ..commands import Command - -# Exports - -__all__ = ( - "POSIX_MAPPINGS", - "archive", - "certbot", - "dialog", - "echo", - "extract", - "file_append", - "file_copy", - "file_write", - "mkdir", - "move", - "perms", - "prompt", - "remove", - "rename", - "rsync", - "scopy", - "sed", - "symlink", - "touch", - "wait", - "Function", - "Prompt", -) - -# Functions - - -def archive(from_path, absolute=False, exclude=None, file_name="archive.tgz", strip=None, to_path=".", view=False, - **kwargs): - """Create a file archive. - - - from_path (str): The path that should be archived. - - absolute (bool): Set to ``True`` to preserve the leading slash. - - exclude (str): A pattern to be excluded from the archive. - - strip (int): Remove the specified number of leading elements from the path. - - to_path (str): Where the archive should be created. This should *not* include the file name. - - view (bool): View the output of the command as it happens. - - """ - tokens = ["tar"] - switches = ["-cz"] - - if absolute: - switches.append("P") - - if view: - switches.append("v") - - tokens.append("".join(switches)) - - if exclude: - tokens.append("--exclude %s" % exclude) - - if strip: - tokens.append("--strip-components %s" % strip) - - to_path = os.path.join(to_path, file_name) - tokens.append('-f %s %s' % (to_path, from_path)) - - name = " ".join(tokens) - - return Command(name, **kwargs) - - -def certbot(domain_name, email=None, webroot=None, **kwargs): - """Get new SSL certificate from Let's Encrypt. - - - domain_name (str): The domain name for which the SSL certificate is requested. - - email (str): The email address of the requester sent to the certificate authority. Required. - - webroot (str): The directory where the challenge file will be created. - - """ - _email = email or os.environ.get("SCRIPTTEASE_CERTBOT_EMAIL", None) - _webroot = webroot or os.path.join("/var", "www", "domains", domain_name.replace(".", "_"), "www") - - if not _email: - raise ValueError("Email is required for certbot command.") - - template = "certbot certonly --agree-tos --email %(email)s -n --webroot -w %(webroot)s -d %(domain_name)s" - name = template % { - 'domain_name': domain_name, - 'email': _email, - 'webroot': _webroot, - } - - return Command(name, **kwargs) - - -def dialog(message, height=15, title="Message", width=100, **kwargs): - """Display a dialog message. - - - message (str): The message to be displayed. - - height (int): The height of the dialog. - - title (str): The title of the dialog. - - width (int): The width of the dialog. - - """ - kwargs.setdefault("comment", "display a dialog message") - - a = list() - a.append('dialog --clear --backtitle "%s"' % title) - a.append('--msgbox "%s" %s %s; clear;' % (message, height, width)) - - return Command(" ".join(a), **kwargs) - - -def echo(message, **kwargs): - """Echo a message. - - - message (str): The message to be printed to screen. - - """ - kwargs.setdefault("comment", "print message to screen") - - return Command('echo "%s"' % message, **kwargs) - - -def extract(from_path, absolute=False, exclude=None, strip=None, to_path=None, view=False, **kwargs): - """Extract a file archive. - - - from_path (str): The path that should be archived. - - absolute (bool): Set to ``True`` to preserve the leading slash. - - exclude (str): A pattern to be excluded from the archive. - - strip (int): Remove the specified number of leading elements from the path. - - to_path (str): Where the archive should be extracted. This should *not* include the file name. - - view (bool): View the output of the command as it happens. - - """ - _to_path = to_path or "./" - - tokens = ["tar"] - switches = ["-xz"] - - if absolute: - switches.append("P") - - if view: - switches.append("v") - - tokens.append("".join(switches)) - - if exclude: - tokens.append("--exclude %s" % exclude) - - if strip: - tokens.append("--strip-components %s" % strip) - - tokens.append('-f %s %s' % (from_path, _to_path)) - - name = " ".join(tokens) - - return Command(name, **kwargs) - - -def file_append(path, content=None, **kwargs): - """Append content to a file. - - - path (str): The path to the file. - - content (str): The content to be appended. - - """ - kwargs.setdefault("comment", "append to %s" % path) - - statement = 'echo "%s" >> %s' % (content or "", path) - - return Command(statement, **kwargs) - - -def file_copy(from_path, to_path, overwrite=False, recursive=False, **kwargs): - """Copy a file or directory. - - - from_path (str): The file or directory to be copied. - - to_path (str): The location to which the file or directory should be copied. - - overwrite (bool): Indicates files and directories should be overwritten if they exist. - - recursive (bool): Copy sub-directories. - - """ - kwargs.setdefault("comment", "copy %s to %s" % (from_path, to_path)) - - a = list() - a.append("cp") - - if not overwrite: - a.append("-n") - - if recursive: - a.append("-R") - - a.append(from_path) - a.append(to_path) - - return Command(" ".join(a), **kwargs) - - -def file_write(path, content=None, **kwargs): - """Write to a file. - - - path (str): The file to be written. - - content (str): The content to be written. Note: If omitted, this command is equivalent to ``touch``. - - """ - _content = content or "" - - kwargs.setdefault("comment", "write to %s" % path) - - a = list() - - if len(_content.split("\n")) > 1: - a.append("cat > %s << EOF" % path) - a.append(_content) - a.append("EOF") - else: - a.append('echo "%s" > %s' % (_content, path)) - - return Command(" ".join(a), **kwargs) - - -def mkdir(path, mode=None, recursive=True, **kwargs): - """Create a directory. - - - path (str): The path to be created. - - mode (int | str): The access permissions of the new directory. - - recursive (bool): Create all directories along the path. - - """ - kwargs.setdefault("comment", "create directory %s" % path) - - statement = ["mkdir"] - if mode is not None: - statement.append("-m %s" % mode) - - if recursive: - statement.append("-p") - - statement.append(path) - - return Command(" ".join(statement), **kwargs) - - -def move(from_path, to_path, **kwargs): - """Move a file or directory. - - - from_path (str): The current path. - - to_path (str): The new path. - - """ - kwargs.setdefault("comment", "move %s to %s" % (from_path, to_path)) - statement = "mv %s %s" % (from_path, to_path) - - return Command(statement, **kwargs) - - -def perms(path, group=None, mode=None, owner=None, recursive=False, **kwargs): - """Set permissions on a file or directory. - - - path (str): The path to be changed. - - group (str): The name of the group to be applied. - - mode (int | str): The access permissions of the file or directory. - - owner (str): The name of the user to be applied. - - recursive: Create all directories along the path. - - """ - commands = list() - - kwargs['comment'] = "set permissions on %s" % path - - if group is not None: - statement = ["chgrp"] - - if recursive: - statement.append("-R") - - statement.append(group) - statement.append(path) - - commands.append(Command(" ".join(statement), **kwargs)) - - if owner is not None: - statement = ["chown"] - - if recursive: - statement.append("-R") - - statement.append(owner) - statement.append(path) - - commands.append(Command(" ".join(statement), **kwargs)) - - if mode is not None: - statement = ["chmod"] - - if recursive: - statement.append("-R") - - statement.append(str(mode)) - statement.append(path) - - commands.append(Command(" ".join(statement), **kwargs)) - - kwargs.setdefault("comment", "set permissions on %s" % path) - - a = list() - for c in commands: - a.append(c.get_statement(suppress_comment=True)) - - return Command("\n".join(a), **kwargs) - - -def prompt(name, back_title="Input", choices=None, default=None, fancy=False, help_text=None, label=None, **kwargs): - """Prompt the user for input. - - - name (str): The programmatic name of the input. - - back_title (str): The back title used with the dialog command. - - choices (str | list): A list of valid choices. - - default: The default value. - - fancy (bool): Use a dialog command for the prompt. - - help_text (str): The text to display with the dialog command. - - label (str): The label for the input. - - """ - return Prompt( - name, - back_title=back_title, - choices=choices, - default=default, - fancy=fancy, - help_text=help_text, - label=label, - **kwargs - ) - - -def remove(path, force=False, recursive=False, **kwargs): - """Remove a file or directory. - - - path (str): The path to be removed. - - force (bool): Force the removal. - - recursive (bool): Remove all directories along the path. - - """ - kwargs.setdefault("comment", "remove %s" % path) - - statement = ["rm"] - - if force: - statement.append("-f") - - if recursive: - statement.append("-r") - - statement.append(path) - - return Command(" ".join(statement), **kwargs) - - -def rename(from_name, to_name, **kwargs): - """Rename a file or directory. - - - from_name (str): The name (or path) of the existing file. - - to_name (str): The name (or path) of the new file. - - """ - kwargs.setdefault("comment", "rename %s" % from_name) - return move(from_name, to_name, **kwargs) - - -def rsync(source, target, delete=False, exclude=None, host=None, key_file=None, links=True, port=22, - recursive=True, user=None, **kwargs): - """Synchronize a directory structure. - - - source (str): The source directory. - - target (str): The target directory. - - delete (bool): Indicates target files that exist in source but not in target should be removed. - - exclude (str): The path to an exclude file. - - host (str): The host name or IP address. This causes the command to run over SSH. - - key_file (str): The privacy SSH key (path) for remote connections. User expansion is automatically applied. - - links (bool): Include symlinks in the sync. - - port (int): The SSH port to use for remote connections. - - recursive (bool): Indicates source contents should be recursively synchronized. - - user (str): The user name to use for remote connections. - - """ - # - guess: When ``True``, the ``host``, ``key_file``, and ``user`` will be guessed based on the base name of - # the source path. - # :type guess: bool - # if guess: - # host = host or os.path.basename(source).replace("_", ".") - # key_file = key_file or os.path.expanduser(os.path.join("~/.ssh", os.path.basename(source))) - # user = user or os.path.basename(source) - # else: - # host = host - # key_file = key_file - # user = user - - kwargs.setdefault("comment", "sync %s with %s" % (source, target)) - - # rsync -e "ssh -i $(SSH_KEY) -p $(SSH_PORT)" -P -rvzc --delete - # $(OUTPUTH_PATH) $(SSH_USER)@$(SSH_HOST):$(UPLOAD_PATH) --cvs-exclude; - - tokens = list() - tokens.append("rsync") - tokens.append("--cvs-exclude") - tokens.append("--checksum") - tokens.append("--compress") - - if links: - tokens.append("--copy-links") - - if delete: - tokens.append("--delete") - - if exclude is not None: - tokens.append("--exclude-from=%s" % exclude) - - # --partial and --progress - tokens.append("-P") - - if recursive: - tokens.append("--recursive") - - tokens.append(source) - - conditions = [ - host is not None, - key_file is not None, - user is not None, - ] - if all(conditions): - tokens.append('-e "ssh -i %s -p %s"' % (key_file, port)) - tokens.append("%s@%s:%s" % (user, host, target)) - else: - tokens.append(target) - - statement = " ".join(tokens) - - return Command(statement, **kwargs) - - -def scopy(from_path, to_path, host=None, key_file=None, port=22, user=None, **kwargs): - """Copy a file or directory to a remote server. - - - from_path (str): The source directory. - - to_path (str): The target directory. - - host (str): The host name or IP address. Required. - - key_file (str): The privacy SSH key (path) for remote connections. User expansion is automatically applied. - - port (int): The SSH port to use for remote connections. - - user (str): The user name to use for remote connections. - - """ - kwargs.setdefault("comment", "copy %s to remote %s" % (from_path, to_path)) - - # TODO: What to do to force local versus remote commands? - # kwargs['local'] = True - - kwargs['sudo'] = False - - statement = ["scp"] - - if key_file is not None: - statement.append("-i %s" % key_file) - - statement.append("-P %s" % port) - statement.append(from_path) - - if host is not None and user is not None: - statement.append("%s@%s:%s" % (user, host, to_path)) - elif host is not None: - statement.append("%s:%s" % (host, to_path)) - else: - raise ValueError("Host is a required keyword argument.") - - return Command(" ".join(statement), **kwargs) - - -def sed(path, backup=".b", delimiter="/", find=None, replace=None, **kwargs): - """Find and replace text in a file. - - - path (str): The path to the file to be edited. - - backup (str): The backup file extension to use. - - delimiter (str): The pattern delimiter. - - find (str): The old text. Required. - - replace (str): The new text. Required. - - """ - - kwargs.setdefault("comment", "find and replace in %s" % path) - - context = { - 'backup': backup, - 'delimiter': delimiter, - 'path': path, - 'pattern': find, - 'replace': replace, - } - - template = "sed -i %(backup)s 's%(delimiter)s%(pattern)s%(delimiter)s%(replace)s%(delimiter)sg' %(path)s" - - statement = template % context - - return Command(statement, **kwargs) - - -def symlink(source, force=False, target=None, **kwargs): - """Create a symlink. - - - source (str): The source of the link. - - force (bool): Force the creation of the link. - - target (str): The name or path of the target. Defaults to the base name of the source path. - - """ - _target = target or os.path.basename(source) - - kwargs.setdefault("comment", "link to %s" % source) - - statement = ["ln -s"] - - if force: - statement.append("-f") - - statement.append(source) - statement.append(_target) - - return Command(" ".join(statement), **kwargs) - - -def touch(path, **kwargs): - """Touch a file or directory. - - - path (str): The file or directory to touch. - - """ - kwargs.setdefault("comment", "touch %s" % path) - - return Command("touch %s" % path, **kwargs) - - -def wait(seconds, **kwargs): - """Pause execution for a number of seconds. - - - seconds (int): The number of seconds to wait. - - """ - kwargs.setdefault("comment", "pause for %s seconds" % seconds) - - return Command("sleep %s" % seconds, **kwargs) - -# Classes - - -class Function(object): - """A function that may be used to organize related commands to be called together.""" - - def __init__(self, name, commands=None, comment=None): - """Initialize a function. - - :param name: The name of the function. - :type name: str - - :param commands: The command instances to be included in the function's output. - :type commands: list - - :param comment: A comment regarding the function. - :type comment: str - - """ - self.commands = commands or list() - self.comment = comment - self.name = name - - def to_string(self): - """Export the function as a string. - - :rtype: str - - """ - a = list() - - if self.comment is not None: - a.append("# %s" % self.comment) - - a.append("function %s()" % self.name) - a.append("{") - for command in self.commands: - a.append(indent(command.get_statement(cd=True))) - a.append("") - - a.append("}") - - return "\n".join(a) - - -class Prompt(Command): - """Prompt the user for input.""" - - def __init__(self, name, back_title="Input", choices=None, default=None, fancy=False, help_text=None, label=None, - **kwargs): - """Initialize a prompt for user input. - - :param name: The variable name. - :type name: str - - :param back_title: The back title of the input. Used only when ``dialog`` is enabled. - :type back_title: str - - :param choices: Valid choices for the variable. May be given as a list of strings or a comma separated string. - :type choices: list[str] | str - - :param default: The default value of the variable. - - :param fancy: Indicates the dialog command should be used. - :type fancy: bool - - :param help_text: Additional text to display. Only use when ``fancy`` is ``True``. - :type help_text: str - - :param label: The label of the prompt. - - """ - self.back_title = back_title - self.default = default - self.dialog_enabled = fancy - self.help_text = help_text - self.label = label or name.replace("_", " ").title() - self.variable_name = name - - if type(choices) in (list, tuple): - self.choices = choices - elif type(choices) is str: - self.choices = split_csv(choices, smart=False) - # for i in choices.split(","): - # self.choices.append(i.strip()) - else: - self.choices = None - - kwargs.setdefault("comment", "prompt user for %s input" % name) - - super().__init__(name, **kwargs) - - def get_statement(self, cd=False, suppress_comment=False): - """Get the statement using dialog or read.""" - if self.dialog_enabled: - return self._get_dialog_statement() - - return self._get_read_statement() - - def _get_dialog_statement(self): - """Get the dialog statement.""" - a = list() - - a.append('dialog --clear --backtitle "%s" --title "%s"' % (self.back_title, self.label)) - - if self.choices is not None: - a.append('--menu "%s" 15 40 %s' % (self.help_text or "Select", len(self.choices))) - count = 1 - for choice in self.choices: - a.append('"%s" %s' % (choice, count)) - count += 1 - - a.append('2>/tmp/input.txt') - else: - if self.help_text is not None: - a.append('--inputbox "%s"' % self.help_text) - else: - a.append('--inputbox ""') - - a.append('8 60 2>/tmp/input.txt') - - b = list() - - b.append('touch /tmp/input.txt') - b.append(" ".join(a)) - - b.append('%s=$( 0: - if not any_list_item(environments, command.environments): - continue - - if tags is not None: - if not any_list_item(tags, command.tags): - continue - - filtered.append(command) - - return filtered - - -def load_commands(path, filters=None, overlay="ubuntu", **kwargs): - """Load commands from a configuration file. - - :param path: The path to the configuration file. - :type path: str - - :param filters: Used to filter commands. - :type filters: dict - - :param overlay: The name of the command overlay to apply to generated commands. - :type overlay: str - - :rtype: list[scriptetease.library.commands.base.Command] | scriptetease.library.commands.base.ItemizedCommand] | - None - - :returns: A list of command instances or ``None`` if the configuration could not be loaded. - - kwargs are passed to the configuration class for instantiation. - - """ - _config = load_config(path, overlay, **kwargs) - if _config is None: - return None - - commands = _config.get_commands() - - if filters is not None: - criteria = dict() - for attribute, values in filters.items(): - criteria[attribute] = values - - commands = filter_commands(commands, **criteria) - - return commands - - -def load_config(path, overlay="ubuntu", **kwargs): - """Load a command configuration. - - :param path: The path to the configuration file. - :type path: str - - :param overlay: The name of the command overlay to apply to generated commands. - :type overlay: str - - :rtype: Config | None - - kwargs are passed to the configuration class for instantiation. - - """ - if path.endswith(".ini"): - _config = Config(path, overlay=overlay, **kwargs) - # elif path.endswith(".yml"): - # _config = YAML(path, **kwargs) - else: - log.warning("Input file format is not currently supported: %s" % path) - return None - - if not _config.load(): - log.error("Failed to load config file: %s" % path) - return None - - return _config - - -def load_variables(path, environment=None): - """Load variables from a file. - - :param path: The path to the file. - :type path: str - - :param environment: Filter variables by the given environment name. - :type environment: str - - :rtype: list[scripttease.parsers.utils.Variable] - - """ - if not os.path.exists(path): - log.warning("Path to variables file does not exist: %s" % path) - return list() - - if path.endswith(".ini"): - return _load_variables_ini(path, environment=environment) - else: - log.warning("Variable file format is not currently supports: %s" % path) - return list() - - -def _load_variables_ini(path, environment=None): - """Load variables from an INI file. See ``load_variables()``.""" - - ini = RawConfigParser() - ini.read(path) - - a = list() - for section in ini.sections(): - if ":" in section: - variable_name, _environment = section.split(":") - else: - _environment = None - variable_name = section - - _kwargs = { - 'environment': _environment, - } - for key, value in ini.items(section): - if key == "tags": - value = split_csv(value) - else: - value = smart_cast(value) - - _kwargs[key] = value - - a.append(Variable(variable_name, **_kwargs)) - - if environment is not None: - b = list() - for var in a: - if var.environment and var.environment == environment or var.environment is None: - b.append(var) - - return b - - return a - -# Classes - - -class Context(object): - """A collection of variables.""" - - def __init__(self, **kwargs): - """Initialize the context. - - kwargs are added as variable instances. - - """ - self.variables = dict() - - for key, value in kwargs.items(): - self.add(key, value) - - def __getattr__(self, item): - if item in self.variables: - return self.variables[item].value - - return None - - def __repr__(self): - return "<%s (%s)>" % (self.__class__.__name__, len(self.variables)) - - def add(self, name, value, environment=None, tags=None): - """Add a variable to the context. - - :param name: The name of the variable. - :type name: str - - :param value: The value of the variable in this context. - - :param environment: The environment name to which the variable applies. ``None`` applies to all environments. - :type environment: str - - :param tags: A list of tags that describe the variable. - :type tags: list[str] - - :rtype: scripttease.parsers.utils.Variable - - :raise: RuntimeError - :raises: ``RuntimeError`` if the variable already exists. - - """ - if name in self.variables: - raise RuntimeError("Variable already exists: %s" % name) - - v = Variable(name, value, environment=environment, tags=tags) - self.variables[name] = v - - return v - - def get(self, name, default=None): - """Get a the value of the variable from the context. - - :param name: The name of the variable. - :type name: str - - :param default: The default value to return. - - """ - if not self.has(name): - return default - - return self.variables[name].value - - def has(self, name): - """Indicates whether the named variable exists in this context, and whether the value is not ``None``. - - :rtype: bool - - """ - if name not in self.variables: - return False - - return self.variables[name].value is not None - - def join(self, variables): - """Join a list of variables to the context. - - :param variables: the list of variables to be added. - :type variables: list[scripttease.parsers.utils.Variable] - - .. note:: - This *replaces* a variable if it already exists. - - """ - for v in variables: - self.variables[v.name] = v - - def mapping(self): - """Export the context as a dictionary. - - :rtype: dict - - """ - values = dict() - for key, var in self.variables.items(): - values[key] = var.value or var.default - - return values - - def merge(self, context): - """Merge another context with this one. - - :param context: The context to be merged. - :type context: scripttease.parser.utils.Context - - .. note:: - Variables that exist in the current context are *not* replaced with variables from the provided context. - - """ - for name, var in context.variables.items(): - if not self.has(name): - self.variables[name] = var - - -class Variable(object): - """Represents a variable to be used in the context of pre-processing a config file.""" - - def __init__(self, name, value, **kwargs): - """Initialize a variable. - - :param name: The variable name. - :type name: str - - :param value: The value of the variable. - - kwargs are added as attributes of the instance. - - """ - self.name = name - self.value = value - - kwargs.setdefault("tags", list()) - self._attributes = kwargs - - def __eq__(self, other): - return self.value == other - - def __getattr__(self, item): - return self._attributes.get(item) - - def __repr__(self): - return "<%s %s>" % (self.__class__.__name__, self.name) diff --git a/scripttease/parsers/yaml.py b/scripttease/parsers/yaml.py deleted file mode 100644 index e69de29..0000000 diff --git a/scripttease/version.py b/scripttease/version.py index 4044acd..5dd6c3f 100644 --- a/scripttease/version.py +++ b/scripttease/version.py @@ -1,5 +1,5 @@ -DATE = "2022-09-24" -VERSION = "6.8.33" -MAJOR = 6 -MINOR = 8 -PATCH = 33 +DATE = "2023-03-30" +VERSION = "7.0.0" +MAJOR = 7 +MINOR = 0 +PATCH = 0 diff --git a/setup.py b/setup.py index 5c18cf6..89d4032 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def read_file(path): setup( name='python-scripttease', - version=read_file("PEP440.txt"), + version=read_file("VERSION.txt"), description=read_file("DESCRIPTION.txt"), long_description=read_file("README.markdown"), long_description_content_type="text/markdown", diff --git a/tests/examples/apache_examples.ini b/tests/examples/apache_examples.ini index d41ff25..85909e2 100644 --- a/tests/examples/apache_examples.ini +++ b/tests/examples/apache_examples.ini @@ -1,17 +1,5 @@ [disable the default site] -apache = disable -site = default - -[enable SSL] -apache = enable -mod = ssl - -[original syntax disable the default site] apache.disable_site = default -[original syntax enable SSL] +[enable SSL] apache.enable_module = ssl - -[atlernative syntax disable the default site] -apache = site default -state = disabled diff --git a/tests/examples/bad_custom_example.ini b/tests/examples/bad_custom_example.ini new file mode 100644 index 0000000..920682e --- /dev/null +++ b/tests/examples/bad_custom_example.ini @@ -0,0 +1,9 @@ +[disable the default site] +apache.disable_site = default + +[enable SSL] +apache.enable_module = ssl + +; see test_lib_factories +[this will raise a type error because bad_custom_command doesn't accept kwargs] +bad_custom = path/to/display diff --git a/tests/examples/bad_variables.ini b/tests/examples/bad_variables.ini new file mode 100644 index 0000000..c7a2d8d --- /dev/null +++ b/tests/examples/bad_variables.ini @@ -0,0 +1,15 @@ +[domain_name +value = example.com + +[domain_tld] +value = example_com + +[debug_enabled:testing] +value = True + +[postgres_version] +value = 11 +tags = postgres + +[mailgun_domain:live] +value = mg.example.com diff --git a/tests/examples/custom_example.ini b/tests/examples/custom_example.ini new file mode 100644 index 0000000..fbed891 --- /dev/null +++ b/tests/examples/custom_example.ini @@ -0,0 +1,9 @@ +[disable the default site] +apache.disable_site = default + +[enable SSL] +apache.enable_module = ssl + +; see test_lib_factories +[this requires that a mapping be passed to the command factory] +custom = path/to/display diff --git a/tests/examples/python_examples.ini b/tests/examples/python_examples.ini index a35a966..ce09fef 100644 --- a/tests/examples/python_examples.ini +++ b/tests/examples/python_examples.ini @@ -1,3 +1,6 @@ +[here's how to install the virtualenv package] +explain = Installing virtualenv is easy. + [install the virtualenv package] pip = virtualenv tags = python-support @@ -7,6 +10,9 @@ virtualenv = python cd = /path/to/project tags = python-support +[a nice soft pillow] +screenshot = static/images/pillow.jpg + [install pillow] pip = Pillow cd = /path/to/project diff --git a/tests/examples/templates/settings.py b/tests/examples/templates/settings.py new file mode 100644 index 0000000..aa98923 --- /dev/null +++ b/tests/examples/templates/settings.py @@ -0,0 +1,3 @@ +TESTING = %(testing)s +TOTAL_TIMES = %(times)s + diff --git a/tests/examples/variables.ini b/tests/examples/variables.ini index 9fc7291..2f247c9 100644 --- a/tests/examples/variables.ini +++ b/tests/examples/variables.ini @@ -12,4 +12,4 @@ value = 11 tags = postgres [mailgun_domain:live] -value = "mg.example.com +value = mg.example.com diff --git a/tests/test_factory.py b/tests/test_factory.py deleted file mode 100644 index b1f95a5..0000000 --- a/tests/test_factory.py +++ /dev/null @@ -1,45 +0,0 @@ -import pytest -from scripttease.library.commands import Command, ItemizedCommand -from scripttease.factory import Factory - - -class TestFactory(object): - - def test_get_command(self): - f = Factory("ubuntu") - with pytest.raises(RuntimeError): - f.get_command("testing") - - f = Factory("ubuntu") - f.load() - - # Non-existent command. - c = f.get_command("nonexistent") - assert c is None - - # A good command with itemized parameters. - c = f.get_command( - "pip", - "$item", - items=["Pillow", "psycopg2-binary", "django"] - ) - assert isinstance(c, ItemizedCommand) - - # A good, normal command. - c = f.get_command("pip", "django") - assert isinstance(c, Command) - - # Command exists, but given bad arguments. - c = f.get_command("pip") - assert c is None - - def test_load(self): - f = Factory("nonexistent") - assert f.load() is False - - f = Factory("ubuntu") - assert f.load() is True - - def test_repr(self): - f = Factory("centos") - assert repr(f) == "" diff --git a/tests/test_lib_commands_base.py b/tests/test_lib_commands_base.py new file mode 100644 index 0000000..b38066a --- /dev/null +++ b/tests/test_lib_commands_base.py @@ -0,0 +1,416 @@ +from scripttease.lib.commands.base import Command, Content, ItemizedCommand, MultipleCommands, Prompt, Sudo, Template +from scripttease.lib.commands.python import python_pip + + +class TestCommand(object): + + def test_getattr(self): + c = Command("ls -ls", extra=True) + assert c.extra is True + + def test_get_statement(self): + c = Command( + "ls -ls", + comment="kitchen sink", + condition="$last_command -eq 0", + cd="/path/to/project", + prefix="source python/bin/active", + register="list_success", + stop=True, + sudo="deploy" + ) + statement = c.get_statement(cd=True) + assert "( cd" in statement + assert "sudo" in statement + assert ")" in statement + assert "# kitchen sink" in statement + assert "if [[ $last_command" in statement + assert "list_success=$?" in statement + assert "if [[ $list_success" in statement + + c = Command( + "ls -ls", + stop=True + ) + statement = c.get_statement() + assert "if [[ $?" in statement + + def test_init(self): + c = Command("ls -ls", sudo=Sudo(user="deploy")) + assert isinstance(c.sudo, Sudo) + assert c.sudo.user == "deploy" + + c = Command("ls -ls", sudo="deploy") + assert isinstance(c.sudo, Sudo) + assert c.sudo.user == "deploy" + + c = Command("ls -ls", sudo=True) + assert isinstance(c.sudo, Sudo) + assert c.sudo.user == "root" + + c = Command("ls -ls") + assert isinstance(c.sudo, Sudo) + assert c.sudo.user == "root" + assert c.sudo.enabled is False + + def test_is_itemized(self): + c = Command("ls -ls") + assert c.is_itemized is False + + def test_repr(self): + c = Command("ls -ls", comment="listing") + assert repr(c) == "" + + c = Command("ls -ls") + assert repr(c) == "" + + +class TestContent(object): + + def test_getattr(self): + c = Content("testing", extra=True) + assert c.extra is True + + def test_get_image_output(self): + c = Content("screenshot", caption="this is a test", image="static/images/testing.png") + assert "# this is a test" in c.get_output("bash") + assert "# static/images/testing.png" in c.get_output("bash") + + c.css = "img right" + c.height = 100 + c.width = 150 + assert 'this is a test" in c.get_output("html") + assert "this is a test" in c.get_output("md") + assert "this is a test" in c.get_output("rst") + assert "this is a test" in c.get_output("plain") + + c.heading = "Test" + assert "# Test: this is a test" in c.get_output("bash") + assert "

Test

" in c.get_output("html") + assert "## Test" in c.get_output("md") + assert "Test" in c.get_output("rst") + assert "====" in c.get_output("rst") + assert "***** Test *****" in c.get_output("plain") + + def test_get_output(self): + c = Content("testing") + assert c.get_output("nonexistent") is None + + def test_is_itemized(self): + c = Content("testing") + assert c.is_itemized is False + + def test_repr(self): + c = Content("testing") + assert repr(c) == "" + + def test_get_statement(self): + c = Content("screenshot", caption="this is a test", image="static/images/testing.png") + assert "# this is a test" in c.get_statement() + assert "# static/images/testing.png" in c.get_statement() + + +class TestItemizedCommand(object): + + def test_getattr(self): + c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item", extra=True) + assert c.extra is True + + def test_get_commands(self): + c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item") + commands = c.get_commands() + for i in commands: + assert isinstance(i, Command) + + def test_get_statement(self): + c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item") + statement = c.get_statement() + assert "Pillow" in statement + assert "psycopg2-binary" in statement + assert "django" in statement + + def test_is_itemized(self): + c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item") + assert c.is_itemized is True + + def test_repr(self): + c = ItemizedCommand(python_pip, ["Pillow", "psycopg2-binary", "django"], "$item") + assert repr(c) == "" + + +class TestMultipleCommands(object): + + def test_getattr(self): + commands = [ + Command("ls -ls", name="run"), + Command("pip install django", name="pip"), + Command("touch /tmp/testing.txt", name="touch") + ] + multiple = MultipleCommands(commands, extra=True) + assert multiple.extra is True + + def test_iter(self): + commands = [ + Command("ls -ls", name="run"), + Command("pip install django", name="pip"), + Command("touch /tmp/testing.txt", name="touch") + ] + multiple = MultipleCommands(commands) + + count = 0 + for c in multiple: + count += 1 + + assert count == 3 + + def test_repr(self): + commands = [ + Command("ls -ls", name="run"), + Command("pip install django", name="pip"), + Command("touch /tmp/testing.txt", name="touch") + ] + multiple = MultipleCommands(commands) + assert repr(multiple) == "" + + def test_get_statement(self): + commands = [ + Command("ls -ls", name="run"), + Command("pip install django", name="pip"), + Command("touch /tmp/testing.txt", name="touch") + ] + multiple = MultipleCommands(commands) + statement = multiple.get_statement() + assert "ls -ls" in statement + assert "pip install django" in statement + assert "touch /tmp/testing.txt" in statement + + +class TestPrompt(object): + + def test_get_dialog_statement(self): + prompt = Prompt("testing", choices=["1", "2", "3"], default="1", dialog=True, help_text="This is a test.", label="Test") + statement = prompt.get_statement() + assert "--menu" in statement + assert "This is a test." in statement + + prompt = Prompt("testing", choices="1,2,3", default="1", dialog=True, help_text="This is a test.", label="Test") + statement = prompt.get_statement() + assert "--menu" in statement + + prompt = Prompt("testing", default="1", dialog=True, help_text="This is a test.", label="Test") + statement = prompt.get_statement() + assert "--inputbox" in statement + assert "This is a test." in statement + + prompt = Prompt("testing", default="1", dialog=True, label="Test") + statement = prompt.get_statement() + assert "--inputbox" in statement + + def test_get_read_statement(self): + prompt = Prompt("testing", choices="1,2,3", default="1", help_text="This is a test.", label="Test") + statement = prompt.get_statement() + assert "select opt in" in statement + + prompt = Prompt("testing", default="1", help_text="This is a test.", label="Test") + statement = prompt.get_statement() + assert "echo -n" in statement + + +class TestSudo(object): + + def test_bool(self): + s = Sudo() + assert bool(s) is False + + s = Sudo(True) + assert bool(s) is True + + def test_str(self): + s = Sudo() + assert str(s) == "" + + s = Sudo(True) + assert str(s) == "sudo -u root" + + +class TestTemplate(object): + + def test_get_content(self): + context = { + 'testing': "yes", + 'times': 123, + } + t = Template( + "tests/examples/templates/simple.txt", + "tests/tmp/simple.txt", + backup=False, + context=context, + parser=Template.PARSER_SIMPLE + ) + content = t.get_content() + assert "I am testing? yes" in content + assert "How many times? 123" in content + + context = { + 'testing': "yes", + 'times': 123, + } + t = Template( + "tests/examples/templates/simple.sh.txt", + "tests/tmp/simple.sh", + backup=False, + context=context, + parser=Template.PARSER_SIMPLE + ) + content = t.get_content() + assert "I am testing? yes" in content + assert "How many times? 123" in content + + context = { + 'testing': "yes", + 'times': 123, + } + t = Template( + "tests/examples/templates/good.j2.txt", + "tests/tmp/good.txt", + backup=False, + context=context + ) + content = t.get_content() + assert "I am testing? yes" in content + assert "How many times? 123" in content + + t = Template("tests/examples/templates/nonexistent.j2.txt", "test/tmp/nonexistent.txt") + assert t.get_content() is None + + t = Template("tests/examples/templates/bad.j2.txt", "test/tmp/nonexistent.txt") + assert t.get_content() is None + + context = { + 'testing': True, + 'times': 3, + } + t = Template("tests/examples/templates/settings.py", "test/tmp/settings.py", context=context, parser=Template.PARSER_PYTHON) + content = t.get_content() + assert "TESTING = True" in content + assert "TOTAL_TIMES = 3" in content + + def test_get_statement(self): + context = { + 'testing': "yes", + 'times': 123, + } + t = Template( + "tests/examples/templates/simple.txt", + "tests/tmp/simple.txt", + context=context, + comment="A simple parser example.", + parser=Template.PARSER_SIMPLE, + register="template_created", + stop=True, + sudo=Sudo(user="root") + ) + s = t.get_statement() + assert "I am testing? yes" in s + assert "How many times? 123" in s + + context = { + 'testing': "yes", + 'times': 123, + } + t = Template( + "tests/examples/templates/simple.sh.txt", + "tests/tmp/simple.txt", + context=context, + parser=Template.PARSER_SIMPLE, + stop=True, + sudo="root" + ) + s = t.get_statement() + assert "I am testing? yes" in s + assert "How many times? 123" in s + + context = { + 'testing': "yes", + 'times': 123, + } + t = Template( + "tests/examples/templates/good.j2.txt", + "tests/tmp/good.txt", + context=context, + sudo=True + ) + s = t.get_statement() + assert "I am testing? yes" in s + assert "How many times? 123" in s + + t = Template( + "tests/examples/templates/simple.txt", + "tests/tmp/simple.txt", + parser="nonexistent" + ) + assert "# NO CONTENT AVAILABLE" in t.get_statement() + + def test_repr(self): + t = Template("/path/to/template.conf", "/path/to/file.conf") + assert repr(t) == "