aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries
diff options
context:
space:
mode:
authorjuraj.linkes <juraj.linkes@pantheon.tech>2019-05-24 10:58:52 +0200
committerPeter Mikus <pmikus@cisco.com>2019-05-28 13:59:12 +0000
commitcf64b95272442e47a2d0fe7eed89b4e99c1f6bb5 (patch)
treee4436b9aaa592dad024abc56ff5cd287717734e4 /resources/libraries
parent66d62a7020a69f7a7c4d274b7bdd9e91de41aa1a (diff)
Use threads for fw setup and cleanup
Using multiprocessing sometimes causes scp to hang when copying files. Threading is also more lightweight. Change-Id: I047b4835bbf1584c80469b27af5394d89087e8a9 Signed-off-by: juraj.linkes <juraj.linkes@pantheon.tech>
Diffstat (limited to 'resources/libraries')
-rw-r--r--resources/libraries/python/SetupFramework.py116
1 files changed, 62 insertions, 54 deletions
diff --git a/resources/libraries/python/SetupFramework.py b/resources/libraries/python/SetupFramework.py
index 9b50b90bc5..c9207d9658 100644
--- a/resources/libraries/python/SetupFramework.py
+++ b/resources/libraries/python/SetupFramework.py
@@ -16,15 +16,15 @@ nodes. All tasks required to be run before the actual tests are started is
supposed to end up here.
"""
+import datetime
+from os import environ, remove
+from os.path import basename
from shlex import split
-from subprocess import Popen, PIPE, call
-from multiprocessing import Pool
+from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
-from os.path import basename
-from os import environ
+import threading
from robot.api import logger
-from robot.libraries.BuiltIn import BuiltIn
from resources.libraries.python.ssh import SSH
from resources.libraries.python.Constants import Constants as con
@@ -138,18 +138,20 @@ def create_env_directory_at_node(node):
logger.console('Virtualenv on {0} created'.format(node['host']))
-def setup_node(args):
- """Run all set-up methods for a node.
-
- This method is used as map_async parameter. It receives tuple with all
- parameters as passed to map_async function.
+def setup_node(node, tarball, remote_tarball, results=None):
+ """Copy a tarball to a node and extract it.
- :param args: All parameters needed to setup one node.
- :type args: tuple
+ :param node: A node where the tarball will be copied and extracted.
+ :param tarball: Local path of tarball to be copied.
+ :param remote_tarball: Remote path of the tarball.
+ :param results: A list where to store the result of node setup, optional.
+ :type node: dict
+ :type tarball: str
+ :type remote_tarball: str
+ :type results: list
:returns: True - success, False - error
:rtype: bool
"""
- tarball, remote_tarball, node = args
try:
copy_tarball_to_node(tarball, node)
extract_tarball_at_node(remote_tarball, node)
@@ -158,20 +160,24 @@ def setup_node(args):
except RuntimeError as exc:
logger.error("Node {0} setup failed, error:'{1}'"
.format(node['host'], exc.message))
- return False
+ result = False
else:
logger.console('Setup of node {0} done'.format(node['host']))
- return True
+ result = True
+
+ if isinstance(results, list):
+ results.append(result)
+ return result
def delete_local_tarball(tarball):
"""Delete local tarball to prevent disk pollution.
- :param tarball: Path to tarball to upload.
+ :param tarball: Path of local tarball to delete.
:type tarball: str
:returns: nothing
"""
- call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball)))
+ remove(tarball)
def delete_framework_dir(node):
@@ -192,14 +198,13 @@ def delete_framework_dir(node):
.format(node))
-def cleanup_node(node):
- """Run all clean-up methods for a node.
-
- This method is used as map_async parameter. It receives tuple with all
- parameters as passed to map_async function.
+def cleanup_node(node, results=None):
+ """Delete a tarball from a node.
- :param node: Node to do cleanup on.
+ :param node: A node where the tarball will be delete.
+ :param results: A list where to store the result of node cleanup, optional.
:type node: dict
+ :type results: list
:returns: True - success, False - error
:rtype: bool
"""
@@ -207,10 +212,14 @@ def cleanup_node(node):
delete_framework_dir(node)
except RuntimeError:
logger.error("Cleanup of node {0} failed".format(node['host']))
- return False
+ result = False
else:
logger.console('Cleanup of node {0} done'.format(node['host']))
- return True
+ result = True
+
+ if isinstance(results, list):
+ results.append(result)
+ return result
class SetupFramework(object):
@@ -236,27 +245,27 @@ class SetupFramework(object):
logger.trace(msg)
remote_tarball = "/tmp/{0}".format(basename(tarball))
- # Turn off logging since we use multiprocessing
- log_level = BuiltIn().set_log_level('NONE')
- params = ((tarball, remote_tarball, node) for node in nodes.values())
- pool = Pool(processes=len(nodes))
- result = pool.map_async(setup_node, params)
- pool.close()
- pool.join()
+ results = []
+ threads = []
- # Turn on logging
- BuiltIn().set_log_level(log_level)
+ for node in nodes.values():
+ thread = threading.Thread(target=setup_node, args=(tarball,
+ remote_tarball,
+ node,
+ results))
+ thread.start()
+ threads.append(thread)
logger.info(
- 'Executing node setups in parallel, waiting for processes to end')
- result.wait()
+ 'Executing node setups in parallel, waiting for threads to end')
+
+ for thread in threads:
+ thread.join()
- results = result.get()
- node_success = all(results)
logger.info('Results: {0}'.format(results))
delete_local_tarball(tarball)
- if node_success:
+ if all(results):
logger.console('All nodes are ready')
else:
raise RuntimeError('Failed to setup framework')
@@ -267,32 +276,31 @@ class CleanupFramework(object):
@staticmethod
def cleanup_framework(nodes):
- """Perform cleaning on each node.
+ """Perform cleanup on each node.
:param nodes: Topology nodes.
:type nodes: dict
:raises RuntimeError: If cleanup framework failed.
"""
- # Turn off logging since we use multiprocessing
- log_level = BuiltIn().set_log_level('NONE')
- params = (node for node in nodes.values())
- pool = Pool(processes=len(nodes))
- result = pool.map_async(cleanup_node, params)
- pool.close()
- pool.join()
- # Turn on logging
- BuiltIn().set_log_level(log_level)
+ results = []
+ threads = []
+
+ for node in nodes.values():
+ thread = threading.Thread(target=cleanup_node,
+ args=(node, results))
+ thread.start()
+ threads.append(thread)
logger.info(
- 'Executing node cleanups in parallel, waiting for processes to end')
- result.wait()
+ 'Executing node cleanups in parallel, waiting for threads to end')
+
+ for thread in threads:
+ thread.join()
- results = result.get()
- node_success = all(results)
logger.info('Results: {0}'.format(results))
- if node_success:
+ if all(results):
logger.console('All nodes cleaned up')
else:
raise RuntimeError('Failed to cleaned up framework')