1 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
2 # vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
4 # Copyright (C) 2010-2014 GRNET S.A.
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from django import forms
21 from django.utils.safestring import mark_safe
22 from django.utils.translation import ugettext as _
23 from django.utils.translation import ugettext_lazy
24 from django.template.defaultfilters import filesizeformat
25 from flowspec.models import *
26 from peers.models import *
27 from accounts.models import *
29 from django.core.urlresolvers import reverse
30 from django.contrib.auth.models import User
31 from django.conf import settings
33 from django.core.mail import mail_admins, mail_managers, send_mail
35 class UserProfileForm(forms.ModelForm):
39 class RouteForm(forms.ModelForm):
40 # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
41 # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
42 # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
43 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
44 # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
45 # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
46 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
47 # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
48 # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
53 def clean_applier(self):
54 applier = self.cleaned_data['applier']
56 return self.cleaned_data["applier"]
58 raise forms.ValidationError('This field is required.')
60 def clean_source(self):
61 user = User.objects.get(pk=self.data['applier'])
62 peer = user.get_profile().peer
63 data = self.cleaned_data['source']
65 protected_error = False
66 networkaddr_error = False
67 broadcast_error = False
70 address = IPNetwork(data)
71 for net in settings.PROTECTED_SUBNETS:
72 if address in IPNetwork(net):
73 protected_error = True
74 mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
75 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
76 mail_body, settings.SERVER_EMAIL,
77 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
79 if address.is_private:
82 if address.version == 4 and int(address.prefixlen) == 32:
83 if int(address.network.compressed.split('.')[-1]) == 0:
84 broadcast_error = True
86 elif int(address.network.compressed.split('.')[-1]) == 255:
87 networkaddr_error = True
89 return self.cleaned_data["source"]
91 error_text = _('Invalid network address format')
93 error_text = _('Private addresses not allowed')
95 error_text = _('Malformed address format. Cannot be ...255/32')
97 error_text = _('Malformed address format. Cannot be ...0/32')
99 error_text = _('You have no authority on this subnet')
100 raise forms.ValidationError(error_text)
102 def clean_destination(self):
103 user = User.objects.get(pk=self.data['applier'])
104 peer = user.get_profile().peer
105 data = self.cleaned_data['destination']
107 protected_error = False
108 networkaddr_error = False
109 broadcast_error = False
112 address = IPNetwork(data)
113 for net in settings.PROTECTED_SUBNETS:
114 if address in IPNetwork(net):
115 protected_error = True
116 mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
117 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
118 mail_body, settings.SERVER_EMAIL,
119 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
121 if address.prefixlen < settings.PREFIX_LENGTH:
122 error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
124 if address.version == 4 and int(address.prefixlen) == 32:
125 if int(address.network.compressed.split('.')[-1]) == 0:
126 broadcast_error = True
128 elif int(address.network.compressed.split('.')[-1]) == 255:
129 networkaddr_error = True
131 return self.cleaned_data["destination"]
133 error_text = _('Invalid network address format')
137 error_text = _('You have no authority on this subnet')
138 if networkaddr_error:
139 error_text = _('Malformed address format. Cannot be ...255/32')
141 error_text = _('Malformed address format. Cannot be ...0/32')
142 raise forms.ValidationError(error_text)
144 def clean_expires(self):
145 date = self.cleaned_data['expires']
147 range_days = (date - datetime.date.today()).days
148 if range_days > 0 and range_days < 11:
149 return self.cleaned_data["expires"]
151 raise forms.ValidationError('Invalid date range')
155 raise forms.ValidationError(_('Errors in form. Please review and fix them: %s'%", ".join(self.errors)))
156 name = self.cleaned_data.get('name', None)
157 source = self.cleaned_data.get('source', None)
158 sourceports = self.cleaned_data.get('sourceport', None)
159 ports = self.cleaned_data.get('port', None)
160 fragmenttypes = self.cleaned_data.get('fragmenttype', None)
161 then = self.cleaned_data.get('then', None)
162 destination = self.cleaned_data.get('destination', None)
163 destinationports = self.cleaned_data.get('destinationport', None)
164 protocols = self.cleaned_data.get('protocol', None)
165 user = self.cleaned_data.get('applier', None)
167 issuperuser = self.data['issuperuser']
168 su = User.objects.get(username=issuperuser)
171 peer = user.get_profile().peer
172 networks = peer.networks.all()
174 networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
178 for network in networks:
179 net = IPNetwork(network.network)
180 if IPNetwork(destination) in net:
183 raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
184 if (sourceports and ports):
185 raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
186 if (destinationports and ports):
187 raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
188 if sourceports and not source:
189 raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
190 if destinationports and not destination:
191 raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
192 if not (source or sourceports or ports or destination or destinationports):
193 raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
194 if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
195 raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
196 existing_routes = Route.objects.all()
197 existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
199 source = IPNetwork(source).compressed
200 existing_routes = existing_routes.filter(source=source)
202 existing_routes = existing_routes.filter(source=None)
204 route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
206 existing_routes = existing_routes.filter(pk__in=route_pk_list)
208 existing_routes = existing_routes.filter(protocol=None)
210 existing_routes = existing_routes.filter(protocol=None)
212 route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
214 existing_routes = existing_routes.filter(pk__in=route_pk_list)
216 existing_routes = existing_routes.filter(sourceport=None)
218 route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
220 existing_routes = existing_routes.filter(pk__in=route_pk_list)
222 existing_routes = existing_routes.filter(destinationport=None)
224 route_pk_list=get_matchingport_route_pks(ports, existing_routes)
226 existing_routes = existing_routes.filter(pk__in=route_pk_list)
228 existing_routes = existing_routes.filter(port=None)
229 for route in existing_routes:
230 if name != route.name:
231 existing_url = reverse('edit-route', args=[route.name])
232 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
233 raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
234 return self.cleaned_data
236 class ThenPlainForm(forms.ModelForm):
237 # action = forms.CharField(initial='rate-limit')
241 def clean_action_value(self):
242 action_value = self.cleaned_data['action_value']
245 assert(int(action_value))
246 if int(action_value) < 50:
247 raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
248 return "%s" %self.cleaned_data["action_value"]
250 raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
252 raise forms.ValidationError(_('Cannot be empty'))
254 def clean_action(self):
255 action = self.cleaned_data['action']
256 if action != 'rate-limit':
257 raise forms.ValidationError(_('Cannot select something other than rate-limit'))
259 return self.cleaned_data["action"]
262 class PortPlainForm(forms.ModelForm):
263 # action = forms.CharField(initial='rate-limit')
267 def clean_port(self):
268 port = self.cleaned_data['port']
272 if int(port) > 65535 or int(port) < 0:
273 raise forms.ValidationError(_('Port should be < 65535 and >= 0'))
274 return "%s" %self.cleaned_data["port"]
275 except forms.ValidationError:
276 raise forms.ValidationError(_('Port should be < 65535 and >= 0'))
278 raise forms.ValidationError(_('Port should be an integer'))
280 raise forms.ValidationError(_('Cannot be empty'))
282 def value_list_to_list(valuelist):
284 for val in valuelist:
288 def get_matchingport_route_pks(portlist, routes):
290 ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
292 rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
293 if rsp and rsp == ports_value_list:
294 route_pk_list.append(route.pk)
297 def get_matchingprotocol_route_pks(protocolist, routes):
299 protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
301 rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
302 if rsp and rsp == protocols_value_list:
303 route_pk_list.append(route.pk)