Files
ultimate-ee/context/_build/scripts/introspect.py
root a54cdcc0ee
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 7s
test
2024-07-15 16:59:56 +02:00

401 lines
14 KiB
Python

import argparse
import logging
import os
import sys
import yaml
import requirements
import importlib.metadata
base_collections_path = '/usr/share/ansible/collections'
default_file = 'execution-environment.yml'
logger = logging.getLogger(__name__)
def line_is_empty(line):
return bool((not line.strip()) or line.startswith('#'))
def read_req_file(path):
"""Provide some minimal error and display handling for file reading"""
if not os.path.exists(path):
print('Expected requirements file not present at: {0}'.format(os.path.abspath(path)))
with open(path, 'r') as f:
return f.read()
def pip_file_data(path):
pip_content = read_req_file(path)
pip_lines = []
for line in pip_content.split('\n'):
if line_is_empty(line):
continue
if line.startswith('-r') or line.startswith('--requirement'):
_, new_filename = line.split(None, 1)
new_path = os.path.join(os.path.dirname(path or '.'), new_filename)
pip_lines.extend(pip_file_data(new_path))
else:
pip_lines.append(line)
return pip_lines
def bindep_file_data(path):
sys_content = read_req_file(path)
sys_lines = []
for line in sys_content.split('\n'):
if line_is_empty(line):
continue
sys_lines.append(line)
return sys_lines
def process_collection(path):
"""Return a tuple of (python_dependencies, system_dependencies) for the
collection install path given.
Both items returned are a list of dependencies.
:param str path: root directory of collection (this would contain galaxy.yml file)
"""
CD = CollectionDefinition(path)
py_file = CD.get_dependency('python')
pip_lines = []
if py_file:
pip_lines = pip_file_data(os.path.join(path, py_file))
sys_file = CD.get_dependency('system')
bindep_lines = []
if sys_file:
bindep_lines = bindep_file_data(os.path.join(path, sys_file))
return (pip_lines, bindep_lines)
def process(data_dir=base_collections_path, user_pip=None, user_bindep=None):
paths = []
path_root = os.path.join(data_dir, 'ansible_collections')
# build a list of all the valid collection paths
if os.path.exists(path_root):
for namespace in sorted(os.listdir(path_root)):
if not os.path.isdir(os.path.join(path_root, namespace)):
continue
for name in sorted(os.listdir(os.path.join(path_root, namespace))):
collection_dir = os.path.join(path_root, namespace, name)
if not os.path.isdir(collection_dir):
continue
files_list = os.listdir(collection_dir)
if 'galaxy.yml' in files_list or 'MANIFEST.json' in files_list:
paths.append(collection_dir)
# populate the requirements content
py_req = {}
sys_req = {}
for path in paths:
col_pip_lines, col_sys_lines = process_collection(path)
CD = CollectionDefinition(path)
namespace, name = CD.namespace_name()
key = '{}.{}'.format(namespace, name)
if col_pip_lines:
py_req[key] = col_pip_lines
if col_sys_lines:
sys_req[key] = col_sys_lines
# add on entries from user files, if they are given
if user_pip:
col_pip_lines = pip_file_data(user_pip)
if col_pip_lines:
py_req['user'] = col_pip_lines
if user_bindep:
col_sys_lines = bindep_file_data(user_bindep)
if col_sys_lines:
sys_req['user'] = col_sys_lines
return {
'python': py_req,
'system': sys_req
}
def has_content(candidate_file):
"""Beyond checking that the candidate exists, this also assures
that the file has something other than whitespace,
which can cause errors when given to pip.
"""
if not os.path.exists(candidate_file):
return False
with open(candidate_file, 'r') as f:
content = f.read()
return bool(content.strip().strip('\n'))
class CollectionDefinition:
"""This class represents the dependency metadata for a collection
should be replaced by logic to hit the Galaxy API if made available
"""
def __init__(self, collection_path):
self.reference_path = collection_path
meta_file = os.path.join(collection_path, 'meta', default_file)
if os.path.exists(meta_file):
with open(meta_file, 'r') as f:
self.raw = yaml.safe_load(f)
else:
self.raw = {'version': 1, 'dependencies': {}}
# Automatically infer requirements for collection
for entry, filename in [('python', 'requirements.txt'), ('system', 'bindep.txt')]:
candidate_file = os.path.join(collection_path, filename)
if has_content(candidate_file):
self.raw['dependencies'][entry] = filename
def target_dir(self):
namespace, name = self.namespace_name()
return os.path.join(
base_collections_path, 'ansible_collections',
namespace, name
)
def namespace_name(self):
"Returns 2-tuple of namespace and name"
path_parts = [p for p in self.reference_path.split(os.path.sep) if p]
return tuple(path_parts[-2:])
def get_dependency(self, entry):
"""A collection is only allowed to reference a file by a relative path
which is relative to the collection root
"""
req_file = self.raw.get('dependencies', {}).get(entry)
if req_file is None:
return None
elif os.path.isabs(req_file):
raise RuntimeError(
'Collections must specify relative paths for requirements files. '
'The file {0} specified by {1} violates this.'.format(
req_file, self.reference_path
)
)
return req_file
def simple_combine(reqs):
"""Given a dictionary of requirement lines keyed off collections,
return a list with the most basic of de-duplication logic,
and comments indicating the sources based off the collection keys
"""
consolidated = []
fancy_lines = []
for collection, lines in reqs.items():
for line in lines:
if line_is_empty(line):
continue
base_line = line.split('#')[0].strip()
if base_line in consolidated:
i = consolidated.index(base_line)
fancy_lines[i] += ', {}'.format(collection)
else:
fancy_line = base_line + ' # from collection {}'.format(collection)
consolidated.append(base_line)
fancy_lines.append(fancy_line)
return fancy_lines
def parse_args(args=sys.argv[1:]):
parser = argparse.ArgumentParser(
prog='introspect',
description=(
'ansible-builder introspection; injected and used during execution environment build'
)
)
subparsers = parser.add_subparsers(help='The command to invoke.', dest='action')
subparsers.required = True
create_introspect_parser(subparsers)
args = parser.parse_args(args)
return args
def run_introspect(args, logger):
data = process(args.folder, user_pip=args.user_pip, user_bindep=args.user_bindep)
if args.sanitize:
logger.info('# Sanitized dependencies for %s', args.folder)
data_for_write = data
data['python'] = sanitize_requirements(data['python'])
data['system'] = simple_combine(data['system'])
else:
logger.info('# Dependency data for %s', args.folder)
data_for_write = data.copy()
data_for_write['python'] = simple_combine(data['python'])
data_for_write['system'] = simple_combine(data['system'])
print('---')
print(yaml.dump(data, default_flow_style=False))
if args.write_pip and data.get('python'):
write_file(args.write_pip, data_for_write.get('python') + [''])
if args.write_bindep and data.get('system'):
write_file(args.write_bindep, data_for_write.get('system') + [''])
sys.exit(0)
def create_introspect_parser(parser):
introspect_parser = parser.add_parser(
'introspect',
help='Introspects collections in folder.',
description=(
'Loops over collections in folder and returns data about dependencies. '
'This is used internally and exposed here for verification. '
'This is targeted toward collection authors and maintainers.'
)
)
introspect_parser.add_argument('--sanitize', action='store_true',
help=('Sanitize and de-duplicate requirements. '
'This is normally done separately from the introspect script, but this '
'option is given to more accurately test collection content.'))
introspect_parser.add_argument(
'folder', default=base_collections_path, nargs='?',
help=(
'Ansible collections path(s) to introspect. '
'This should have a folder named ansible_collections inside of it.'
)
)
# Combine user requirements and collection requirements into single file
# in the future, could look into passing multilple files to
# python-builder scripts to be fed multiple files as opposed to this
introspect_parser.add_argument(
'--user-pip', dest='user_pip',
help='An additional file to combine with collection pip requirements.'
)
introspect_parser.add_argument(
'--user-bindep', dest='user_bindep',
help='An additional file to combine with collection bindep requirements.'
)
introspect_parser.add_argument(
'--write-pip', dest='write_pip',
help='Write the combined pip requirements file to this location.'
)
introspect_parser.add_argument(
'--write-bindep', dest='write_bindep',
help='Write the combined bindep requirements file to this location.'
)
return introspect_parser
EXCLUDE_REQUIREMENTS = frozenset((
# obviously already satisfied or unwanted
'ansible', 'ansible-base', 'python', 'ansible-core',
# general python test requirements
'tox', 'pycodestyle', 'yamllint', 'pylint',
'flake8', 'pytest', 'pytest-xdist', 'coverage', 'mock', 'testinfra',
# test requirements highly specific to Ansible testing
'ansible-lint', 'molecule', 'galaxy-importer', 'voluptuous',
# already present in image for py3 environments
'yaml', 'pyyaml', 'json',
))
def sanitize_requirements(collection_py_reqs):
"""
Cleanup Python requirements by removing duplicates and excluded packages.
The user requirements file will go through the deduplication process, but
skips the special package exclusion process.
:param dict collection_py_reqs: A dict of lists of Python requirements, keyed
by fully qualified collection name. The special key `user` holds requirements
from the user specified requirements file from the ``--user-pip`` CLI option.
:returns: A finalized list of sanitized Python requirements.
"""
# de-duplication
consolidated = []
seen_pkgs = set()
for collection, lines in collection_py_reqs.items():
try:
for req in requirements.parse('\n'.join(lines)):
if req.specifier:
req.name = importlib.metadata.Prepared(req.name).normalized
req.collections = [collection] # add backref for later
if req.name is None:
consolidated.append(req)
continue
if req.name in seen_pkgs:
for prior_req in consolidated:
if req.name == prior_req.name:
prior_req.specs.extend(req.specs)
prior_req.collections.append(collection)
break
continue
consolidated.append(req)
seen_pkgs.add(req.name)
except Exception as e:
logger.warning('Warning: failed to parse requirements from %s, error: %s', collection, e)
# removal of unwanted packages
sanitized = []
for req in consolidated:
# Exclude packages, unless it was present in the user supplied requirements.
if req.name and req.name.lower() in EXCLUDE_REQUIREMENTS and 'user' not in req.collections:
logger.debug('# Excluding requirement %s from %s', req.name, req.collections)
continue
if req.vcs or req.uri:
# Requirement like git+ or http return as-is
new_line = req.line
elif req.name:
specs = ['{0}{1}'.format(cmp, ver) for cmp, ver in req.specs]
new_line = req.name + ','.join(specs)
else:
raise RuntimeError('Could not process {0}'.format(req.line))
sanitized.append(new_line + ' # from collection {}'.format(','.join(req.collections)))
return sanitized
def write_file(filename: str, lines: list) -> bool:
parent_dir = os.path.dirname(filename)
if parent_dir and not os.path.exists(parent_dir):
logger.warning('Creating parent directory for %s', filename)
os.makedirs(parent_dir)
new_text = '\n'.join(lines)
if os.path.exists(filename):
with open(filename, 'r') as f:
if f.read() == new_text:
logger.debug("File %s is already up-to-date.", filename)
return False
else:
logger.warning('File %s had modifications and will be rewritten', filename)
with open(filename, 'w') as f:
f.write(new_text)
return True
def main():
args = parse_args()
if args.action == 'introspect':
run_introspect(args, logger)
logger.error("An error has occurred.")
sys.exit(1)
if __name__ == '__main__':
main()