test/sniffer.py

import unittest as ut
import socket
import struct
import time
import random
import select

import dpkt

from dprobelib import sniffer

SNIFFER_INTERFACE = "lo"
SERVER_ADDRESS = "127.0.0.1"
# Maximum time a blocking operation should wait (in seconds).
MAX_WAIT = 3.0

class TestSniffer (ut.TestCase):
    def testSniffer(self):
        data = self.__makeRandomData()
        packet = self.__sniffPacket(data)
        self.assertEqual(packet.data.data, data)

    def testCaptureLength(self):
        data = self.__makeRandomData(length=512)
        packet = self.__sniffPacket(data)
        self.assertEqual(sniffer.CAPTURE_LENGTH, len(str(packet)))

    def __makeRandomData(self, length=None):
        if length == None:
            length = 64
        return "".join([ struct.pack("B", random.randint(0, 255))
                         for counter in range(length) ])

    def __sniffPacket(self, testData):
        server, client, aSniffer, host, port = self.__setUpSniffer()
        waitingWriters = ()
        lastWrite = 0
        timeStarted = time.time()
        while (time.time() - timeStarted) <= MAX_WAIT:
            timeout = min(time.time() - lastWrite, 0.1)
            if timeout >= 0.1:
                waitingWriters = (client,)
            readers, writers, ignored = select.select((aSniffer,),
                                                      waitingWriters, (),
                                                      timeout)
            if readers:
                packet = self.__sniffAndParsePacket(aSniffer)
                if self.__matchTestPacket(packet, host, port):
                    return packet
            if writers:
                client.send(testData)
                lastWrite = time.time()
                waitingWriters = ()
        self.fail("didn't see test packet")

    def __sniffAndParsePacket(self, aSniffer):
        outbound, packetBytes = aSniffer.next()
        return dpkt.ip.IP(packetBytes)

    def __matchTestPacket(self, packet, host, port):
        return packet.dst == socket.inet_aton(host) \
               and packet.p == socket.IPPROTO_UDP and packet.data.dport == port

    def __setUpSniffer(self):
        server = self.__makeServerSocket()
        host, port = server.getsockname()
        client = self.__makeClientSocket(host, port)
        aSniffer = sniffer.sniff(SNIFFER_INTERFACE)
        return server, client, aSniffer, host, port

    def __makeClientSocket(self, host, port):
        client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        client.bind(("", socket.INADDR_ANY))
        client.connect((host, port))
        return client

    def __makeServerSocket(self):
        server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        server.bind((SERVER_ADDRESS, socket.INADDR_ANY))
        return server

Generated by GNU enscript 1.6.1.