특정 UDP 포트로 들어오는 데이터를 캡처하여 sequential하게 증가하는 특정값을 모니터링하는 소프트웨어이다.
import socket
import struct
import time
from threading import Thread
from scapy.all import *
DATA_FMT = "!IHHH%ds"
UDP_PORT = 8080
captured_list = []
start_msginx = -1
expected_msginx = -1
missing_cnt = 0
captured_cnt = 0
def pkg_callback(pkg):
global start_msginx
global missing_cnt
global expected_msginx
global captured_cnt
udp_layer = pkg.getlayer("UDP")
if udp_layer.dport != UDP_PORT:
return
captured_cnt = captured_cnt + 1
udp_size = udp_layer.len
dl_payload_size = udp_size - 8 - 10
dl_timestamp, dl_bport, dl_msginx, dl_crc, dl_payload = struct.unpack(DATA_FMT%dl_payload_size, str(udp_layer.payload))
if start_msginx == -1:
start_msginx = dl_msginx
print "DL MsgInx Start from %d" % ( start_msginx )
expected_msginx = start_msginx + 1
expected_msginx = expected_msginx % 0x10000
return
if expected_msginx != dl_msginx:
print "Expected MsgInx ( %d ), Received MsgInx ( %d )" % (expected_msginx, dl_msginx)
temp_missing_cnt = 0
if dl_msginx < expected_msginx:
temp_missing_cnt = 0xffff + dl_msginx - expected_msginx
else:
temp_missing_cnt = dl_msginx - expected_msginx
missing_cnt = missing_cnt + temp_missing_cnt
expected_msginx = dl_msginx + 1
expected_msginx = expected_msginx % 0x10000
class UDP_SNIFFER(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
while 1:
choice = raw_input("menu> ")
if choice.strip() == "":
continue
if choice.strip() == "exit":
break
if choice.strip() == "start":
print "Start to capture packets..."
self.startCapture()
continue
print "Exiting..."
def startCapture(self):
global captured_list
global start_msginx
global missing_cnt
global expected_msginx
global captured_cnt
# initialize all variables.
captured_list = []
start_msginx = -1
expected_msginx = -1
missing_cnt = 0
captured_cnt = 0
# try to capture N packets.
start_time = time.time()
sniff(prn=pkg_callback, filter="udp and port %d" % (UDP_PORT), count=10000)
end_time = time.time()
gap = end_time - start_time
# print result
print "Sniffing is done. Result ( %d / %d ) during %f" % ( missing_cnt, captured_cnt, gap )
if __name__ == "__main__":
upr = UDP_SNIFFER()
upr.start()
upr.join()
exit(0)
import struct
import time
from threading import Thread
from scapy.all import *
DATA_FMT = "!IHHH%ds"
UDP_PORT = 8080
captured_list = []
start_msginx = -1
expected_msginx = -1
missing_cnt = 0
captured_cnt = 0
def pkg_callback(pkg):
global start_msginx
global missing_cnt
global expected_msginx
global captured_cnt
udp_layer = pkg.getlayer("UDP")
if udp_layer.dport != UDP_PORT:
return
captured_cnt = captured_cnt + 1
udp_size = udp_layer.len
dl_payload_size = udp_size - 8 - 10
dl_timestamp, dl_bport, dl_msginx, dl_crc, dl_payload = struct.unpack(DATA_FMT%dl_payload_size, str(udp_layer.payload))
if start_msginx == -1:
start_msginx = dl_msginx
print "DL MsgInx Start from %d" % ( start_msginx )
expected_msginx = start_msginx + 1
expected_msginx = expected_msginx % 0x10000
return
if expected_msginx != dl_msginx:
print "Expected MsgInx ( %d ), Received MsgInx ( %d )" % (expected_msginx, dl_msginx)
temp_missing_cnt = 0
if dl_msginx < expected_msginx:
temp_missing_cnt = 0xffff + dl_msginx - expected_msginx
else:
temp_missing_cnt = dl_msginx - expected_msginx
missing_cnt = missing_cnt + temp_missing_cnt
expected_msginx = dl_msginx + 1
expected_msginx = expected_msginx % 0x10000
class UDP_SNIFFER(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
while 1:
choice = raw_input("menu> ")
if choice.strip() == "":
continue
if choice.strip() == "exit":
break
if choice.strip() == "start":
print "Start to capture packets..."
self.startCapture()
continue
print "Exiting..."
def startCapture(self):
global captured_list
global start_msginx
global missing_cnt
global expected_msginx
global captured_cnt
# initialize all variables.
captured_list = []
start_msginx = -1
expected_msginx = -1
missing_cnt = 0
captured_cnt = 0
# try to capture N packets.
start_time = time.time()
sniff(prn=pkg_callback, filter="udp and port %d" % (UDP_PORT), count=10000)
end_time = time.time()
gap = end_time - start_time
# print result
print "Sniffing is done. Result ( %d / %d ) during %f" % ( missing_cnt, captured_cnt, gap )
if __name__ == "__main__":
upr = UDP_SNIFFER()
upr.start()
upr.join()
exit(0)