1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
8 from django.core.urlresolvers import reverse
9 from django.contrib.auth.models import User
10 from django.conf import settings
12 from django.core.mail import mail_admins, mail_managers, send_mail
15 class RouteForm(forms.ModelForm):
16 # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17 # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18 # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20 # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21 # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23 # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24 # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
29 def clean_source(self):
30 user = User.objects.get(pk=self.data['applier'])
31 peer = user.get_profile().peer
32 data = self.cleaned_data['source']
34 protected_error = False
37 address = IPNetwork(data)
38 for net in settings.PROTECTED_SUBNETS:
39 if address in IPNetwork(net):
40 protected_error = True
41 mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
43 mail_body, settings.SERVER_EMAIL,
44 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
46 if address.is_private:
50 return self.cleaned_data["source"]
52 error_text = _('Invalid network address format')
54 error_text = _('Private addresses not allowed')
56 error_text = _('You have no authority on this subnet')
57 raise forms.ValidationError(error_text)
59 def clean_destination(self):
60 user = User.objects.get(pk=self.data['applier'])
61 peer = user.get_profile().peer
62 data = self.cleaned_data['destination']
64 protected_error = False
67 address = IPNetwork(data)
68 for net in settings.PROTECTED_SUBNETS:
69 if address in IPNetwork(net):
70 protected_error = True
71 mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
73 mail_body, settings.SERVER_EMAIL,
74 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
76 if address.prefixlen < settings.PREFIX_LENGTH:
77 error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
79 return self.cleaned_data["destination"]
81 error_text = _('Invalid network address format')
85 error_text = _('You have no authority on this subnet')
86 raise forms.ValidationError(error_text)
88 def clean_expires(self):
89 date = self.cleaned_data['expires']
91 range_days = (date - datetime.date.today()).days
92 if range_days > 0 and range_days < 11:
93 return self.cleaned_data["expires"]
95 raise forms.ValidationError('Invalid date range')
99 raise forms.ValidationError(_('Errors in form. Please review and fix them'))
100 name = self.cleaned_data.get('name', None)
101 source = self.cleaned_data.get('source', None)
102 sourceports = self.cleaned_data.get('sourceport', None)
103 ports = self.cleaned_data.get('port', None)
104 then = self.cleaned_data.get('then', None)
105 destination = self.cleaned_data.get('destination', None)
106 destinationports = self.cleaned_data.get('destinationport', None)
107 protocols = self.cleaned_data.get('protocol', None)
108 user = self.cleaned_data.get('applier', None)
109 peer = user.get_profile().peer
110 networks = peer.networks.all()
114 for network in networks:
115 net = IPNetwork(network.network)
116 if IPNetwork(destination) in net:
119 raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
120 if (sourceports and ports):
121 raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
122 if (destinationports and ports):
123 raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
124 if sourceports and not source:
125 raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
126 if destinationports and not destination:
127 raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
128 if not (source or sourceports or ports or destination or destinationports):
129 raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
130 if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
131 raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
132 existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
133 existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
135 source = IPNetwork(source).compressed
136 existing_routes = existing_routes.filter(source=source)
138 existing_routes = existing_routes.filter(source=None)
140 route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
142 existing_routes = existing_routes.filter(pk__in=route_pk_list)
144 existing_routes = existing_routes.filter(protocol=None)
146 existing_routes = existing_routes.filter(protocol=None)
148 route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
150 existing_routes = existing_routes.filter(pk__in=route_pk_list)
152 existing_routes = existing_routes.filter(sourceport=None)
154 route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
156 existing_routes = existing_routes.filter(pk__in=route_pk_list)
158 existing_routes = existing_routes.filter(destinationport=None)
160 route_pk_list=get_matchingport_route_pks(ports, existing_routes)
162 existing_routes = existing_routes.filter(pk__in=route_pk_list)
164 existing_routes = existing_routes.filter(port=None)
165 for route in existing_routes:
166 if name != route.name:
167 existing_url = reverse('edit-route', args=[route.name])
168 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
169 raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
170 return self.cleaned_data
172 class ThenPlainForm(forms.ModelForm):
173 # action = forms.CharField(initial='rate-limit')
177 def clean_action_value(self):
178 action_value = self.cleaned_data['action_value']
181 assert(int(action_value))
182 if int(action_value) < 50:
183 raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
184 return "%s" %self.cleaned_data["action_value"]
186 raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
188 raise forms.ValidationError(_('Cannot be empty'))
190 def clean_action(self):
191 action = self.cleaned_data['action']
192 if action != 'rate-limit':
193 raise forms.ValidationError(_('Cannot select something other than rate-limit'))
195 return self.cleaned_data["action"]
198 class PortPlainForm(forms.ModelForm):
199 # action = forms.CharField(initial='rate-limit')
203 def clean_port(self):
204 port = self.cleaned_data['port']
208 return "%s" %self.cleaned_data["port"]
210 raise forms.ValidationError(_('Port should be an integer'))
212 raise forms.ValidationError(_('Cannot be empty'))
214 def value_list_to_list(valuelist):
216 for val in valuelist:
220 def get_matchingport_route_pks(portlist, routes):
222 ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
224 rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
225 if rsp and rsp == ports_value_list:
226 route_pk_list.append(route.pk)
229 def get_matchingprotocol_route_pks(protocolist, routes):
231 protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
233 rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
234 if rsp and rsp == protocols_value_list:
235 route_pk_list.append(route.pk)