Minor documentation fix
[flowspy] / flowspec / forms.py
1 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
2 # vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
3
4 # Copyright © 2011-2014 Greek Research and Technology Network (GRNET S.A.)
5 # Copyright © 2011-2014 Leonidas Poulopoulos (@leopoul)
6
7 # Permission to use, copy, modify, and/or distribute this software for any
8 # purpose with or without fee is hereby granted, provided that the above
9 # copyright notice and this permission notice appear in all copies.
10
11 # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD
12 # TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
13 # FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
14 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
15 # DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
16 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
17 # SOFTWARE.
18
19 from django import forms
20 from django.utils.safestring import mark_safe
21 from django.utils.translation import ugettext as _
22 from django.utils.translation import ugettext_lazy
23 from django.template.defaultfilters import filesizeformat
24 from flowspec.models import *
25 from peers.models import *
26 from accounts.models import *
27 from ipaddr import *
28 from django.core.urlresolvers import reverse
29 from django.contrib.auth.models import User
30 from django.conf import settings
31 import datetime
32 from django.core.mail import mail_admins, mail_managers, send_mail
33
34 class UserProfileForm(forms.ModelForm):
35     class Meta:
36         model = UserProfile
37
38 class RouteForm(forms.ModelForm):
39 #    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
40 #                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
41 #    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
42 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
43 #    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
44 #    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
45 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
46 #    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
47 #    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
48
49     class Meta:
50         model = Route
51
52     def clean_applier(self):
53         applier = self.cleaned_data['applier']
54         if applier:
55             return self.cleaned_data["applier"]
56         else:
57             raise forms.ValidationError('This field is required.')
58
59     def clean_source(self):
60         user = User.objects.get(pk=self.data['applier'])
61         peer = user.get_profile().peer
62         data = self.cleaned_data['source']
63         private_error = False
64         protected_error = False
65         networkaddr_error = False
66         broadcast_error = False
67         if data:
68             try:
69                 address = IPNetwork(data)
70                 for net in settings.PROTECTED_SUBNETS:
71                     if address in IPNetwork(net):
72                         protected_error = True
73                         mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
74                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
75                               mail_body, settings.SERVER_EMAIL,
76                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
77                         raise Exception
78                 if address.is_private:
79                     private_error = True
80                     raise Exception
81                 if address.version == 4 and int(address.prefixlen) == 32:
82                     if int(address.network.compressed.split('.')[-1]) == 0:
83                         broadcast_error = True
84                         raise Exception
85                     elif int(address.network.compressed.split('.')[-1]) == 255:
86                         networkaddr_error = True
87                         raise Exception
88                 return self.cleaned_data["source"]
89             except Exception:
90                 error_text = _('Invalid network address format')
91                 if private_error:
92                     error_text = _('Private addresses not allowed')
93                 if networkaddr_error:
94                     error_text = _('Malformed address format. Cannot be ...255/32')
95                 if broadcast_error:
96                     error_text = _('Malformed address format. Cannot be ...0/32')
97                 if protected_error:
98                     error_text = _('You have no authority on this subnet')
99                 raise forms.ValidationError(error_text)
100
101     def clean_destination(self):
102         user = User.objects.get(pk=self.data['applier'])
103         peer = user.get_profile().peer
104         data = self.cleaned_data['destination']
105         error = None
106         protected_error = False
107         networkaddr_error = False
108         broadcast_error = False
109         if data:
110             try:
111                 address = IPNetwork(data)
112                 for net in settings.PROTECTED_SUBNETS:
113                     if address in IPNetwork(net):
114                         protected_error = True
115                         mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
116                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
117                               mail_body, settings.SERVER_EMAIL,
118                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
119                         raise Exception
120                 if address.prefixlen < settings.PREFIX_LENGTH:
121                     error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
122                     raise Exception
123                 if address.version == 4 and int(address.prefixlen) == 32:
124                     if int(address.network.compressed.split('.')[-1]) == 0:
125                         broadcast_error = True
126                         raise Exception
127                     elif int(address.network.compressed.split('.')[-1]) == 255:
128                         networkaddr_error = True
129                         raise Exception
130                 return self.cleaned_data["destination"]
131             except Exception:
132                 error_text = _('Invalid network address format')
133                 if error:
134                     error_text = error
135                 if protected_error:
136                     error_text = _('You have no authority on this subnet')
137                 if networkaddr_error:
138                     error_text = _('Malformed address format. Cannot be ...255/32')
139                 if broadcast_error:
140                     error_text = _('Malformed address format. Cannot be ...0/32')
141                 raise forms.ValidationError(error_text)
142     
143     def clean_expires(self):
144         date = self.cleaned_data['expires']
145         if date:
146             range_days = (date - datetime.date.today()).days
147             if range_days > 0 and range_days < 11:
148                 return self.cleaned_data["expires"]
149             else:
150                 raise forms.ValidationError('Invalid date range')
151
152     def clean(self):
153         if self.errors:
154              raise forms.ValidationError(_('Errors in form. Please review and fix them: %s'%", ".join(self.errors)))
155         name = self.cleaned_data.get('name', None)
156         source = self.cleaned_data.get('source', None)
157         sourceports = self.cleaned_data.get('sourceport', None)
158         ports = self.cleaned_data.get('port', None)
159         fragmenttypes = self.cleaned_data.get('fragmenttype', None)
160         then = self.cleaned_data.get('then', None)
161         destination = self.cleaned_data.get('destination', None)
162         destinationports = self.cleaned_data.get('destinationport', None)
163         protocols = self.cleaned_data.get('protocol', None)
164         user = self.cleaned_data.get('applier', None)
165         try:
166             issuperuser = self.data['issuperuser']
167             su = User.objects.get(username=issuperuser)
168         except:
169             issuperuser = None
170         peer = user.get_profile().peer
171         networks = peer.networks.all()
172         if issuperuser:
173             networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
174         mynetwork = False
175         route_pk_list = []
176         if destination:
177             for network in networks:
178                 net = IPNetwork(network.network)
179                 if IPNetwork(destination) in net:
180                     mynetwork = True
181             if not mynetwork:
182                 raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
183         if (sourceports and ports):
184             raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
185         if (destinationports and ports):
186             raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
187         if sourceports and not source:
188             raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
189         if destinationports and not destination:
190             raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
191         if not (source or sourceports or ports or destination or destinationports):
192             raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
193         if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
194             raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
195         existing_routes = Route.objects.all()
196         existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
197         if source:
198             source = IPNetwork(source).compressed
199             existing_routes = existing_routes.filter(source=source)
200         else:
201             existing_routes = existing_routes.filter(source=None)
202         if protocols:
203             route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
204             if route_pk_list:
205                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
206             else:
207                 existing_routes = existing_routes.filter(protocol=None)
208         else:
209             existing_routes = existing_routes.filter(protocol=None)
210         if sourceports:
211             route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
212             if route_pk_list:
213                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
214         else:
215             existing_routes = existing_routes.filter(sourceport=None)
216         if destinationports:
217             route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
218             if route_pk_list:
219                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
220         else:
221             existing_routes = existing_routes.filter(destinationport=None)
222         if ports:
223             route_pk_list=get_matchingport_route_pks(ports, existing_routes)
224             if route_pk_list:
225                 existing_routes = existing_routes.filter(pk__in=route_pk_list)              
226         else:
227             existing_routes = existing_routes.filter(port=None)
228         for route in existing_routes:
229             if name != route.name:
230                 existing_url = reverse('edit-route', args=[route.name])
231                 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
232                     raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
233         return self.cleaned_data
234
235 class ThenPlainForm(forms.ModelForm):
236 #    action = forms.CharField(initial='rate-limit')
237     class Meta:
238         model = ThenAction
239     
240     def clean_action_value(self):
241         action_value = self.cleaned_data['action_value']
242         if action_value:
243             try:
244                 assert(int(action_value))
245                 if int(action_value) < 50:
246                     raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
247                 return "%s" %self.cleaned_data["action_value"]
248             except:
249                 raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
250         else:
251             raise forms.ValidationError(_('Cannot be empty'))
252
253     def clean_action(self):
254         action = self.cleaned_data['action']
255         if action != 'rate-limit':
256             raise forms.ValidationError(_('Cannot select something other than rate-limit'))
257         else:
258             return self.cleaned_data["action"]
259     
260
261 class PortPlainForm(forms.ModelForm):
262 #    action = forms.CharField(initial='rate-limit')
263     class Meta:
264         model = MatchPort
265     
266     def clean_port(self):
267         port = self.cleaned_data['port']
268         if port:
269             try:
270                 p = int(port)
271                 if int(port) > 65535 or int(port) < 0:
272                     raise forms.ValidationError(_('Port should be < 65535 and >= 0'))
273                 return "%s" %self.cleaned_data["port"]
274             except forms.ValidationError:
275                 raise forms.ValidationError(_('Port should be < 65535 and >= 0'))
276             except:
277                 raise forms.ValidationError(_('Port should be an integer'))
278         else:
279             raise forms.ValidationError(_('Cannot be empty'))
280
281 def value_list_to_list(valuelist):
282     vl = []
283     for val in valuelist:
284         vl.append(val[0])
285     return vl
286
287 def get_matchingport_route_pks(portlist, routes):
288     route_pk_list = []
289     ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
290     for route in routes:
291         rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
292         if rsp and rsp == ports_value_list:
293             route_pk_list.append(route.pk)
294     return route_pk_list
295
296 def get_matchingprotocol_route_pks(protocolist, routes):
297     route_pk_list = []
298     protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
299     for route in routes:
300         rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
301         if rsp and rsp == protocols_value_list:
302             route_pk_list.append(route.pk)
303     return route_pk_list