import unittest as ut
import socket
import struct
import time
import random
import select
import dpkt
from dprobelib import sniffer
SNIFFER_INTERFACE = "lo"
SERVER_ADDRESS = "127.0.0.1"
# Maximum time a blocking operation should wait (in seconds).
MAX_WAIT = 3.0
class TestSniffer (ut.TestCase):
def testSniffer(self):
data = self.__makeRandomData()
packet = self.__sniffPacket(data)
self.assertEqual(packet.data.data, data)
def testCaptureLength(self):
data = self.__makeRandomData(length=512)
packet = self.__sniffPacket(data)
self.assertEqual(sniffer.CAPTURE_LENGTH, len(str(packet)))
def __makeRandomData(self, length=None):
if length == None:
length = 64
return "".join([ struct.pack("B", random.randint(0, 255))
for counter in range(length) ])
def __sniffPacket(self, testData):
server, client, aSniffer, host, port = self.__setUpSniffer()
waitingWriters = ()
lastWrite = 0
timeStarted = time.time()
while (time.time() - timeStarted) <= MAX_WAIT:
timeout = min(time.time() - lastWrite, 0.1)
if timeout >= 0.1:
waitingWriters = (client,)
readers, writers, ignored = select.select((aSniffer,),
waitingWriters, (),
timeout)
if readers:
packet = self.__sniffAndParsePacket(aSniffer)
if self.__matchTestPacket(packet, host, port):
return packet
if writers:
client.send(testData)
lastWrite = time.time()
waitingWriters = ()
self.fail("didn't see test packet")
def __sniffAndParsePacket(self, aSniffer):
outbound, packetBytes = aSniffer.next()
return dpkt.ip.IP(packetBytes)
def __matchTestPacket(self, packet, host, port):
return packet.dst == socket.inet_aton(host) \
and packet.p == socket.IPPROTO_UDP and packet.data.dport == port
def __setUpSniffer(self):
server = self.__makeServerSocket()
host, port = server.getsockname()
client = self.__makeClientSocket(host, port)
aSniffer = sniffer.sniff(SNIFFER_INTERFACE)
return server, client, aSniffer, host, port
def __makeClientSocket(self, host, port):
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.bind(("", socket.INADDR_ANY))
client.connect((host, port))
return client
def __makeServerSocket(self):
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind((SERVER_ADDRESS, socket.INADDR_ANY))
return server
Generated by GNU enscript
1.6.1.