1 from django import forms
2 from django.utils.safestring import mark_safe
3 from django.utils.translation import ugettext as _
4 from django.utils.translation import ugettext_lazy
5 from django.template.defaultfilters import filesizeformat
6 from flowspy.flowspec.models import *
7 from flowspy.peers.models import *
9 from django.core.urlresolvers import reverse
10 from django.contrib.auth.models import User
11 from django.conf import settings
13 from django.core.mail import mail_admins, mail_managers, send_mail
16 class RouteForm(forms.ModelForm):
17 # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
18 # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
19 # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
20 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
21 # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
22 # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
23 # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
24 # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
25 # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
30 def clean_applier(self):
31 applier = self.cleaned_data['applier']
33 return self.cleaned_data["applier"]
35 raise forms.ValidationError('This field is required.')
37 def clean_source(self):
38 user = User.objects.get(pk=self.data['applier'])
39 peer = user.get_profile().peer
40 data = self.cleaned_data['source']
42 protected_error = False
45 address = IPNetwork(data)
46 for net in settings.PROTECTED_SUBNETS:
47 if address in IPNetwork(net):
48 protected_error = True
49 mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
50 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
51 mail_body, settings.SERVER_EMAIL,
52 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
54 if address.is_private:
58 return self.cleaned_data["source"]
60 error_text = 'Invalid network address format'
62 error_text = 'Private addresses not allowed'
64 error_text = 'You have no authority on this subnet'
65 raise forms.ValidationError(error_text)
67 def clean_destination(self):
68 user = User.objects.get(pk=self.data['applier'])
69 peer = user.get_profile().peer
70 data = self.cleaned_data['destination']
72 protected_error = False
75 address = IPNetwork(data)
76 for net in settings.PROTECTED_SUBNETS:
77 if address in IPNetwork(net):
78 protected_error = True
79 mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
80 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
81 mail_body, settings.SERVER_EMAIL,
82 settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
84 if address.prefixlen < settings.PREFIX_LENGTH:
85 error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
87 return self.cleaned_data["destination"]
89 error_text = 'Invalid network address format'
93 error_text = 'You have no authority on this subnet'
94 raise forms.ValidationError(error_text)
96 def clean_expires(self):
97 date = self.cleaned_data['expires']
99 range_days = (date - datetime.date.today()).days
100 if range_days > 0 and range_days < 11:
101 return self.cleaned_data["expires"]
103 raise forms.ValidationError('Invalid date range')
107 raise forms.ValidationError('Errors in form. Please review and fix them')
108 name = self.cleaned_data.get('name', None)
109 source = self.cleaned_data.get('source', None)
110 sourceports = self.cleaned_data.get('sourceport', None)
111 ports = self.cleaned_data.get('port', None)
112 then = self.cleaned_data.get('then', None)
113 destination = self.cleaned_data.get('destination', None)
114 destinationports = self.cleaned_data.get('destinationport', None)
115 protocols = self.cleaned_data.get('protocol', None)
116 user = self.cleaned_data.get('applier', None)
118 issuperuser = self.data['issuperuser']
119 su = User.objects.get(username=issuperuser)
122 peer = user.get_profile().peer
123 networks = peer.networks.all()
125 networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
129 for network in networks:
130 net = IPNetwork(network.network)
131 if IPNetwork(destination) in net:
134 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
135 if (sourceports and ports):
136 raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
137 if (destinationports and ports):
138 raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
139 if sourceports and not source:
140 raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
141 if destinationports and not destination:
142 raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
143 if not (source or sourceports or ports or destination or destinationports):
144 raise forms.ValidationError('Fill at least a Rule Match Condition')
145 if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
146 raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
147 existing_routes = Route.objects.all()
148 existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
150 source = IPNetwork(source).compressed
151 existing_routes = existing_routes.filter(source=source)
153 existing_routes = existing_routes.filter(source=None)
155 route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
157 existing_routes = existing_routes.filter(pk__in=route_pk_list)
159 existing_routes = existing_routes.filter(protocol=None)
161 existing_routes = existing_routes.filter(protocol=None)
163 route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
165 existing_routes = existing_routes.filter(pk__in=route_pk_list)
167 existing_routes = existing_routes.filter(sourceport=None)
169 route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
171 existing_routes = existing_routes.filter(pk__in=route_pk_list)
173 existing_routes = existing_routes.filter(destinationport=None)
175 route_pk_list=get_matchingport_route_pks(ports, existing_routes)
177 existing_routes = existing_routes.filter(pk__in=route_pk_list)
179 existing_routes = existing_routes.filter(port=None)
180 for route in existing_routes:
181 if name != route.name:
182 existing_url = reverse('edit-route', args=[route.name])
183 if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
184 raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
185 return self.cleaned_data
187 class ThenPlainForm(forms.ModelForm):
188 # action = forms.CharField(initial='rate-limit')
192 def clean_action_value(self):
193 action_value = self.cleaned_data['action_value']
196 assert(int(action_value))
197 if int(action_value) < 50:
198 raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
199 return "%s" %self.cleaned_data["action_value"]
201 raise forms.ValidationError('Rate-limiting should be an integer < 50')
203 raise forms.ValidationError('Cannot be empty')
205 def clean_action(self):
206 action = self.cleaned_data['action']
207 if action != 'rate-limit':
208 raise forms.ValidationError('Cannot select something other than rate-limit')
210 return self.cleaned_data["action"]
213 class PortPlainForm(forms.ModelForm):
214 # action = forms.CharField(initial='rate-limit')
218 def clean_port(self):
219 port = self.cleaned_data['port']
223 return "%s" %self.cleaned_data["port"]
225 raise forms.ValidationError('Port should be an integer')
227 raise forms.ValidationError('Cannot be empty')
229 def value_list_to_list(valuelist):
231 for val in valuelist:
235 def get_matchingport_route_pks(portlist, routes):
237 ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
239 rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
240 if rsp and rsp == ports_value_list:
241 route_pk_list.append(route.pk)
244 def get_matchingprotocol_route_pks(protocolist, routes):
246 protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
248 rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
249 if rsp and rsp == protocols_value_list:
250 route_pk_list.append(route.pk)