1 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
2 # vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
4 # Copyright © 2011-2014 Greek Research and Technology Network (GRNET S.A.)
5 # Copyright © 2011-2014 Leonidas Poulopoulos (@leopoul)
7 # Permission to use, copy, modify, and/or distribute this software for any
8 # purpose with or without fee is hereby granted, provided that the above
9 # copyright notice and this permission notice appear in all copies.
11 # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD
12 # TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
13 # FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
14 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
15 # DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
16 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
19 from django import forms
20 from django.utils.safestring import mark_safe
21 from django.utils.translation import ugettext as _
22 from django.utils.translation import ugettext_lazy
23 from django.template.defaultfilters import filesizeformat
24 from flowspec.models import *
25 from peers.models import *
26 from accounts.models import *
28 from django.core.urlresolvers import reverse
29 from django.contrib.auth.models import User
30 from django.conf import settings
32 from django.core.mail import mail_admins, mail_managers, send_mail
34 class UserProfileForm(forms.ModelForm):
38 class RouteForm(forms.ModelForm):
39 # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
40 # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
41 # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
42 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
43 # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
44 # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
45 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
46 # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
47 # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
52 def clean_applier(self):
53 applier = self.cleaned_data['applier']
55 return self.cleaned_data["applier"]
57 raise forms.ValidationError('This field is required.')
59 def clean_source(self):
60 user = User.objects.get(pk=self.data['applier'])
61 peer = user.get_profile().peer
62 data = self.cleaned_data['source']
64 protected_error = False
65 networkaddr_error = False
66 broadcast_error = False
69 address = IPNetwork(data)
70 for net in settings.PROTECTED_SUBNETS:
71 if address in IPNetwork(net):
72 protected_error = True
73 mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
74 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
75 mail_body, settings.SERVER_EMAIL,
76 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
78 if address.is_private:
81 if address.version == 4 and int(address.prefixlen) == 32:
82 if int(address.network.compressed.split('.')[-1]) == 0:
83 broadcast_error = True
85 elif int(address.network.compressed.split('.')[-1]) == 255:
86 networkaddr_error = True
88 return self.cleaned_data["source"]
90 error_text = _('Invalid network address format')
92 error_text = _('Private addresses not allowed')
94 error_text = _('Malformed address format. Cannot be ...255/32')
96 error_text = _('Malformed address format. Cannot be ...0/32')
98 error_text = _('You have no authority on this subnet')
99 raise forms.ValidationError(error_text)
101 def clean_destination(self):
102 user = User.objects.get(pk=self.data['applier'])
103 peer = user.get_profile().peer
104 data = self.cleaned_data['destination']
106 protected_error = False
107 networkaddr_error = False
108 broadcast_error = False
111 address = IPNetwork(data)
112 for net in settings.PROTECTED_SUBNETS:
113 if address in IPNetwork(net):
114 protected_error = True
115 mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
116 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
117 mail_body, settings.SERVER_EMAIL,
118 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
120 if address.prefixlen < settings.PREFIX_LENGTH:
121 error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
123 if address.version == 4 and int(address.prefixlen) == 32:
124 if int(address.network.compressed.split('.')[-1]) == 0:
125 broadcast_error = True
127 elif int(address.network.compressed.split('.')[-1]) == 255:
128 networkaddr_error = True
130 return self.cleaned_data["destination"]
132 error_text = _('Invalid network address format')
136 error_text = _('You have no authority on this subnet')
137 if networkaddr_error:
138 error_text = _('Malformed address format. Cannot be ...255/32')
140 error_text = _('Malformed address format. Cannot be ...0/32')
141 raise forms.ValidationError(error_text)
143 def clean_expires(self):
144 date = self.cleaned_data['expires']
146 range_days = (date - datetime.date.today()).days
147 if range_days > 0 and range_days < 11:
148 return self.cleaned_data["expires"]
150 raise forms.ValidationError('Invalid date range')
154 raise forms.ValidationError(_('Errors in form. Please review and fix them: %s'%", ".join(self.errors)))
155 name = self.cleaned_data.get('name', None)
156 source = self.cleaned_data.get('source', None)
157 sourceports = self.cleaned_data.get('sourceport', None)
158 ports = self.cleaned_data.get('port', None)
159 fragmenttypes = self.cleaned_data.get('fragmenttype', None)
160 then = self.cleaned_data.get('then', None)
161 destination = self.cleaned_data.get('destination', None)
162 destinationports = self.cleaned_data.get('destinationport', None)
163 protocols = self.cleaned_data.get('protocol', None)
164 user = self.cleaned_data.get('applier', None)
166 issuperuser = self.data['issuperuser']
167 su = User.objects.get(username=issuperuser)
170 peer = user.get_profile().peer
171 networks = peer.networks.all()
173 networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
177 for network in networks:
178 net = IPNetwork(network.network)
179 if IPNetwork(destination) in net:
182 raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
183 if (sourceports and ports):
184 raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
185 if (destinationports and ports):
186 raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
187 if sourceports and not source:
188 raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
189 if destinationports and not destination:
190 raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
191 if not (source or sourceports or ports or destination or destinationports):
192 raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
193 if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
194 raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
195 existing_routes = Route.objects.all()
196 existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
198 source = IPNetwork(source).compressed
199 existing_routes = existing_routes.filter(source=source)
201 existing_routes = existing_routes.filter(source=None)
203 route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
205 existing_routes = existing_routes.filter(pk__in=route_pk_list)
207 existing_routes = existing_routes.filter(protocol=None)
209 existing_routes = existing_routes.filter(protocol=None)
211 route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
213 existing_routes = existing_routes.filter(pk__in=route_pk_list)
215 existing_routes = existing_routes.filter(sourceport=None)
217 route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
219 existing_routes = existing_routes.filter(pk__in=route_pk_list)
221 existing_routes = existing_routes.filter(destinationport=None)
223 route_pk_list=get_matchingport_route_pks(ports, existing_routes)
225 existing_routes = existing_routes.filter(pk__in=route_pk_list)
227 existing_routes = existing_routes.filter(port=None)
228 for route in existing_routes:
229 if name != route.name:
230 existing_url = reverse('edit-route', args=[route.name])
231 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
232 raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
233 return self.cleaned_data
235 class ThenPlainForm(forms.ModelForm):
236 # action = forms.CharField(initial='rate-limit')
240 def clean_action_value(self):
241 action_value = self.cleaned_data['action_value']
244 assert(int(action_value))
245 if int(action_value) < 50:
246 raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
247 return "%s" %self.cleaned_data["action_value"]
249 raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
251 raise forms.ValidationError(_('Cannot be empty'))
253 def clean_action(self):
254 action = self.cleaned_data['action']
255 if action != 'rate-limit':
256 raise forms.ValidationError(_('Cannot select something other than rate-limit'))
258 return self.cleaned_data["action"]
261 class PortPlainForm(forms.ModelForm):
262 # action = forms.CharField(initial='rate-limit')
266 def clean_port(self):
267 port = self.cleaned_data['port']
271 if int(port) > 65535 or int(port) < 0:
272 raise forms.ValidationError(_('Port should be < 65535 and >= 0'))
273 return "%s" %self.cleaned_data["port"]
274 except forms.ValidationError:
275 raise forms.ValidationError(_('Port should be < 65535 and >= 0'))
277 raise forms.ValidationError(_('Port should be an integer'))
279 raise forms.ValidationError(_('Cannot be empty'))
281 def value_list_to_list(valuelist):
283 for val in valuelist:
287 def get_matchingport_route_pks(portlist, routes):
289 ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
291 rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
292 if rsp and rsp == ports_value_list:
293 route_pk_list.append(route.pk)
296 def get_matchingprotocol_route_pks(protocolist, routes):
298 protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
300 rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
301 if rsp and rsp == protocols_value_list:
302 route_pk_list.append(route.pk)