And you might think now, why not just use a crafted RST packet from scapy? And I would answer, because it is more reliable, and because you might prefer not to send anything to the source IP that you want to block. An rst packet is enough to know that something (some app) is alive at the other side. On the other hand, a combination of both would be the best fit here, because that will avoid long timeouts while waitting for an answer, and duplicated requests (some browsers make a lot of retries).
To test it (on MacOSX), you need to have scapy installed. Then create a divert socket with a rule similar to this (be careful, this will enqueue all the packets at ipfw and it might break your connections if you don't attach a program to veredict them on time):
one@macuto2$ sudo ipfw add divert 3282 tcp from any to any
Password:
00100 divert 3282 ip from any to any proto tcp
Now run the attached script as follows:
sudo python2.6 DivertSocket.py 3282
And it should start fetching packets reaching the function of the packet handler, where you should place your logic. You might want to try to block petitions by for example ip addresses (just for testing), or content payload, but remember that this doesn't reassemble tcp streams, there would be much more to do for content inspection. Anyway I hope someone will have some fun testing it.
DivertSocket.py
-- cut here --
import socket
import sys
import re
from scapy.all import *
if not socket.__dict__.has_key("IPPROTO_DIVERT"):
# Define if
socket.IPPROTO_DIVERT = 254
class DivertSocket:
def __init__(self, port, delegateFunc=None):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_DIVERT)
# Here the addr can be any. The important one is the port
self.sock.bind(("0.0.0.0", port))
# By default the max
self.bufsize = 65535
# Set blocking
self.sock.setblocking(True)
# Register callback
self.delegateFunc = delegateFunc
self.__loop = 1
def start(self, default=0):
self.fetchPackets(default)
def fetchPackets(self, default=0):
while self.__loop:
buf, addr = self.sock.recvfrom(self.bufsize)
# If we registered a delegate funcion, call it
if self.delegateFunc != None:
self.delegateFunc(buf, addr)
# Else send it if the default behavior matches
else:
print "Warning, no functions registered for inspection!"
if default:
self.sendPacket(buf, addr)
else:
print "You need to implement a callback function for inspection"
sys.exit(-1)
def setVeredict(self, buf, addr, veredict=False):
if veredict:
if self.__sendPacket(buf, addr) == False:
print "Pkt not sent. Weird.. Need to see which packet causes this error"
def __sendPacket(self, buf, addr = None):
try:
if addr:
return self.sock.sendto(buf, addr)
#else try send it raw anyway..
return self.sock.send(buf)
except KeyboardInterrupt, e:
print "Stopping Engine..."
sys.exit(0)
except:
print "Could not send packet..."
return False
def stop(self):
self.__loop = 0
self.sock.close()
def pktHandler(buf, addr):
p = IP(buf)
print p.display()
ds.setVeredict(buf,addr, True)
ds = DivertSocket(int(sys.argv[1]), pktHandler)
ds.start()
-- stop here --
BTW, I would be very pleased if someone can test it under linux using iptables. Sooner or later I'll try it anyway.
Happy hacking!
;-)
No comments:
Post a Comment