root / flowspec / forms.py @ 15d4014d
History | View | Annotate | Download (11.1 kB)
1 | 9cad4715 | Leonidas Poulopoulos | from django import forms |
---|---|---|---|
2 | 9cad4715 | Leonidas Poulopoulos | from django.utils.safestring import mark_safe |
3 | 9cad4715 | Leonidas Poulopoulos | from django.utils.translation import ugettext as _ |
4 | 9cad4715 | Leonidas Poulopoulos | from django.utils.translation import ugettext_lazy |
5 | 9cad4715 | Leonidas Poulopoulos | from django.template.defaultfilters import filesizeformat |
6 | 6d153302 | Leonidas Poulopoulos | from flowspy.flowspec.models import * |
7 | 9cad4715 | Leonidas Poulopoulos | from ipaddr import * |
8 | 6d153302 | Leonidas Poulopoulos | from django.core.urlresolvers import reverse |
9 | 97e42c7d | Leonidas Poulopoulos | from django.contrib.auth.models import User |
10 | b4401a0c | Leonidas Poulopoulos | from django.conf import settings |
11 | 052c14aa | Leonidas Poulopoulos | import datetime |
12 | bfdfac23 | Leonidas Poulopoulos | from django.core.mail import mail_admins, mail_managers, send_mail |
13 | 6d153302 | Leonidas Poulopoulos | |
14 | 9cad4715 | Leonidas Poulopoulos | |
15 | 9cad4715 | Leonidas Poulopoulos | class RouteForm(forms.ModelForm): |
16 | 9cad4715 | Leonidas Poulopoulos | # name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
|
17 | 9cad4715 | Leonidas Poulopoulos | # " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
|
18 | 9cad4715 | Leonidas Poulopoulos | # source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
19 | 9cad4715 | Leonidas Poulopoulos | # " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
|
20 | 9cad4715 | Leonidas Poulopoulos | # source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
|
21 | 9cad4715 | Leonidas Poulopoulos | # destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
|
22 | 9cad4715 | Leonidas Poulopoulos | # " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
|
23 | 9cad4715 | Leonidas Poulopoulos | # destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
|
24 | 9cad4715 | Leonidas Poulopoulos | # ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
|
25 | 6d48c46c | Leonidas Poulopoulos | |
26 | 9cad4715 | Leonidas Poulopoulos | class Meta: |
27 | 9cad4715 | Leonidas Poulopoulos | model = Route |
28 | 9cad4715 | Leonidas Poulopoulos | |
29 | 9cad4715 | Leonidas Poulopoulos | def clean_source(self): |
30 | 20aafa59 | Leonidas Poulopoulos | user = User.objects.get(pk=self.data['applier']) |
31 | 20aafa59 | Leonidas Poulopoulos | peer = user.get_profile().peer |
32 | 9cad4715 | Leonidas Poulopoulos | data = self.cleaned_data['source'] |
33 | 34cff057 | Leonidas Poulopoulos | private_error = False
|
34 | bfdfac23 | Leonidas Poulopoulos | protected_error = False
|
35 | 9cad4715 | Leonidas Poulopoulos | if data:
|
36 | 9cad4715 | Leonidas Poulopoulos | try:
|
37 | 9cad4715 | Leonidas Poulopoulos | address = IPNetwork(data) |
38 | bfdfac23 | Leonidas Poulopoulos | for net in settings.PROTECTED_SUBNETS: |
39 | bfdfac23 | Leonidas Poulopoulos | if address in IPNetwork(net): |
40 | bfdfac23 | Leonidas Poulopoulos | protected_error = True
|
41 | 20aafa59 | Leonidas Poulopoulos | mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
42 | bfdfac23 | Leonidas Poulopoulos | send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
|
43 | bfdfac23 | Leonidas Poulopoulos | mail_body, settings.SERVER_EMAIL, |
44 | 0667633c | Leonidas Poulopoulos | settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
45 | bfdfac23 | Leonidas Poulopoulos | raise forms.ValidationError("Not allowed") |
46 | 34cff057 | Leonidas Poulopoulos | if address.is_private:
|
47 | 34cff057 | Leonidas Poulopoulos | private_error = True
|
48 | 34cff057 | Leonidas Poulopoulos | raise forms.ValidationError('Private addresses not allowed') |
49 | 34cff057 | Leonidas Poulopoulos | else:
|
50 | 34cff057 | Leonidas Poulopoulos | return self.cleaned_data["source"] |
51 | 9cad4715 | Leonidas Poulopoulos | except Exception: |
52 | 34cff057 | Leonidas Poulopoulos | error_text = 'Invalid network address format'
|
53 | 34cff057 | Leonidas Poulopoulos | if private_error:
|
54 | 34cff057 | Leonidas Poulopoulos | error_text = 'Private addresses not allowed'
|
55 | bfdfac23 | Leonidas Poulopoulos | if protected_error:
|
56 | bfdfac23 | Leonidas Poulopoulos | error_text = 'You have no authority on this subnet'
|
57 | 34cff057 | Leonidas Poulopoulos | raise forms.ValidationError(error_text)
|
58 | 9cad4715 | Leonidas Poulopoulos | |
59 | 9cad4715 | Leonidas Poulopoulos | def clean_destination(self): |
60 | 20aafa59 | Leonidas Poulopoulos | user = User.objects.get(pk=self.data['applier']) |
61 | 20aafa59 | Leonidas Poulopoulos | peer = user.get_profile().peer |
62 | 9cad4715 | Leonidas Poulopoulos | data = self.cleaned_data['destination'] |
63 | b4401a0c | Leonidas Poulopoulos | error = None
|
64 | bfdfac23 | Leonidas Poulopoulos | protected_error = False
|
65 | 9cad4715 | Leonidas Poulopoulos | if data:
|
66 | 9cad4715 | Leonidas Poulopoulos | try:
|
67 | 9cad4715 | Leonidas Poulopoulos | address = IPNetwork(data) |
68 | bfdfac23 | Leonidas Poulopoulos | for net in settings.PROTECTED_SUBNETS: |
69 | bfdfac23 | Leonidas Poulopoulos | if address in IPNetwork(net): |
70 | bfdfac23 | Leonidas Poulopoulos | protected_error = True
|
71 | 20aafa59 | Leonidas Poulopoulos | mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
|
72 | bfdfac23 | Leonidas Poulopoulos | send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
|
73 | bfdfac23 | Leonidas Poulopoulos | mail_body, settings.SERVER_EMAIL, |
74 | 0667633c | Leonidas Poulopoulos | settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
|
75 | bfdfac23 | Leonidas Poulopoulos | raise forms.ValidationError("Not allowed") |
76 | b4401a0c | Leonidas Poulopoulos | if address.prefixlen < settings.PREFIX_LENGTH:
|
77 | b4401a0c | Leonidas Poulopoulos | error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
|
78 | b4401a0c | Leonidas Poulopoulos | raise forms.ValidationError('error') |
79 | 9cad4715 | Leonidas Poulopoulos | return self.cleaned_data["destination"] |
80 | 9cad4715 | Leonidas Poulopoulos | except Exception: |
81 | bfdfac23 | Leonidas Poulopoulos | error_text = 'Invalid network address format'
|
82 | b4401a0c | Leonidas Poulopoulos | if error:
|
83 | b4401a0c | Leonidas Poulopoulos | error_text = error |
84 | bfdfac23 | Leonidas Poulopoulos | if protected_error:
|
85 | bfdfac23 | Leonidas Poulopoulos | error_text = 'You have no authority on this subnet'
|
86 | b4401a0c | Leonidas Poulopoulos | raise forms.ValidationError(error_text)
|
87 | 052c14aa | Leonidas Poulopoulos | |
88 | 052c14aa | Leonidas Poulopoulos | def clean_expires(self): |
89 | 052c14aa | Leonidas Poulopoulos | date = self.cleaned_data['expires'] |
90 | 052c14aa | Leonidas Poulopoulos | if date:
|
91 | 052c14aa | Leonidas Poulopoulos | range_days = (date - datetime.date.today()).days |
92 | 052c14aa | Leonidas Poulopoulos | if range_days > 0 and range_days < 11: |
93 | 052c14aa | Leonidas Poulopoulos | return self.cleaned_data["expires"] |
94 | 052c14aa | Leonidas Poulopoulos | else:
|
95 | 052c14aa | Leonidas Poulopoulos | raise forms.ValidationError('Invalid date range') |
96 | 9cad4715 | Leonidas Poulopoulos | |
97 | 9cad4715 | Leonidas Poulopoulos | def clean(self): |
98 | 6d153302 | Leonidas Poulopoulos | name = self.cleaned_data.get('name', None) |
99 | 9cad4715 | Leonidas Poulopoulos | source = self.cleaned_data.get('source', None) |
100 | 9cad4715 | Leonidas Poulopoulos | sourceports = self.cleaned_data.get('sourceport', None) |
101 | 9cad4715 | Leonidas Poulopoulos | ports = self.cleaned_data.get('port', None) |
102 | 6d153302 | Leonidas Poulopoulos | then = self.cleaned_data.get('then', None) |
103 | 9cad4715 | Leonidas Poulopoulos | destination = self.cleaned_data.get('destination', None) |
104 | 9cad4715 | Leonidas Poulopoulos | destinationports = self.cleaned_data.get('destinationport', None) |
105 | 97e42c7d | Leonidas Poulopoulos | user = self.cleaned_data.get('applier', None) |
106 | 6d153302 | Leonidas Poulopoulos | peer = user.get_profile().peer |
107 | 6d153302 | Leonidas Poulopoulos | networks = peer.networks.all() |
108 | 97e42c7d | Leonidas Poulopoulos | mynetwork = False
|
109 | 6d153302 | Leonidas Poulopoulos | route_pk_list = [] |
110 | 97e42c7d | Leonidas Poulopoulos | if destination:
|
111 | 97e42c7d | Leonidas Poulopoulos | for network in networks: |
112 | 97e42c7d | Leonidas Poulopoulos | net = IPNetwork(network.network) |
113 | 97e42c7d | Leonidas Poulopoulos | if IPNetwork(destination) in net: |
114 | 97e42c7d | Leonidas Poulopoulos | mynetwork = True
|
115 | 97e42c7d | Leonidas Poulopoulos | if not mynetwork: |
116 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks') |
117 | 9cad4715 | Leonidas Poulopoulos | if (sourceports and ports): |
118 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports') |
119 | 9cad4715 | Leonidas Poulopoulos | if (destinationports and ports): |
120 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports') |
121 | 9cad4715 | Leonidas Poulopoulos | if sourceports and not source: |
122 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address') |
123 | 9cad4715 | Leonidas Poulopoulos | if destinationports and not destination: |
124 | 9cad4715 | Leonidas Poulopoulos | raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address') |
125 | 9cad4715 | Leonidas Poulopoulos | if not (source or sourceports or ports or destination or destinationports): |
126 | a5d4c70f | Leonidas Poulopoulos | raise forms.ValidationError('Fill at least a Rule Match Condition') |
127 | 6d48c46c | Leonidas Poulopoulos | if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS: |
128 | 6d48c46c | Leonidas Poulopoulos | raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action)) |
129 | 6d153302 | Leonidas Poulopoulos | existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='PENDING').exclude(status='ERROR').exclude(status='ADMININACTIVE') |
130 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(applier__userprofile__peer=peer) |
131 | 6d153302 | Leonidas Poulopoulos | if source:
|
132 | 6d153302 | Leonidas Poulopoulos | source = IPNetwork(source).compressed |
133 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(source=source) |
134 | 6d153302 | Leonidas Poulopoulos | else:
|
135 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(source=None)
|
136 | 6d153302 | Leonidas Poulopoulos | if sourceports:
|
137 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(sourceports, existing_routes) |
138 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
139 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
140 | 6d153302 | Leonidas Poulopoulos | else:
|
141 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(sourceport=None)
|
142 | 6d153302 | Leonidas Poulopoulos | if destinationports:
|
143 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(destinationports, existing_routes) |
144 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
145 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
146 | 6d153302 | Leonidas Poulopoulos | else:
|
147 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(destinationport=None)
|
148 | 6d153302 | Leonidas Poulopoulos | if ports:
|
149 | 6d153302 | Leonidas Poulopoulos | route_pk_list=get_matchingport_route_pks(ports, existing_routes) |
150 | 6d153302 | Leonidas Poulopoulos | if route_pk_list:
|
151 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(pk__in=route_pk_list) |
152 | 6d153302 | Leonidas Poulopoulos | else:
|
153 | 6d153302 | Leonidas Poulopoulos | existing_routes = existing_routes.filter(port=None)
|
154 | 6d153302 | Leonidas Poulopoulos | |
155 | 6d153302 | Leonidas Poulopoulos | for route in existing_routes: |
156 | 6d153302 | Leonidas Poulopoulos | if name != route.name:
|
157 | 6d153302 | Leonidas Poulopoulos | existing_url = reverse('edit-route', args=[route.name])
|
158 | 7a0ac0d1 | Leonidas Poulopoulos | if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination): |
159 | 7a0ac0d1 | Leonidas Poulopoulos | raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name)) |
160 | 97e42c7d | Leonidas Poulopoulos | return self.cleaned_data |
161 | 97e42c7d | Leonidas Poulopoulos | |
162 | 97e42c7d | Leonidas Poulopoulos | class ThenPlainForm(forms.ModelForm): |
163 | 97e42c7d | Leonidas Poulopoulos | # action = forms.CharField(initial='rate-limit')
|
164 | 97e42c7d | Leonidas Poulopoulos | class Meta: |
165 | 97e42c7d | Leonidas Poulopoulos | model = ThenAction |
166 | 97e42c7d | Leonidas Poulopoulos | |
167 | 97e42c7d | Leonidas Poulopoulos | def clean_action_value(self): |
168 | 97e42c7d | Leonidas Poulopoulos | action_value = self.cleaned_data['action_value'] |
169 | 97e42c7d | Leonidas Poulopoulos | if action_value:
|
170 | 97e42c7d | Leonidas Poulopoulos | try:
|
171 | 97e42c7d | Leonidas Poulopoulos | assert(int(action_value)) |
172 | f12b3d54 | Leonidas Poulopoulos | if int(action_value) < 50: |
173 | f12b3d54 | Leonidas Poulopoulos | raise forms.ValidationError('Rate-limiting cannot be < 50kbps') |
174 | 97e42c7d | Leonidas Poulopoulos | return "%s" %self.cleaned_data["action_value"] |
175 | 97e42c7d | Leonidas Poulopoulos | except:
|
176 | f12b3d54 | Leonidas Poulopoulos | raise forms.ValidationError('Rate-limiting should be an integer < 50') |
177 | 97e42c7d | Leonidas Poulopoulos | else:
|
178 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot be empty') |
179 | 97e42c7d | Leonidas Poulopoulos | |
180 | 97e42c7d | Leonidas Poulopoulos | def clean_action(self): |
181 | 97e42c7d | Leonidas Poulopoulos | action = self.cleaned_data['action'] |
182 | 97e42c7d | Leonidas Poulopoulos | if action != 'rate-limit': |
183 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot select something other than rate-limit') |
184 | 97e42c7d | Leonidas Poulopoulos | else:
|
185 | 97e42c7d | Leonidas Poulopoulos | return self.cleaned_data["action"] |
186 | f12b3d54 | Leonidas Poulopoulos | |
187 | 97e42c7d | Leonidas Poulopoulos | |
188 | 97e42c7d | Leonidas Poulopoulos | class PortPlainForm(forms.ModelForm): |
189 | 97e42c7d | Leonidas Poulopoulos | # action = forms.CharField(initial='rate-limit')
|
190 | 97e42c7d | Leonidas Poulopoulos | class Meta: |
191 | 97e42c7d | Leonidas Poulopoulos | model = MatchPort |
192 | 97e42c7d | Leonidas Poulopoulos | |
193 | 97e42c7d | Leonidas Poulopoulos | def clean_port(self): |
194 | 97e42c7d | Leonidas Poulopoulos | port = self.cleaned_data['port'] |
195 | 97e42c7d | Leonidas Poulopoulos | if port:
|
196 | 97e42c7d | Leonidas Poulopoulos | try:
|
197 | 97e42c7d | Leonidas Poulopoulos | assert(int(port)) |
198 | 97e42c7d | Leonidas Poulopoulos | return "%s" %self.cleaned_data["port"] |
199 | 97e42c7d | Leonidas Poulopoulos | except:
|
200 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Port should be an integer') |
201 | 97e42c7d | Leonidas Poulopoulos | else:
|
202 | 97e42c7d | Leonidas Poulopoulos | raise forms.ValidationError('Cannot be empty') |
203 | 6d153302 | Leonidas Poulopoulos | |
204 | 6d153302 | Leonidas Poulopoulos | def value_list_to_list(valuelist): |
205 | 6d153302 | Leonidas Poulopoulos | vl = [] |
206 | 6d153302 | Leonidas Poulopoulos | for val in valuelist: |
207 | 6d153302 | Leonidas Poulopoulos | vl.append(val[0])
|
208 | 6d153302 | Leonidas Poulopoulos | return vl
|
209 | 6d153302 | Leonidas Poulopoulos | |
210 | 6d153302 | Leonidas Poulopoulos | def get_matchingport_route_pks(portlist, routes): |
211 | 6d153302 | Leonidas Poulopoulos | route_pk_list = [] |
212 | 6d153302 | Leonidas Poulopoulos | ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port')) |
213 | 6d153302 | Leonidas Poulopoulos | for route in routes: |
214 | 6d153302 | Leonidas Poulopoulos | rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port')) |
215 | 6d153302 | Leonidas Poulopoulos | if rsp and rsp == ports_value_list: |
216 | 6d153302 | Leonidas Poulopoulos | route_pk_list.append(route.pk) |
217 | 6d153302 | Leonidas Poulopoulos | return route_pk_list |