1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
"""
Ansible action plugin to execute health checks in OpenShift clusters.
"""
# pylint: disable=wrong-import-position,missing-docstring,invalid-name
import sys
import os
from collections import defaultdict
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
from ansible.plugins.action import ActionBase
# Augment sys.path so that we can import checks from a directory relative to
# this callback plugin.
sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
from openshift_checks import OpenShiftCheck, OpenShiftCheckException, load_checks # noqa: E402
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
result = super(ActionModule, self).run(tmp, task_vars)
if task_vars is None:
task_vars = {}
if "openshift" not in task_vars:
result["failed"] = True
result["msg"] = "'openshift' is undefined, did 'openshift_facts' run?"
return result
try:
known_checks = self.load_known_checks()
except OpenShiftCheckException as e:
result["failed"] = True
result["msg"] = str(e)
return result
args = self._task.args
resolved_checks = resolve_checks(args.get("checks", []), known_checks.values())
result["checks"] = check_results = {}
for check_name in resolved_checks:
display.banner("CHECK [{} : {}]".format(check_name, task_vars["ansible_host"]))
check = known_checks[check_name]
if check.is_active(task_vars):
try:
r = check.run(tmp, task_vars)
except OpenShiftCheckException as e:
r = {}
r["failed"] = True
r["msg"] = str(e)
else:
r = {"skipped": True}
check_results[check_name] = r
if r.get("failed", False):
result["failed"] = True
result["msg"] = "One or more checks failed"
result["changed"] = any(r.get("changed", False) for r in check_results.values())
return result
def load_known_checks(self):
load_checks()
known_checks = {}
for cls in OpenShiftCheck.subclasses():
check_name = cls.name
if check_name in known_checks:
other_cls = known_checks[check_name].__class__
raise OpenShiftCheckException(
"non-unique check name '{}' in: '{}.{}' and '{}.{}'".format(
check_name,
cls.__module__, cls.__name__,
other_cls.__module__, other_cls.__name__))
known_checks[check_name] = cls(execute_module=self._execute_module)
return known_checks
def resolve_checks(names, all_checks):
"""Returns a set of resolved check names.
Resolving a check name expands tag references (e.g., "@tag") to all the
checks that contain the given tag. OpenShiftCheckException is raised if
names contains an unknown check or tag name.
names should be a sequence of strings.
all_checks should be a sequence of check classes/instances.
"""
known_check_names = set(check.name for check in all_checks)
known_tag_names = set(name for check in all_checks for name in check.tags)
check_names = set(name for name in names if not name.startswith('@'))
tag_names = set(name[1:] for name in names if name.startswith('@'))
unknown_check_names = check_names - known_check_names
unknown_tag_names = tag_names - known_tag_names
if unknown_check_names or unknown_tag_names:
msg = []
if unknown_check_names:
msg.append('Unknown check names: {}.'.format(', '.join(sorted(unknown_check_names))))
if unknown_tag_names:
msg.append('Unknown tag names: {}.'.format(', '.join(sorted(unknown_tag_names))))
msg.append('Make sure there is no typo in the playbook and no files are missing.')
raise OpenShiftCheckException('\n'.join(msg))
tag_to_checks = defaultdict(set)
for check in all_checks:
for tag in check.tags:
tag_to_checks[tag].add(check.name)
resolved = check_names.copy()
for tag in tag_names:
resolved.update(tag_to_checks[tag])
return resolved
|