A collection of classes and commands for automated command line scripting using Python.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
8.3 KiB

# Imports
from commonkit import any_list_item, smart_cast, split_csv
from configparser import RawConfigParser
import logging
import os
from ..constants import LOGGER_NAME
from .ini import Config
log = logging.getLogger(LOGGER_NAME)
# Exports
__all__ = (
"filter_commands",
"load_commands",
"load_config",
"load_variables",
"Context",
"Variable",
)
# Functions
def filter_commands(commands, environments=None, tags=None):
"""Filter commands based on the given criteria.
:param commands: The commands to be filtered.
:type commands: list
:param environments: Environment names to be matched.
:type environments: list[str]
:param tags: Tag names to be matched.
4 years ago
:type tags: list[str]
"""
filtered = list()
for command in commands:
if environments is not None and len(command.environments) > 0:
if not any_list_item(environments, command.environments):
continue
if tags is not None:
if not any_list_item(tags, command.tags):
continue
filtered.append(command)
return filtered
def load_commands(path, filters=None, overlay="ubuntu", **kwargs):
"""Load commands from a configuration file.
:param path: The path to the configuration file.
:type path: str
:param filters: Used to filter commands.
:type filters: dict
:param overlay: The name of the command overlay to apply to generated commands.
:type overlay: str
:rtype: list[scriptetease.library.commands.base.Command] | scriptetease.library.commands.base.ItemizedCommand] |
None
:returns: A list of command instances or ``None`` if the configuration could not be loaded.
kwargs are passed to the configuration class for instantiation.
"""
_config = load_config(path, overlay, **kwargs)
if _config is None:
return None
commands = _config.get_commands()
if filters is not None:
criteria = dict()
for attribute, values in filters.items():
criteria[attribute] = values
commands = filter_commands(commands, **criteria)
return commands
def load_config(path, overlay="ubuntu", **kwargs):
"""Load a command configuration.
:param path: The path to the configuration file.
:type path: str
:param overlay: The name of the command overlay to apply to generated commands.
:type overlay: str
:rtype: Config | None
kwargs are passed to the configuration class for instantiation.
"""
if path.endswith(".ini"):
_config = Config(path, overlay=overlay, **kwargs)
# elif path.endswith(".yml"):
# _config = YAML(path, **kwargs)
else:
log.warning("Input file format is not currently supported: %s" % path)
return None
if not _config.load():
log.error("Failed to load config file: %s" % path)
return None
return _config
def load_variables(path, environment=None):
"""Load variables from a file.
:param path: The path to the file.
:type path: str
:param environment: Filter variables by the given environment name.
:type environment: str
:rtype: list[scripttease.parsers.utils.Variable]
"""
if not os.path.exists(path):
log.warning("Path to variables file does not exist: %s" % path)
return list()
if path.endswith(".ini"):
return _load_variables_ini(path, environment=environment)
else:
log.warning("Variable file format is not currently supports: %s" % path)
return list()
def _load_variables_ini(path, environment=None):
"""Load variables from an INI file. See ``load_variables()``."""
ini = RawConfigParser()
ini.read(path)
a = list()
for section in ini.sections():
if ":" in section:
variable_name, _environment = section.split(":")
else:
_environment = None
variable_name = section
_kwargs = {
'environment': _environment,
}
for key, value in ini.items(section):
if key == "tags":
value = split_csv(value)
else:
value = smart_cast(value)
_kwargs[key] = value
a.append(Variable(variable_name, **_kwargs))
if environment is not None:
b = list()
for var in a:
if var.environment and var.environment == environment or var.environment is None:
b.append(var)
return b
return a
# Classes
class Context(object):
"""A collection of variables."""
def __init__(self, **kwargs):
"""Initialize the context.
kwargs are added as variable instances.
"""
self.variables = dict()
for key, value in kwargs.items():
self.add(key, value)
def __getattr__(self, item):
if item in self.variables:
return self.variables[item].value
return None
def __repr__(self):
return "<%s (%s)>" % (self.__class__.__name__, len(self.variables))
def add(self, name, value, environment=None, tags=None):
"""Add a variable to the context.
:param name: The name of the variable.
:type name: str
:param value: The value of the variable in this context.
:param environment: The environment name to which the variable applies. ``None`` applies to all environments.
:type environment: str
:param tags: A list of tags that describe the variable.
:type tags: list[str]
:rtype: scripttease.parsers.utils.Variable
:raise: RuntimeError
:raises: ``RuntimeError`` if the variable already exists.
"""
if name in self.variables:
raise RuntimeError("Variable already exists: %s" % name)
v = Variable(name, value, environment=environment, tags=tags)
self.variables[name] = v
return v
def get(self, name, default=None):
"""Get a the value of the variable from the context.
:param name: The name of the variable.
:type name: str
:param default: The default value to return.
"""
if not self.has(name):
return default
return self.variables[name].value
def has(self, name):
"""Indicates whether the named variable exists in this context, and whether the value is not ``None``.
:rtype: bool
"""
if name not in self.variables:
return False
return self.variables[name].value is not None
def join(self, variables):
"""Join a list of variables to the context.
:param variables: the list of variables to be added.
:type variables: list[scripttease.parsers.utils.Variable]
.. note::
This *replaces* a variable if it already exists.
"""
for v in variables:
self.variables[v.name] = v
def mapping(self):
"""Export the context as a dictionary.
:rtype: dict
"""
values = dict()
for key, var in self.variables.items():
values[key] = var.value or var.default
return values
def merge(self, context):
"""Merge another context with this one.
:param context: The context to be merged.
:type context: scripttease.parser.utils.Context
.. note::
Variables that exist in the current context are *not* replaced with variables from the provided context.
"""
for name, var in context.variables.items():
if not self.has(name):
self.variables[name] = var
class Variable(object):
"""Represents a variable to be used in the context of pre-processing a config file."""
def __init__(self, name, value, **kwargs):
"""Initialize a variable.
:param name: The variable name.
:type name: str
:param value: The value of the variable.
kwargs are added as attributes of the instance.
"""
self.name = name
self.value = value
kwargs.setdefault("tags", list())
self._attributes = kwargs
def __eq__(self, other):
return self.value == other
def __getattr__(self, item):
return self._attributes.get(item)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.name)