aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/wrk/wrk.py
diff options
context:
space:
mode:
authorTibor Frank <tifrank@cisco.com>2017-12-04 16:41:57 +0100
committerTibor Frank <tifrank@cisco.com>2018-01-10 15:35:01 +0100
commita95c54b7821596402e0aa7136cd7d1de71a5b187 (patch)
treef1c941b06bb05069af1b3f587b5bcfc8bc22ef3a /resources/tools/wrk/wrk.py
parentec120d957cfec192d30e84a0d337198153214a70 (diff)
CSIT-866: wrk onboarding in CSIT
- CSIT-867: Low Level Description - CSIT-868: wrk traffic profile - parsing - CSIT-869: wrk implementation into CSIT Change-Id: I65e1037f5ae05b3a5b2020e4a6c54462766ae1b4 Signed-off-by: Tibor Frank <tifrank@cisco.com>
Diffstat (limited to 'resources/tools/wrk/wrk.py')
-rw-r--r--resources/tools/wrk/wrk.py291
1 files changed, 291 insertions, 0 deletions
diff --git a/resources/tools/wrk/wrk.py b/resources/tools/wrk/wrk.py
new file mode 100644
index 0000000000..33cfd08174
--- /dev/null
+++ b/resources/tools/wrk/wrk.py
@@ -0,0 +1,291 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""wrk implementation into CSIT framework.
+"""
+
+import re
+
+from robot.api import logger
+
+from resources.libraries.python.ssh import SSH
+from resources.libraries.python.topology import NodeType
+from resources.libraries.python.CpuUtils import CpuUtils
+from resources.libraries.python.constants import Constants
+
+from resources.tools.wrk.wrk_traffic_profile_parser import WrkTrafficProfile
+from resources.tools.wrk.wrk_errors import WrkError
+
+
+REGEX_LATENCY_STATS = \
+ r"Latency\s*" \
+ r"(\d*\.*\d*\S*)\s*" \
+ r"(\d*\.*\d*\S*)\s*" \
+ r"(\d*\.*\d*\S*)\s*" \
+ r"(\d*\.*\d*\%)"
+REGEX_RPS_STATS = \
+ r"Req/Sec\s*" \
+ r"(\d*\.*\d*\S*)\s*" \
+ r"(\d*\.*\d*\S*)\s*" \
+ r"(\d*\.*\d*\S*)\s*" \
+ r"(\d*\.*\d*\%)"
+REGEX_RPS = r"Requests/sec:\s*" \
+ r"(\d*\.*\S*)"
+REGEX_BW = r"Transfer/sec:\s*" \
+ r"(\d*\.*\S*)"
+REGEX_LATENCY_DIST = \
+ r"Latency Distribution\n" \
+ r"\s*50\%\s*(\d*\.*\d*\D*)\n" \
+ r"\s*75\%\s*(\d*\.*\d*\D*)\n" \
+ r"\s*90\%\s*(\d*\.*\d*\D*)\n" \
+ r"\s*99\%\s*(\d*\.*\d*\D*)\n"
+
+# Split number and multiplicand, e.g. 14.25k --> 14.25 and k
+REGEX_NUM = r"(\d*\.*\d*)(\D*)"
+
+
+def install_wrk(tg_node):
+ """Install wrk on the TG node.
+
+ :param tg_node: Traffic generator node.
+ :type tg_node: dict
+ :raises: RuntimeError if the given node is not a TG node or if the
+ installation fails.
+ """
+
+ if tg_node['type'] != NodeType.TG:
+ raise RuntimeError('Node type is not a TG.')
+
+ ssh = SSH()
+ ssh.connect(tg_node)
+
+ ret, _, _ = ssh.exec_command(
+ "sudo -E "
+ "sh -c '{0}/resources/tools/wrk/wrk_utils.sh install false'".
+ format(Constants.REMOTE_FW_DIR), timeout=1800)
+ if int(ret) != 0:
+ raise RuntimeError('Installation of wrk on TG node failed.')
+
+
+def destroy_wrk(tg_node):
+ """Destroy wrk on the TG node.
+
+ :param tg_node: Traffic generator node.
+ :type tg_node: dict
+ :raises: RuntimeError if the given node is not a TG node or the removal of
+ wrk failed.
+ """
+
+ if tg_node['type'] != NodeType.TG:
+ raise RuntimeError('Node type is not a TG.')
+
+ ssh = SSH()
+ ssh.connect(tg_node)
+
+ ret, _, _ = ssh.exec_command(
+ "sudo -E "
+ "sh -c '{0}/resources/tools/wrk/wrk_utils.sh destroy'".
+ format(Constants.REMOTE_FW_DIR), timeout=1800)
+ if int(ret) != 0:
+ raise RuntimeError('Removal of wrk from the TG node failed.')
+
+
+def run_wrk(tg_node, profile_name, tg_numa, test_type):
+ """Send the traffic as defined in the profile.
+
+ :param tg_node: Traffic generator node.
+ :param profile_name: The name of wrk traffic profile.
+ :param tg_numa: Numa node on which wrk will run.
+ :param test_type: The type of the tests: cps, rps, bw
+ :type profile_name: str
+ :type tg_node: dict
+ :type tg_numa: int
+ :type test_type: str
+ :returns: Message with measured data.
+ :rtype: str
+ :raises: RuntimeError if node type is not a TG.
+ """
+
+ if tg_node['type'] != NodeType.TG:
+ raise RuntimeError('Node type is not a TG.')
+
+ # Parse and validate the profile
+ profile_path = ("resources/traffic_profiles/wrk/{0}.yaml".
+ format(profile_name))
+ profile = WrkTrafficProfile(profile_path).traffic_profile
+
+ cores = CpuUtils.cpu_list_per_node(tg_node, tg_numa)
+ first_cpu = cores[profile["first-cpu"]]
+
+ if len(profile["urls"]) == 1 and profile["cpus"] == 1:
+ params = [
+ "traffic_1_url_1_core",
+ str(first_cpu),
+ str(profile["nr-of-threads"]),
+ str(profile["nr-of-connections"]),
+ "{0}s".format(profile["duration"]),
+ "'{0}'".format(profile["header"]),
+ str(profile["timeout"]),
+ str(profile["script"]),
+ str(profile["latency"]),
+ "'{0}'".format(" ".join(profile["urls"]))
+ ]
+ elif len(profile["urls"]) == profile["cpus"]:
+ params = [
+ "traffic_n_urls_n_cores",
+ str(first_cpu),
+ str(profile["nr-of-threads"]),
+ str(profile["nr-of-connections"]),
+ "{0}s".format(profile["duration"]),
+ "'{0}'".format(profile["header"]),
+ str(profile["timeout"]),
+ str(profile["script"]),
+ str(profile["latency"]),
+ "'{0}'".format(" ".join(profile["urls"]))
+ ]
+ else:
+ params = [
+ "traffic_n_urls_m_cores",
+ str(first_cpu),
+ str(profile["cpus"] / len(profile["urls"])),
+ str(profile["nr-of-threads"]),
+ str(profile["nr-of-connections"]),
+ "{0}s".format(profile["duration"]),
+ "'{0}'".format(profile["header"]),
+ str(profile["timeout"]),
+ str(profile["script"]),
+ str(profile["latency"]),
+ "'{0}'".format(" ".join(profile["urls"]))
+ ]
+ args = " ".join(params)
+
+ ssh = SSH()
+ ssh.connect(tg_node)
+
+ ret, stdout, _ = ssh.exec_command(
+ "{0}/resources/tools/wrk/wrk_utils.sh {1}".
+ format(Constants.REMOTE_FW_DIR, args), timeout=1800)
+ if int(ret) != 0:
+ raise RuntimeError('wrk runtime error.')
+
+ stats = _parse_wrk_output(stdout)
+
+ log_msg = "\nMeasured values:\n"
+ if test_type == "cps":
+ log_msg += "Connections/sec: Avg / Stdev / Max / +/- Stdev\n"
+ for item in stats["rps-stats-lst"]:
+ log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
+ log_msg += "Total cps: {0}cps\n".format(stats["rps-sum"])
+ elif test_type == "rps":
+ log_msg += "Requests/sec: Avg / Stdev / Max / +/- Stdev\n"
+ for item in stats["rps-stats-lst"]:
+ log_msg += "{0} / {1} / {2} / {3}\n".format(*item)
+ log_msg += "Total rps: {0}cps\n".format(stats["rps-sum"])
+ elif test_type == "bw":
+ log_msg += "Transfer/sec: {0}Bps".format(stats["bw-sum"])
+
+ logger.info(log_msg)
+
+ return log_msg
+
+
+def _parse_wrk_output(msg):
+ """Parse the wrk stdout with the results.
+
+ :param msg: stdout of wrk.
+ :type msg: str
+ :returns: Parsed results.
+ :rtype: dict
+ :raises: WrkError if the message does not include the results.
+ """
+
+ if "Thread Stats" not in msg:
+ raise WrkError("The output of wrk does not include the results.")
+
+ msg_lst = msg.splitlines(False)
+
+ stats = {
+ "latency-dist-lst": list(),
+ "latency-stats-lst": list(),
+ "rps-stats-lst": list(),
+ "rps-lst": list(),
+ "bw-lst": list(),
+ "rps-sum": 0,
+ "bw-sum": None
+ }
+
+ for line in msg_lst:
+ if "Latency Distribution" in line:
+ # Latency distribution - 50%, 75%, 90%, 99%
+ pass
+ elif "Latency" in line:
+ # Latency statistics - Avg, Stdev, Max, +/- Stdev
+ pass
+ elif "Req/Sec" in line:
+ # rps statistics - Avg, Stdev, Max, +/- Stdev
+ stats["rps-stats-lst"].append((
+ _evaluate_number(re.search(REGEX_RPS_STATS, line).group(1)),
+ _evaluate_number(re.search(REGEX_RPS_STATS, line).group(2)),
+ _evaluate_number(re.search(REGEX_RPS_STATS, line).group(3)),
+ _evaluate_number(re.search(REGEX_RPS_STATS, line).group(4))))
+ elif "Requests/sec:" in line:
+ # rps (cps)
+ stats["rps-lst"].append(
+ _evaluate_number(re.search(REGEX_RPS, line).group(1)))
+ elif "Transfer/sec:" in line:
+ # BW
+ stats["bw-lst"].append(
+ _evaluate_number(re.search(REGEX_BW, line).group(1)))
+
+ for item in stats["rps-stats-lst"]:
+ stats["rps-sum"] += item[0]
+ stats["bw-sum"] = sum(stats["bw-lst"])
+
+ return stats
+
+
+def _evaluate_number(num):
+ """Evaluate the numeric value of the number with multiplicands, e.g.:
+ 12.25k --> 12250
+
+ :param num: Number to evaluate.
+ :type num: str
+ :returns: Evaluated number.
+ :rtype: float
+ :raises: WrkError if it is not possible to evaluate the given number.
+ """
+
+ val = re.search(REGEX_NUM, num)
+ try:
+ val_num = float(val.group(1))
+ except ValueError:
+ raise WrkError("The output of wrk does not include the results "
+ "or the format of results has changed.")
+ val_mul = val.group(2).lower()
+ if val_mul:
+ if "k" in val_mul:
+ val_num *= 1000
+ elif "m" in val_mul:
+ val_num *= 1000000
+ elif "g" in val_mul:
+ val_num *= 1000000000
+ elif "b" in val_mul:
+ pass
+ elif "%" in val_mul:
+ pass
+ elif "" in val_mul:
+ pass
+ else:
+ raise WrkError("The multiplicand {0} is not defined.".
+ format(val_mul))
+ return val_num