#!/usr/bin/env python3 import unittest from framework import VppTestCase, VppTestRunner from util import ppp from scapy.packet import Raw from scapy.layers.inet import IP, UDP from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity from vpp_papi import VppEnum class TestSyslog(VppTestCase): """ Syslog Protocol Test Cases """ @property def SYSLOG_SEVERITY(self): return VppEnum.vl_api_syslog_severity_t @classmethod def setUpClass(cls): super(TestSyslog, cls).setUpClass() try: cls.pg0, = cls.create_pg_interfaces(range(1)) cls.pg0.admin_up() cls.pg0.config_ip4() cls.pg0.resolve_arp() except Exception: super(TestSyslog, cls).tearDownClass() raise @classmethod def tearDownClass(cls): super(TestSyslog, cls).tearDownClass() def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None): """ Generate syslog message :param facility: facility value :param severity: severity level :param appname: application name that originate message :param msgid: message identifier :param sd: structured data (optional) :param msg: free-form message (optional) """ facility_str = ['kernel', 'user-level', 'mail-system', 'system-daemons', 'security-authorization', 'syslogd', 'line-printer', 'network-news', 'uucp', 'clock-daemon', '', 'ftp-daemon', 'ntp-subsystem', 'log-audit', 'log-alert', '', 'local0', 'local1', 'local2', 'local3', 'local4', 'local5', 'local6', 'local7'] severity_str = ['emergency', 'alert', 'critical', 'error', 'warning', 'notice', 'informational', 'debug'] cli_str = "test syslog %s %s %s %s" % (facility_str[facility], severity_str[severity], appname, msgid) if sd is not None: for sd_id, sd_params in sd.items(): cli_str += " sd-id %s" % (sd_id) for name, value in sd_params.items(): cli_str += " sd-param %s %s" % (name, value) if msg is not None: cli_str += " %s" % (msg) self.vapi.cli(cli_str) def syslog_verify(self, data, facility, severity, appname, msgid, sd=None, msg=None): """ Verify syslog message :param data: syslog message :param facility: facility value :param severity: severity level :param appname: application name that originate message :param msgid: message identifier :param sd: structured data (optional) :param msg: free-form message (optional) """ message = data.decode('utf-8') if sd is None: sd = {} try: message = SyslogMessage.parse(message) except ParseError as e: self.logger.error(e) raise else: self.assertEqual(message.facility, facility) self.assertEqual(message.severity, severity) self.assertEqual(message.appname, appname) self.assertEqual(message.msgid, msgid) self.assertEqual(message.msg, msg) self.assertEqual(message.sd, sd) self.assertEqual(message.version, 1) self.assertEqual(message.hostname, self.pg0.local_ip4) def test_syslog(self): """ Syslog Protocol test """ self.vapi.syslog_set_sender(src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4) config = self.vapi.syslog_get_sender() self.assertEqual(str(config.collector_address), self.pg0.remote_ip4) self.assertEqual(config.collector_port, 514) self.assertEqual(str(config.src_address), self.pg0.local_ip4) self.assertEqual(config.vrf_id, 0) self.assertEqual(config.max_msg_size, 480) appname = 'test' msgid = 'testMsg' msg = 'this is message' sd1 = {'exampleSDID@32473': {'iut': '3', 'eventSource': 'App', 'eventID': '1011'}} sd2 = {'exampleSDID@32473': {'iut': '3', 'eventSource': 'App', 'eventID': '1011'}, 'examplePriority@32473': {'class': 'high'}} self.pg_enable_capture(self.pg_interfaces) self.syslog_generate(SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg) capture = self.pg0.get_capture(1) try: self.assertEqual(capture[0][IP].src, self.pg0.local_ip4) self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4) self.assertEqual(capture[0][UDP].dport, 514) self.assert_packet_checksums_valid(capture[0], False) except: self.logger.error(ppp("invalid packet:", capture[0])) raise self.syslog_verify(capture[0][Raw].load, SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg) self.pg_enable_capture(self.pg_interfaces) self.vapi.syslog_set_filter( self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN) filter = self.vapi.syslog_get_filter() self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN) self.syslog_generate(SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg) self.pg0.assert_nothing_captured() self.pg_enable_capture(self.pg_interfaces) self.syslog_generate(SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg) capture = self.pg0.get_capture(1) self.syslog_verify(capture[0][Raw].load, SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg) self.vapi.syslog_set_sender(self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345) config = self.vapi.syslog_get_sender() self.assertEqual(config.collector_port, 12345) self.pg_enable_capture(self.pg_interfaces) self.syslog_generate(SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None) capture = self.pg0.get_capture(1) try: self.assertEqual(capture[0][UDP].dport, 12345) except: self.logger.error(ppp("invalid packet:", capture[0])) raise self.syslog_verify(capture[0][Raw].load, SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)