path: root/roles/openshift_health_checker
diff options
authorRodolfo Carvalho <>2017-07-20 15:09:51 +0200
committerGitHub <>2017-07-20 15:09:51 +0200
commite29eab4e9813c46f4738a81e8698b001bd5981b9 (patch)
treecd5348ee1625531af182166daa4325e00b14c2f8 /roles/openshift_health_checker
parenta6a0621d7b21e04c278f0a79e2c30dfec7b261d8 (diff)
parent78955891fe5279d497730a49fe19d69e22b43a8b (diff)
Merge pull request #4316 from juanvallejo/jvallejo/add-increased-etcd-traffic-check
add check to detect increased etcd traffic
Diffstat (limited to 'roles/openshift_health_checker')
4 files changed, 434 insertions, 0 deletions
diff --git a/roles/openshift_health_checker/library/ b/roles/openshift_health_checker/library/
new file mode 100644
index 000000000..3631f71c8
--- /dev/null
+++ b/roles/openshift_health_checker/library/
@@ -0,0 +1,150 @@
+"""Interface to journalctl."""
+from time import time
+import json
+import re
+import subprocess
+from ansible.module_utils.basic import AnsibleModule
+class InvalidMatcherRegexp(Exception):
+ """Exception class for invalid matcher regexp."""
+ pass
+class InvalidLogEntry(Exception):
+ """Exception class for invalid / non-json log entries."""
+ pass
+class LogInputSubprocessError(Exception):
+ """Exception class for errors that occur while executing a subprocess."""
+ pass
+def main():
+ """Scan a given list of "log_matchers" for journalctl messages containing given patterns.
+ "log_matchers" is a list of dicts consisting of three keys that help fine-tune log searching:
+ 'start_regexp', 'regexp', and 'unit'.
+ Sample "log_matchers" list:
+ [
+ {
+ 'start_regexp': r'Beginning of systemd unit',
+ 'regexp': r'the specific log message to find',
+ 'unit': 'etcd',
+ }
+ ]
+ """
+ module = AnsibleModule(
+ argument_spec=dict(
+ log_count_limit=dict(type="int", default=500),
+ log_matchers=dict(type="list", required=True),
+ ),
+ )
+ timestamp_limit_seconds = time() - 60 * 60 # 1 hour
+ log_count_limit = module.params["log_count_limit"]
+ log_matchers = module.params["log_matchers"]
+ matched_regexp, errors = get_log_matches(log_matchers, log_count_limit, timestamp_limit_seconds)
+ module.exit_json(
+ changed=False,
+ failed=bool(errors),
+ errors=errors,
+ matched=matched_regexp,
+ )
+def get_log_matches(matchers, log_count_limit, timestamp_limit_seconds):
+ """Return a list of up to log_count_limit matches for each matcher.
+ Log entries are only considered if newer than timestamp_limit_seconds.
+ """
+ matched_regexp = []
+ errors = []
+ for matcher in matchers:
+ try:
+ log_output = get_log_output(matcher)
+ except LogInputSubprocessError as err:
+ errors.append(str(err))
+ continue
+ try:
+ matched = find_matches(log_output, matcher, log_count_limit, timestamp_limit_seconds)
+ if matched:
+ matched_regexp.append(matcher.get("regexp", ""))
+ except InvalidMatcherRegexp as err:
+ errors.append(str(err))
+ except InvalidLogEntry as err:
+ errors.append(str(err))
+ return matched_regexp, errors
+def get_log_output(matcher):
+ """Return an iterator on the logs of a given matcher."""
+ try:
+ cmd_output = subprocess.Popen(list([
+ '/bin/journalctl',
+ '-ru', matcher.get("unit", ""),
+ '--output', 'json',
+ ]), stdout=subprocess.PIPE)
+ return iter(cmd_output.stdout.readline, '')
+ except subprocess.CalledProcessError as exc:
+ msg = "Could not obtain journalctl logs for the specified systemd unit: {}: {}"
+ raise LogInputSubprocessError(msg.format(matcher.get("unit", "<missing>"), str(exc)))
+ except OSError as exc:
+ raise LogInputSubprocessError(str(exc))
+def find_matches(log_output, matcher, log_count_limit, timestamp_limit_seconds):
+ """Return log messages matched in iterable log_output by a given matcher.
+ Ignore any log_output items older than timestamp_limit_seconds.
+ """
+ try:
+ regexp = re.compile(matcher.get("regexp", ""))
+ start_regexp = re.compile(matcher.get("start_regexp", ""))
+ except re.error as err:
+ msg = "A log matcher object was provided with an invalid regular expression: {}"
+ raise InvalidMatcherRegexp(msg.format(str(err)))
+ matched = None
+ for log_count, line in enumerate(log_output):
+ if log_count >= log_count_limit:
+ break
+ try:
+ obj = json.loads(line)
+ # don't need to look past the most recent service restart
+ if start_regexp.match(obj["MESSAGE"]):
+ break
+ log_timestamp_seconds = float(obj["__REALTIME_TIMESTAMP"]) / 1000000
+ if log_timestamp_seconds < timestamp_limit_seconds:
+ break
+ if regexp.match(obj["MESSAGE"]):
+ matched = line
+ break
+ except ValueError:
+ msg = "Log entry for systemd unit {} contained invalid json syntax: {}"
+ raise InvalidLogEntry(msg.format(matcher.get("unit"), line))
+ return matched
+if __name__ == '__main__':
+ main()
diff --git a/roles/openshift_health_checker/openshift_checks/ b/roles/openshift_health_checker/openshift_checks/
new file mode 100644
index 000000000..40c87873d
--- /dev/null
+++ b/roles/openshift_health_checker/openshift_checks/
@@ -0,0 +1,47 @@
+"""Check that scans journalctl for messages caused as a symptom of increased etcd traffic."""
+from openshift_checks import OpenShiftCheck, get_var
+class EtcdTraffic(OpenShiftCheck):
+ """Check if host is being affected by an increase in etcd traffic."""
+ name = "etcd_traffic"
+ tags = ["health", "etcd"]
+ @classmethod
+ def is_active(cls, task_vars):
+ """Skip hosts that do not have etcd in their group names."""
+ group_names = get_var(task_vars, "group_names", default=[])
+ valid_group_names = "etcd" in group_names
+ version = get_var(task_vars, "openshift", "common", "short_version")
+ valid_version = version in ("3.4", "3.5", "1.4", "1.5")
+ return super(EtcdTraffic, cls).is_active(task_vars) and valid_group_names and valid_version
+ def run(self, tmp, task_vars):
+ is_containerized = get_var(task_vars, "openshift", "common", "is_containerized")
+ unit = "etcd_container" if is_containerized else "etcd"
+ log_matchers = [{
+ "start_regexp": r"Starting Etcd Server",
+ "regexp": r"etcd: sync duration of [^,]+, expected less than 1s",
+ "unit": unit
+ }]
+ match = self.execute_module("search_journalctl", {
+ "log_matchers": log_matchers,
+ }, task_vars)
+ if match.get("matched"):
+ msg = ("Higher than normal etcd traffic detected.\n"
+ "OpenShift 3.4 introduced an increase in etcd traffic.\n"
+ "Upgrading to OpenShift 3.6 is recommended in order to fix this issue.\n"
+ "Please refer to for more information.")
+ return {"failed": True, "msg": msg}
+ if match.get("failed"):
+ return {"failed": True, "msg": "\n".join(match.get("errors"))}
+ return {}
diff --git a/roles/openshift_health_checker/test/ b/roles/openshift_health_checker/test/
new file mode 100644
index 000000000..287175e29
--- /dev/null
+++ b/roles/openshift_health_checker/test/
@@ -0,0 +1,80 @@
+import pytest
+from openshift_checks.etcd_traffic import EtcdTraffic
+@pytest.mark.parametrize('group_names,version,is_active', [
+ (['masters'], "3.5", False),
+ (['masters'], "3.6", False),
+ (['nodes'], "3.4", False),
+ (['etcd'], "3.4", True),
+ (['etcd'], "3.5", True),
+ (['etcd'], "3.1", False),
+ (['masters', 'nodes'], "3.5", False),
+ (['masters', 'etcd'], "3.5", True),
+ ([], "3.4", False),
+def test_is_active(group_names, version, is_active):
+ task_vars = dict(
+ group_names=group_names,
+ openshift=dict(
+ common=dict(short_version=version),
+ ),
+ )
+ assert EtcdTraffic.is_active(task_vars=task_vars) == is_active
+@pytest.mark.parametrize('group_names,matched,failed,extra_words', [
+ (["masters"], True, True, ["Higher than normal", "traffic"]),
+ (["masters", "etcd"], False, False, []),
+ (["etcd"], False, False, []),
+def test_log_matches_high_traffic_msg(group_names, matched, failed, extra_words):
+ def execute_module(module_name, args, task_vars):
+ return {
+ "matched": matched,
+ "failed": failed,
+ }
+ task_vars = dict(
+ group_names=group_names,
+ openshift=dict(
+ common=dict(service_type="origin", is_containerized=False),
+ )
+ )
+ check = EtcdTraffic(execute_module=execute_module)
+ result =, task_vars=task_vars)
+ for word in extra_words:
+ assert word in result.get("msg", "")
+ assert result.get("failed", False) == failed
+@pytest.mark.parametrize('is_containerized,expected_unit_value', [
+ (False, "etcd"),
+ (True, "etcd_container"),
+def test_systemd_unit_matches_deployment_type(is_containerized, expected_unit_value):
+ task_vars = dict(
+ openshift=dict(
+ common=dict(is_containerized=is_containerized),
+ )
+ )
+ def execute_module(module_name, args, task_vars):
+ assert module_name == "search_journalctl"
+ matchers = args["log_matchers"]
+ for matcher in matchers:
+ assert matcher["unit"] == expected_unit_value
+ return {"failed": False}
+ check = EtcdTraffic(execute_module=execute_module)
+, task_vars=task_vars)
+def fake_execute_module(*args):
+ raise AssertionError('this function should not be called')
diff --git a/roles/openshift_health_checker/test/ b/roles/openshift_health_checker/test/
new file mode 100644
index 000000000..724928aa1
--- /dev/null
+++ b/roles/openshift_health_checker/test/
@@ -0,0 +1,157 @@
+import pytest
+import search_journalctl
+def canned_search_journalctl(get_log_output=None):
+ """Create a search_journalctl object with canned get_log_output method"""
+ module = search_journalctl
+ if get_log_output:
+ module.get_log_output = get_log_output
+ return module
+def get_timestamp(modifier=0):
+ return DEFAULT_TIMESTAMP + modifier
+def get_timestamp_microseconds(modifier=0):
+ return get_timestamp(modifier) * 1000000
+def create_test_log_object(stamp, msg):
+ return '{{"__REALTIME_TIMESTAMP": "{}", "MESSAGE": "{}"}}'.format(stamp, msg)
+@pytest.mark.parametrize('name,matchers,log_input,expected_matches,expected_errors', [
+ (
+ 'test with valid params',
+ [
+ {
+ "start_regexp": r"Sample Logs Beginning",
+ "regexp": r"test log message",
+ "unit": "test",
+ },
+ ],
+ [
+ create_test_log_object(get_timestamp_microseconds(), "test log message"),
+ create_test_log_object(get_timestamp_microseconds(), "Sample Logs Beginning"),
+ ],
+ ["test log message"],
+ [],
+ ),
+ (
+ 'test with invalid json in log input',
+ [
+ {
+ "start_regexp": r"Sample Logs Beginning",
+ "regexp": r"test log message",
+ "unit": "test-unit",
+ },
+ ],
+ [
+ '{__REALTIME_TIMESTAMP: ' + str(get_timestamp_microseconds()) + ', "MESSAGE": "test log message"}',
+ ],
+ [],
+ [
+ ["invalid json", "test-unit", "test log message"],
+ ],
+ ),
+ (
+ 'test with invalid regexp',
+ [
+ {
+ "start_regexp": r"Sample Logs Beginning",
+ "regexp": r"test [ log message",
+ "unit": "test",
+ },
+ ],
+ [
+ create_test_log_object(get_timestamp_microseconds(), "test log message"),
+ create_test_log_object(get_timestamp_microseconds(), "sample log message"),
+ create_test_log_object(get_timestamp_microseconds(), "fake log message"),
+ create_test_log_object(get_timestamp_microseconds(), "dummy log message"),
+ create_test_log_object(get_timestamp_microseconds(), "Sample Logs Beginning"),
+ ],
+ [],
+ [
+ ["invalid regular expression"],
+ ],
+ ),
+], ids=lambda argval: argval[0])
+def test_get_log_matches(name, matchers, log_input, expected_matches, expected_errors):
+ def get_log_output(matcher):
+ return log_input
+ module = canned_search_journalctl(get_log_output)
+ matched_regexp, errors = module.get_log_matches(matchers, 500, 60 * 60)
+ assert set(matched_regexp) == set(expected_matches)
+ assert len(expected_errors) == len(errors)
+ for idx, partial_err_set in enumerate(expected_errors):
+ for partial_err_msg in partial_err_set:
+ assert partial_err_msg in errors[idx]
+@pytest.mark.parametrize('name,matcher,log_count_lim,stamp_lim_seconds,log_input,expected_match', [
+ (
+ 'test with matching log message, but out of bounds of log_count_lim',
+ {
+ "start_regexp": r"Sample Logs Beginning",
+ "regexp": r"dummy log message",
+ "unit": "test",
+ },
+ 3,
+ get_timestamp(-100 * 60 * 60),
+ [
+ create_test_log_object(get_timestamp_microseconds(), "test log message"),
+ create_test_log_object(get_timestamp_microseconds(), "sample log message"),
+ create_test_log_object(get_timestamp_microseconds(), "fake log message"),
+ create_test_log_object(get_timestamp_microseconds(), "dummy log message"),
+ create_test_log_object(get_timestamp_microseconds(), "Sample Logs Beginning"),
+ ],
+ None,
+ ),
+ (
+ 'test with matching log message, but with timestamp too old',
+ {
+ "start_regexp": r"Sample Logs Beginning",
+ "regexp": r"dummy log message",
+ "unit": "test",
+ },
+ 100,
+ get_timestamp(-10),
+ [
+ create_test_log_object(get_timestamp_microseconds(), "test log message"),
+ create_test_log_object(get_timestamp_microseconds(), "sample log message"),
+ create_test_log_object(get_timestamp_microseconds(), "fake log message"),
+ create_test_log_object(get_timestamp_microseconds(-1000), "dummy log message"),
+ create_test_log_object(get_timestamp_microseconds(-1000), "Sample Logs Beginning"),
+ ],
+ None,
+ ),
+ (
+ 'test with matching log message, and timestamp within time limit',
+ {
+ "start_regexp": r"Sample Logs Beginning",
+ "regexp": r"dummy log message",
+ "unit": "test",
+ },
+ 100,
+ get_timestamp(-1010),
+ [
+ create_test_log_object(get_timestamp_microseconds(), "test log message"),
+ create_test_log_object(get_timestamp_microseconds(), "sample log message"),
+ create_test_log_object(get_timestamp_microseconds(), "fake log message"),
+ create_test_log_object(get_timestamp_microseconds(-1000), "dummy log message"),
+ create_test_log_object(get_timestamp_microseconds(-1000), "Sample Logs Beginning"),
+ ],
+ create_test_log_object(get_timestamp_microseconds(-1000), "dummy log message"),
+ ),
+], ids=lambda argval: argval[0])
+def test_find_matches_skips_logs(name, matcher, log_count_lim, stamp_lim_seconds, log_input, expected_match):
+ match = search_journalctl.find_matches(log_input, matcher, log_count_lim, stamp_lim_seconds)
+ assert match == expected_match