root / flowspec / forms.py @ 052c14aa
History | View | Annotate | Download (8.7 kB)
1 |
from django import forms |
---|---|
2 |
from django.utils.safestring import mark_safe |
3 |
from django.utils.translation import ugettext as _ |
4 |
from django.utils.translation import ugettext_lazy |
5 |
from django.template.defaultfilters import filesizeformat |
6 |
from flowspy.flowspec.models import * |
7 |
from ipaddr import * |
8 |
from django.core.urlresolvers import reverse |
9 |
from django.contrib.auth.models import User |
10 |
import datetime |
11 |
|
12 |
|
13 |
|
14 |
class RouteForm(forms.ModelForm): |
15 |
# name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
|
16 |
# " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
|
17 |
# source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
18 |
# " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
|
19 |
# source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
|
20 |
# destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
21 |
# " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
|
22 |
# destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
|
23 |
# ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
|
24 |
class Meta: |
25 |
model = Route |
26 |
|
27 |
def clean_source(self): |
28 |
data = self.cleaned_data['source'] |
29 |
if data:
|
30 |
try:
|
31 |
address = IPNetwork(data) |
32 |
return self.cleaned_data["source"] |
33 |
except Exception: |
34 |
raise forms.ValidationError('Invalid network address format') |
35 |
|
36 |
def clean_destination(self): |
37 |
data = self.cleaned_data['destination'] |
38 |
if data:
|
39 |
try:
|
40 |
address = IPNetwork(data) |
41 |
return self.cleaned_data["destination"] |
42 |
except Exception: |
43 |
raise forms.ValidationError('Invalid network address format') |
44 |
|
45 |
def clean_expires(self): |
46 |
date = self.cleaned_data['expires'] |
47 |
if date:
|
48 |
range_days = (date - datetime.date.today()).days |
49 |
if range_days > 0 and range_days < 11: |
50 |
return self.cleaned_data["expires"] |
51 |
else:
|
52 |
raise forms.ValidationError('Invalid date range') |
53 |
|
54 |
def clean(self): |
55 |
name = self.cleaned_data.get('name', None) |
56 |
source = self.cleaned_data.get('source', None) |
57 |
sourceports = self.cleaned_data.get('sourceport', None) |
58 |
ports = self.cleaned_data.get('port', None) |
59 |
then = self.cleaned_data.get('then', None) |
60 |
destination = self.cleaned_data.get('destination', None) |
61 |
destinationports = self.cleaned_data.get('destinationport', None) |
62 |
user = self.cleaned_data.get('applier', None) |
63 |
peer = user.get_profile().peer |
64 |
networks = peer.networks.all() |
65 |
mynetwork = False
|
66 |
route_pk_list = [] |
67 |
|
68 |
if destination:
|
69 |
for network in networks: |
70 |
net = IPNetwork(network.network) |
71 |
if IPNetwork(destination) in net: |
72 |
mynetwork = True
|
73 |
if not mynetwork: |
74 |
raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks') |
75 |
if (sourceports and ports): |
76 |
raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports') |
77 |
if (destinationports and ports): |
78 |
raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports') |
79 |
if sourceports and not source: |
80 |
raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address') |
81 |
if destinationports and not destination: |
82 |
raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address') |
83 |
if not (source or sourceports or ports or destination or destinationports): |
84 |
raise forms.ValidationError('Fill at least a Route Match Condition') |
85 |
existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='PENDING').exclude(status='ERROR').exclude(status='ADMININACTIVE') |
86 |
existing_routes = existing_routes.filter(applier__userprofile__peer=peer) |
87 |
if source:
|
88 |
source = IPNetwork(source).compressed |
89 |
existing_routes = existing_routes.filter(source=source) |
90 |
else:
|
91 |
existing_routes = existing_routes.filter(source=None)
|
92 |
if sourceports:
|
93 |
route_pk_list=get_matchingport_route_pks(sourceports, existing_routes) |
94 |
if route_pk_list:
|
95 |
existing_routes = existing_routes.filter(pk__in=route_pk_list) |
96 |
else:
|
97 |
existing_routes = existing_routes.filter(sourceport=None)
|
98 |
if destinationports:
|
99 |
route_pk_list=get_matchingport_route_pks(destinationports, existing_routes) |
100 |
if route_pk_list:
|
101 |
existing_routes = existing_routes.filter(pk__in=route_pk_list) |
102 |
else:
|
103 |
existing_routes = existing_routes.filter(destinationport=None)
|
104 |
if ports:
|
105 |
route_pk_list=get_matchingport_route_pks(ports, existing_routes) |
106 |
if route_pk_list:
|
107 |
existing_routes = existing_routes.filter(pk__in=route_pk_list) |
108 |
else:
|
109 |
existing_routes = existing_routes.filter(port=None)
|
110 |
|
111 |
for route in existing_routes: |
112 |
if name != route.name:
|
113 |
existing_url = reverse('edit-route', args=[route.name])
|
114 |
if IPNetwork(destination) in IPNetwork(route.destination): |
115 |
raise forms.ValidationError('There is an exact %s rule, %s whose destination (%s) is supernet of (or the same as) network (%s).<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, destination, existing_url, route.name)) |
116 |
if IPNetwork(route.destination) in IPNetwork(destination): |
117 |
raise forms.ValidationError('There is an exact %s rule, %s whose destination network (%s) belongs to the destination network %s.<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, destination, existing_url, route.name)) |
118 |
|
119 |
|
120 |
return self.cleaned_data |
121 |
|
122 |
class ThenPlainForm(forms.ModelForm): |
123 |
# action = forms.CharField(initial='rate-limit')
|
124 |
class Meta: |
125 |
model = ThenAction |
126 |
|
127 |
def clean_action_value(self): |
128 |
action_value = self.cleaned_data['action_value'] |
129 |
if action_value:
|
130 |
try:
|
131 |
assert(int(action_value)) |
132 |
return "%s" %self.cleaned_data["action_value"] |
133 |
except:
|
134 |
raise forms.ValidationError('Rate-limiting should be an integer') |
135 |
if int(action_value) < 50: |
136 |
raise forms.ValidationError('Rate-limiting cannot be < 50kbps') |
137 |
else:
|
138 |
raise forms.ValidationError('Cannot be empty') |
139 |
|
140 |
def clean_action(self): |
141 |
action = self.cleaned_data['action'] |
142 |
if action != 'rate-limit': |
143 |
raise forms.ValidationError('Cannot select something other than rate-limit') |
144 |
else:
|
145 |
return self.cleaned_data["action"] |
146 |
|
147 |
class PortPlainForm(forms.ModelForm): |
148 |
# action = forms.CharField(initial='rate-limit')
|
149 |
class Meta: |
150 |
model = MatchPort |
151 |
|
152 |
def clean_port(self): |
153 |
port = self.cleaned_data['port'] |
154 |
if port:
|
155 |
try:
|
156 |
assert(int(port)) |
157 |
return "%s" %self.cleaned_data["port"] |
158 |
except:
|
159 |
raise forms.ValidationError('Port should be an integer') |
160 |
else:
|
161 |
raise forms.ValidationError('Cannot be empty') |
162 |
|
163 |
def value_list_to_list(valuelist): |
164 |
vl = [] |
165 |
for val in valuelist: |
166 |
vl.append(val[0])
|
167 |
return vl
|
168 |
|
169 |
def get_matchingport_route_pks(portlist, routes): |
170 |
route_pk_list = [] |
171 |
ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port')) |
172 |
for route in routes: |
173 |
rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port')) |
174 |
if rsp and rsp == ports_value_list: |
175 |
route_pk_list.append(route.pk) |
176 |
return route_pk_list
|