Merge branch 'master' of https://code.grnet.gr/git/flowspy
[flowspy] / flowspec / forms.py
1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
7 from ipaddr import *
8 from django.core.urlresolvers import reverse
9 from django.contrib.auth.models import User
10 from django.conf import settings
11 import datetime
12 from django.core.mail import mail_admins, mail_managers, send_mail
13
14
15 class RouteForm(forms.ModelForm):
16 #    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17 #                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18 #    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20 #    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21 #    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23 #    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24 #    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25
26     class Meta:
27         model = Route
28     
29     def clean_source(self):
30         user = User.objects.get(pk=self.data['applier'])
31         peer = user.get_profile().peer
32         data = self.cleaned_data['source']
33         private_error = False
34         protected_error = False
35         if data:
36             try:
37                 address = IPNetwork(data)
38                 for net in settings.PROTECTED_SUBNETS:
39                     if address in IPNetwork(net):
40                         protected_error = True
41                         mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
43                               mail_body, settings.SERVER_EMAIL,
44                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45                         raise forms.ValidationError("Not allowed")
46                 if address.is_private:
47                     private_error = True
48                     raise forms.ValidationError('Private addresses not allowed')
49                 else:
50                     return self.cleaned_data["source"]
51             except Exception:
52                 error_text = 'Invalid network address format'
53                 if private_error:
54                     error_text = 'Private addresses not allowed'
55                 if protected_error:
56                     error_text = 'You have no authority on this subnet'
57                 raise forms.ValidationError(error_text)
58
59     def clean_destination(self):
60         user = User.objects.get(pk=self.data['applier'])
61         peer = user.get_profile().peer        
62         data = self.cleaned_data['destination']
63         error = None
64         protected_error = False
65         if data:
66             try:
67                 address = IPNetwork(data)
68                 for net in settings.PROTECTED_SUBNETS:
69                     if address in IPNetwork(net):
70                         protected_error = True
71                         mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
73                               mail_body, settings.SERVER_EMAIL,
74                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75                         raise forms.ValidationError("Not allowed")
76                 if address.prefixlen < settings.PREFIX_LENGTH:
77                     error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
78                     raise forms.ValidationError('error')
79                 return self.cleaned_data["destination"]
80             except Exception:
81                 error_text = 'Invalid network address format'
82                 if error:
83                     error_text = error
84                 if protected_error:
85                     error_text = 'You have no authority on this subnet'
86                 raise forms.ValidationError(error_text)
87     
88     def clean_expires(self):
89         date = self.cleaned_data['expires']
90         if date:
91             range_days = (date - datetime.date.today()).days
92             if range_days > 0 and range_days < 11:
93                 return self.cleaned_data["expires"]
94             else:
95                 raise forms.ValidationError('Invalid date range')
96
97     def clean(self):
98         name = self.cleaned_data.get('name', None)
99         source = self.cleaned_data.get('source', None)
100         sourceports = self.cleaned_data.get('sourceport', None)
101         ports = self.cleaned_data.get('port', None)
102         then = self.cleaned_data.get('then', None)
103         destination = self.cleaned_data.get('destination', None)
104         destinationports = self.cleaned_data.get('destinationport', None)
105         protocols = self.cleaned_data.get('protocol', None)
106         user = self.cleaned_data.get('applier', None)
107         peer = user.get_profile().peer
108         networks = peer.networks.all()
109         mynetwork = False
110         route_pk_list = []
111         if destination:
112             for network in networks:
113                 net = IPNetwork(network.network)
114                 if IPNetwork(destination) in net:
115                     mynetwork = True
116             if not mynetwork:
117                  raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
118         if (sourceports and ports):
119             raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
120         if (destinationports and ports):
121             raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
122         if sourceports and not source:
123             raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
124         if destinationports and not destination:
125             raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
126         if not (source or sourceports or ports or destination or destinationports):
127             raise forms.ValidationError('Fill at least a Rule Match Condition')
128         if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
129             raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
130         existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
131         existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
132         if source:
133             source = IPNetwork(source).compressed
134             existing_routes = existing_routes.filter(source=source)
135         else:
136             existing_routes = existing_routes.filter(source=None)
137         if protocols:
138             route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
139             if route_pk_list:
140                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
141             else:
142                 existing_routes = existing_routes.filter(protocol=None)
143         else:
144             existing_routes = existing_routes.filter(protocol=None)
145         if sourceports:
146             route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
147             if route_pk_list:
148                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
149         else:
150             existing_routes = existing_routes.filter(sourceport=None)
151         if destinationports:
152             route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
153             if route_pk_list:
154                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
155         else:
156             existing_routes = existing_routes.filter(destinationport=None)
157         if ports:
158             route_pk_list=get_matchingport_route_pks(ports, existing_routes)
159             if route_pk_list:
160                 existing_routes = existing_routes.filter(pk__in=route_pk_list)              
161         else:
162             existing_routes = existing_routes.filter(port=None)
163         for route in existing_routes:
164             if name != route.name:
165                 existing_url = reverse('edit-route', args=[route.name])
166                 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
167                     raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
168         return self.cleaned_data
169
170 class ThenPlainForm(forms.ModelForm):
171 #    action = forms.CharField(initial='rate-limit')
172     class Meta:
173         model = ThenAction
174     
175     def clean_action_value(self):
176         action_value = self.cleaned_data['action_value']
177         if action_value:
178             try:
179                 assert(int(action_value))
180                 if int(action_value) < 50:
181                     raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
182                 return "%s" %self.cleaned_data["action_value"]
183             except:
184                 raise forms.ValidationError('Rate-limiting should be an integer < 50')
185         else:
186             raise forms.ValidationError('Cannot be empty')
187
188     def clean_action(self):
189         action = self.cleaned_data['action']
190         if action != 'rate-limit':
191             raise forms.ValidationError('Cannot select something other than rate-limit')
192         else:
193             return self.cleaned_data["action"]
194     
195
196 class PortPlainForm(forms.ModelForm):
197 #    action = forms.CharField(initial='rate-limit')
198     class Meta:
199         model = MatchPort
200     
201     def clean_port(self):
202         port = self.cleaned_data['port']
203         if port:
204             try:
205                 assert(int(port))
206                 return "%s" %self.cleaned_data["port"]
207             except:
208                 raise forms.ValidationError('Port should be an integer')
209         else:
210             raise forms.ValidationError('Cannot be empty')
211
212 def value_list_to_list(valuelist):
213     vl = []
214     for val in valuelist:
215         vl.append(val[0])
216     return vl
217
218 def get_matchingport_route_pks(portlist, routes):
219     route_pk_list = []
220     ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
221     for route in routes:
222         rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
223         if rsp and rsp == ports_value_list:
224             route_pk_list.append(route.pk)
225     return route_pk_list
226
227 def get_matchingprotocol_route_pks(protocolist, routes):
228     route_pk_list = []
229     protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
230     for route in routes:
231         rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
232         if rsp and rsp == protocols_value_list:
233             route_pk_list.append(route.pk)
234     return route_pk_list