From 693e822b3779d695677d5bdc55a6b87e359285a9 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 1 Feb 2017 17:01:47 +0200 Subject: added tests for capture few tweaks Signed-off-by: imarom --- .../regression/stateless_tests/stl_capture_test.py | 269 +++++++++++++++++++++ .../trex_control_plane/stl/console/trex_capture.py | 77 +++++- .../trex_control_plane/stl/console/trex_console.py | 27 +-- .../stl/trex_stl_lib/trex_stl_client.py | 112 ++++++--- .../stl/trex_stl_lib/trex_stl_types.py | 2 +- .../stl/trex_stl_lib/utils/common.py | 26 ++ 6 files changed, 442 insertions(+), 71 deletions(-) create mode 100644 scripts/automation/regression/stateless_tests/stl_capture_test.py diff --git a/scripts/automation/regression/stateless_tests/stl_capture_test.py b/scripts/automation/regression/stateless_tests/stl_capture_test.py new file mode 100644 index 00000000..d8248dd7 --- /dev/null +++ b/scripts/automation/regression/stateless_tests/stl_capture_test.py @@ -0,0 +1,269 @@ +#!/router/bin/python +from .stl_general_test import CStlGeneral_Test, CTRexScenario +from trex_stl_lib.api import * +import os, sys +import pprint + +def ip2num (ip_str): + return struct.unpack('>L', socket.inet_pton(socket.AF_INET, ip_str))[0] + +def num2ip (ip_num): + return socket.inet_ntoa(struct.pack('>L', ip_num)) + +def ip_add (ip_str, cnt): + return num2ip(ip2num(ip_str) + cnt) + + +class STLCapture_Test(CStlGeneral_Test): + """Tests for capture packets""" + + def setUp(self): + CStlGeneral_Test.setUp(self) + + if not self.is_loopback: + self.skip('capture tests are skipped on a non-loopback machine') + + assert 'bi' in CTRexScenario.stl_ports_map + + self.c = CTRexScenario.stl_trex + + self.tx_port, self.rx_port = CTRexScenario.stl_ports_map['bi'][0] + + self.c.connect() + self.c.reset(ports = [self.tx_port, self.rx_port]) + + self.pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example') + + self.percentage = 5 if self.is_virt_nics else 50 + + + @classmethod + def tearDownClass(cls): + if CTRexScenario.stl_init_error: + return + # connect back at end of tests + if not cls.is_connected(): + CTRexScenario.stl_trex.connect() + + + # a simple capture test - inject packets and see the packets arrived the same + def test_basic_capture (self): + pkt_count = 100 + + try: + # move to service mode + self.c.set_service_mode(ports = self.rx_port) + # start a capture + rc = self.c.start_capture(rx_ports = [self.rx_port], limit = pkt_count) + + # inject few packets with a VM + vm = STLScVmRaw( [STLVmFlowVar ( "ip_src", min_value="16.0.0.0", max_value="16.255.255.255", size=4, step = 7, op = "inc"), + STLVmWrFlowVar (fv_name="ip_src", pkt_offset= "IP.src"), + STLVmFixIpv4(offset = "IP") + ] + ); + + pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example', + vm = vm) + + stream = STLStream(name = 'burst', + packet = pkt, + mode = STLTXSingleBurst(total_pkts = pkt_count, + percentage = self.percentage) + ) + + self.c.add_streams(ports = self.tx_port, streams = [stream]) + + self.c.start(ports = self.tx_port, force = True) + self.c.wait_on_traffic(ports = self.tx_port) + + pkt_list = [] + self.c.stop_capture(rc['id'], output = pkt_list) + + assert (len(pkt_list) == pkt_count) + + # generate all the values that should be + expected_src_ips = [ip_add('16.0.0.0', i * 7) for i in range(pkt_count)] + + for i, pkt in enumerate(pkt_list): + pkt_scapy = Ether(pkt['binary']) + pkt_ts = pkt['ts'] + + assert('IP' in pkt_scapy) + assert(pkt_scapy['IP'].src in expected_src_ips) + + # remove the match + del expected_src_ips[expected_src_ips.index(pkt_scapy['IP'].src)] + + + except STLError as e: + assert False , '{0}'.format(e) + + finally: + self.c.remove_all_captures() + self.c.set_service_mode(ports = self.rx_port, enabled = False) + + + + + # in this test we apply captures under traffic multiple times + def test_stress_capture (self): + pkt_count = 100 + + try: + # move to service mode + self.c.set_service_mode(ports = self.rx_port) + + # start heavy traffic + pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example') + + stream = STLStream(name = 'burst', + packet = pkt, + mode = STLTXCont(percentage = self.percentage) + ) + + self.c.add_streams(ports = self.tx_port, streams = [stream]) + self.c.start(ports = self.tx_port, force = True) + + captures = [{'capture_id': None, 'limit': 50}, {'capture_id': None, 'limit': 80}, {'capture_id': None, 'limit': 100}] + + for i in range(0, 100): + # start a few captures + for capture in captures: + capture['capture_id'] = self.c.start_capture(rx_ports = [self.rx_port], limit = capture['limit'])['id'] + + # a little time to wait for captures to be full + server_captures = self.c.get_capture_status() + + for capture in captures: + capture_id = capture['capture_id'] + + # make sure the server registers us and we are full + assert(capture['capture_id'] in server_captures.keys()) + assert(server_captures[capture_id]['count'] == capture['limit']) + + # fetch packets + pkt_list = [] + self.c.stop_capture(capture['capture_id'], pkt_list) + assert (len(pkt_list) == capture['limit']) + + # a little sanity per packet + for pkt in pkt_list: + scapy_pkt = Ether(pkt['binary']) + assert(scapy_pkt['IP'].src == '16.0.0.1') + assert(scapy_pkt['IP'].dst == '48.0.0.1') + + except STLError as e: + assert False , '{0}'.format(e) + + finally: + self.c.remove_all_captures() + self.c.set_service_mode(ports = self.rx_port, enabled = False) + + + # in this test we capture and analyze the ARP request / response + def test_arp_capture (self): + if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4': + return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port)) + + if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4': + return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port)) + + try: + # move to service mode + self.c.set_service_mode(ports = [self.tx_port, self.rx_port]) + + # start a capture + capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 2) + + # generate an ARP request + self.c.arp(ports = self.tx_port) + + pkts = [] + self.c.stop_capture(capture_info['id'], output = pkts) + + assert len(pkts) == 2 + + # find the correct order + if pkts[0]['port'] == self.rx_port: + request = pkts[0] + response = pkts[1] + else: + request = pkts[1] + response = pkts[0] + + assert request['port'] == self.rx_port + assert response['port'] == self.tx_port + + arp_request, arp_response = Ether(request['binary']), Ether(response['binary']) + assert 'ARP' in arp_request + assert 'ARP' in arp_response + + assert arp_request['ARP'].op == 1 + assert arp_response['ARP'].op == 2 + + assert arp_request['ARP'].pdst == arp_response['ARP'].psrc + + + except STLError as e: + assert False , '{0}'.format(e) + + finally: + self.c.remove_all_captures() + self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False) + + + # test PING + def test_ping_capture (self): + if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4': + return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port)) + + if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4': + return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port)) + + try: + # move to service mode + self.c.set_service_mode(ports = [self.tx_port, self.rx_port]) + + # start a capture + capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 100) + + # generate an ARP request + tx_ipv4 = self.c.get_port_attr(port = self.tx_port)['src_ipv4'] + rx_ipv4 = self.c.get_port_attr(port = self.rx_port)['src_ipv4'] + + count = 50 + + self.c.ping_ip(src_port = self.tx_port, dst_ipv4 = rx_ipv4, pkt_size = 1500, count = count, interval_sec = 0.01) + + pkts = [] + self.c.stop_capture(capture_info['id'], output = pkts) + + req_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.rx_port] + res_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.tx_port] + assert len(req_pkts) == count + assert len(res_pkts) == count + + for req_pkt in req_pkts: + assert 'ICMP' in req_pkt + assert req_pkt['IP'].src == tx_ipv4 + assert req_pkt['IP'].dst == rx_ipv4 + assert req_pkt['ICMP'].type == 8 + assert len(req_pkt) == 1500 + + for res_pkt in res_pkts: + assert 'ICMP' in res_pkt + assert res_pkt['IP'].src == rx_ipv4 + assert res_pkt['IP'].dst == tx_ipv4 + assert res_pkt['ICMP'].type == 0 + assert len(res_pkt) == 1500 + + + except STLError as e: + assert False , '{0}'.format(e) + + finally: + self.c.remove_all_captures() + self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False) + + diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index 2132458e..b6943912 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -3,6 +3,9 @@ from trex_stl_lib.utils import parsing_opts, text_tables import threading import tempfile import select +from distutils import spawn +from subprocess import Popen +import subprocess # defines a generic monitor writer class CaptureMonitorWriter(object): @@ -88,7 +91,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): # make sure to restore the logger self.logger.prompt_redraw() - + # a pipe based monitor class CaptureMonitorWriterPipe(CaptureMonitorWriter): def __init__ (self, logger, start_ts): @@ -100,16 +103,27 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): # generate a temp fifo pipe self.fifo_name = tempfile.mktemp() + self.wireshark_pid = None + try: self.logger.pre_cmd('Starting pipe capture monitor') os.mkfifo(self.fifo_name) self.logger.post_cmd(RC_OK) - self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold')) + # try to locate wireshark on the machine + self.wireshark_exe = self.locate_wireshark() + + # we found wireshark - try to launch a process + if self.wireshark_exe: + self.wireshark_pid = self.launch_wireshark() + + # did we succeed ? + if not self.wireshark_pid: + self.logger.log(format_text("*** Please manually run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold')) - self.logger.pre_cmd("Waiting for Wireshark pipe connection") # blocks until pipe is connected + self.logger.pre_cmd("Waiting for Wireshark pipe connection") self.fifo = os.open(self.fifo_name, os.O_WRONLY) self.logger.post_cmd(RC_OK()) @@ -125,17 +139,60 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): self.is_init = True + except KeyboardInterrupt as e: self.deinit() self.logger.post_cmd(RC_ERR("")) raise STLError("*** pipe monitor aborted...cleaning up") - + except OSError as e: self.deinit() self.logger.post_cmd(RC_ERR("")) raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e))) + + def locate_wireshark (self): + self.logger.pre_cmd('Trying to locate Wireshark') + wireshark_exe = spawn.find_executable('wireshark') + self.logger.post_cmd(RC_OK() if wireshark_exe else RC_ERR()) + + if not wireshark_exe: + return None + + dumpcap = os.path.join(os.path.dirname(wireshark_exe), 'dumpcap') + + self.logger.pre_cmd("Checking permissions on '{}'".format(dumpcap)) + if not os.access(dumpcap, os.X_OK): + self.logger.post_cmd(RC_ERR('bad permissions on dumpcap')) + return None + + self.logger.post_cmd(RC_OK()) + + return wireshark_exe + + # try to launch wireshark... returns true on success + def launch_wireshark (self): + cmd = '{0} -k -i {1}'.format(self.wireshark_exe, self.fifo_name) + self.logger.pre_cmd("Launching '{0}'".format(cmd)) + + try: + devnull = open(os.devnull, 'w') + self.wireshark_pid = Popen(cmd.split(), + stdout = devnull, + stderr = devnull, + stdin = subprocess.PIPE, + preexec_fn = os.setpgrp, + close_fds = True) + + self.logger.post_cmd(RC_OK()) + return True + + except OSError as e: + self.wireshark_pid = None + self.logger.post_cmd(RC_ERR()) + return False + def deinit (self): try: @@ -260,7 +317,7 @@ class CaptureMonitor(object): return # make sure the capture is active on the server - captures = [x['id'] for x in self.client.get_capture_status()] + captures = self.client.get_capture_status().keys() if capture_id not in captures: return @@ -493,8 +550,7 @@ class CaptureManager(object): def parse_record_stop (self, opts): - captures = self.c.get_capture_status() - ids = [c['id'] for c in captures] + ids = self.c.get_capture_status().keys() if self.monitor and (opts.capture_id == self.monitor.get_capture_id()): self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id)) @@ -564,15 +620,14 @@ class CaptureManager(object): mon_table.set_cols_align(["c"] * 6) mon_table.set_cols_width([15] * 6) - for elem in data: - id = elem['id'] + for capture_id, elem in data.items(): - if self.monitor and (self.monitor.get_capture_id() == id): + if self.monitor and (self.monitor.get_capture_id() == capture_id): row = self.monitor.get_mon_row() mon_table.add_rows([row], header=False) else: - row = [id, + row = [capture_id, format_text(elem['state'], 'bold'), '[{0}/{1}]'.format(elem['count'], elem['limit']), format_num(elem['bytes'], suffix = 'B'), diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index d36ce7b0..5e68cdf0 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -39,7 +39,7 @@ except: from trex_stl_lib.api import * from trex_stl_lib.utils.text_opts import * -from trex_stl_lib.utils.common import user_input, get_current_user +from trex_stl_lib.utils.common import user_input, get_current_user, set_window_always_on_top from trex_stl_lib.utils import parsing_opts from .trex_capture import CaptureManager @@ -74,31 +74,6 @@ class ConsoleLogger(LoggerApi): self.flush() -def set_window_always_on_top (title): - # we need the GDK module, if not available - ignroe this command - try: - if sys.version_info < (3,0): - from gtk import gdk - else: - #from gi.repository import Gdk as gdk - return - - except ImportError: - return - - # search the window and set it as above - root = gdk.get_default_root_window() - - for id in root.property_get('_NET_CLIENT_LIST')[2]: - w = gdk.window_foreign_new(id) - if w: - name = w.property_get('WM_NAME')[2] - if name == title: - w.set_keep_above(True) - gdk.window_process_all_updates() - break - - class TRexGeneralCmd(cmd.Cmd): def __init__(self): cmd.Cmd.__init__(self) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 215c0253..c88a68b2 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -829,8 +829,7 @@ class STLClient(object): rc.add(self.ports[port_id].set_attr(**port_attr_dict)) return rc - - + def __set_rx_queue (self, port_id_list, size): port_id_list = self.__ports(port_id_list) rc = RC() @@ -1894,15 +1893,16 @@ class STLClient(object): @__api_check(True) - def ping_ip (self, src_port, dst_ipv4, pkt_size = 64, count = 5): + def ping_ip (self, src_port, dst_ipv4, pkt_size = 64, count = 5, interval_sec = 1): """ Pings an IP address through a port :parameters: - src_port - on which port_id to send the ICMP PING request - dst_ipv4 - which IP to ping - pkt_size - packet size to use - count - how many times to ping + src_port - on which port_id to send the ICMP PING request + dst_ipv4 - which IP to ping + pkt_size - packet size to use + count - how many times to ping + interval_sec - how much time to wait between pings :raises: + :exc:`STLError` @@ -1919,7 +1919,8 @@ class STLClient(object): raise STLError("pkt_size should be a value between 64 and 9216: '{0}'".format(pkt_size)) validate_type('count', count, int) - + validate_type('interval_sec', interval_sec, (int, float)) + self.logger.pre_cmd("Pinging {0} from port {1} with {2} bytes of data:".format(dst_ipv4, src_port, pkt_size)) @@ -1935,7 +1936,7 @@ class STLClient(object): self.logger.log(rc.data()) if i != (count - 1): - time.sleep(1) + time.sleep(interval_sec) @@ -2938,7 +2939,27 @@ class STLClient(object): + @__api_check(True) + def get_port_attr (self, port): + """ + get the port attributes currently set + + :parameters: + ports - for which ports to configure service mode on/off + + + :raises: + + :exe:'STLError' + + """ + validate_type('port', port, int) + if port not in self.get_all_ports(): + raise STLError("'{0}' is not a valid port id".format(port)) + + return self.ports[port].get_formatted_info() + + @__api_check(True) def set_service_mode (self, ports = None, enabled = True): """ @@ -2999,11 +3020,12 @@ class STLClient(object): if not rc: raise STLError(rc) - + # alias + arp = resolve @__api_check(True) - def start_capture (self, tx_ports, rx_ports, limit = 1000, mode = 'fixed'): + def start_capture (self, tx_ports = None, rx_ports = None, limit = 1000, mode = 'fixed'): """ Starts a low rate packet capturing on the server @@ -3033,6 +3055,11 @@ class STLClient(object): + :exe:'STLError' """ + + # default values for TX / RX ports + tx_ports = tx_ports if tx_ports is not None else [] + rx_ports = rx_ports if rx_ports is not None else [] + # TODO: remove this when TX is implemented if tx_ports: raise STLError('TX port capturing is not yet implemented') @@ -3072,7 +3099,7 @@ class STLClient(object): @__api_check(True) - def stop_capture (self, capture_id, output_filename = None): + def stop_capture (self, capture_id, output = None): """ Stops an active capture and optionally save it to a PCAP file @@ -3080,9 +3107,10 @@ class STLClient(object): capture_id: int an active capture ID to stop - output_filename: str - output filename to save capture - if 'None', all captured packets will be discarded + output: None/ str / list + if output is None - all the packets will be discarded + if output is a 'str' - it will be interpeted as output filename + if it is a list, the API will populate the list with packet objects :raises: + :exe:'STLError' @@ -3094,6 +3122,10 @@ class STLClient(object): # 2. fetching # 3. saving to file + + validate_type('capture_id', capture_id, (int)) + validate_type('output', output, (type(None), str, list)) + # stop self.logger.pre_cmd("Stopping packet capture {0}".format(capture_id)) @@ -3105,9 +3137,9 @@ class STLClient(object): # pkt count pkt_count = rc.data()['pkt_count'] - # fetch packets - if output_filename: - self.__fetch_capture_packets(capture_id, output_filename, pkt_count) + # fetch packets + if output is not None: + self.__fetch_capture_packets(capture_id, output, pkt_count) # remove self.logger.pre_cmd("Removing PCAP capture {0} from server".format(capture_id)) @@ -3119,12 +3151,18 @@ class STLClient(object): # fetch packets from the server and save them to a file - def __fetch_capture_packets (self, capture_id, output_filename, pkt_count): - self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) - + def __fetch_capture_packets (self, capture_id, output, pkt_count): + write_to_file = isinstance(output, basestring) + + self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output if write_to_file else 'list')) + # create a PCAP file - writer = RawPcapWriter(output_filename, linktype = 1) - writer._write_header(None) + if write_to_file: + writer = RawPcapWriter(output, linktype = 1) + writer._write_header(None) + else: + # clear the list + del output[:] pending = pkt_count rc = RC_OK() @@ -3145,11 +3183,15 @@ class STLClient(object): # write packets for pkt in pkts: - # split the server timestamp relative to the capture start time - ts_sec, ts_usec = sec_split_usec(pkt['ts'] - start_ts) + ts = pkt['ts'] - start_ts - pkt_bin = base64.b64decode(pkt['binary']) - writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) + pkt['binary'] = base64.b64decode(pkt['binary']) + + if write_to_file: + ts_sec, ts_usec = sec_split_usec(ts) + writer._write_packet(pkt['binary'], sec = ts_sec, usec = ts_usec) + else: + output.append(pkt) @@ -3161,16 +3203,20 @@ class STLClient(object): @__api_check(True) def get_capture_status (self): """ - returns a list of all active captures - each element in the list is an object containing - info about the capture + Returns a dictionary where each key is an capture ID + Each value is an object describing the capture """ rc = self._transmit("capture", params = {'command': 'status'}) if not rc: raise STLError(rc) - return rc.data() + # reformat as dictionary + output = {} + for c in rc.data(): + output[c['id']] = c + + return output @__api_check(True) @@ -3182,9 +3228,9 @@ class STLClient(object): self.logger.pre_cmd("Removing all packet captures from server") - for c in captures: + for capture_id in captures.keys(): # remove - rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': c['id']}) + rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': capture_id}) if not rc: raise STLError(rc) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py index 7ac508a2..2358a38f 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py @@ -110,7 +110,7 @@ class RC(): def RC_OK(data = ""): return RC(True, data) -def RC_ERR (err): +def RC_ERR (err = ""): return RC(False, err) def RC_WARN (warn): diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py index 72d3fa9f..7cb94b28 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py @@ -124,6 +124,32 @@ def bitfield_to_list (bf): return rc +def set_window_always_on_top (title): + # we need the GDK module, if not available - ignroe this command + try: + if sys.version_info < (3,0): + from gtk import gdk + else: + #from gi.repository import Gdk as gdk + return + + except ImportError: + return + + # search the window and set it as above + root = gdk.get_default_root_window() + + for id in root.property_get('_NET_CLIENT_LIST')[2]: + w = gdk.window_foreign_new(id) + if w: + name = w.property_get('WM_NAME')[2] + if title in name: + w.set_keep_above(True) + gdk.window_process_all_updates() + break + + def bitfield_to_str (bf): lst = bitfield_to_list(bf) return "-" if not lst else ', '.join([str(x) for x in lst]) + -- cgit 1.2.3-korg