Added release 0.8.7 details in ChangeLog file
[flowspy] / flowspec / forms.py
1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
7 from flowspy.peers.models import *
8 from ipaddr import *
9 from django.core.urlresolvers import reverse
10 from django.contrib.auth.models import User
11 from django.conf import settings
12 import datetime
13 from django.core.mail import mail_admins, mail_managers, send_mail
14
15
16 class RouteForm(forms.ModelForm):
17 #    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
18 #                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
19 #    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
20 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
21 #    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
22 #    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
23 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
24 #    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
25 #    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
26
27     class Meta:
28         model = Route
29
30     def clean_applier(self):
31         applier = self.cleaned_data['applier']
32         if applier:
33             return self.cleaned_data["applier"]
34         else:
35             raise forms.ValidationError('This field is required.')
36
37     def clean_source(self):
38         user = User.objects.get(pk=self.data['applier'])
39         peer = user.get_profile().peer
40         data = self.cleaned_data['source']
41         private_error = False
42         protected_error = False
43         if data:
44             try:
45                 address = IPNetwork(data)
46                 for net in settings.PROTECTED_SUBNETS:
47                     if address in IPNetwork(net):
48                         protected_error = True
49                         mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
50                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
51                               mail_body, settings.SERVER_EMAIL,
52                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
53                         raise Exception
54                 if address.is_private:
55                     private_error = True
56                     raise Exception
57                 else:
58                     return self.cleaned_data["source"]
59             except Exception:
60                 error_text = 'Invalid network address format'
61                 if private_error:
62                     error_text = 'Private addresses not allowed'
63                 if protected_error:
64                     error_text = 'You have no authority on this subnet'
65                 raise forms.ValidationError(error_text)
66
67     def clean_destination(self):
68         user = User.objects.get(pk=self.data['applier'])
69         peer = user.get_profile().peer
70         data = self.cleaned_data['destination']
71         error = None
72         protected_error = False
73         if data:
74             try:
75                 address = IPNetwork(data)
76                 for net in settings.PROTECTED_SUBNETS:
77                     if address in IPNetwork(net):
78                         protected_error = True
79                         mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
80                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
81                               mail_body, settings.SERVER_EMAIL,
82                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
83                         raise Exception
84                 if address.prefixlen < settings.PREFIX_LENGTH:
85                     error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
86                     raise Exception
87                 return self.cleaned_data["destination"]
88             except Exception:
89                 error_text = 'Invalid network address format'
90                 if error:
91                     error_text = error
92                 if protected_error:
93                     error_text = 'You have no authority on this subnet'
94                 raise forms.ValidationError(error_text)
95     
96     def clean_expires(self):
97         date = self.cleaned_data['expires']
98         if date:
99             range_days = (date - datetime.date.today()).days
100             if range_days > 0 and range_days < 11:
101                 return self.cleaned_data["expires"]
102             else:
103                 raise forms.ValidationError('Invalid date range')
104
105     def clean(self):
106         if self.errors:
107              raise forms.ValidationError('Errors in form. Please review and fix them')
108         name = self.cleaned_data.get('name', None)
109         source = self.cleaned_data.get('source', None)
110         sourceports = self.cleaned_data.get('sourceport', None)
111         ports = self.cleaned_data.get('port', None)
112         then = self.cleaned_data.get('then', None)
113         destination = self.cleaned_data.get('destination', None)
114         destinationports = self.cleaned_data.get('destinationport', None)
115         protocols = self.cleaned_data.get('protocol', None)
116         user = self.cleaned_data.get('applier', None)
117         try:
118             issuperuser = self.data['issuperuser']
119             su = User.objects.get(username=issuperuser)
120         except:
121             issuperuser = None
122         peer = user.get_profile().peer
123         networks = peer.networks.all()
124         if issuperuser:
125             networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
126         mynetwork = False
127         route_pk_list = []
128         if destination:
129             for network in networks:
130                 net = IPNetwork(network.network)
131                 if IPNetwork(destination) in net:
132                     mynetwork = True
133             if not mynetwork:
134                 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
135         if (sourceports and ports):
136             raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
137         if (destinationports and ports):
138             raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
139         if sourceports and not source:
140             raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
141         if destinationports and not destination:
142             raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
143         if not (source or sourceports or ports or destination or destinationports):
144             raise forms.ValidationError('Fill at least a Rule Match Condition')
145         if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
146             raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
147         existing_routes = Route.objects.all()
148         existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
149         if source:
150             source = IPNetwork(source).compressed
151             existing_routes = existing_routes.filter(source=source)
152         else:
153             existing_routes = existing_routes.filter(source=None)
154         if protocols:
155             route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
156             if route_pk_list:
157                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
158             else:
159                 existing_routes = existing_routes.filter(protocol=None)
160         else:
161             existing_routes = existing_routes.filter(protocol=None)
162         if sourceports:
163             route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
164             if route_pk_list:
165                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
166         else:
167             existing_routes = existing_routes.filter(sourceport=None)
168         if destinationports:
169             route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
170             if route_pk_list:
171                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
172         else:
173             existing_routes = existing_routes.filter(destinationport=None)
174         if ports:
175             route_pk_list=get_matchingport_route_pks(ports, existing_routes)
176             if route_pk_list:
177                 existing_routes = existing_routes.filter(pk__in=route_pk_list)              
178         else:
179             existing_routes = existing_routes.filter(port=None)
180         for route in existing_routes:
181             if name != route.name:
182                 existing_url = reverse('edit-route', args=[route.name])
183                 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
184                     raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
185         return self.cleaned_data
186
187 class ThenPlainForm(forms.ModelForm):
188 #    action = forms.CharField(initial='rate-limit')
189     class Meta:
190         model = ThenAction
191     
192     def clean_action_value(self):
193         action_value = self.cleaned_data['action_value']
194         if action_value:
195             try:
196                 assert(int(action_value))
197                 if int(action_value) < 50:
198                     raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
199                 return "%s" %self.cleaned_data["action_value"]
200             except:
201                 raise forms.ValidationError('Rate-limiting should be an integer < 50')
202         else:
203             raise forms.ValidationError('Cannot be empty')
204
205     def clean_action(self):
206         action = self.cleaned_data['action']
207         if action != 'rate-limit':
208             raise forms.ValidationError('Cannot select something other than rate-limit')
209         else:
210             return self.cleaned_data["action"]
211     
212
213 class PortPlainForm(forms.ModelForm):
214 #    action = forms.CharField(initial='rate-limit')
215     class Meta:
216         model = MatchPort
217     
218     def clean_port(self):
219         port = self.cleaned_data['port']
220         if port:
221             try:
222                 assert(int(port))
223                 return "%s" %self.cleaned_data["port"]
224             except:
225                 raise forms.ValidationError('Port should be an integer')
226         else:
227             raise forms.ValidationError('Cannot be empty')
228
229 def value_list_to_list(valuelist):
230     vl = []
231     for val in valuelist:
232         vl.append(val[0])
233     return vl
234
235 def get_matchingport_route_pks(portlist, routes):
236     route_pk_list = []
237     ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
238     for route in routes:
239         rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
240         if rsp and rsp == ports_value_list:
241             route_pk_list.append(route.pk)
242     return route_pk_list
243
244 def get_matchingprotocol_route_pks(protocolist, routes):
245     route_pk_list = []
246     protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
247     for route in routes:
248         rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
249         if rsp and rsp == protocols_value_list:
250             route_pk_list.append(route.pk)
251     return route_pk_list