1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
"""
Check for ensuring logs from pods can be queried in a reasonable amount of time.
"""
import json
import time
from uuid import uuid4
from openshift_checks import OpenShiftCheckException
from openshift_checks.logging.logging import LoggingCheck
ES_CMD_TIMEOUT_SECONDS = 30
class LoggingIndexTime(LoggingCheck):
"""Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time."""
name = "logging_index_time"
tags = ["health", "logging"]
logging_namespace = "logging"
def run(self):
"""Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs."""
try:
log_index_timeout = int(
self.get_var("openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS)
)
except ValueError:
return {
"failed": True,
"msg": ('Invalid value provided for "openshift_check_logging_index_timeout_seconds". '
'Value must be an integer representing an amount in seconds.'),
}
running_component_pods = dict()
# get all component pods
self.logging_namespace = self.get_var("openshift_logging_namespace", default=self.logging_namespace)
for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']):
pods, error = self.get_pods_for_component(self.logging_namespace, component)
if error:
msg = 'Unable to retrieve pods for the {} logging component: {}'
return {"failed": True, "changed": False, "msg": msg.format(name, error)}
running_pods = self.running_pods(pods)
if not running_pods:
msg = ('No {} pods in the "Running" state were found.'
'At least one pod is required in order to perform this check.')
return {"failed": True, "changed": False, "msg": msg.format(name)}
running_component_pods[component] = running_pods
uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0])
self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout)
return {}
def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs):
"""Retry an Elasticsearch query every second until query success, or a defined
length of time has passed."""
deadline = time.time() + timeout_secs
interval = 1
while not self.query_es_from_es(es_pod, uuid):
if time.time() + interval > deadline:
msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s."
raise OpenShiftCheckException(msg.format(uuid, timeout_secs))
time.sleep(interval)
def curl_kibana_with_uuid(self, kibana_pod):
"""curl Kibana with a unique uuid."""
uuid = self.generate_uuid()
pod_name = kibana_pod["metadata"]["name"]
exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}"
exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid)
error_str = self.exec_oc(self.logging_namespace, exec_cmd, [])
try:
error_code = json.loads(error_str)["statusCode"]
except KeyError:
msg = ('invalid response returned from Kibana request (Missing "statusCode" key):\n'
'Command: {}\nResponse: {}').format(exec_cmd, error_str)
raise OpenShiftCheckException(msg)
except ValueError:
msg = ('invalid response returned from Kibana request (Non-JSON output):\n'
'Command: {}\nResponse: {}').format(exec_cmd, error_str)
raise OpenShiftCheckException(msg)
if error_code != 404:
msg = 'invalid error code returned from Kibana request. Expecting error code "404", but got "{}" instead.'
raise OpenShiftCheckException(msg.format(error_code))
return uuid
def query_es_from_es(self, es_pod, uuid):
"""curl the Elasticsearch pod and look for a unique uuid in its logs."""
pod_name = es_pod["metadata"]["name"]
exec_cmd = (
"exec {pod_name} -- curl --max-time 30 -s -f "
"--cacert /etc/elasticsearch/secret/admin-ca "
"--cert /etc/elasticsearch/secret/admin-cert "
"--key /etc/elasticsearch/secret/admin-key "
"https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}"
)
exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid)
result = self.exec_oc(self.logging_namespace, exec_cmd, [])
try:
count = json.loads(result)["count"]
except KeyError:
msg = 'invalid response from Elasticsearch query:\n"{}"\nMissing "count" key:\n{}'
raise OpenShiftCheckException(msg.format(exec_cmd, result))
except ValueError:
msg = 'invalid response from Elasticsearch query:\n"{}"\nNon-JSON output:\n{}'
raise OpenShiftCheckException(msg.format(exec_cmd, result))
return count
@staticmethod
def running_pods(pods):
"""Filter pods that are running."""
return [pod for pod in pods if pod['status']['phase'] == 'Running']
@staticmethod
def generate_uuid():
"""Wrap uuid generator. Allows for testing with expected values."""
return str(uuid4())
|