8f17e73d1ba2c7cbc29f8dfe29624097bf964961
[flowspy] / flowspec / forms.py
1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
7 from ipaddr import *
8 from django.core.urlresolvers import reverse
9 from django.contrib.auth.models import User
10 from django.conf import settings
11 import datetime
12 from django.core.mail import mail_admins, mail_managers, send_mail
13
14
15 class RouteForm(forms.ModelForm):
16 #    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17 #                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18 #    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20 #    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21 #    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23 #    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24 #    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25
26     class Meta:
27         model = Route
28     
29     def clean_source(self):
30         user = User.objects.get(pk=self.data['applier'])
31         peer = user.get_profile().peer
32         data = self.cleaned_data['source']
33         private_error = False
34         protected_error = False
35         if data:
36             try:
37                 address = IPNetwork(data)
38                 for net in settings.PROTECTED_SUBNETS:
39                     if address in IPNetwork(net):
40                         protected_error = True
41                         mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
43                               mail_body, settings.SERVER_EMAIL,
44                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45                         raise Exception
46                 if address.is_private:
47                     private_error = True
48                     raise Exception
49                 else:
50                     return self.cleaned_data["source"]
51             except Exception:
52                 error_text = 'Invalid network address format'
53                 if private_error:
54                     error_text = 'Private addresses not allowed'
55                 if protected_error:
56                     error_text = 'You have no authority on this subnet'
57                 raise forms.ValidationError(error_text)
58
59     def clean_destination(self):
60         user = User.objects.get(pk=self.data['applier'])
61         peer = user.get_profile().peer
62         data = self.cleaned_data['destination']
63         error = None
64         protected_error = False
65         if data:
66             try:
67                 address = IPNetwork(data)
68                 for net in settings.PROTECTED_SUBNETS:
69                     if address in IPNetwork(net):
70                         protected_error = True
71                         mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
73                               mail_body, settings.SERVER_EMAIL,
74                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75                         raise Exception
76                 if address.prefixlen < settings.PREFIX_LENGTH:
77                     error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
78                     raise Exception
79                 return self.cleaned_data["destination"]
80             except Exception:
81                 error_text = 'Invalid network address format'
82                 if error:
83                     error_text = error
84                 if protected_error:
85                     error_text = 'You have no authority on this subnet'
86                 raise forms.ValidationError(error_text)
87     
88     def clean_expires(self):
89         date = self.cleaned_data['expires']
90         if date:
91             range_days = (date - datetime.date.today()).days
92             if range_days > 0 and range_days < 11:
93                 return self.cleaned_data["expires"]
94             else:
95                 raise forms.ValidationError('Invalid date range')
96
97     def clean(self):
98         if self.errors:
99              raise forms.ValidationError('Errors in form. Please review and fix them')
100         name = self.cleaned_data.get('name', None)
101         source = self.cleaned_data.get('source', None)
102         sourceports = self.cleaned_data.get('sourceport', None)
103         ports = self.cleaned_data.get('port', None)
104         then = self.cleaned_data.get('then', None)
105         destination = self.cleaned_data.get('destination', None)
106         destinationports = self.cleaned_data.get('destinationport', None)
107         protocols = self.cleaned_data.get('protocol', None)
108         user = self.cleaned_data.get('applier', None)
109         peer = user.get_profile().peer
110         networks = peer.networks.all()
111         mynetwork = False
112         route_pk_list = []
113         if destination:
114             for network in networks:
115                 net = IPNetwork(network.network)
116                 if IPNetwork(destination) in net:
117                     mynetwork = True
118             if not mynetwork:
119                  raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
120         if (sourceports and ports):
121             raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
122         if (destinationports and ports):
123             raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
124         if sourceports and not source:
125             raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
126         if destinationports and not destination:
127             raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
128         if not (source or sourceports or ports or destination or destinationports):
129             raise forms.ValidationError('Fill at least a Rule Match Condition')
130         if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
131             raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
132         existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
133         existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
134         if source:
135             source = IPNetwork(source).compressed
136             existing_routes = existing_routes.filter(source=source)
137         else:
138             existing_routes = existing_routes.filter(source=None)
139         if protocols:
140             route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
141             if route_pk_list:
142                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
143             else:
144                 existing_routes = existing_routes.filter(protocol=None)
145         else:
146             existing_routes = existing_routes.filter(protocol=None)
147         if sourceports:
148             route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
149             if route_pk_list:
150                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
151         else:
152             existing_routes = existing_routes.filter(sourceport=None)
153         if destinationports:
154             route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
155             if route_pk_list:
156                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
157         else:
158             existing_routes = existing_routes.filter(destinationport=None)
159         if ports:
160             route_pk_list=get_matchingport_route_pks(ports, existing_routes)
161             if route_pk_list:
162                 existing_routes = existing_routes.filter(pk__in=route_pk_list)              
163         else:
164             existing_routes = existing_routes.filter(port=None)
165         for route in existing_routes:
166             if name != route.name:
167                 existing_url = reverse('edit-route', args=[route.name])
168                 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
169                     raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
170         return self.cleaned_data
171
172 class ThenPlainForm(forms.ModelForm):
173 #    action = forms.CharField(initial='rate-limit')
174     class Meta:
175         model = ThenAction
176     
177     def clean_action_value(self):
178         action_value = self.cleaned_data['action_value']
179         if action_value:
180             try:
181                 assert(int(action_value))
182                 if int(action_value) < 50:
183                     raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
184                 return "%s" %self.cleaned_data["action_value"]
185             except:
186                 raise forms.ValidationError('Rate-limiting should be an integer < 50')
187         else:
188             raise forms.ValidationError('Cannot be empty')
189
190     def clean_action(self):
191         action = self.cleaned_data['action']
192         if action != 'rate-limit':
193             raise forms.ValidationError('Cannot select something other than rate-limit')
194         else:
195             return self.cleaned_data["action"]
196     
197
198 class PortPlainForm(forms.ModelForm):
199 #    action = forms.CharField(initial='rate-limit')
200     class Meta:
201         model = MatchPort
202     
203     def clean_port(self):
204         port = self.cleaned_data['port']
205         if port:
206             try:
207                 assert(int(port))
208                 return "%s" %self.cleaned_data["port"]
209             except:
210                 raise forms.ValidationError('Port should be an integer')
211         else:
212             raise forms.ValidationError('Cannot be empty')
213
214 def value_list_to_list(valuelist):
215     vl = []
216     for val in valuelist:
217         vl.append(val[0])
218     return vl
219
220 def get_matchingport_route_pks(portlist, routes):
221     route_pk_list = []
222     ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
223     for route in routes:
224         rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
225         if rsp and rsp == ports_value_list:
226             route_pk_list.append(route.pk)
227     return route_pk_list
228
229 def get_matchingprotocol_route_pks(protocolist, routes):
230     route_pk_list = []
231     protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
232     for route in routes:
233         rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
234         if rsp and rsp == protocols_value_list:
235             route_pk_list.append(route.pk)
236     return route_pk_list