본문 바로가기

Language/python

scapy :: sniff 예제

특정 UDP 포트로 들어오는 데이터를 캡처하여 sequential하게 증가하는 특정값을 모니터링하는 소프트웨어이다.

import socket
import struct
import time
from threading import Thread
from scapy.all import *

DATA_FMT = "!IHHH%ds"
UDP_PORT = 8080
captured_list = []
start_msginx = -1
expected_msginx = -1
missing_cnt = 0
captured_cnt = 0

def pkg_callback(pkg):
    global start_msginx
    global missing_cnt
    global expected_msginx
    global captured_cnt
   
    udp_layer = pkg.getlayer("UDP")
    if udp_layer.dport != UDP_PORT:
        return
    captured_cnt = captured_cnt + 1
    udp_size = udp_layer.len
    dl_payload_size = udp_size - 8 - 10
    dl_timestamp, dl_bport, dl_msginx, dl_crc, dl_payload = struct.unpack(DATA_FMT%dl_payload_size, str(udp_layer.payload))
    if start_msginx == -1:
        start_msginx = dl_msginx
        print "DL MsgInx Start from %d" % ( start_msginx )
        expected_msginx = start_msginx + 1
        expected_msginx = expected_msginx % 0x10000
        return
    if expected_msginx != dl_msginx:
        print "Expected MsgInx ( %d ), Received MsgInx ( %d )" % (expected_msginx, dl_msginx)
        temp_missing_cnt = 0
        if dl_msginx < expected_msginx:
            temp_missing_cnt = 0xffff + dl_msginx - expected_msginx
        else:
            temp_missing_cnt = dl_msginx - expected_msginx
        missing_cnt = missing_cnt + temp_missing_cnt
    expected_msginx = dl_msginx + 1
    expected_msginx = expected_msginx % 0x10000

class UDP_SNIFFER(Thread):
    def __init__(self):
        Thread.__init__(self)
                                     
    def run(self):
      
        while 1:
            choice = raw_input("menu> ")
            if choice.strip() == "":
                continue
            if choice.strip() == "exit":
                break
            if choice.strip() == "start":
                print "Start to capture packets..."
                self.startCapture()
                continue
        print "Exiting..." 
   def startCapture(self):
        global captured_list
        global start_msginx
        global missing_cnt
        global expected_msginx
        global captured_cnt

        # initialize all variables.
        captured_list = []
        start_msginx = -1
        expected_msginx = -1
        missing_cnt = 0
        captured_cnt = 0

        # try to capture N packets.
        start_time = time.time()
        sniff(prn=pkg_callback, filter="udp and port %d" % (UDP_PORT), count=10000)
        end_time = time.time()
        gap = end_time - start_time

        # print result
        print "Sniffing is done. Result ( %d / %d ) during %f" % ( missing_cnt, captured_cnt, gap )

if __name__ == "__main__":
    upr = UDP_SNIFFER()
    upr.start()
    upr.join()
    exit(0)