Worked on new approach to overlays and factory.

development
Shawn Davis 4 years ago
parent dea0ee119a
commit 6fec406d04
  1. 224
      scripttease/library/_overlays.py
  2. 52
      scripttease/library/commands/__init__.py
  3. 133
      scripttease/library/commands/apache.py
  4. 119
      scripttease/library/commands/factory.py
  5. 68
      scripttease/library/factory.py
  6. 97
      scripttease/library/overlays.py
  7. 0
      scripttease/library/overlays/__init__.py
  8. 21
      scripttease/library/overlays/common.py
  9. 177
      scripttease/library/overlays/posix.py
  10. 155
      scripttease/library/overlays/ubuntu.py
  11. 29
      tests/examples/apache_examples.ini
  12. 0
      tests/test_library_commands_factory.py

@ -0,0 +1,224 @@
# Imports
from configparser import RawConfigParser
import os
from superpython.utils import parse_jinja_string
from ..constants import PATH_TO_SCRIPT_TEASE
# Exports
__all__ = (
"Overlay",
)
# Classes
class Overlay(object):
"""An overlay applies commands specific to a given operating system or platform."""
def __init__(self, name):
self.is_loaded = False
self._name = name
self._path = os.path.join(PATH_TO_SCRIPT_TEASE, "data", "overlays", "%s.ini" % name)
self._sections = dict()
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self._name)
@property
def exists(self):
"""Indicates whether the overlay file exists.
:rtype: bool
"""
return os.path.exists(self._path)
def get(self, section, key, **kwargs):
"""Get the command statement for the given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
kwargs are used to parse the value of the key within the section.
:rtype: str | None
"""
if not self.has(section, key):
return None
template = self._sections[section][key]
return parse_jinja_string(template, kwargs)
def has(self, section, key):
"""Determine whether the overlay contains a given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
:rtype: bool
"""
if section not in self._sections:
return False
if key not in self._sections[section]:
return False
return True
def load(self):
"""Load the overlay.
:rtype: bool
"""
if not self.exists:
return False
ini = RawConfigParser()
ini.read(self._path)
for section in ini.sections():
self._sections[section] = dict()
for key, value in ini.items(section):
self._sections[section][key] = value
self.is_loaded = True
return True
def to_mapping(self):
"""Export the overlay as a dictionary with command names as values.
:rtype: dict
"""
d = dict()
for section in self._sections:
d[section] = list()
for command_name, statement in self._sections[section].items():
d[section].append(command_name)
class Overlay2(object):
"""An overlay applies commands specific to a given operating system or platform."""
def __init__(self, name):
self.is_loaded = False
self._name = name
self._path = os.path.join(PATH_TO_SCRIPT_TEASE, "data", "overlays", "%s.ini" % name)
self._sections = dict()
self.exists = os.path.exists(self._path)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self._name)
def command_exists(self, name):
"""Determine whether a given command exists.
:param name: The name of the command to check.
:type name: str
:rtype: bool
"""
section = None
if "." in name:
section, name = name.split(".")
if section is not None:
if section in self._sections:
return name in self._sections[section]
for section in self._sections.keys():
if name in self._sections[section]:
return True
return False
def get_statement(self, name, *args, **kwargs):
pass
def get(self, section, key, **kwargs):
"""Get the command statement for the given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
kwargs are used to parse the value of the key within the section.
:rtype: str | None
"""
if not self.has(section, key):
return None
template = self._sections[section][key]
return parse_jinja_string(template, kwargs)
def has(self, section, key):
"""Determine whether the overlay contains a given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
:rtype: bool
"""
if section not in self._sections:
return False
if key not in self._sections[section]:
return False
return True
def load(self):
"""Load the overlay.
:rtype: bool
"""
if not self.exists:
return False
ini = RawConfigParser()
ini.read(self._path)
for section in ini.sections():
self._sections[section] = dict()
for command_name, statement_template in ini.items(section):
self._sections[section][command_name] = statement_template
self.is_loaded = True
return True
def to_mapping(self):
"""Export the overlay as a dictionary with command names as values.
:rtype: dict
"""
d = dict()
for section in self._sections:
d[section] = list()
for command_name, statement in self._sections[section].items():
d[section].append(command_name)

@ -1,50 +1,2 @@
# Imports from .base import Command, ItemizedCommand
from .factory import command_factory
import logging
# from ..scripts import Function
from ...constants import LOGGER_NAME
from .base import ItemizedCommand
from .mappings import MAPPING
log = logging.getLogger(LOGGER_NAME)
# Functions
def command_exists(name):
"""Indicates whether the named command exists.
:param name: The name of the command to be checked.
:type name: str
:rtype: bool
"""
return name in MAPPING
def command_factory(name, comment, overlay, *args, **kwargs):
# if name in ("func", "function"):
# kwargs['comment'] = comment
# return Function(*args, **kwargs)
if not command_exists(name):
log.warning("No mapping for command: %s" % name)
return None
_args = list(args)
kwargs['comment'] = comment
kwargs['overlay'] = overlay
log.debug("%s: %s" % (comment, kwargs))
command_class = MAPPING[name]
try:
items = kwargs.pop("items", None)
if items is not None:
return ItemizedCommand(command_class, items, *_args, **kwargs)
return command_class(*_args, **kwargs)
except (KeyError, TypeError, ValueError) as e:
log.critical("Failed to load %s command: %s" % (name, e))
return None

@ -1,9 +1,134 @@
# Imports
import logging
from ...constants import LOGGER_NAME
from .base import Command
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"MAPPING",
"ConfigTest",
"DisableModule",
"DisableSite",
"EnableModule",
"EnableSite",
"Reload",
"Restart",
"Start",
"Stop",
)
# Classes # Classes
class Apache(object): class ConfigTest(Command):
"""Run an apache config test."""
def __init__(self, overlay=None, **kwargs):
"""There is no argument."""
if overlay is not None:
statement = overlay.get("apache", "test")
else:
statement = "apachectl configtest"
kwargs.setdefault('register', "apache_checks_out")
super().__init__(statement, **kwargs)
class DisableModule(Command):
"""Disable an Apache module."""
def __init__(self, module_name, overlay=None, **kwargs):
"""Initialize the command.
:param module_name: The module name.
:type module_name: str
"""
if overlay is not None:
statement = overlay.get("apache", "disable_module", module_name=module_name)
statement = "a2dismod %s" % module_name
super().__init__(statement, **kwargs)
class DisableSite(Command):
"""Disable a virtual host."""
def __init__(self, domain_name, **kwargs):
"""Initialize the command.
:param domain_name: The domain name.
:type domain_name: str
"""
statement = "a2dissite %s.conf" % domain_name
super().__init__(statement, **kwargs)
class Enable(Command):
def __init__(self, what, name, **kwargs):
if what in ("mod", "module"):
statement = EnableModule(name, **kwargs).statement
elif what == "site":
statement = EnableSite(name, **kwargs).statement
else:
raise ValueError("Invalid Apache item to be enabled: %s" % what)
super().__init__(statement, **kwargs)
class EnableModule(Command):
"""Enable an Apache module."""
def __init__(self, module_name, **kwargs):
"""Initialize the command.
:param module_name: The module name.
:type module_name: str
"""
statement = "a2enmod %s" % module_name
super().__init__(statement, **kwargs)
class EnableSite(Command):
"""Enable a virtual host."""
def __init__(self, domain_name, **kwargs):
"""Initialize the command.
:param domain_name: The domain name.
:type domain_name: str
"""
statement = "a2ensite %s.conf" % domain_name
super().__init__(statement, **kwargs)
def __init__(self, subcommand, *args, **kwargs):
if subcommand == "disable":
pass
MAPPING = {
# 'apache': Apache,
'apache.check': ConfigTest,
'apache.config': ConfigTest,
'apache.configtest': ConfigTest,
'apache.disable': Disable,
'apache.disable_mod': DisableModule,
'apache.disable_module': DisableModule,
'apache.disable_site': DisableSite,
'apache.enable': Enable,
'apache.enable_mod': EnableModule,
'apache.enable_module': EnableModule,
'apache.enable_site': EnableSite,
'apache.mod': EnableModule,
'apache.module': EnableModule,
'apache.test': ConfigTest,
}

@ -0,0 +1,119 @@
# Imports
from importlib import import_module
import logging
# from ..scripts import Function
from ...constants import LOGGER_NAME
# from .base import ItemizedCommand
# from .mappings import MAPPING
log = logging.getLogger(LOGGER_NAME)
# Functions
def command_factory(name, comment, overlay, *args, **kwargs):
# try:
# _overlay = import_module("scripttease.library.overlays.%s" % overlay)
# except ImportError as e:
# log.error("The %s overlay could not be imported: %s" % (overlay, str(e)))
# return None
if not overlay.command_exists(name):
log.warning("Command does not exist in %s overlay: %s" % (overlay.name, name))
return None
kwargs['comment'] = comment
callback = overlay.MAPPINGS[name]
return callback(*args, **kwargs)
'''
def command_exists(name):
"""Indicates whether the named command exists.
:param name: The name of the command to be checked.
:type name: str
:rtype: bool
"""
return name in MAPPING
def command_factory(name, comment, overlay, *args, **kwargs):
# if name in ("func", "function"):
# kwargs['comment'] = comment
# return Function(*args, **kwargs)
if not command_exists(name):
log.warning("No mapping for command: %s" % name)
return None
_args = list(args)
kwargs['comment'] = comment
kwargs['overlay'] = overlay
log.debug("%s: %s" % (comment, kwargs))
command_class = MAPPING[name]
try:
items = kwargs.pop("items", None)
if items is not None:
return ItemizedCommand(command_class, items, *_args, **kwargs)
return command_class(*_args, **kwargs)
except (KeyError, TypeError, ValueError) as e:
log.critical("Failed to load %s command: %s" % (name, e))
return None
'''
#
#
#
# MAPPINGS = {
# 'apache.disable_module': apache_disable_module,
# 'apache.disable_site': apache_disable_site,
# 'apache.enable_module': apache_enable_module,
# 'apache.enable_site': apache_enable_site,
# 'apache.reload': apache_reload,
# 'apache.restart': apache_restart,
# 'apache.start': apache_start,
# 'apache.stop': apache_stop,
# 'apache.test': apache_test,
# 'copy': file_copy,
# 'pip': python_pip,
# 'virtualenv': python_virtualenv,
# # 'python': ("pip", "virtualenv"),
# # 'apache': ("disable_module", "disable_site", "enable_module", "enable_site", "test"),
# }
def nother_command_exists(name):
return name in MAPPINGS
def other_command_exists(name, section=None):
if section is not None:
if section not in MAPPINGS:
return False
return name in MAPPINGS[section]
for _section, commands in MAPPINGS.items():
if name in commands:
return True
return False
def other_command_factory(name, comment, overlay, *args, **kwargs):
if not overlay.command_exists(name):
log.warning("The %s overlay does not have a mapping for command: %s" % (overlay, name))
return None
items = kwargs.pop("items", None)
if items is not None:
return ItemizedCommand

@ -0,0 +1,68 @@
# Imports
from importlib import import_module
# Exports
__all__ = (
"Factory",
)
# Classes
class Factory(object):
"""A command factory."""
def __init__(self, overlay):
"""Initialize the factory.
:param overlay: The name of the overlay to use for generating commands.
:type overlay: str
"""
self.is_loaded = False
self.overlay = None
self._overlay = overlay
def get_command(self, name, *args, **kwargs):
"""Get a command.
:param name: The name of the command.
:type name: str
args and kwargs are passed to the initialize the command.
:rtype: scripttease.library.commands.Command | scripttease.library.commands.ItemizedCommand
"""
if not self.overlay.command_exists(name):
# log.warning("Command does not exist in %s overlay: %s" % (overlay.name, name))
return None
callback = self.overlay.MAPPINGS[name]
try:
# items = kwargs.pop("items", None)
# if items is not None:
# return ItemizedCommand(callback, items, *args, **kwargs)
return callback(*args, **kwargs)
except (KeyError, TypeError, ValueError) as e:
# log.critical("Failed to load %s command: %s" % (name, e))
return None
def load(self):
"""Load the factory.
:rtype: bool
"""
try:
self.overlay = import_module("scripttease.library.overlays.%s" % self._overlay)
self.is_loaded = True
except ImportError as e:
# log.error("The %s overlay could not be imported: %s" % (overlay, str(e)))
pass
return self.is_loaded

@ -1,97 +0,0 @@
# Imports
from configparser import RawConfigParser
import os
from superpython.utils import parse_jinja_string
from ..constants import PATH_TO_SCRIPT_TEASE
# Exports
__all__ = (
"Overlay",
)
# Classes
class Overlay(object):
"""An overlay applies commands specific to a given operating system or platform."""
def __init__(self, name):
self.is_loaded = False
self._name = name
self._path = os.path.join(PATH_TO_SCRIPT_TEASE, "data", "overlays", "%s.ini" % name)
self._sections = dict()
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self._name)
@property
def exists(self):
"""Indicates whether the overlay file exists.
:rtype: bool
"""
return os.path.exists(self._path)
def get(self, section, key, **kwargs):
"""Get the command statement for the given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
kwargs are used to parse the value of the key within the section.
:rtype: str | None
"""
if not self.has(section, key):
return None
template = self._sections[section][key]
return parse_jinja_string(template, kwargs)
def has(self, section, key):
"""Determine whether the overlay contains a given section and key.
:param section: The section name.
:type section: str
:param key: The key within the section.
:type key: str
:rtype: bool
"""
if section not in self._sections:
return False
if key not in self._sections[section]:
return False
return True
def load(self):
"""Load the overlay.
:rtype: bool
"""
if not self.exists:
return False
ini = RawConfigParser()
ini.read(self._path)
for section in ini.sections():
self._sections[section] = dict()
for key, value in ini.items(section):
self._sections[section][key] = value
self.is_loaded = True
return True

@ -0,0 +1,21 @@
from ..commands import Command
def python_pip(name, op="install", upgrade=False, venv=None, **kwargs):
if upgrade:
statement = "pip install --upgrade -y %s" % name
else:
statement = "pip %s -y %s" % (op, name)
if venv is not None:
kwargs['prefix'] = "source %s/bin/activate" % venv
kwargs.setdefault("comment", "%s %s" % (op, name))
return Command(statement, **kwargs)
def python_virtualenv(name="python", **kwargs):
kwargs.setdefault("comment", "create %s virtual environment" % name)
return Command("virtualenv %s" % name, **kwargs)

@ -0,0 +1,177 @@
from ..commands import Command
def file_append(path, content=None, **kwargs):
"""Append content to a file.
:param path: The path to the file.
:type path: str
:param content: The content to be appended.
:type content: str
"""
print("HERE")
kwargs.setdefault("comment", "append to %s" % path)
statement = 'echo "%s" >> %s' % (content or "", path)
return Command(statement, **kwargs)
def file_copy(from_path, to_path, overwrite=False, recursive=False, **kwargs):
"""Initialize the command.
:param from_path: The file or directory to be copied.
:type from_path: str
:param to_path: The location to which the file or directory should be copied.
:type to_path: str
:param overwrite: Indicates files and directories should be overwritten if they exist.
:type overwrite: bool
:param recursive: Copy sub-directories.
:type recursive: bool
"""
kwargs.setdefault("comment", "copy %s to %s" % (from_path, to_path))
a = list()
a.append("cp")
if not overwrite:
a.append("-n")
if recursive:
a.append("-R")
a.append(from_path)
a.append(to_path)
return Command(" ".join(a), **kwargs)
def mkdir(path, mode=None, recursive=True, **kwargs):
"""Initialize the command.
:param path: The path to be created.
:type path: str
:param mode: The access permissions of the new directory.
:type mode: str
:param recursive: Create all directories along the path.
:type recursive: bool
"""
kwargs.setdefault("comment", "create directory %s" % path)
statement = ["mkdir"]
if mode is not None:
statement.append("-m %s" % mode)
if recursive:
statement.append("-p")
statement.append(path)
return Command(" ".join(statement), **kwargs)
def move(from_path, to_path, **kwargs):
kwargs.setdefault("comment", "move %s to %s" % (from_path, to_path))
statement = "mv %s %s" % (from_path, to_path)
return Command(statement, **kwargs)
def perms(path, group=None, mode=None, owner=None, recursive=False, **kwargs):
"""Initialize the command.
:param path: The path to be changed.
:type path: str
:param group: The name of the group to be applied.
:type group: str
:param mode: The access permissions of the file or directory.
:type mode: str
:param owner: The name of the user to be applied.
:type owner: str
:param recursive: Create all directories along the path.
:type recursive: bool
"""
commands = list()
if group is not None:
statement = ["chgrp"]
if recursive:
statement.append("-R")
statement.append(group)
statement.append(path)
commands.append(Command(" ".join(statement), **kwargs))
if owner is not None:
statement = ["chown"]
if recursive:
statement.append("-R")
statement.append(owner)
statement.append(path)
commands.append(Command(" ".join(statement), **kwargs))
if mode is not None:
statement = ["chmod"]
if recursive:
statement.append("-R")
statement.append(str(mode))
statement.append(path)
commands.append(Command(" ".join(statement), **kwargs))
kwargs.setdefault("comment", "set permissions on %s" % path)
a = list()
for c in commands:
a.append(c.get_statement())
return Command("\n".join(a), **kwargs)
def remove(path, force=False, recursive=False, **kwargs):
"""Initialize the command.
:param path: The path to be removed.
:type path: str
:param force: Force the removal.
:type force: bool
:param recursive: Remove all directories along the path.
:type recursive: bool
"""
kwargs.setdefault("comment", "remove %s" % path)
statement = ["rm"]
if force:
statement.append("-f")
if recursive:
statement.append("-r")
statement.append(path)
return Command(" ".join(statement), **kwargs)

@ -0,0 +1,155 @@
from ..commands import Command
from .common import python_pip, python_virtualenv
from .posix import file_append, file_copy, mkdir, move, perms, remove
name = "ubuntu"
def command_exists(name):
return name in MAPPINGS
def apache_disable_module(name, **kwargs):
kwargs.setdefault("comment", "disable %s apache module" % name)
return Command("a2dismod %s" % name, **kwargs)
def apache_disable_site(name, **kwargs):
kwargs.setdefault("comment", "disable %s apache site" % name)
return Command("a2dissite %s" % name, **kwargs)
def apache_enable_module(name, **kwargs):
kwargs.setdefault("comment", "enable %s apache module" % name)
return Command("a2enmod %s" % name, **kwargs)
def apache_enable_site(name, **kwargs):
kwargs.setdefault("comment", "enable %s apache module" % name)
return Command("a2densite %s" % name, **kwargs)
def apache_reload(**kwargs):
kwargs.setdefault("comment", "reload apache")
kwargs.setdefault("register", "apache_reloaded")
return Command("service apache2 reload", **kwargs)
def apache_restart(**kwargs):
kwargs.setdefault("comment", "restart apache")
kwargs.setdefault("register", "apache_restarted")
return Command("service apache2 restart", **kwargs)
def apache_start(**kwargs):
kwargs.setdefault("comment", "start apache")
kwargs.setdefault("register", "apache_started")
return Command("service apache2 start", **kwargs)
def apache_stop(**kwargs):
kwargs.setdefault("comment", "stop apache")
return Command("service apache2 stop", **kwargs)
def apache_test(**kwargs):
kwargs.setdefault("comment", "check apache configuration")
kwargs.setdefault("register", "apache_checks_out")
return Command("apachectl configtest", **kwargs)
def service_reload(name, **kwargs):
kwargs.setdefault("comment", "reload %s service" % name)
kwargs.setdefault("register", "%s_reloaded" % name)
return Command("service %s reload" % name, **kwargs)
def service_restart(name, **kwargs):
kwargs.setdefault("comment", "restart %s service" % name)
kwargs.setdefault("register", "%s_restarted" % name)
return Command("service %s reload" % name, **kwargs)
def service_start(name, **kwargs):
kwargs.setdefault("comment", "start %s service" % name)
kwargs.setdefault("register", "%s_started" % name)
return Command("service %s start" % name, **kwargs)
def service_stop(name, **kwargs):
kwargs.setdefault("comment", "stop %s service" % name)
kwargs.setdefault("register", "%s_stopped" % name)
return Command("service %s stop" % name, **kwargs)
def system_install(name, **kwargs):
kwargs.setdefault("comment", "install system package %s" % name)
return Command("apt-get install -y %s" % name, **kwargs)
def system_reboot(**kwargs):
kwargs.setdefault("comment", "reboot the system")
return Command("reboot", **kwargs)
def system_uninstall(name, **kwargs):
kwargs.setdefault("comment", "remove system package %s" % name)
return Command("apt-get uninstall -y %s" % name, **kwargs)
def system_update(**kwargs):
kwargs.setdefault("comment", "update system package info")
return Command("apt-get update -y", **kwargs)
def system_upgrade(**kwargs):
kwargs.setdefault("comment", "upgrade the system")
return Command("apt-get upgrade -y", **kwargs)
MAPPINGS = {
'apache.disable_module': apache_disable_module,
'apache.disable_site': apache_disable_site,
'apache.enable_module': apache_enable_module,
'apache.enable_site': apache_enable_site,
'apache.reload': apache_reload,
'apache.restart': apache_restart,
'apache.start': apache_start,
'apache.stop': apache_stop,
'apache.test': apache_test,
'append': file_append,
'copy': file_copy,
'install': system_install,
'mkdir': mkdir,
'move': move,
'perms': perms,
'pip': python_pip,
'reboot': system_reboot,
'reload': service_reload,
'remove': remove,
'restart': service_restart,
'start': service_start,
'stop': service_stop,
'update': system_update,
'uninstall': system_uninstall,
'upgrade': system_upgrade,
'virtualenv': python_virtualenv,
}

@ -0,0 +1,29 @@
[disable the default site]
apache = disable
site = default
[enable SSL]
apache = enable
mod = ssl
[original syntax disable the default site]
apache.disable_site = default
[original syntax enable SSL]
apache.enable_module = ssl
[atlernative syntax disable the default site]
apache = site default
state = disabled
[apache]
disable_module = a2dismod {{ module_name }}
disable_site = a2dissite {{ domain_name }}.conf
enable_module = a2enmod {{ module_name }}
enable_site = a2ensite {{ domain_name }}.conf
reload = service apache2 reload
restart = service apache2 restart
start = service apache2 start
stop = service apache2 stop
test = apachectl configtest
Loading…
Cancel
Save