Правила для файервола хранятся в виде текста.
firewall.py
# -*- coding: cp1251 -*-
import re
ruleDict = {}
ruleList = []
class IPError:
"""
Исключение возвращающее неправильное значение IP адреса или маски подсети.
"""
def __init__(self,errValue):
self.errIP = errValue
regxIP = re.compile(r'''\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.
(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.
(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.
(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b''', re.VERBOSE)
def netmask(addr, mask):
"""
На основании введенного IP адреса и маски возвращает значения адреса подсети
и широковещательного адреса.
"""
ADDR = addr.strip().split('.')
MASK = mask.strip().split('.')
subnet = []
brdcst = []
shortMask = False
for i in range(len(ADDR)):
if int(MASK[i]) == 255 and shortMask == False:
subnet.append(str(int(ADDR[i]) & int(MASK[i])))
brdcst.append(str(int(ADDR[i]) | 255 - int(MASK[i])))
elif int(MASK[i]) == 0 and shortMask == True:
subnet.append(str(int(ADDR[i]) & int(MASK[i])))
brdcst.append(str(int(ADDR[i]) | 255 - int(MASK[i])))
elif int(MASK[i]) < 255 and shortMask == False:
shortMask = True
subnet.append(str(int(ADDR[i]) & int(MASK[i])))
brdcst.append(str(int(ADDR[i]) | 255 - int(MASK[i])))
else:
raise IPError('Неправильно задана маска подсети: ' + mask)
return '.'.join(subnet), '.'.join(brdcst)
def add(ruleStr):
"""
Добавляет правило для файервола в словарь.
"""
try:
Rule, srcAddr, srcMask, dstAddr, dstMask = ruleStr.strip().split()
except ValueError:
raise IPError('Неправильное количество параметров.')
for srcIP in [srcAddr, srcMask, dstAddr, dstMask]:
if not regxIP.search(srcIP):
raise IPError('Неправильно задан параметр: ' + srcIP)
try:
subNetSrc, brdCstSrc = netmask(srcAddr, srcMask)
subNetDst, brdCstDst = netmask(dstAddr, dstMask)
except IPError, i:
raise IPError(i.errIP)
if srcAddr == subNetSrc or srcAddr == brdCstSrc:
raise IPError('Адрес источника является адресом подсети или широковещательным адресом.')
elif dstAddr == subNetDst or dstAddr == brdCstDst:
raise IPError('Адрес источника является адресом подсети или широковещательным адресом.')
if Rule.lower() == 'permit':
ruleDict[srcAddr + ' -> ' + dstAddr] = True
ruleList.append(ruleStr.strip())
elif Rule.lower() == 'deny':
ruleDict[srcAddr + ' -> ' + dstAddr] = False
ruleList.append(ruleStr.strip())
else:
raise IPError('\nНеправильно задано ключевое слово: ' + Rule.lower())
def list():
"""
Выводит на экран список правил для файервола.
"""
ruleNum = 1
print '\nПравила для файервола:'
if len(ruleList) == 0:
print '\nНи одного правила еще не задано.\n'
else:
for strRule in ruleList:
print '%2i %s' % (ruleNum, strRule)
ruleNum += 1
print '\n'
def filter (src, dst):
"""
Фильтрует пакет на основании адреса отправителя и адреса получателя.
Возвращает значение True | False | Строку с ошибкой.
"""
if regxIP.search(src) and regxIP.search(dst):
filterKey = src + ' -> ' + dst
if filterKey in ruleDict.keys():
return ruleDict[filterKey]
else:
return 'For this query rule not exist.'
else:
return 'Wrong data.'
def delete(delRule):
"""
По номеру удаляет правило из списка.
"""
if delRule.isdigit() and 0 < int(delRule) <= len(ruleList):
delKey = ruleList[int(delRule)-1].strip().split()
ruleList.remove(ruleList[int(delRule)-1])
ruleDict.pop(delKey[1] + ' -> ' + delKey[3])
else:
pass
def backup(fileDB,act):
"""
Очищает файл с правилами для файервола.
"""
if act == 'clear':
try:
DB = open(fileDB,'w')
except IOError:
print 'Файл невозможно стереть.'
finally:
DB.close()
if act == 'save':
try:
rulesDB = open(fileDB,'w')
for i in ruleList:
rulesDB.write(i+'\n')
except IOError:
print 'Правило не добавлено в файл для постоянного хранения.'
finally:
rulesDB.close()
def readDbFile(fileDB):
"""
Считывает правила из файла fileDB,
должен находится в том же каталоге что и модуль.
Возвращает словарь ruleDict.
"""
ruleDict = {}
ruleList = []
for ruleLine in open(fileDB,'r'):
rule, srcaddr, srcmask, dstaddr, dstmask = ruleLine.strip().split()
ruleList.append(ruleLine.strip())
if rule.lower() == 'permit':
ruleDict[srcaddr + ' -> ' + dstaddr] = True
elif rule.lower() == 'deny':
ruleDict[srcaddr + ' -> ' + dstaddr] = False
start_firewall.py
# -*- coding: cp1251 -*-
import firewall
while True:
print 'МЕНЮ'
print '1 - Просмотреть список правил для файервола.'
print '2 - Добавить правило.'
print '3 - Удалить правило.'
print '4 - Проверить IP пакет.'
print '5 - Сохранить введенные правила в базу правил файервола.'
print '6 - Очистить базу правил файервола.'
print '7 - Восстановить базу правил из файла. Существующая база будет утеряна.'
inputKey = raw_input('Выберите действие: ')
if inputKey == 'q':
break
elif inputKey == '1':
firewall.list()
elif inputKey == '2':
inputStr = raw_input('Введите правило: ')
try:
firewall.add(inputStr)
except firewall.IPError, error:
print '%s' % (error.errIP)
elif inputKey == '3':
inputStr = raw_input('Введите номер строки для удаления: ')
firewall.delete(inputStr)
elif inputKey == '4':
inSrc, inDst = raw_input('Введите через пробле srcAddr и dstAddr: ').strip().split()
print firewall.filter(inSrc, inDst)
elif inputKey == '5':
firewall.backup('rules.db','save')
elif inputKey == '6':
firewall.backup('rules.db','clear')
elif inputKey == '7':
firewall.readDbFile('rules.db')