aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/builtinurl/builtins.c
diff options
context:
space:
mode:
authorAlexander Chernavin <achernavin@netgate.com>2023-11-07 11:25:21 +0000
committerMatthew Smith <mgsmith@netgate.com>2023-11-12 21:52:13 +0000
commit64d6463d2eac0c0fe434f3a7aa56fe4d85c046d9 (patch)
tree89871a62760cb89dfb37993fcf5b6d825cb6d0eb /src/plugins/builtinurl/builtins.c
parent4aaedaa2ffaf0cee9ceae62c785f940b6fe23535 (diff)
flowprobe: fix tx flows generated for rewritten traffic
Currently, when IPFIX records generation is enabled for an interface in the TX direction, some rewritten traffic is being sent from that interface, and the Ethernet header's location has changed due to rewriting, generated TX flows will contain fields with wrong and zero values. For example, that can be observed when traffic is rewritten from a subinterface to a hardware interface (i.e. when tags are removed). A TX flow generated in this case will have wrong L2 fields because of an incorrectly located Ethernet header. And zero L3/L4 fields because the Ethernet type will match neither IP4 nor IP6. The same code is executed to generate flows for both input and output features. And the same mechanism is applied to identify the Ethernet header in the buffer's data. However, such general code usually works with the buffer's data conditionally based on the direction. For most input features, the buffer's current_data will likely point to the IP header. For most output features, the buffer's current_data will likely point to the Ethernet header. With this fix: - Keep relying on ethernet_buffer_get_header() to locate the Ethernet header for input features. And start using vlib_buffer_get_current() to locate the Ethernet header for output features. The function will account for the Ethernet header's position change in the buffer's data if there is rewriting. - After fixing Ethernet header determination in the buffer's data, L3/L4 fields will contain non-zero but still incorrect data. That is because IP header determination needs to be fixed too. It currently relies on the fact that the Ethernet header is always located at the beginning of the buffer's data and that l2_hdr_sz can be used as an IP header offset. However, this may not be the case after rewriting. So start calculating the actual offset of the IP header in the buffer's data. - Add a unit test to cover the case. Type: fix Change-Id: Icf3f9e6518912d06dff0d5aa48e103b3dc94edb7 Signed-off-by: Alexander Chernavin <achernavin@netgate.com>
Diffstat (limited to 'src/plugins/builtinurl/builtins.c')
0 files changed, 0 insertions, 0 deletions
222'>222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
#! /usr/bin/env python

## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license

from __future__ import with_statement

import scapy.utils
from scapy.config import conf
import os,thread,select
import subprocess
import itertools
import collections
import time
from scapy.error import log_interactive,warning
import Queue

class PipeEngine:
    pipes = {}
    @classmethod
    def list_pipes(cls):
        for pn,pc in sorted(cls.pipes.items()):
            doc = pc.__doc__ or ""
            if doc:
                doc = doc.splitlines()[0]
            print "%20s: %s" % (pn, doc)
    @classmethod
    def list_pipes_detailed(cls):
        for pn,pc in sorted(cls.pipes.items()):
            if pc.__doc__:
                print "###### %s\n %s" % (pn ,pc.__doc__)
            else:
                print "###### %s" % pn
    
    def __init__(self, *pipes):
        self.active_pipes = set()
        self.active_sources = set()
        self.active_drains = set()
        self.active_sinks = set()
        self._add_pipes(*pipes)
        self.thread_lock = thread.allocate_lock()
        self.command_lock = thread.allocate_lock()
        self.__fdr,self.__fdw = os.pipe()
        self.threadid = None
    def __getattr__(self, attr):
        if attr.startswith("spawn_"):
            dname = attr[6:]
            if dname in self.pipes:
                def f(*args, **kargs):
                    k = self.pipes[dname]
                    p = k(*args, **kargs)
                    self.add(p)
                    return p
                return f
        raise AttributeError(attr)

    def add_one_pipe(self, pipe):
        self.active_pipes.add(pipe)
        if isinstance(pipe, Source):
            self.active_sources.add(pipe)
        if isinstance(pipe, Drain):
            self.active_drains.add(pipe)
        if isinstance(pipe, Sink):
            self.active_sinks.add(pipe)

    def get_pipe_list(self, pipe):
        def flatten(p, l):
            l.add(p)
            for q in p.sources|p.sinks|p.high_sources|p.high_sinks:
                if q not in l:
                    flatten(q, l)
        pl = set()
        flatten(pipe, pl)
        return pl

    def _add_pipes(self, *pipes):
        pl = set()
        for p in pipes:
            pl |= self.get_pipe_list(p)
        pl -= self.active_pipes
        for q in pl:
            self.add_one_pipe(q)
        return pl
            

    def run(self):
        log_interactive.info("Pipe engine thread started.")
        try:
            for p in self.active_pipes:
                p.start()
            sources = self.active_sources
            sources.add(self.__fdr)
            exhausted = set([])
            RUN=True
            STOP_IF_EXHAUSTED = False
            while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1):
                fds,fdo,fde=select.select(sources,[],[])
                for fd in fds:
                    if fd is self.__fdr:
                        cmd = os.read(self.__fdr,1)
                        if cmd == "X":
                            RUN=False
                            break
                        elif cmd == "B":
                            STOP_IF_EXHAUSTED = True
                        elif cmd == "A":
                            sources = self.active_sources-exhausted
                            sources.add(self.__fdr)
                        else:
                            warning("Unknown internal pipe engine command: %r. Ignoring." % cmd)
                    elif fd in sources:
                        try:
                            fd.deliver()
                        except Exception,e:
                            log_interactive.exception("piping from %s failed: %s" % (fd.name, e))
                        else:
                            if fd.exhausted():
                                exhausted.add(fd)
                                sources.remove(fd)
        except KeyboardInterrupt:
            pass
        finally:
            try:
                for p in self.active_pipes:
                    p.stop()
            finally:
                self.thread_lock.release()
                log_interactive.info("Pipe engine thread stopped.")

    def start(self):
        if self.thread_lock.acquire(0):
            self.threadid = thread.start_new_thread(self.run,())
        else:
            warning("Pipe engine already running")
    def wait_and_stop(self):
        self.stop(_cmd="B")
    def stop(self, _cmd="X"):
        try:
            with self.command_lock:
                if self.threadid is not None:
                    os.write(self.__fdw, _cmd)
                    while not self.thread_lock.acquire(0):
                        time.sleep(0.01) # interruptible wait for thread to terminate
                    self.thread_lock.release() # (not using .join() because it needs 'threading' module)
                else:
                    warning("Pipe engine thread not running")
        except KeyboardInterrupt:
            print "Interrupted by user."

    def add(self, *pipes):
        pipes = self._add_pipes(*pipes)
        with self.command_lock:
            if self.threadid is not None:
                for p in pipes:
                    p.start()
                os.write(self.__fdw, "A")
    
    def graph(self,**kargs):
        g=['digraph "pipe" {',"\tnode [shape=rectangle];",]
        for p in self.active_pipes:
            g.append('\t"%i" [label="%s"];' % (id(p), p.name))
        g.append("")
        g.append("\tedge [color=blue, arrowhead=vee];")
        for p in self.active_pipes:
            for q in p.sinks:
                g.append('\t"%i" -> "%i";' % (id(p), id(q)))
        g.append("")
        g.append("\tedge [color=red, arrowhead=veevee];")
        for p in self.active_pipes:
            for q in p.high_sinks:
                g.append('\t"%i" -> "%i" [color="red"];' % (id(p), id(q)))
        g.append('}')
        graph = "\n".join(g)
        scapy.utils.do_graph(graph, **kargs) 


class _ConnectorLogic(object):
    def __init__(self):
        self.sources = set()
        self.sinks = set()
        self.high_sources = set()
        self.high_sinks = set()

    def __lt__(self, other):
        other.sinks.add(self)
        self.sources.add(other)
        return other
    def __gt__(self, other):
        self.sinks.add(other)
        other.sources.add(self)
        return other
    def __eq__(self, other):
        self > other
        other > self
        return other

    def __lshift__(self, other):
        self.high_sources.add(other)
        other.high_sinks.add(self)
        return other
    def __rshift__(self, other):
        self.high_sinks.add(other)
        other.high_sources.add(self)
        return other
    def __floordiv__(self, other):
        self >> other
        other >> self
        return other


class Pipe(_ConnectorLogic):
    class __metaclass__(type):
        def __new__(cls, name, bases, dct):
            c = type.__new__(cls, name, bases, dct)
            PipeEngine.pipes[name] = c
            return c
    def __init__(self, name=None):
        _ConnectorLogic.__init__(self)
        if name is None:
            name = "%s" % (self.__class__.__name__)
        self.name = name
    def _send(self, msg):
        for s in self.sinks:
            s.push(msg)
    def _high_send(self, msg):
        for s in self.high_sinks:
            s.high_push(msg)

    def __repr__(self):
        ct = conf.color_theme
        s = "%s%s" % (ct.punct("<"), ct.layer_name(self.name))
        if self.sources or self.sinks:
            s+= " %s" % ct.punct("[")
            if self.sources:
                s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.sources),
                              ct.field_value(">"))
            s += ct.layer_name("#")
            if self.sinks:
                s+="%s%s" % (ct.field_value(">"),
                             ct.punct(",").join(ct.field_name(s.name) for s in self.sinks))
            s += ct.punct("]")

        if self.high_sources or self.high_sinks:
            s+= " %s" % ct.punct("[")
            if self.high_sources:
                s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.high_sources),
                              ct.field_value(">>"))
            s += ct.layer_name("#")
            if self.high_sinks:
                s+="%s%s" % (ct.field_value(">>"),
                             ct.punct(",").join(ct.field_name(s.name) for s in self.high_sinks))
            s += ct.punct("]")


        s += ct.punct(">")
        return s

class Source(Pipe):
    def __init__(self, name=None):
        Pipe.__init__(self, name=name)
        self.is_exhausted = False
    def _read_message(self):
        return Message()
    def deliver(self):
        msg = self._read_message
        self._send(msg)
    def fileno(self):
        return None
    def exhausted(self):
        return self.is_exhausted
    def start(self):
        pass
    def stop(self):
        pass

class Drain(Pipe):
    """Repeat messages from low/high entries to (resp.) low/high exits
     +-------+
  >>-|-------|->>
     |       |
   >-|-------|->
     +-------+
"""
    def push(self, msg):
        self._send(msg)
    def high_push(self, msg):
        self._high_send(msg)
    def start(self):
        pass
    def stop(self):
        pass

class Sink(Pipe):
    def push(self, msg):
        pass
    def high_push(self, msg):
        pass
    def start(self):
        pass
    def stop(self):
        pass


class AutoSource(Source):
    def __init__(self, name=None):
        Source.__init__(self, name=name)
        self.__fdr,self.__fdw = os.pipe()
        self._queue = collections.deque()
    def fileno(self):
        return self.__fdr
    def _gen_data(self, msg):
        self._queue.append((msg,False))
        self._wake_up()
    def _gen_high_data(self, msg):
        self._queue.append((msg,True))
        self._wake_up()
    def _wake_up(self):
        os.write(self.__fdw,"x")
    def deliver(self):
        os.read(self.__fdr,1)
        try:
            msg,high = self._queue.popleft()
        except IndexError: #empty queue. Exhausted source
            pass
        else:
            if high:
                self._high_send(msg)
            else:
                self._send(msg)

class ThreadGenSource(AutoSource):
    def __init__(self, name=None):
        AutoSource.__init__(self, name=name)
        self.RUN = False
    def generate(self):
        pass
    def start(self):
        self.RUN = True
        thread.start_new_thread(self.generate,())
    def stop(self):
        self.RUN = False


        
class ConsoleSink(Sink):
    """Print messages on low and high entries
     +-------+
  >>-|--.    |->>
     | print |
   >-|--'    |->
     +-------+
"""
    def push(self, msg):
        print ">%r" % msg
    def high_push(self, msg):
        print ">>%r" % msg

class RawConsoleSink(Sink):
    """Print messages on low and high entries
     +-------+
  >>-|--.    |->>
     | write |
   >-|--'    |->
     +-------+
"""
    def __init__(self, name=None, newlines=True):
        Sink.__init__(self, name=name)
        self.newlines = newlines
    def push(self, msg):
        if self.newlines:
            msg += "\n"
        os.write(1, str(msg))
    def high_push(self, msg):
        if self.newlines:
            msg += "\n"
        os.write(1, str(msg))

class CLIFeeder(AutoSource):
    """Send messages from python command line
     +--------+
  >>-|        |->>
     | send() |
   >-|   `----|->
     +--------+
"""
    def send(self, msg):
        self._gen_data(msg)
    def close(self):
        self.is_exhausted = True

class CLIHighFeeder(CLIFeeder):
    """Send messages from python command line to high output
     +--------+
  >>-|   .----|->>
     | send() |
   >-|        |->
     +--------+
"""
    def send(self, msg):
        self._gen_high_data(msg)


class PeriodicSource(ThreadGenSource):
    """Generage messages periodically on low exit
     +-------+
  >>-|       |->>
     | msg,T |
   >-|  `----|->
     +-------+
"""
    def __init__(self, msg, period, period2=0, name=None):
        ThreadGenSource.__init__(self,name=name)
        if not hasattr(msg, "__iter__"):
            msg=[msg]
        self.msg = msg
        self.period = period
        self.period2 = period2
    def generate(self):
        while self.RUN:
            empty_gen = True
            for m in self.msg:
                empty_gen = False
                self._gen_data(m)
                time.sleep(self.period)
            if empty_gen:
                self.is_exhausted = True
                self._wake_up()
            time.sleep(self.period2)
        
class TermSink(Sink):
    """Print messages on low and high entries on a separate terminal
     +-------+
  >>-|--.    |->>
     | print |
   >-|--'    |->
     +-------+
"""
    def __init__(self, name=None, keepterm=True, newlines=True, openearly=True):
        Sink.__init__(self, name=name)
        self.keepterm = keepterm
        self.newlines = newlines
        self.openearly = openearly
        self.opened = False
        if self.openearly:
            self.start()

    def start(self):
        if not self.opened:
            self.opened = True
            self.__r,self.__w = os.pipe()
            cmd = ["xterm"]
            if self.name is not None:
                cmd.extend(["-title",self.name])
            if self.keepterm:
                cmd.append("-hold")
            cmd.extend(["-e", "cat 0<&%i" % self.__r])
            self.__p = subprocess.Popen(cmd)
            os.close(self.__r)
    def stop(self):
        if not self.keepterm:
            self.opened = False
            os.close(self.__w)
            self.__p.kill()
            self.__p.wait()
    def _print(self, s):
        if self.newlines:
            s+="\n"
        os.write(self.__w, s)
            
    def push(self, msg):
        self._print(str(msg))
    def high_push(self, msg):
        self._print(str(msg))
    

class QueueSink(Sink):
    """Collect messages from high and low entries and queue them. Messages are unqueued with the .recv() method.
     +-------+
  >>-|--.    |->>
     | queue |
   >-|--'    |->
     +-------+
"""
    def __init__(self, name=None):
        Sink.__init__(self, name=name)
        self.q = Queue.Queue()
    def push(self, msg):
        self.q.put(msg)
    def high_push(self, msg):
        self.q.put(msg)
    def recv(self):
        while True:
            try:
                return self.q.get(True, timeout=0.1)
            except Queue.Empty:
                pass


class TransformDrain(Drain):
    """Apply a function to messages on low and high entry
     +-------+
  >>-|--[f]--|->>
     |       |
   >-|--[f]--|->
     +-------+
"""
    def __init__(self, f, name=None):
        Drain.__init__(self, name=name)
        self.f = f
    def push(self, msg):
        self._send(self.f(msg))
    def high_push(self, msg):
        self._high_send(self.f(msg))

class UpDrain(Drain):
    """Repeat messages from low entry to high exit
     +-------+
  >>-|    ,--|->>
     |   /   |
   >-|--'    |->
     +-------+
"""
    def push(self, msg):
        self._high_send(msg)
    def high_push(self, msg):
        pass

class DownDrain(Drain):
    """Repeat messages from high entry to low exit
     +-------+
  >>-|--.    |->>
     |   \   |
   >-|    `--|->
     +-------+
"""
    def push(self, msg):
        pass
    def high_push(self, msg):
        self._send(msg)
        

def _testmain():
    s = PeriodicSource("hello", 1, name="src")
    d1 = Drain(name="d1")
    c = ConsoleSink(name="c")
    tf = TransformDrain(lambda x:"Got %r" % x)
    t = TermSink(name="t", keepterm=False)

    s > d1 > c
    d1 > tf > t

    p = PipeEngine(s)

    p.graph(type="png",target="> /tmp/pipe.png")

    p.start()
    print p.threadid
    time.sleep(5)
    p.stop()


if __name__ == "__main__":
    _testmain()