2 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
3 #Copyright © 2011-2013 Greek Research and Technology Network (GRNET S.A.)
5 #Developed by Leonidas Poulopoulos (leopoul-at-noc-dot-grnet-dot-gr),
8 #Permission to use, copy, modify, and/or distribute this software for any
9 #purpose with or without fee is hereby granted, provided that the above
10 #copyright notice and this permission notice appear in all copies.
12 #THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH REGARD
13 #TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
14 #FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
15 #CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
16 #DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 #ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
20 from django import forms
21 from django.utils.safestring import mark_safe
22 from django.utils.translation import ugettext as _
23 from django.utils.translation import ugettext_lazy
24 from django.template.defaultfilters import filesizeformat
25 from flowspy.flowspec.models import *
26 from flowspy.peers.models import *
27 from flowspy.accounts.models import *
29 from django.core.urlresolvers import reverse
30 from django.contrib.auth.models import User
31 from django.conf import settings
33 from django.core.mail import mail_admins, mail_managers, send_mail
35 class UserProfileForm(forms.ModelForm):
39 class RouteForm(forms.ModelForm):
40 # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
41 # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
42 # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
43 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
44 # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
45 # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
46 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
47 # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
48 # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
53 def clean_applier(self):
54 applier = self.cleaned_data['applier']
56 return self.cleaned_data["applier"]
58 raise forms.ValidationError('This field is required.')
60 def clean_source(self):
61 user = User.objects.get(pk=self.data['applier'])
62 peer = user.get_profile().peer
63 data = self.cleaned_data['source']
65 protected_error = False
68 address = IPNetwork(data)
69 for net in settings.PROTECTED_SUBNETS:
70 if address in IPNetwork(net):
71 protected_error = True
72 mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
73 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
74 mail_body, settings.SERVER_EMAIL,
75 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
77 if address.is_private:
81 return self.cleaned_data["source"]
83 error_text = _('Invalid network address format')
85 error_text = _('Private addresses not allowed')
87 error_text = _('You have no authority on this subnet')
88 raise forms.ValidationError(error_text)
90 def clean_destination(self):
91 user = User.objects.get(pk=self.data['applier'])
92 peer = user.get_profile().peer
93 data = self.cleaned_data['destination']
95 protected_error = False
98 address = IPNetwork(data)
99 for net in settings.PROTECTED_SUBNETS:
100 if address in IPNetwork(net):
101 protected_error = True
102 mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
103 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
104 mail_body, settings.SERVER_EMAIL,
105 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
107 if address.prefixlen < settings.PREFIX_LENGTH:
108 error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
110 return self.cleaned_data["destination"]
112 error_text = _('Invalid network address format')
116 error_text = _('You have no authority on this subnet')
117 raise forms.ValidationError(error_text)
119 def clean_expires(self):
120 date = self.cleaned_data['expires']
122 range_days = (date - datetime.date.today()).days
123 if range_days > 0 and range_days < 11:
124 return self.cleaned_data["expires"]
126 raise forms.ValidationError('Invalid date range')
130 raise forms.ValidationError(_('Errors in form. Please review and fix them'))
131 name = self.cleaned_data.get('name', None)
132 source = self.cleaned_data.get('source', None)
133 sourceports = self.cleaned_data.get('sourceport', None)
134 ports = self.cleaned_data.get('port', None)
135 then = self.cleaned_data.get('then', None)
136 destination = self.cleaned_data.get('destination', None)
137 destinationports = self.cleaned_data.get('destinationport', None)
138 protocols = self.cleaned_data.get('protocol', None)
139 user = self.cleaned_data.get('applier', None)
141 issuperuser = self.data['issuperuser']
142 su = User.objects.get(username=issuperuser)
145 peer = user.get_profile().peer
146 networks = peer.networks.all()
148 networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
152 for network in networks:
153 net = IPNetwork(network.network)
154 if IPNetwork(destination) in net:
157 raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
158 if (sourceports and ports):
159 raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
160 if (destinationports and ports):
161 raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
162 if sourceports and not source:
163 raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
164 if destinationports and not destination:
165 raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
166 if not (source or sourceports or ports or destination or destinationports):
167 raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
168 if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
169 raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
170 existing_routes = Route.objects.all()
171 existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
173 source = IPNetwork(source).compressed
174 existing_routes = existing_routes.filter(source=source)
176 existing_routes = existing_routes.filter(source=None)
178 route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
180 existing_routes = existing_routes.filter(pk__in=route_pk_list)
182 existing_routes = existing_routes.filter(protocol=None)
184 existing_routes = existing_routes.filter(protocol=None)
186 route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
188 existing_routes = existing_routes.filter(pk__in=route_pk_list)
190 existing_routes = existing_routes.filter(sourceport=None)
192 route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
194 existing_routes = existing_routes.filter(pk__in=route_pk_list)
196 existing_routes = existing_routes.filter(destinationport=None)
198 route_pk_list=get_matchingport_route_pks(ports, existing_routes)
200 existing_routes = existing_routes.filter(pk__in=route_pk_list)
202 existing_routes = existing_routes.filter(port=None)
203 for route in existing_routes:
204 if name != route.name:
205 existing_url = reverse('edit-route', args=[route.name])
206 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
207 raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
208 return self.cleaned_data
210 class ThenPlainForm(forms.ModelForm):
211 # action = forms.CharField(initial='rate-limit')
215 def clean_action_value(self):
216 action_value = self.cleaned_data['action_value']
219 assert(int(action_value))
220 if int(action_value) < 50:
221 raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
222 return "%s" %self.cleaned_data["action_value"]
224 raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
226 raise forms.ValidationError(_('Cannot be empty'))
228 def clean_action(self):
229 action = self.cleaned_data['action']
230 if action != 'rate-limit':
231 raise forms.ValidationError(_('Cannot select something other than rate-limit'))
233 return self.cleaned_data["action"]
236 class PortPlainForm(forms.ModelForm):
237 # action = forms.CharField(initial='rate-limit')
241 def clean_port(self):
242 port = self.cleaned_data['port']
246 return "%s" %self.cleaned_data["port"]
248 raise forms.ValidationError(_('Port should be an integer'))
250 raise forms.ValidationError(_('Cannot be empty'))
252 def value_list_to_list(valuelist):
254 for val in valuelist:
258 def get_matchingport_route_pks(portlist, routes):
260 ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
262 rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
263 if rsp and rsp == ports_value_list:
264 route_pk_list.append(route.pk)
267 def get_matchingprotocol_route_pks(protocolist, routes):
269 protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
271 rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
272 if rsp and rsp == protocols_value_list:
273 route_pk_list.append(route.pk)