#!/usr/bin/env python3 import unittest from framework import VppTestCase from asfframework import VppTestRunner from util import ppp from scapy.packet import Raw from scapy.layers.inet import IP, UDP from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity from vpp_papi import VppEnum class TestSyslog(VppTestCase): """Syslog Protocol Test Cases""" @property def SYSLOG_SEVERITY(self): return VppEnum.vl_api_syslog_severity_t @classmethod def setUpClass(cls): super(TestSyslog, cls).setUpClass() try: (cls.pg0,) = cls.create_pg_interfaces(range(1)) cls.pg0.admin_up() cls.pg0.config_ip4() cls.pg0.resolve_arp() except Exception: super(TestSyslog, cls).tearDownClass() raise @classmethod def tearDownClass(cls): super(TestSyslog, cls).tearDownClass() def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None): """ Generate syslog message :param facility: facility value :param severity: severity level :param appname: application name that originate message :param msgid: message identifier :param sd: structured data (optional) :param msg: free-form message (optional) """ facility_str = [ "kernel", "user-level", "mail-system", "system-daemons", "security-authorization", "syslogd", "line-printer", "network-news", "uucp", "clock-daemon", "", "ftp-daemon", "ntp-subsystem", "log-audit", "log-alert", "", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7", ] severity_str = [ "emergency", "alert", "critical", "error", "warning", "notice", "informational", "debug", ] cli_str = "test syslog %s %s %s %s" % ( facility_str[facility], severity_str[severity], appname, msgid, ) if sd is not None: for sd_id, sd_params in sd.items(): cli_str += " sd-id %s" % (sd_id) for name, value in sd_params.items(): cli_str += " sd-param %s %s" % (name, value) if msg is not None: cli_str += " %s" % (msg) self.vapi.cli(cli_str) def syslog_verify( self, data, facility, severity, appname, msgid, sd=None, msg=None ): """ Verify syslog message :param data: syslog message :param facility: facility value :param severity: severity level :param appname: application name that originate message :param msgid: message identifier :param sd: structured data (optional) :param msg: free-form message (optional) """ message = data.decode("utf-8") if sd is None: sd = {} try: message = SyslogMessage.parse(message) except ParseError as e: self.logger.error(e) raise else: self.assertEqual(message.facility, facility) self.assertEqual(message.severity, severity) self.assertEqual(message.appname, appname) self.assertEqual(message.msgid, msgid) self.assertEqual(message.msg, msg) self.assertEqual(message.sd, sd) self.assertEqual(message.version, 1) self.assertEqual(message.hostname, self.pg0.local_ip4) def test_syslog(self): """Syslog Protocol test""" self.vapi.syslog_set_sender( src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4 ) config = self.vapi.syslog_get_sender() self.assertEqual(str(config.collector_address), self.pg0.remote_ip4) self.assertEqual(config.collector_port, 514) self.assertEqual(str(config.src_address), self.pg0.local_ip4) self.assertEqual(config.vrf_id, 0) self.assertEqual(config.max_msg_size, 480) appname = "test" msgid = "testMsg" msg = "this is message" sd1 = { "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"} } sd2 = { "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}, "examplePriority@32473": {"class": "high"}, } self.pg_enable_capture(self.pg_interfaces) self.syslog_generate( SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg ) capture = self.pg0.get_capture(1) try: self.assertEqual(capture[0][IP].src, self.pg0.local_ip4) self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4) self.assertEqual(capture[0][UDP].dport, 514) self.assert_packet_checksums_valid(capture[0], False) except: self.logger.error(ppp("invalid packet:", capture[0])) raise self.syslog_verify( capture[0][Raw].load, SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg, ) self.pg_enable_capture(self.pg_interfaces) self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN) filter = self.vapi.syslog_get_filter() self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN) self.syslog_generate( SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg ) self.pg0.assert_nothing_captured() self.pg_enable_capture(self.pg_interfaces) self.syslog_generate( SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg ) capture = self.pg0.get_capture(1) self.syslog_verify( capture[0][Raw].load, SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg, ) self.vapi.syslog_set_sender( self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345 ) config = self.vapi.syslog_get_sender() self.assertEqual(config.collector_port, 12345) self.pg_enable_capture(self.pg_interfaces) self.syslog_generate( SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None ) capture = self.pg0.get_capture(1) try: self.assertEqual(capture[0][UDP].dport, 12345) except: self.logger.error(ppp("invalid packet:", capture[0])) raise self.syslog_verify( capture[0][Raw].load, SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None, ) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)