1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
8 from django.core.urlresolvers import reverse
9 from django.contrib.auth.models import User
10 from django.conf import settings
12 from django.core.mail import mail_admins, mail_managers, send_mail
15 class RouteForm(forms.ModelForm):
16 # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17 # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18 # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20 # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21 # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23 # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24 # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
29 def clean_source(self):
30 user = User.objects.get(pk=self.data['applier'])
31 peer = user.get_profile().peer
32 data = self.cleaned_data['source']
34 protected_error = False
37 address = IPNetwork(data)
38 for net in settings.PROTECTED_SUBNETS:
39 if address in IPNetwork(net):
40 protected_error = True
41 mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
43 mail_body, settings.SERVER_EMAIL,
44 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45 raise forms.ValidationError("Not allowed")
46 if address.is_private:
48 raise forms.ValidationError('Private addresses not allowed')
50 return self.cleaned_data["source"]
52 error_text = 'Invalid network address format'
54 error_text = 'Private addresses not allowed'
56 error_text = 'You have no authority on this subnet'
57 raise forms.ValidationError(error_text)
59 def clean_destination(self):
60 user = User.objects.get(pk=self.data['applier'])
61 peer = user.get_profile().peer
62 data = self.cleaned_data['destination']
64 protected_error = False
67 address = IPNetwork(data)
68 for net in settings.PROTECTED_SUBNETS:
69 if address in IPNetwork(net):
70 protected_error = True
71 mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
73 mail_body, settings.SERVER_EMAIL,
74 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75 raise forms.ValidationError("Not allowed")
76 if address.prefixlen < settings.PREFIX_LENGTH:
77 error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
78 raise forms.ValidationError('error')
79 return self.cleaned_data["destination"]
81 error_text = 'Invalid network address format'
85 error_text = 'You have no authority on this subnet'
86 raise forms.ValidationError(error_text)
88 def clean_expires(self):
89 date = self.cleaned_data['expires']
91 range_days = (date - datetime.date.today()).days
92 if range_days > 0 and range_days < 11:
93 return self.cleaned_data["expires"]
95 raise forms.ValidationError('Invalid date range')
98 name = self.cleaned_data.get('name', None)
99 source = self.cleaned_data.get('source', None)
100 sourceports = self.cleaned_data.get('sourceport', None)
101 ports = self.cleaned_data.get('port', None)
102 then = self.cleaned_data.get('then', None)
103 destination = self.cleaned_data.get('destination', None)
104 destinationports = self.cleaned_data.get('destinationport', None)
105 protocols = self.cleaned_data.get('protocol', None)
106 user = self.cleaned_data.get('applier', None)
107 peer = user.get_profile().peer
108 networks = peer.networks.all()
112 for network in networks:
113 net = IPNetwork(network.network)
114 if IPNetwork(destination) in net:
117 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
118 if (sourceports and ports):
119 raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
120 if (destinationports and ports):
121 raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
122 if sourceports and not source:
123 raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
124 if destinationports and not destination:
125 raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
126 if not (source or sourceports or ports or destination or destinationports):
127 raise forms.ValidationError('Fill at least a Rule Match Condition')
128 if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
129 raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
130 existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
131 existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
133 source = IPNetwork(source).compressed
134 existing_routes = existing_routes.filter(source=source)
136 existing_routes = existing_routes.filter(source=None)
138 route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
140 existing_routes = existing_routes.filter(pk__in=route_pk_list)
142 existing_routes = existing_routes.filter(protocol=None)
144 existing_routes = existing_routes.filter(protocol=None)
146 route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
148 existing_routes = existing_routes.filter(pk__in=route_pk_list)
150 existing_routes = existing_routes.filter(sourceport=None)
152 route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
154 existing_routes = existing_routes.filter(pk__in=route_pk_list)
156 existing_routes = existing_routes.filter(destinationport=None)
158 route_pk_list=get_matchingport_route_pks(ports, existing_routes)
160 existing_routes = existing_routes.filter(pk__in=route_pk_list)
162 existing_routes = existing_routes.filter(port=None)
163 for route in existing_routes:
164 if name != route.name:
165 existing_url = reverse('edit-route', args=[route.name])
166 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
167 raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
168 return self.cleaned_data
170 class ThenPlainForm(forms.ModelForm):
171 # action = forms.CharField(initial='rate-limit')
175 def clean_action_value(self):
176 action_value = self.cleaned_data['action_value']
179 assert(int(action_value))
180 if int(action_value) < 50:
181 raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
182 return "%s" %self.cleaned_data["action_value"]
184 raise forms.ValidationError('Rate-limiting should be an integer < 50')
186 raise forms.ValidationError('Cannot be empty')
188 def clean_action(self):
189 action = self.cleaned_data['action']
190 if action != 'rate-limit':
191 raise forms.ValidationError('Cannot select something other than rate-limit')
193 return self.cleaned_data["action"]
196 class PortPlainForm(forms.ModelForm):
197 # action = forms.CharField(initial='rate-limit')
201 def clean_port(self):
202 port = self.cleaned_data['port']
206 return "%s" %self.cleaned_data["port"]
208 raise forms.ValidationError('Port should be an integer')
210 raise forms.ValidationError('Cannot be empty')
212 def value_list_to_list(valuelist):
214 for val in valuelist:
218 def get_matchingport_route_pks(portlist, routes):
220 ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
222 rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
223 if rsp and rsp == ports_value_list:
224 route_pk_list.append(route.pk)
227 def get_matchingprotocol_route_pks(protocolist, routes):
229 protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
231 rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
232 if rsp and rsp == protocols_value_list:
233 route_pk_list.append(route.pk)