1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
7 from flowspy.peers.models import *
9 from django.core.urlresolvers import reverse
10 from django.contrib.auth.models import User
11 from django.conf import settings
13 from django.core.mail import mail_admins, mail_managers, send_mail
16 class RouteForm(forms.ModelForm):
17 # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
18 # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
19 # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
20 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
21 # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
22 # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
23 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
24 # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
25 # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
30 def clean_source(self):
31 user = User.objects.get(pk=self.data['applier'])
32 peer = user.get_profile().peer
33 data = self.cleaned_data['source']
35 protected_error = False
38 address = IPNetwork(data)
39 for net in settings.PROTECTED_SUBNETS:
40 if address in IPNetwork(net):
41 protected_error = True
42 mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
43 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
44 mail_body, settings.SERVER_EMAIL,
45 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
47 if address.is_private:
51 return self.cleaned_data["source"]
53 error_text = 'Invalid network address format'
55 error_text = 'Private addresses not allowed'
57 error_text = 'You have no authority on this subnet'
58 raise forms.ValidationError(error_text)
60 def clean_destination(self):
61 user = User.objects.get(pk=self.data['applier'])
62 peer = user.get_profile().peer
63 data = self.cleaned_data['destination']
65 protected_error = False
68 address = IPNetwork(data)
69 for net in settings.PROTECTED_SUBNETS:
70 if address in IPNetwork(net):
71 protected_error = True
72 mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
73 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
74 mail_body, settings.SERVER_EMAIL,
75 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
77 if address.prefixlen < settings.PREFIX_LENGTH:
78 error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
80 return self.cleaned_data["destination"]
82 error_text = 'Invalid network address format'
86 error_text = 'You have no authority on this subnet'
87 raise forms.ValidationError(error_text)
89 def clean_expires(self):
90 date = self.cleaned_data['expires']
92 range_days = (date - datetime.date.today()).days
93 if range_days > 0 and range_days < 11:
94 return self.cleaned_data["expires"]
96 raise forms.ValidationError('Invalid date range')
100 raise forms.ValidationError('Errors in form. Please review and fix them')
101 name = self.cleaned_data.get('name', None)
102 source = self.cleaned_data.get('source', None)
103 sourceports = self.cleaned_data.get('sourceport', None)
104 ports = self.cleaned_data.get('port', None)
105 then = self.cleaned_data.get('then', None)
106 destination = self.cleaned_data.get('destination', None)
107 destinationports = self.cleaned_data.get('destinationport', None)
108 protocols = self.cleaned_data.get('protocol', None)
109 user = self.cleaned_data.get('applier', None)
110 peer = user.get_profile().peer
111 networks = peer.networks.all()
112 if user.is_superuser:
113 networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
117 for network in networks:
118 net = IPNetwork(network.network)
119 if IPNetwork(destination) in net:
122 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
123 if (sourceports and ports):
124 raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
125 if (destinationports and ports):
126 raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
127 if sourceports and not source:
128 raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
129 if destinationports and not destination:
130 raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
131 if not (source or sourceports or ports or destination or destinationports):
132 raise forms.ValidationError('Fill at least a Rule Match Condition')
133 if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
134 raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
135 existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
136 existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
138 source = IPNetwork(source).compressed
139 existing_routes = existing_routes.filter(source=source)
141 existing_routes = existing_routes.filter(source=None)
143 route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
145 existing_routes = existing_routes.filter(pk__in=route_pk_list)
147 existing_routes = existing_routes.filter(protocol=None)
149 existing_routes = existing_routes.filter(protocol=None)
151 route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
153 existing_routes = existing_routes.filter(pk__in=route_pk_list)
155 existing_routes = existing_routes.filter(sourceport=None)
157 route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
159 existing_routes = existing_routes.filter(pk__in=route_pk_list)
161 existing_routes = existing_routes.filter(destinationport=None)
163 route_pk_list=get_matchingport_route_pks(ports, existing_routes)
165 existing_routes = existing_routes.filter(pk__in=route_pk_list)
167 existing_routes = existing_routes.filter(port=None)
168 for route in existing_routes:
169 if name != route.name:
170 existing_url = reverse('edit-route', args=[route.name])
171 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
172 raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
173 return self.cleaned_data
175 class ThenPlainForm(forms.ModelForm):
176 # action = forms.CharField(initial='rate-limit')
180 def clean_action_value(self):
181 action_value = self.cleaned_data['action_value']
184 assert(int(action_value))
185 if int(action_value) < 50:
186 raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
187 return "%s" %self.cleaned_data["action_value"]
189 raise forms.ValidationError('Rate-limiting should be an integer < 50')
191 raise forms.ValidationError('Cannot be empty')
193 def clean_action(self):
194 action = self.cleaned_data['action']
195 if action != 'rate-limit':
196 raise forms.ValidationError('Cannot select something other than rate-limit')
198 return self.cleaned_data["action"]
201 class PortPlainForm(forms.ModelForm):
202 # action = forms.CharField(initial='rate-limit')
206 def clean_port(self):
207 port = self.cleaned_data['port']
211 return "%s" %self.cleaned_data["port"]
213 raise forms.ValidationError('Port should be an integer')
215 raise forms.ValidationError('Cannot be empty')
217 def value_list_to_list(valuelist):
219 for val in valuelist:
223 def get_matchingport_route_pks(portlist, routes):
225 ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
227 rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
228 if rsp and rsp == ports_value_list:
229 route_pk_list.append(route.pk)
232 def get_matchingprotocol_route_pks(protocolist, routes):
234 protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
236 rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
237 if rsp and rsp == protocols_value_list:
238 route_pk_list.append(route.pk)