Superuser can interact with all networks from all peers
[flowspy] / flowspec / forms.py
1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
7 from flowspy.peers.models import *
8 from ipaddr import *
9 from django.core.urlresolvers import reverse
10 from django.contrib.auth.models import User
11 from django.conf import settings
12 import datetime
13 from django.core.mail import mail_admins, mail_managers, send_mail
14
15
16 class RouteForm(forms.ModelForm):
17 #    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
18 #                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
19 #    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
20 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
21 #    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
22 #    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
23 #                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
24 #    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
25 #    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
26
27     class Meta:
28         model = Route
29     
30     def clean_source(self):
31         user = User.objects.get(pk=self.data['applier'])
32         peer = user.get_profile().peer
33         data = self.cleaned_data['source']
34         private_error = False
35         protected_error = False
36         if data:
37             try:
38                 address = IPNetwork(data)
39                 for net in settings.PROTECTED_SUBNETS:
40                     if address in IPNetwork(net):
41                         protected_error = True
42                         mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
43                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
44                               mail_body, settings.SERVER_EMAIL,
45                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
46                         raise Exception
47                 if address.is_private:
48                     private_error = True
49                     raise Exception
50                 else:
51                     return self.cleaned_data["source"]
52             except Exception:
53                 error_text = 'Invalid network address format'
54                 if private_error:
55                     error_text = 'Private addresses not allowed'
56                 if protected_error:
57                     error_text = 'You have no authority on this subnet'
58                 raise forms.ValidationError(error_text)
59
60     def clean_destination(self):
61         user = User.objects.get(pk=self.data['applier'])
62         peer = user.get_profile().peer
63         data = self.cleaned_data['destination']
64         error = None
65         protected_error = False
66         if data:
67             try:
68                 address = IPNetwork(data)
69                 for net in settings.PROTECTED_SUBNETS:
70                     if address in IPNetwork(net):
71                         protected_error = True
72                         mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
73                         send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
74                               mail_body, settings.SERVER_EMAIL,
75                               settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
76                         raise Exception
77                 if address.prefixlen < settings.PREFIX_LENGTH:
78                     error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
79                     raise Exception
80                 return self.cleaned_data["destination"]
81             except Exception:
82                 error_text = 'Invalid network address format'
83                 if error:
84                     error_text = error
85                 if protected_error:
86                     error_text = 'You have no authority on this subnet'
87                 raise forms.ValidationError(error_text)
88     
89     def clean_expires(self):
90         date = self.cleaned_data['expires']
91         if date:
92             range_days = (date - datetime.date.today()).days
93             if range_days > 0 and range_days < 11:
94                 return self.cleaned_data["expires"]
95             else:
96                 raise forms.ValidationError('Invalid date range')
97
98     def clean(self):
99         if self.errors:
100              raise forms.ValidationError('Errors in form. Please review and fix them')
101         name = self.cleaned_data.get('name', None)
102         source = self.cleaned_data.get('source', None)
103         sourceports = self.cleaned_data.get('sourceport', None)
104         ports = self.cleaned_data.get('port', None)
105         then = self.cleaned_data.get('then', None)
106         destination = self.cleaned_data.get('destination', None)
107         destinationports = self.cleaned_data.get('destinationport', None)
108         protocols = self.cleaned_data.get('protocol', None)
109         user = self.cleaned_data.get('applier', None)
110         peer = user.get_profile().peer
111         networks = peer.networks.all()
112         if user.is_superuser:
113             networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
114         mynetwork = False
115         route_pk_list = []
116         if destination:
117             for network in networks:
118                 net = IPNetwork(network.network)
119                 if IPNetwork(destination) in net:
120                     mynetwork = True
121             if not mynetwork:
122                  raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
123         if (sourceports and ports):
124             raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
125         if (destinationports and ports):
126             raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
127         if sourceports and not source:
128             raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
129         if destinationports and not destination:
130             raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
131         if not (source or sourceports or ports or destination or destinationports):
132             raise forms.ValidationError('Fill at least a Rule Match Condition')
133         if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
134             raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
135         existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
136         existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
137         if source:
138             source = IPNetwork(source).compressed
139             existing_routes = existing_routes.filter(source=source)
140         else:
141             existing_routes = existing_routes.filter(source=None)
142         if protocols:
143             route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
144             if route_pk_list:
145                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
146             else:
147                 existing_routes = existing_routes.filter(protocol=None)
148         else:
149             existing_routes = existing_routes.filter(protocol=None)
150         if sourceports:
151             route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
152             if route_pk_list:
153                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
154         else:
155             existing_routes = existing_routes.filter(sourceport=None)
156         if destinationports:
157             route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
158             if route_pk_list:
159                 existing_routes = existing_routes.filter(pk__in=route_pk_list)
160         else:
161             existing_routes = existing_routes.filter(destinationport=None)
162         if ports:
163             route_pk_list=get_matchingport_route_pks(ports, existing_routes)
164             if route_pk_list:
165                 existing_routes = existing_routes.filter(pk__in=route_pk_list)              
166         else:
167             existing_routes = existing_routes.filter(port=None)
168         for route in existing_routes:
169             if name != route.name:
170                 existing_url = reverse('edit-route', args=[route.name])
171                 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
172                     raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
173         return self.cleaned_data
174
175 class ThenPlainForm(forms.ModelForm):
176 #    action = forms.CharField(initial='rate-limit')
177     class Meta:
178         model = ThenAction
179     
180     def clean_action_value(self):
181         action_value = self.cleaned_data['action_value']
182         if action_value:
183             try:
184                 assert(int(action_value))
185                 if int(action_value) < 50:
186                     raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
187                 return "%s" %self.cleaned_data["action_value"]
188             except:
189                 raise forms.ValidationError('Rate-limiting should be an integer < 50')
190         else:
191             raise forms.ValidationError('Cannot be empty')
192
193     def clean_action(self):
194         action = self.cleaned_data['action']
195         if action != 'rate-limit':
196             raise forms.ValidationError('Cannot select something other than rate-limit')
197         else:
198             return self.cleaned_data["action"]
199     
200
201 class PortPlainForm(forms.ModelForm):
202 #    action = forms.CharField(initial='rate-limit')
203     class Meta:
204         model = MatchPort
205     
206     def clean_port(self):
207         port = self.cleaned_data['port']
208         if port:
209             try:
210                 assert(int(port))
211                 return "%s" %self.cleaned_data["port"]
212             except:
213                 raise forms.ValidationError('Port should be an integer')
214         else:
215             raise forms.ValidationError('Cannot be empty')
216
217 def value_list_to_list(valuelist):
218     vl = []
219     for val in valuelist:
220         vl.append(val[0])
221     return vl
222
223 def get_matchingport_route_pks(portlist, routes):
224     route_pk_list = []
225     ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
226     for route in routes:
227         rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
228         if rsp and rsp == ports_value_list:
229             route_pk_list.append(route.pk)
230     return route_pk_list
231
232 def get_matchingprotocol_route_pks(protocolist, routes):
233     route_pk_list = []
234     protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
235     for route in routes:
236         rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
237         if rsp and rsp == protocols_value_list:
238             route_pk_list.append(route.pk)
239     return route_pk_list