root / flowspec / forms.py @ 1b996627
History | View | Annotate | Download (12 kB)
1 | 9cad4715 | Leonidas Poulopoulos | from django import forms |
---|---|---|---|
2 | 9cad4715 | Leonidas Poulopoulos | from django.utils.safestring import mark_safe |
3 | 9cad4715 | Leonidas Poulopoulos | from django.utils.translation import ugettext as _ |
4 | 9cad4715 | Leonidas Poulopoulos | from django.utils.translation import ugettext_lazy |
5 | 9cad4715 | Leonidas Poulopoulos | from django.template.defaultfilters import filesizeformat |
6 | 6d153302 | Leonidas Poulopoulos | from flowspy.flowspec.models import * |
7 | 9cad4715 | Leonidas Poulopoulos | from ipaddr import * |
8 | 6d153302 | Leonidas Poulopoulos | from django.core.urlresolvers import reverse |
9 | 97e42c7d | Leonidas Poulopoulos | from django.contrib.auth.models import User |
10 | b4401a0c | Leonidas Poulopoulos | from django.conf import settings |
11 | 052c14aa | Leonidas Poulopoulos | import datetime |
12 | bfdfac23 | Leonidas Poulopoulos | from django.core.mail import mail_admins, mail_managers, send_mail |
13 | 1b996627 | Leonidas Poulopoulos | from django.forms.models import modelformset_factory |
14 | 6d153302 | Leonidas Poulopoulos | |
15 | 9cad4715 | Leonidas Poulopoulos | |
16 | 9cad4715 | Leonidas Poulopoulos | class RouteForm(forms.ModelForm): |
17 | 9cad4715 | Leonidas Poulopoulos | # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
|
18 | 9cad4715 | Leonidas Poulopoulos | # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
|
19 | 9cad4715 | Leonidas Poulopoulos | # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
20 | 9cad4715 | Leonidas Poulopoulos | # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
|
21 | 9cad4715 | Leonidas Poulopoulos | # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
|
22 | 9cad4715 | Leonidas Poulopoulos | # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
23 | 9cad4715 | Leonidas Poulopoulos | # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
|
24 | 9cad4715 | Leonidas Poulopoulos | # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
|
25 | 9cad4715 | Leonidas Poulopoulos | # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
|
26 | 6d48c46c | Leonidas Poulopoulos | |
27 | 9cad4715 | Leonidas Poulopoulos | class Meta: |
28 | 9cad4715 | Leonidas Poulopoulos | model = Route |
29 | 9cad4715 | Leonidas Poulopoulos | |
30 | 9cad4715 | Leonidas Poulopoulos | def clean_source(self): |
31 | 20aafa59 | Leonidas Poulopoulos | user = User.objects.get(pk=self.data['applier']) |
32 | 20aafa59 | Leonidas Poulopoulos | peer = user.get_profile().peer |
33 | 9cad4715 | Leonidas Poulopoulos | data = self.cleaned_data['source'] |
34 | 34cff057 | Leonidas Poulopoulos | private_error = False
|
35 | bfdfac23 | Leonidas Poulopoulos | protected_error = False
|
36 | 9cad4715 | Leonidas Poulopoulos | if data:
|
37 | 9cad4715 | Leonidas Poulopoulos | try:
|
38 | 9cad4715 | Leonidas Poulopoulos | address = IPNetwork(data) |
39 | bfdfac23 | Leonidas Poulopoulos | for net in settings.PROTECTED_SUBNETS: |
40 | bfdfac23 | Leonidas Poulopoulos | if address in IPNetwork(net): |
41 | bfdfac23 | Leonidas Poulopoulos | protected_error = True
|
42 | 20aafa59 | Leonidas Poulopoulos | mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
43 | bfdfac23 | Leonidas Poulopoulos | send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
|
44 | bfdfac23 | Leonidas Poulopoulos | mail_body, settings.SERVER_EMAIL, |
45 | 0667633c | Leonidas Poulopoulos | settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
46 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
47 | 34cff057 | Leonidas Poulopoulos | if address.is_private:
|
48 | 34cff057 | Leonidas Poulopoulos | private_error = True
|
49 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
50 | 34cff057 | Leonidas Poulopoulos | else:
|
51 | 34cff057 | Leonidas Poulopoulos | return self.cleaned_data["source"] |
52 | 9cad4715 | Leonidas Poulopoulos | except Exception: |
53 | 34cff057 | Leonidas Poulopoulos | error_text = 'Invalid network address format'
|
54 | 34cff057 | Leonidas Poulopoulos | if private_error:
|
55 | 34cff057 | Leonidas Poulopoulos | error_text = 'Private addresses not allowed'
|
56 | bfdfac23 | Leonidas Poulopoulos | if protected_error:
|
57 | bfdfac23 | Leonidas Poulopoulos | error_text = 'You have no authority on this subnet'
|
58 | 34cff057 | Leonidas Poulopoulos | raise forms.ValidationError(error_text)
|
59 | 9cad4715 | Leonidas Poulopoulos | |
60 | 9cad4715 | Leonidas Poulopoulos | def clean_destination(self): |
61 | 20aafa59 | Leonidas Poulopoulos | user = User.objects.get(pk=self.data['applier']) |
62 | f289f4c8 | Leonidas Poulopoulos | peer = user.get_profile().peer |
63 | 9cad4715 | Leonidas Poulopoulos | data = self.cleaned_data['destination'] |
64 | b4401a0c | Leonidas Poulopoulos | error = None
|
65 | bfdfac23 | Leonidas Poulopoulos | protected_error = False
|
66 | 9cad4715 | Leonidas Poulopoulos | if data:
|
67 | 9cad4715 | Leonidas Poulopoulos | try:
|
68 | 9cad4715 | Leonidas Poulopoulos | address = IPNetwork(data) |
69 | bfdfac23 | Leonidas Poulopoulos | for net in settings.PROTECTED_SUBNETS: |
70 | bfdfac23 | Leonidas Poulopoulos | if address in IPNetwork(net): |
71 | bfdfac23 | Leonidas Poulopoulos | protected_error = True
|
72 | 20aafa59 | Leonidas Poulopoulos | mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
73 | bfdfac23 | Leonidas Poulopoulos | send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
|
74 | bfdfac23 | Leonidas Poulopoulos | mail_body, settings.SERVER_EMAIL, |
75 | 0667633c | Leonidas Poulopoulos | settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
76 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
77 | b4401a0c | Leonidas Poulopoulos | if address.prefixlen < settings.PREFIX_LENGTH:
|
78 | b4401a0c | Leonidas Poulopoulos | error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
|
79 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
80 | 9cad4715 | Leonidas Poulopoulos | return self.cleaned_data["destination"] |
81 | 9cad4715 | Leonidas Poulopoulos | except Exception: |
82 | bfdfac23 | Leonidas Poulopoulos | error_text = 'Invalid network address format'
|
83 | b4401a0c | Leonidas Poulopoulos | if error:
|
84 | b4401a0c | Leonidas Poulopoulos | error_text = error |
85 | bfdfac23 | Leonidas Poulopoulos | if protected_error:
|
86 | bfdfac23 | Leonidas Poulopoulos | error_text = 'You have no authority on this subnet'
|
87 | b4401a0c | Leonidas Poulopoulos | raise forms.ValidationError(error_text)
|
88 | 052c14aa | Leonidas Poulopoulos | |
89 | 052c14aa | Leonidas Poulopoulos | def clean_expires(self): |
90 | 052c14aa | Leonidas Poulopoulos | date = self.cleaned_data['expires'] |
91 | 052c14aa | Leonidas Poulopoulos | if date:
|
92 | 052c14aa | Leonidas Poulopoulos | range_days = (date - datetime.date.today()).days |
93 | 052c14aa | Leonidas Poulopoulos | if range_days > 0 and range_days < 11: |
94 | 052c14aa | Leonidas Poulopoulos | return self.cleaned_data["expires"] |
95 | 052c14aa | Leonidas Poulopoulos | else:
|
96 | 052c14aa | Leonidas Poulopoulos | raise forms.ValidationError('Invalid date range') |
97 | 9cad4715 | Leonidas Poulopoulos | |
98 | 9cad4715 | Leonidas Poulopoulos | def clean(self): |
99 | f289f4c8 | Leonidas Poulopoulos | if self.errors: |
100 | f289f4c8 | Leonidas Poulopoulos | raise forms.ValidationError('Errors in form. Please review and fix them') |
101 | 6d153302 | Leonidas Poulopoulos | name = self.cleaned_data.get('name', None) |
102 | 9cad4715 | Leonidas Poulopoulos | source = self.cleaned_data.get('source', None) |
103 | 9cad4715 | Leonidas Poulopoulos | sourceports = self.cleaned_data.get('sourceport', None) |
104 | 9cad4715 | Leonidas Poulopoulos | ports = self.cleaned_data.get('port', None) |
105 | 6d153302 | Leonidas Poulopoulos | then = self.cleaned_data.get('then', None) |
106 | 9cad4715 | Leonidas Poulopoulos | destination = self.cleaned_data.get('destination', None) |
107 | 9cad4715 | Leonidas Poulopoulos | destinationports = self.cleaned_data.get('destinationport', None) |
108 | dbdc30ec | Leonidas Poulopoulos | protocols = self.cleaned_data.get('protocol', None) |
109 | 97e42c7d | Leonidas Poulopoulos | user = self.cleaned_data.get('applier', None) |
110 | 6d153302 | Leonidas Poulopoulos | peer = user.get_profile().peer |
111 | 6d153302 | Leonidas Poulopoulos | networks = peer.networks.all() |
112 | 97e42c7d | Leonidas Poulopoulos | mynetwork = False
|
113 | 6d153302 | Leonidas Poulopoulos | route_pk_list = [] |
114 | 97e42c7d | Leonidas Poulopoulos | if destination:
|
115 | 97e42c7d | Leonidas Poulopoulos | for network in networks: |
116 | 97e42c7d | Leonidas Poulopoulos | net = IPNetwork(network.network) |
117 | 97e42c7d | Leonidas Poulopoulos | if IPNetwork(destination) in net: |
118 | 97e42c7d | Leonidas Poulopoulos | mynetwork = True
|
119 | 97e42c7d | Leonidas Poulopoulos | if not mynetwork: |
120 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks') |
121 | 9cad4715 | Leonidas Poulopoulos | if (sourceports and ports): |
122 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports') |
123 | 9cad4715 | Leonidas Poulopoulos | if (destinationports and ports): |
124 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports') |
125 | 9cad4715 | Leonidas Poulopoulos | if sourceports and not source: |
126 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address') |
127 | 9cad4715 | Leonidas Poulopoulos | if destinationports and not destination: |
128 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address') |
129 | 9cad4715 | Leonidas Poulopoulos | if not (source or sourceports or ports or destination or destinationports): |
130 | a5d4c70f | Leonidas Poulopoulos | raise forms.ValidationError('Fill at least a Rule Match Condition') |
131 | 6d48c46c | Leonidas Poulopoulos | if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS: |
132 | 6d48c46c | Leonidas Poulopoulos | raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action)) |
133 | dbdc30ec | Leonidas Poulopoulos | existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE') |
134 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(applier__userprofile__peer=peer) |
135 | 6d153302 | Leonidas Poulopoulos | if source:
|
136 | 6d153302 | Leonidas Poulopoulos | source = IPNetwork(source).compressed |
137 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(source=source) |
138 | 6d153302 | Leonidas Poulopoulos | else:
|
139 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(source=None)
|
140 | dbdc30ec | Leonidas Poulopoulos | if protocols:
|
141 | dbdc30ec | Leonidas Poulopoulos | route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes) |
142 | dbdc30ec | Leonidas Poulopoulos | if route_pk_list:
|
143 | dbdc30ec | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
144 | dbdc30ec | Leonidas Poulopoulos | else:
|
145 | dbdc30ec | Leonidas Poulopoulos | existing_routes = existing_routes.filter(protocol=None)
|
146 | dbdc30ec | Leonidas Poulopoulos | else:
|
147 | dbdc30ec | Leonidas Poulopoulos | existing_routes = existing_routes.filter(protocol=None)
|
148 | 6d153302 | Leonidas Poulopoulos | if sourceports:
|
149 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(sourceports, existing_routes) |
150 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
151 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
152 | 6d153302 | Leonidas Poulopoulos | else:
|
153 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(sourceport=None)
|
154 | 6d153302 | Leonidas Poulopoulos | if destinationports:
|
155 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(destinationports, existing_routes) |
156 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
157 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
158 | 6d153302 | Leonidas Poulopoulos | else:
|
159 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(destinationport=None)
|
160 | 6d153302 | Leonidas Poulopoulos | if ports:
|
161 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(ports, existing_routes) |
162 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
163 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
164 | 6d153302 | Leonidas Poulopoulos | else:
|
165 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(port=None)
|
166 | 6d153302 | Leonidas Poulopoulos | for route in existing_routes: |
167 | 6d153302 | Leonidas Poulopoulos | if name != route.name:
|
168 | 6d153302 | Leonidas Poulopoulos | existing_url = reverse('edit-route', args=[route.name])
|
169 | 7a0ac0d1 | Leonidas Poulopoulos | if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination): |
170 | 7a0ac0d1 | Leonidas Poulopoulos | raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name)) |
171 | 97e42c7d | Leonidas Poulopoulos | return self.cleaned_data |
172 | 97e42c7d | Leonidas Poulopoulos | |
173 | 97e42c7d | Leonidas Poulopoulos | class ThenPlainForm(forms.ModelForm): |
174 | 97e42c7d | Leonidas Poulopoulos | # action = forms.CharField(initial='rate-limit')
|
175 | 97e42c7d | Leonidas Poulopoulos | class Meta: |
176 | 97e42c7d | Leonidas Poulopoulos | model = ThenAction |
177 | 97e42c7d | Leonidas Poulopoulos | |
178 | 97e42c7d | Leonidas Poulopoulos | def clean_action_value(self): |
179 | 97e42c7d | Leonidas Poulopoulos | action_value = self.cleaned_data['action_value'] |
180 | 97e42c7d | Leonidas Poulopoulos | if action_value:
|
181 | 97e42c7d | Leonidas Poulopoulos | try:
|
182 | 97e42c7d | Leonidas Poulopoulos | assert(int(action_value)) |
183 | f12b3d54 | Leonidas Poulopoulos | if int(action_value) < 50: |
184 | f12b3d54 | Leonidas Poulopoulos | raise forms.ValidationError('Rate-limiting cannot be < 50kbps') |
185 | 97e42c7d | Leonidas Poulopoulos | return "%s" %self.cleaned_data["action_value"] |
186 | 97e42c7d | Leonidas Poulopoulos | except:
|
187 | f12b3d54 | Leonidas Poulopoulos | raise forms.ValidationError('Rate-limiting should be an integer < 50') |
188 | 97e42c7d | Leonidas Poulopoulos | else:
|
189 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot be empty') |
190 | 97e42c7d | Leonidas Poulopoulos | |
191 | 97e42c7d | Leonidas Poulopoulos | def clean_action(self): |
192 | 97e42c7d | Leonidas Poulopoulos | action = self.cleaned_data['action'] |
193 | 97e42c7d | Leonidas Poulopoulos | if action != 'rate-limit': |
194 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot select something other than rate-limit') |
195 | 97e42c7d | Leonidas Poulopoulos | else:
|
196 | 97e42c7d | Leonidas Poulopoulos | return self.cleaned_data["action"] |
197 | f12b3d54 | Leonidas Poulopoulos | |
198 | 97e42c7d | Leonidas Poulopoulos | |
199 | 97e42c7d | Leonidas Poulopoulos | class PortPlainForm(forms.ModelForm): |
200 | 97e42c7d | Leonidas Poulopoulos | # action = forms.CharField(initial='rate-limit')
|
201 | 97e42c7d | Leonidas Poulopoulos | class Meta: |
202 | 97e42c7d | Leonidas Poulopoulos | model = MatchPort |
203 | 97e42c7d | Leonidas Poulopoulos | |
204 | 97e42c7d | Leonidas Poulopoulos | def clean_port(self): |
205 | 97e42c7d | Leonidas Poulopoulos | port = self.cleaned_data['port'] |
206 | 97e42c7d | Leonidas Poulopoulos | if port:
|
207 | 97e42c7d | Leonidas Poulopoulos | try:
|
208 | 97e42c7d | Leonidas Poulopoulos | assert(int(port)) |
209 | 97e42c7d | Leonidas Poulopoulos | return "%s" %self.cleaned_data["port"] |
210 | 97e42c7d | Leonidas Poulopoulos | except:
|
211 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Port should be an integer') |
212 | 97e42c7d | Leonidas Poulopoulos | else:
|
213 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot be empty') |
214 | 6d153302 | Leonidas Poulopoulos | |
215 | 1b996627 | Leonidas Poulopoulos | RuleFormSet = modelformset_factory(Route, extra=2)
|
216 | 1b996627 | Leonidas Poulopoulos | |
217 | 6d153302 | Leonidas Poulopoulos | def value_list_to_list(valuelist): |
218 | 6d153302 | Leonidas Poulopoulos | vl = [] |
219 | 6d153302 | Leonidas Poulopoulos | for val in valuelist: |
220 | 6d153302 | Leonidas Poulopoulos | vl.append(val[0])
|
221 | 6d153302 | Leonidas Poulopoulos | return vl
|
222 | 6d153302 | Leonidas Poulopoulos | |
223 | 6d153302 | Leonidas Poulopoulos | def get_matchingport_route_pks(portlist, routes): |
224 | 6d153302 | Leonidas Poulopoulos | route_pk_list = [] |
225 | 6d153302 | Leonidas Poulopoulos | ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port')) |
226 | 6d153302 | Leonidas Poulopoulos | for route in routes: |
227 | 6d153302 | Leonidas Poulopoulos | rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port')) |
228 | 6d153302 | Leonidas Poulopoulos | if rsp and rsp == ports_value_list: |
229 | 6d153302 | Leonidas Poulopoulos | route_pk_list.append(route.pk) |
230 | dbdc30ec | Leonidas Poulopoulos | return route_pk_list
|
231 | dbdc30ec | Leonidas Poulopoulos | |
232 | dbdc30ec | Leonidas Poulopoulos | def get_matchingprotocol_route_pks(protocolist, routes): |
233 | dbdc30ec | Leonidas Poulopoulos | route_pk_list = [] |
234 | dbdc30ec | Leonidas Poulopoulos | protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol')) |
235 | dbdc30ec | Leonidas Poulopoulos | for route in routes: |
236 | dbdc30ec | Leonidas Poulopoulos | rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol')) |
237 | dbdc30ec | Leonidas Poulopoulos | if rsp and rsp == protocols_value_list: |
238 | dbdc30ec | Leonidas Poulopoulos | route_pk_list.append(route.pk) |
239 | 6d153302 | Leonidas Poulopoulos | return route_pk_list |