|
|
|
# Imports
|
|
|
|
|
|
|
|
from commonkit import any_list_item, smart_cast, split_csv
|
|
|
|
from configparser import RawConfigParser
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
from ..constants import LOGGER_NAME
|
|
|
|
from .ini import Config
|
|
|
|
|
|
|
|
log = logging.getLogger(LOGGER_NAME)
|
|
|
|
|
|
|
|
# Exports
|
|
|
|
|
|
|
|
__all__ = (
|
|
|
|
"filter_commands",
|
|
|
|
"load_commands",
|
|
|
|
"load_config",
|
|
|
|
"load_variables",
|
|
|
|
"Context",
|
|
|
|
"Variable",
|
|
|
|
)
|
|
|
|
|
|
|
|
# Functions
|
|
|
|
|
|
|
|
|
|
|
|
def filter_commands(commands, environments=None, tags=None):
|
|
|
|
"""Filter commands based on the given criteria.
|
|
|
|
|
|
|
|
:param commands: The commands to be filtered.
|
|
|
|
:type commands: list
|
|
|
|
|
|
|
|
:param environments: Environment names to be matched.
|
|
|
|
:type environments: list[str]
|
|
|
|
|
|
|
|
:param tags: Tag names to be matched.
|
|
|
|
:type tags: list[str]
|
|
|
|
|
|
|
|
"""
|
|
|
|
filtered = list()
|
|
|
|
for command in commands:
|
|
|
|
if environments is not None and len(command.environments) > 0:
|
|
|
|
if not any_list_item(environments, command.environments):
|
|
|
|
continue
|
|
|
|
|
|
|
|
if tags is not None:
|
|
|
|
if not any_list_item(tags, command.tags):
|
|
|
|
continue
|
|
|
|
|
|
|
|
filtered.append(command)
|
|
|
|
|
|
|
|
return filtered
|
|
|
|
|
|
|
|
|
|
|
|
def load_commands(path, filters=None, overlay="ubuntu", **kwargs):
|
|
|
|
"""Load commands from a configuration file.
|
|
|
|
|
|
|
|
:param path: The path to the configuration file.
|
|
|
|
:type path: str
|
|
|
|
|
|
|
|
:param filters: Used to filter commands.
|
|
|
|
:type filters: dict
|
|
|
|
|
|
|
|
:param overlay: The name of the command overlay to apply to generated commands.
|
|
|
|
:type overlay: str
|
|
|
|
|
|
|
|
:rtype: list[scriptetease.library.commands.base.Command] | scriptetease.library.commands.base.ItemizedCommand] |
|
|
|
|
None
|
|
|
|
|
|
|
|
:returns: A list of command instances or ``None`` if the configuration could not be loaded.
|
|
|
|
|
|
|
|
kwargs are passed to the configuration class for instantiation.
|
|
|
|
|
|
|
|
"""
|
|
|
|
_config = load_config(path, overlay, **kwargs)
|
|
|
|
if _config is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
commands = _config.get_commands()
|
|
|
|
|
|
|
|
if filters is not None:
|
|
|
|
criteria = dict()
|
|
|
|
for attribute, values in filters.items():
|
|
|
|
criteria[attribute] = values
|
|
|
|
|
|
|
|
commands = filter_commands(commands, **criteria)
|
|
|
|
|
|
|
|
return commands
|
|
|
|
|
|
|
|
|
|
|
|
def load_config(path, overlay="ubuntu", **kwargs):
|
|
|
|
"""Load a command configuration.
|
|
|
|
|
|
|
|
:param path: The path to the configuration file.
|
|
|
|
:type path: str
|
|
|
|
|
|
|
|
:param overlay: The name of the command overlay to apply to generated commands.
|
|
|
|
:type overlay: str
|
|
|
|
|
|
|
|
:rtype: Config | None
|
|
|
|
|
|
|
|
kwargs are passed to the configuration class for instantiation.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if path.endswith(".ini"):
|
|
|
|
_config = Config(path, overlay=overlay, **kwargs)
|
|
|
|
# elif path.endswith(".yml"):
|
|
|
|
# _config = YAML(path, **kwargs)
|
|
|
|
else:
|
|
|
|
log.warning("Input file format is not currently supported: %s" % path)
|
|
|
|
return None
|
|
|
|
|
|
|
|
if not _config.load():
|
|
|
|
log.error("Failed to load config file: %s" % path)
|
|
|
|
return None
|
|
|
|
|
|
|
|
return _config
|
|
|
|
|
|
|
|
|
|
|
|
def load_variables(path, environment=None):
|
|
|
|
"""Load variables from a file.
|
|
|
|
|
|
|
|
:param path: The path to the file.
|
|
|
|
:type path: str
|
|
|
|
|
|
|
|
:param environment: Filter variables by the given environment name.
|
|
|
|
:type environment: str
|
|
|
|
|
|
|
|
:rtype: list[scripttease.parsers.utils.Variable]
|
|
|
|
|
|
|
|
"""
|
|
|
|
if not os.path.exists(path):
|
|
|
|
log.warning("Path to variables file does not exist: %s" % path)
|
|
|
|
return list()
|
|
|
|
|
|
|
|
if path.endswith(".ini"):
|
|
|
|
return _load_variables_ini(path, environment=environment)
|
|
|
|
else:
|
|
|
|
log.warning("Variable file format is not currently supports: %s" % path)
|
|
|
|
return list()
|
|
|
|
|
|
|
|
|
|
|
|
def _load_variables_ini(path, environment=None):
|
|
|
|
"""Load variables from an INI file. See ``load_variables()``."""
|
|
|
|
|
|
|
|
ini = RawConfigParser()
|
|
|
|
ini.read(path)
|
|
|
|
|
|
|
|
a = list()
|
|
|
|
for section in ini.sections():
|
|
|
|
if ":" in section:
|
|
|
|
variable_name, _environment = section.split(":")
|
|
|
|
else:
|
|
|
|
_environment = None
|
|
|
|
variable_name = section
|
|
|
|
|
|
|
|
_kwargs = {
|
|
|
|
'environment': _environment,
|
|
|
|
}
|
|
|
|
for key, value in ini.items(section):
|
|
|
|
if key == "tags":
|
|
|
|
value = split_csv(value)
|
|
|
|
else:
|
|
|
|
value = smart_cast(value)
|
|
|
|
|
|
|
|
_kwargs[key] = value
|
|
|
|
|
|
|
|
a.append(Variable(variable_name, **_kwargs))
|
|
|
|
|
|
|
|
if environment is not None:
|
|
|
|
b = list()
|
|
|
|
for var in a:
|
|
|
|
if var.environment and var.environment == environment or var.environment is None:
|
|
|
|
b.append(var)
|
|
|
|
|
|
|
|
return b
|
|
|
|
|
|
|
|
return a
|
|
|
|
|
|
|
|
# Classes
|
|
|
|
|
|
|
|
|
|
|
|
class Context(object):
|
|
|
|
"""A collection of variables."""
|
|
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
"""Initialize the context.
|
|
|
|
|
|
|
|
kwargs are added as variable instances.
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.variables = dict()
|
|
|
|
|
|
|
|
for key, value in kwargs.items():
|
|
|
|
self.add(key, value)
|
|
|
|
|
|
|
|
def __getattr__(self, item):
|
|
|
|
if item in self.variables:
|
|
|
|
return self.variables[item].value
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "<%s (%s)>" % (self.__class__.__name__, len(self.variables))
|
|
|
|
|
|
|
|
def add(self, name, value, environment=None, tags=None):
|
|
|
|
"""Add a variable to the context.
|
|
|
|
|
|
|
|
:param name: The name of the variable.
|
|
|
|
:type name: str
|
|
|
|
|
|
|
|
:param value: The value of the variable in this context.
|
|
|
|
|
|
|
|
:param environment: The environment name to which the variable applies. ``None`` applies to all environments.
|
|
|
|
:type environment: str
|
|
|
|
|
|
|
|
:param tags: A list of tags that describe the variable.
|
|
|
|
:type tags: list[str]
|
|
|
|
|
|
|
|
:rtype: scripttease.parsers.utils.Variable
|
|
|
|
|
|
|
|
:raise: RuntimeError
|
|
|
|
:raises: ``RuntimeError`` if the variable already exists.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if name in self.variables:
|
|
|
|
raise RuntimeError("Variable already exists: %s" % name)
|
|
|
|
|
|
|
|
v = Variable(name, value, environment=environment, tags=tags)
|
|
|
|
self.variables[name] = v
|
|
|
|
|
|
|
|
return v
|
|
|
|
|
|
|
|
def get(self, name, default=None):
|
|
|
|
"""Get a the value of the variable from the context.
|
|
|
|
|
|
|
|
:param name: The name of the variable.
|
|
|
|
:type name: str
|
|
|
|
|
|
|
|
:param default: The default value to return.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if not self.has(name):
|
|
|
|
return default
|
|
|
|
|
|
|
|
return self.variables[name].value
|
|
|
|
|
|
|
|
def has(self, name):
|
|
|
|
"""Indicates whether the named variable exists in this context, and whether the value is not ``None``.
|
|
|
|
|
|
|
|
:rtype: bool
|
|
|
|
|
|
|
|
"""
|
|
|
|
if name not in self.variables:
|
|
|
|
return False
|
|
|
|
|
|
|
|
return self.variables[name].value is not None
|
|
|
|
|
|
|
|
def join(self, variables):
|
|
|
|
"""Join a list of variables to the context.
|
|
|
|
|
|
|
|
:param variables: the list of variables to be added.
|
|
|
|
:type variables: list[scripttease.parsers.utils.Variable]
|
|
|
|
|
|
|
|
.. note::
|
|
|
|
This *replaces* a variable if it already exists.
|
|
|
|
|
|
|
|
"""
|
|
|
|
for v in variables:
|
|
|
|
self.variables[v.name] = v
|
|
|
|
|
|
|
|
def mapping(self):
|
|
|
|
"""Export the context as a dictionary.
|
|
|
|
|
|
|
|
:rtype: dict
|
|
|
|
|
|
|
|
"""
|
|
|
|
values = dict()
|
|
|
|
for key, var in self.variables.items():
|
|
|
|
values[key] = var.value or var.default
|
|
|
|
|
|
|
|
return values
|
|
|
|
|
|
|
|
def merge(self, context):
|
|
|
|
"""Merge another context with this one.
|
|
|
|
|
|
|
|
:param context: The context to be merged.
|
|
|
|
:type context: scripttease.parser.utils.Context
|
|
|
|
|
|
|
|
.. note::
|
|
|
|
Variables that exist in the current context are *not* replaced with variables from the provided context.
|
|
|
|
|
|
|
|
"""
|
|
|
|
for name, var in context.variables.items():
|
|
|
|
if not self.has(name):
|
|
|
|
self.variables[name] = var
|
|
|
|
|
|
|
|
|
|
|
|
class Variable(object):
|
|
|
|
"""Represents a variable to be used in the context of pre-processing a config file."""
|
|
|
|
|
|
|
|
def __init__(self, name, value, **kwargs):
|
|
|
|
"""Initialize a variable.
|
|
|
|
|
|
|
|
:param name: The variable name.
|
|
|
|
:type name: str
|
|
|
|
|
|
|
|
:param value: The value of the variable.
|
|
|
|
|
|
|
|
kwargs are added as attributes of the instance.
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.name = name
|
|
|
|
self.value = value
|
|
|
|
|
|
|
|
kwargs.setdefault("tags", list())
|
|
|
|
self._attributes = kwargs
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
return self.value == other
|
|
|
|
|
|
|
|
def __getattr__(self, item):
|
|
|
|
return self._attributes.get(item)
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "<%s %s>" % (self.__class__.__name__, self.name)
|