root / flowspec / forms.py @ 2a2ea58f
History | View | Annotate | Download (11.9 kB)
1 | 9cad4715 | Leonidas Poulopoulos | from django import forms |
---|---|---|---|
2 | 9cad4715 | Leonidas Poulopoulos | from django.utils.safestring import mark_safe |
3 | 9cad4715 | Leonidas Poulopoulos | from django.utils.translation import ugettext as _ |
4 | 9cad4715 | Leonidas Poulopoulos | from django.utils.translation import ugettext_lazy |
5 | 9cad4715 | Leonidas Poulopoulos | from django.template.defaultfilters import filesizeformat |
6 | 6d153302 | Leonidas Poulopoulos | from flowspy.flowspec.models import * |
7 | 9cad4715 | Leonidas Poulopoulos | from ipaddr import * |
8 | 6d153302 | Leonidas Poulopoulos | from django.core.urlresolvers import reverse |
9 | 97e42c7d | Leonidas Poulopoulos | from django.contrib.auth.models import User |
10 | b4401a0c | Leonidas Poulopoulos | from django.conf import settings |
11 | 052c14aa | Leonidas Poulopoulos | import datetime |
12 | bfdfac23 | Leonidas Poulopoulos | from django.core.mail import mail_admins, mail_managers, send_mail |
13 | 6d153302 | Leonidas Poulopoulos | |
14 | 9cad4715 | Leonidas Poulopoulos | |
15 | 9cad4715 | Leonidas Poulopoulos | class RouteForm(forms.ModelForm): |
16 | 9cad4715 | Leonidas Poulopoulos | # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
|
17 | 9cad4715 | Leonidas Poulopoulos | # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
|
18 | 9cad4715 | Leonidas Poulopoulos | # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
19 | 9cad4715 | Leonidas Poulopoulos | # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
|
20 | 9cad4715 | Leonidas Poulopoulos | # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
|
21 | 9cad4715 | Leonidas Poulopoulos | # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
22 | 9cad4715 | Leonidas Poulopoulos | # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
|
23 | 9cad4715 | Leonidas Poulopoulos | # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
|
24 | 9cad4715 | Leonidas Poulopoulos | # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
|
25 | 6d48c46c | Leonidas Poulopoulos | |
26 | 9cad4715 | Leonidas Poulopoulos | class Meta: |
27 | 9cad4715 | Leonidas Poulopoulos | model = Route |
28 | 9cad4715 | Leonidas Poulopoulos | |
29 | 9cad4715 | Leonidas Poulopoulos | def clean_source(self): |
30 | 20aafa59 | Leonidas Poulopoulos | user = User.objects.get(pk=self.data['applier']) |
31 | 20aafa59 | Leonidas Poulopoulos | peer = user.get_profile().peer |
32 | 9cad4715 | Leonidas Poulopoulos | data = self.cleaned_data['source'] |
33 | 34cff057 | Leonidas Poulopoulos | private_error = False
|
34 | bfdfac23 | Leonidas Poulopoulos | protected_error = False
|
35 | 9cad4715 | Leonidas Poulopoulos | if data:
|
36 | 9cad4715 | Leonidas Poulopoulos | try:
|
37 | 9cad4715 | Leonidas Poulopoulos | address = IPNetwork(data) |
38 | bfdfac23 | Leonidas Poulopoulos | for net in settings.PROTECTED_SUBNETS: |
39 | bfdfac23 | Leonidas Poulopoulos | if address in IPNetwork(net): |
40 | bfdfac23 | Leonidas Poulopoulos | protected_error = True
|
41 | 20aafa59 | Leonidas Poulopoulos | mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
42 | bfdfac23 | Leonidas Poulopoulos | send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
|
43 | bfdfac23 | Leonidas Poulopoulos | mail_body, settings.SERVER_EMAIL, |
44 | 0667633c | Leonidas Poulopoulos | settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
45 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
46 | 34cff057 | Leonidas Poulopoulos | if address.is_private:
|
47 | 34cff057 | Leonidas Poulopoulos | private_error = True
|
48 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
49 | 34cff057 | Leonidas Poulopoulos | else:
|
50 | 34cff057 | Leonidas Poulopoulos | return self.cleaned_data["source"] |
51 | 9cad4715 | Leonidas Poulopoulos | except Exception: |
52 | 34cff057 | Leonidas Poulopoulos | error_text = 'Invalid network address format'
|
53 | 34cff057 | Leonidas Poulopoulos | if private_error:
|
54 | 34cff057 | Leonidas Poulopoulos | error_text = 'Private addresses not allowed'
|
55 | bfdfac23 | Leonidas Poulopoulos | if protected_error:
|
56 | bfdfac23 | Leonidas Poulopoulos | error_text = 'You have no authority on this subnet'
|
57 | 34cff057 | Leonidas Poulopoulos | raise forms.ValidationError(error_text)
|
58 | 9cad4715 | Leonidas Poulopoulos | |
59 | 9cad4715 | Leonidas Poulopoulos | def clean_destination(self): |
60 | 20aafa59 | Leonidas Poulopoulos | user = User.objects.get(pk=self.data['applier']) |
61 | f289f4c8 | Leonidas Poulopoulos | peer = user.get_profile().peer |
62 | 9cad4715 | Leonidas Poulopoulos | data = self.cleaned_data['destination'] |
63 | b4401a0c | Leonidas Poulopoulos | error = None
|
64 | bfdfac23 | Leonidas Poulopoulos | protected_error = False
|
65 | 9cad4715 | Leonidas Poulopoulos | if data:
|
66 | 9cad4715 | Leonidas Poulopoulos | try:
|
67 | 9cad4715 | Leonidas Poulopoulos | address = IPNetwork(data) |
68 | bfdfac23 | Leonidas Poulopoulos | for net in settings.PROTECTED_SUBNETS: |
69 | bfdfac23 | Leonidas Poulopoulos | if address in IPNetwork(net): |
70 | bfdfac23 | Leonidas Poulopoulos | protected_error = True
|
71 | 20aafa59 | Leonidas Poulopoulos | mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
72 | bfdfac23 | Leonidas Poulopoulos | send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
|
73 | bfdfac23 | Leonidas Poulopoulos | mail_body, settings.SERVER_EMAIL, |
74 | 0667633c | Leonidas Poulopoulos | settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
75 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
76 | b4401a0c | Leonidas Poulopoulos | if address.prefixlen < settings.PREFIX_LENGTH:
|
77 | b4401a0c | Leonidas Poulopoulos | error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
|
78 | f289f4c8 | Leonidas Poulopoulos | raise Exception |
79 | 9cad4715 | Leonidas Poulopoulos | return self.cleaned_data["destination"] |
80 | 9cad4715 | Leonidas Poulopoulos | except Exception: |
81 | bfdfac23 | Leonidas Poulopoulos | error_text = 'Invalid network address format'
|
82 | b4401a0c | Leonidas Poulopoulos | if error:
|
83 | b4401a0c | Leonidas Poulopoulos | error_text = error |
84 | bfdfac23 | Leonidas Poulopoulos | if protected_error:
|
85 | bfdfac23 | Leonidas Poulopoulos | error_text = 'You have no authority on this subnet'
|
86 | b4401a0c | Leonidas Poulopoulos | raise forms.ValidationError(error_text)
|
87 | 052c14aa | Leonidas Poulopoulos | |
88 | 052c14aa | Leonidas Poulopoulos | def clean_expires(self): |
89 | 052c14aa | Leonidas Poulopoulos | date = self.cleaned_data['expires'] |
90 | 052c14aa | Leonidas Poulopoulos | if date:
|
91 | 052c14aa | Leonidas Poulopoulos | range_days = (date - datetime.date.today()).days |
92 | 052c14aa | Leonidas Poulopoulos | if range_days > 0 and range_days < 11: |
93 | 052c14aa | Leonidas Poulopoulos | return self.cleaned_data["expires"] |
94 | 052c14aa | Leonidas Poulopoulos | else:
|
95 | 052c14aa | Leonidas Poulopoulos | raise forms.ValidationError('Invalid date range') |
96 | 9cad4715 | Leonidas Poulopoulos | |
97 | 9cad4715 | Leonidas Poulopoulos | def clean(self): |
98 | f289f4c8 | Leonidas Poulopoulos | if self.errors: |
99 | f289f4c8 | Leonidas Poulopoulos | raise forms.ValidationError('Errors in form. Please review and fix them') |
100 | 6d153302 | Leonidas Poulopoulos | name = self.cleaned_data.get('name', None) |
101 | 9cad4715 | Leonidas Poulopoulos | source = self.cleaned_data.get('source', None) |
102 | 9cad4715 | Leonidas Poulopoulos | sourceports = self.cleaned_data.get('sourceport', None) |
103 | 9cad4715 | Leonidas Poulopoulos | ports = self.cleaned_data.get('port', None) |
104 | 6d153302 | Leonidas Poulopoulos | then = self.cleaned_data.get('then', None) |
105 | 9cad4715 | Leonidas Poulopoulos | destination = self.cleaned_data.get('destination', None) |
106 | 9cad4715 | Leonidas Poulopoulos | destinationports = self.cleaned_data.get('destinationport', None) |
107 | dbdc30ec | Leonidas Poulopoulos | protocols = self.cleaned_data.get('protocol', None) |
108 | 97e42c7d | Leonidas Poulopoulos | user = self.cleaned_data.get('applier', None) |
109 | 6d153302 | Leonidas Poulopoulos | peer = user.get_profile().peer |
110 | 6d153302 | Leonidas Poulopoulos | networks = peer.networks.all() |
111 | 97e42c7d | Leonidas Poulopoulos | mynetwork = False
|
112 | 6d153302 | Leonidas Poulopoulos | route_pk_list = [] |
113 | 97e42c7d | Leonidas Poulopoulos | if destination:
|
114 | 97e42c7d | Leonidas Poulopoulos | for network in networks: |
115 | 97e42c7d | Leonidas Poulopoulos | net = IPNetwork(network.network) |
116 | 97e42c7d | Leonidas Poulopoulos | if IPNetwork(destination) in net: |
117 | 97e42c7d | Leonidas Poulopoulos | mynetwork = True
|
118 | 97e42c7d | Leonidas Poulopoulos | if not mynetwork: |
119 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks') |
120 | 9cad4715 | Leonidas Poulopoulos | if (sourceports and ports): |
121 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports') |
122 | 9cad4715 | Leonidas Poulopoulos | if (destinationports and ports): |
123 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports') |
124 | 9cad4715 | Leonidas Poulopoulos | if sourceports and not source: |
125 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address') |
126 | 9cad4715 | Leonidas Poulopoulos | if destinationports and not destination: |
127 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address') |
128 | 9cad4715 | Leonidas Poulopoulos | if not (source or sourceports or ports or destination or destinationports): |
129 | a5d4c70f | Leonidas Poulopoulos | raise forms.ValidationError('Fill at least a Rule Match Condition') |
130 | 6d48c46c | Leonidas Poulopoulos | if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS: |
131 | 6d48c46c | Leonidas Poulopoulos | raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action)) |
132 | dbdc30ec | Leonidas Poulopoulos | existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE') |
133 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(applier__userprofile__peer=peer) |
134 | 6d153302 | Leonidas Poulopoulos | if source:
|
135 | 6d153302 | Leonidas Poulopoulos | source = IPNetwork(source).compressed |
136 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(source=source) |
137 | 6d153302 | Leonidas Poulopoulos | else:
|
138 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(source=None)
|
139 | dbdc30ec | Leonidas Poulopoulos | if protocols:
|
140 | dbdc30ec | Leonidas Poulopoulos | route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes) |
141 | dbdc30ec | Leonidas Poulopoulos | if route_pk_list:
|
142 | dbdc30ec | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
143 | dbdc30ec | Leonidas Poulopoulos | else:
|
144 | dbdc30ec | Leonidas Poulopoulos | existing_routes = existing_routes.filter(protocol=None)
|
145 | dbdc30ec | Leonidas Poulopoulos | else:
|
146 | dbdc30ec | Leonidas Poulopoulos | existing_routes = existing_routes.filter(protocol=None)
|
147 | 6d153302 | Leonidas Poulopoulos | if sourceports:
|
148 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(sourceports, existing_routes) |
149 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
150 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
151 | 6d153302 | Leonidas Poulopoulos | else:
|
152 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(sourceport=None)
|
153 | 6d153302 | Leonidas Poulopoulos | if destinationports:
|
154 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(destinationports, existing_routes) |
155 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
156 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
157 | 6d153302 | Leonidas Poulopoulos | else:
|
158 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(destinationport=None)
|
159 | 6d153302 | Leonidas Poulopoulos | if ports:
|
160 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(ports, existing_routes) |
161 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
162 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
163 | 6d153302 | Leonidas Poulopoulos | else:
|
164 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(port=None)
|
165 | 6d153302 | Leonidas Poulopoulos | for route in existing_routes: |
166 | 6d153302 | Leonidas Poulopoulos | if name != route.name:
|
167 | 6d153302 | Leonidas Poulopoulos | existing_url = reverse('edit-route', args=[route.name])
|
168 | 7a0ac0d1 | Leonidas Poulopoulos | if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination): |
169 | 7a0ac0d1 | Leonidas Poulopoulos | raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name)) |
170 | 97e42c7d | Leonidas Poulopoulos | return self.cleaned_data |
171 | 97e42c7d | Leonidas Poulopoulos | |
172 | 97e42c7d | Leonidas Poulopoulos | class ThenPlainForm(forms.ModelForm): |
173 | 97e42c7d | Leonidas Poulopoulos | # action = forms.CharField(initial='rate-limit')
|
174 | 97e42c7d | Leonidas Poulopoulos | class Meta: |
175 | 97e42c7d | Leonidas Poulopoulos | model = ThenAction |
176 | 97e42c7d | Leonidas Poulopoulos | |
177 | 97e42c7d | Leonidas Poulopoulos | def clean_action_value(self): |
178 | 97e42c7d | Leonidas Poulopoulos | action_value = self.cleaned_data['action_value'] |
179 | 97e42c7d | Leonidas Poulopoulos | if action_value:
|
180 | 97e42c7d | Leonidas Poulopoulos | try:
|
181 | 97e42c7d | Leonidas Poulopoulos | assert(int(action_value)) |
182 | f12b3d54 | Leonidas Poulopoulos | if int(action_value) < 50: |
183 | f12b3d54 | Leonidas Poulopoulos | raise forms.ValidationError('Rate-limiting cannot be < 50kbps') |
184 | 97e42c7d | Leonidas Poulopoulos | return "%s" %self.cleaned_data["action_value"] |
185 | 97e42c7d | Leonidas Poulopoulos | except:
|
186 | f12b3d54 | Leonidas Poulopoulos | raise forms.ValidationError('Rate-limiting should be an integer < 50') |
187 | 97e42c7d | Leonidas Poulopoulos | else:
|
188 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot be empty') |
189 | 97e42c7d | Leonidas Poulopoulos | |
190 | 97e42c7d | Leonidas Poulopoulos | def clean_action(self): |
191 | 97e42c7d | Leonidas Poulopoulos | action = self.cleaned_data['action'] |
192 | 97e42c7d | Leonidas Poulopoulos | if action != 'rate-limit': |
193 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot select something other than rate-limit') |
194 | 97e42c7d | Leonidas Poulopoulos | else:
|
195 | 97e42c7d | Leonidas Poulopoulos | return self.cleaned_data["action"] |
196 | f12b3d54 | Leonidas Poulopoulos | |
197 | 97e42c7d | Leonidas Poulopoulos | |
198 | 97e42c7d | Leonidas Poulopoulos | class PortPlainForm(forms.ModelForm): |
199 | 97e42c7d | Leonidas Poulopoulos | # action = forms.CharField(initial='rate-limit')
|
200 | 97e42c7d | Leonidas Poulopoulos | class Meta: |
201 | 97e42c7d | Leonidas Poulopoulos | model = MatchPort |
202 | 97e42c7d | Leonidas Poulopoulos | |
203 | 97e42c7d | Leonidas Poulopoulos | def clean_port(self): |
204 | 97e42c7d | Leonidas Poulopoulos | port = self.cleaned_data['port'] |
205 | 97e42c7d | Leonidas Poulopoulos | if port:
|
206 | 97e42c7d | Leonidas Poulopoulos | try:
|
207 | 97e42c7d | Leonidas Poulopoulos | assert(int(port)) |
208 | 97e42c7d | Leonidas Poulopoulos | return "%s" %self.cleaned_data["port"] |
209 | 97e42c7d | Leonidas Poulopoulos | except:
|
210 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Port should be an integer') |
211 | 97e42c7d | Leonidas Poulopoulos | else:
|
212 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot be empty') |
213 | 6d153302 | Leonidas Poulopoulos | |
214 | 6d153302 | Leonidas Poulopoulos | def value_list_to_list(valuelist): |
215 | 6d153302 | Leonidas Poulopoulos | vl = [] |
216 | 6d153302 | Leonidas Poulopoulos | for val in valuelist: |
217 | 6d153302 | Leonidas Poulopoulos | vl.append(val[0])
|
218 | 6d153302 | Leonidas Poulopoulos | return vl
|
219 | 6d153302 | Leonidas Poulopoulos | |
220 | 6d153302 | Leonidas Poulopoulos | def get_matchingport_route_pks(portlist, routes): |
221 | 6d153302 | Leonidas Poulopoulos | route_pk_list = [] |
222 | 6d153302 | Leonidas Poulopoulos | ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port')) |
223 | 6d153302 | Leonidas Poulopoulos | for route in routes: |
224 | 6d153302 | Leonidas Poulopoulos | rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port')) |
225 | 6d153302 | Leonidas Poulopoulos | if rsp and rsp == ports_value_list: |
226 | 6d153302 | Leonidas Poulopoulos | route_pk_list.append(route.pk) |
227 | dbdc30ec | Leonidas Poulopoulos | return route_pk_list
|
228 | dbdc30ec | Leonidas Poulopoulos | |
229 | dbdc30ec | Leonidas Poulopoulos | def get_matchingprotocol_route_pks(protocolist, routes): |
230 | dbdc30ec | Leonidas Poulopoulos | route_pk_list = [] |
231 | dbdc30ec | Leonidas Poulopoulos | protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol')) |
232 | dbdc30ec | Leonidas Poulopoulos | for route in routes: |
233 | dbdc30ec | Leonidas Poulopoulos | rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol')) |
234 | dbdc30ec | Leonidas Poulopoulos | if rsp and rsp == protocols_value_list: |
235 | dbdc30ec | Leonidas Poulopoulos | route_pk_list.append(route.pk) |
236 | 6d153302 | Leonidas Poulopoulos | return route_pk_list |