root / flowspec / forms.py @ f5cd0730
History | View | Annotate | Download (15.1 kB)
1 |
# -*- coding: utf-8 -*- vim:fileencoding=utf-8:
|
---|---|
2 |
# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
|
3 |
|
4 |
# Copyright © 2011-2014 Greek Research and Technology Network (GRNET S.A.)
|
5 |
# Copyright © 2011-2014 Leonidas Poulopoulos (@leopoul)
|
6 |
#
|
7 |
# Permission to use, copy, modify, and/or distribute this software for any
|
8 |
# purpose with or without fee is hereby granted, provided that the above
|
9 |
# copyright notice and this permission notice appear in all copies.
|
10 |
#
|
11 |
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD
|
12 |
# TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
|
13 |
# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
|
14 |
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
|
15 |
# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
|
16 |
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
|
17 |
# SOFTWARE.
|
18 |
|
19 |
from django import forms |
20 |
from django.utils.safestring import mark_safe |
21 |
from django.utils.translation import ugettext as _ |
22 |
from django.utils.translation import ugettext_lazy |
23 |
from django.template.defaultfilters import filesizeformat |
24 |
from flowspec.models import * |
25 |
from peers.models import * |
26 |
from accounts.models import * |
27 |
from ipaddr import * |
28 |
from django.core.urlresolvers import reverse |
29 |
from django.contrib.auth.models import User |
30 |
from django.conf import settings |
31 |
import datetime |
32 |
from django.core.mail import mail_admins, mail_managers, send_mail |
33 |
|
34 |
class UserProfileForm(forms.ModelForm): |
35 |
class Meta: |
36 |
model = UserProfile |
37 |
|
38 |
class RouteForm(forms.ModelForm): |
39 |
# name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
|
40 |
# " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
|
41 |
# source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
42 |
# " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
|
43 |
# source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
|
44 |
# destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
45 |
# " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
|
46 |
# destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
|
47 |
# ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
|
48 |
|
49 |
class Meta: |
50 |
model = Route |
51 |
|
52 |
def clean_applier(self): |
53 |
applier = self.cleaned_data['applier'] |
54 |
if applier:
|
55 |
return self.cleaned_data["applier"] |
56 |
else:
|
57 |
raise forms.ValidationError('This field is required.') |
58 |
|
59 |
def clean_source(self): |
60 |
user = User.objects.get(pk=self.data['applier']) |
61 |
peer = user.get_profile().peer |
62 |
data = self.cleaned_data['source'] |
63 |
private_error = False
|
64 |
protected_error = False
|
65 |
networkaddr_error = False
|
66 |
broadcast_error = False
|
67 |
if data:
|
68 |
try:
|
69 |
address = IPNetwork(data) |
70 |
for net in settings.PROTECTED_SUBNETS: |
71 |
if address in IPNetwork(net): |
72 |
protected_error = True
|
73 |
mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
74 |
send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
|
75 |
mail_body, settings.SERVER_EMAIL, |
76 |
settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
77 |
raise Exception |
78 |
if address.is_private:
|
79 |
private_error = True
|
80 |
raise Exception |
81 |
if address.version == 4 and int(address.prefixlen) == 32: |
82 |
if int(address.network.compressed.split('.')[-1]) == 0: |
83 |
broadcast_error = True
|
84 |
raise Exception |
85 |
elif int(address.network.compressed.split('.')[-1]) == 255: |
86 |
networkaddr_error = True
|
87 |
raise Exception |
88 |
return self.cleaned_data["source"] |
89 |
except Exception: |
90 |
error_text = _('Invalid network address format')
|
91 |
if private_error:
|
92 |
error_text = _('Private addresses not allowed')
|
93 |
if networkaddr_error:
|
94 |
error_text = _('Malformed address format. Cannot be ...255/32')
|
95 |
if broadcast_error:
|
96 |
error_text = _('Malformed address format. Cannot be ...0/32')
|
97 |
if protected_error:
|
98 |
error_text = _('You have no authority on this subnet')
|
99 |
raise forms.ValidationError(error_text)
|
100 |
|
101 |
def clean_destination(self): |
102 |
user = User.objects.get(pk=self.data['applier']) |
103 |
peer = user.get_profile().peer |
104 |
data = self.cleaned_data['destination'] |
105 |
error = None
|
106 |
protected_error = False
|
107 |
networkaddr_error = False
|
108 |
broadcast_error = False
|
109 |
if data:
|
110 |
try:
|
111 |
address = IPNetwork(data) |
112 |
for net in settings.PROTECTED_SUBNETS: |
113 |
if address in IPNetwork(net): |
114 |
protected_error = True
|
115 |
mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
116 |
send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
|
117 |
mail_body, settings.SERVER_EMAIL, |
118 |
settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
119 |
raise Exception |
120 |
if address.prefixlen < settings.PREFIX_LENGTH:
|
121 |
error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
|
122 |
raise Exception |
123 |
if address.version == 4 and int(address.prefixlen) == 32: |
124 |
if int(address.network.compressed.split('.')[-1]) == 0: |
125 |
broadcast_error = True
|
126 |
raise Exception |
127 |
elif int(address.network.compressed.split('.')[-1]) == 255: |
128 |
networkaddr_error = True
|
129 |
raise Exception |
130 |
return self.cleaned_data["destination"] |
131 |
except Exception: |
132 |
error_text = _('Invalid network address format')
|
133 |
if error:
|
134 |
error_text = error |
135 |
if protected_error:
|
136 |
error_text = _('You have no authority on this subnet')
|
137 |
if networkaddr_error:
|
138 |
error_text = _('Malformed address format. Cannot be ...255/32')
|
139 |
if broadcast_error:
|
140 |
error_text = _('Malformed address format. Cannot be ...0/32')
|
141 |
raise forms.ValidationError(error_text)
|
142 |
|
143 |
def clean_expires(self): |
144 |
date = self.cleaned_data['expires'] |
145 |
if date:
|
146 |
range_days = (date - datetime.date.today()).days |
147 |
if range_days > 0 and range_days < 11: |
148 |
return self.cleaned_data["expires"] |
149 |
else:
|
150 |
raise forms.ValidationError('Invalid date range') |
151 |
|
152 |
def clean(self): |
153 |
if self.errors: |
154 |
raise forms.ValidationError(_('Errors in form. Please review and fix them: %s'%", ".join(self.errors))) |
155 |
name = self.cleaned_data.get('name', None) |
156 |
source = self.cleaned_data.get('source', None) |
157 |
sourceports = self.cleaned_data.get('sourceport', None) |
158 |
ports = self.cleaned_data.get('port', None) |
159 |
fragmenttypes = self.cleaned_data.get('fragmenttype', None) |
160 |
then = self.cleaned_data.get('then', None) |
161 |
destination = self.cleaned_data.get('destination', None) |
162 |
destinationports = self.cleaned_data.get('destinationport', None) |
163 |
protocols = self.cleaned_data.get('protocol', None) |
164 |
user = self.cleaned_data.get('applier', None) |
165 |
try:
|
166 |
issuperuser = self.data['issuperuser'] |
167 |
su = User.objects.get(username=issuperuser) |
168 |
except:
|
169 |
issuperuser = None
|
170 |
peer = user.get_profile().peer |
171 |
networks = peer.networks.all() |
172 |
if issuperuser:
|
173 |
networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct() |
174 |
mynetwork = False
|
175 |
route_pk_list = [] |
176 |
if destination:
|
177 |
for network in networks: |
178 |
net = IPNetwork(network.network) |
179 |
if IPNetwork(destination) in net: |
180 |
mynetwork = True
|
181 |
if not mynetwork: |
182 |
raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')) |
183 |
if (sourceports and ports): |
184 |
raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')) |
185 |
if (destinationports and ports): |
186 |
raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')) |
187 |
if sourceports and not source: |
188 |
raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')) |
189 |
if destinationports and not destination: |
190 |
raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')) |
191 |
if not (source or sourceports or ports or destination or destinationports): |
192 |
raise forms.ValidationError(_('Fill at least a Rule Match Condition')) |
193 |
if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS: |
194 |
raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action)) |
195 |
existing_routes = Route.objects.all() |
196 |
existing_routes = existing_routes.filter(applier__userprofile__peer=peer) |
197 |
if source:
|
198 |
source = IPNetwork(source).compressed |
199 |
existing_routes = existing_routes.filter(source=source) |
200 |
else:
|
201 |
existing_routes = existing_routes.filter(source=None)
|
202 |
if protocols:
|
203 |
route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes) |
204 |
if route_pk_list:
|
205 |
existing_routes = existing_routes.filter(pk__in=route_pk_list) |
206 |
else:
|
207 |
existing_routes = existing_routes.filter(protocol=None)
|
208 |
else:
|
209 |
existing_routes = existing_routes.filter(protocol=None)
|
210 |
if sourceports:
|
211 |
route_pk_list=get_matchingport_route_pks(sourceports, existing_routes) |
212 |
if route_pk_list:
|
213 |
existing_routes = existing_routes.filter(pk__in=route_pk_list) |
214 |
else:
|
215 |
existing_routes = existing_routes.filter(sourceport=None)
|
216 |
if destinationports:
|
217 |
route_pk_list=get_matchingport_route_pks(destinationports, existing_routes) |
218 |
if route_pk_list:
|
219 |
existing_routes = existing_routes.filter(pk__in=route_pk_list) |
220 |
else:
|
221 |
existing_routes = existing_routes.filter(destinationport=None)
|
222 |
if ports:
|
223 |
route_pk_list=get_matchingport_route_pks(ports, existing_routes) |
224 |
if route_pk_list:
|
225 |
existing_routes = existing_routes.filter(pk__in=route_pk_list) |
226 |
else:
|
227 |
existing_routes = existing_routes.filter(port=None)
|
228 |
for route in existing_routes: |
229 |
if name != route.name:
|
230 |
existing_url = reverse('edit-route', args=[route.name])
|
231 |
if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination): |
232 |
raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name)) |
233 |
return self.cleaned_data |
234 |
|
235 |
class ThenPlainForm(forms.ModelForm): |
236 |
# action = forms.CharField(initial='rate-limit')
|
237 |
class Meta: |
238 |
model = ThenAction |
239 |
|
240 |
def clean_action_value(self): |
241 |
action_value = self.cleaned_data['action_value'] |
242 |
if action_value:
|
243 |
try:
|
244 |
assert(int(action_value)) |
245 |
if int(action_value) < 50: |
246 |
raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps')) |
247 |
return "%s" %self.cleaned_data["action_value"] |
248 |
except:
|
249 |
raise forms.ValidationError(_('Rate-limiting should be an integer < 50')) |
250 |
else:
|
251 |
raise forms.ValidationError(_('Cannot be empty')) |
252 |
|
253 |
def clean_action(self): |
254 |
action = self.cleaned_data['action'] |
255 |
if action != 'rate-limit': |
256 |
raise forms.ValidationError(_('Cannot select something other than rate-limit')) |
257 |
else:
|
258 |
return self.cleaned_data["action"] |
259 |
|
260 |
|
261 |
class PortPlainForm(forms.ModelForm): |
262 |
# action = forms.CharField(initial='rate-limit')
|
263 |
class Meta: |
264 |
model = MatchPort |
265 |
|
266 |
def clean_port(self): |
267 |
port = self.cleaned_data['port'] |
268 |
if port:
|
269 |
try:
|
270 |
p = int(port)
|
271 |
if int(port) > 65535 or int(port) < 0: |
272 |
raise forms.ValidationError(_('Port should be < 65535 and >= 0')) |
273 |
return "%s" %self.cleaned_data["port"] |
274 |
except forms.ValidationError:
|
275 |
raise forms.ValidationError(_('Port should be < 65535 and >= 0')) |
276 |
except:
|
277 |
raise forms.ValidationError(_('Port should be an integer')) |
278 |
else:
|
279 |
raise forms.ValidationError(_('Cannot be empty')) |
280 |
|
281 |
def value_list_to_list(valuelist): |
282 |
vl = [] |
283 |
for val in valuelist: |
284 |
vl.append(val[0])
|
285 |
return vl
|
286 |
|
287 |
def get_matchingport_route_pks(portlist, routes): |
288 |
route_pk_list = [] |
289 |
ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port')) |
290 |
for route in routes: |
291 |
rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port')) |
292 |
if rsp and rsp == ports_value_list: |
293 |
route_pk_list.append(route.pk) |
294 |
return route_pk_list
|
295 |
|
296 |
def get_matchingprotocol_route_pks(protocolist, routes): |
297 |
route_pk_list = [] |
298 |
protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol')) |
299 |
for route in routes: |
300 |
rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol')) |
301 |
if rsp and rsp == protocols_value_list: |
302 |
route_pk_list.append(route.pk) |
303 |
return route_pk_list
|