Statistics
| Branch: | Tag: | Revision:

root / flowspec / forms.py @ f1bd3b97

History | View | Annotate | Download (13.7 kB)

1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from ipaddr import *
8
from django.core.urlresolvers import reverse
9
from django.contrib.auth.models import User
10
from django.conf import settings
11
import datetime
12
from django.core.mail import mail_admins, mail_managers, send_mail
13
from django.forms.models import modelformset_factory
14

    
15

    
16
class RouteForm(forms.ModelForm):
17
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
18
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
19
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
20
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
21
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
22
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
23
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
24
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
25
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
26

    
27
    class Meta:
28
        model = Route
29
    
30
    def clean_source(self):
31
        user = User.objects.get(pk=self.data['applier'])
32
        peer = user.get_profile().peer
33
        data = self.cleaned_data['source']
34
        private_error = False
35
        protected_error = False
36
        if data:
37
            try:
38
                address = IPNetwork(data)
39
                for net in settings.PROTECTED_SUBNETS:
40
                    if address in IPNetwork(net):
41
                        protected_error = True
42
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
43
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
44
                              mail_body, settings.SERVER_EMAIL,
45
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
46
                        raise Exception
47
                if address.is_private:
48
                    private_error = True
49
                    raise Exception
50
                else:
51
                    return self.cleaned_data["source"]
52
            except Exception:
53
                error_text = 'Invalid network address format'
54
                if private_error:
55
                    error_text = 'Private addresses not allowed'
56
                if protected_error:
57
                    error_text = 'You have no authority on this subnet'
58
                raise forms.ValidationError(error_text)
59

    
60
    def clean_destination(self):
61
        user = User.objects.get(pk=self.data['applier'])
62
        peer = user.get_profile().peer
63
        data = self.cleaned_data['destination']
64
        error = None
65
        protected_error = False
66
        if data:
67
            try:
68
                address = IPNetwork(data)
69
                for net in settings.PROTECTED_SUBNETS:
70
                    if address in IPNetwork(net):
71
                        protected_error = True
72
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
73
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
74
                              mail_body, settings.SERVER_EMAIL,
75
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
76
                        raise Exception
77
                if address.prefixlen < settings.PREFIX_LENGTH:
78
                    error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
79
                    raise Exception
80
                return self.cleaned_data["destination"]
81
            except Exception:
82
                error_text = 'Invalid network address format'
83
                if error:
84
                    error_text = error
85
                if protected_error:
86
                    error_text = 'You have no authority on this subnet'
87
                raise forms.ValidationError(error_text)
88

    
89
    def clean_expires(self):
90
        date = self.cleaned_data['expires']
91
        if date:
92
            range_days = (date - datetime.date.today()).days
93
            if range_days > 0 and range_days < 11:
94
                return self.cleaned_data["expires"]
95
            else:
96
                raise forms.ValidationError('Invalid date range')
97

    
98
    def clean(self):
99
        if self.errors:
100
             raise forms.ValidationError('Errors in form. Please review and fix them')
101
        name = self.cleaned_data.get('name', None)
102
        source = self.cleaned_data.get('source', None)
103
        sourceports = self.cleaned_data.get('sourceport', None)
104
        ports = self.cleaned_data.get('port', None)
105
        then = self.cleaned_data.get('then', None)
106
        destination = self.cleaned_data.get('destination', None)
107
        destinationports = self.cleaned_data.get('destinationport', None)
108
        protocols = self.cleaned_data.get('protocol', None)
109
        user = self.cleaned_data.get('applier', None)
110
        peer = user.get_profile().peer
111
        networks = peer.networks.all()
112
        mynetwork = False
113
        route_pk_list = []
114
        if destination:
115
            for network in networks:
116
                net = IPNetwork(network.network)
117
                if IPNetwork(destination) in net:
118
                    mynetwork = True
119
            if not mynetwork:
120
                 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
121
        if (sourceports and ports):
122
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
123
        if (destinationports and ports):
124
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
125
        if sourceports and not source:
126
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
127
        if destinationports and not destination:
128
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
129
        if not (source or sourceports or ports or destination or destinationports):
130
            raise forms.ValidationError('Fill at least a Rule Match Condition')
131
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
132
            raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
133
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
134
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
135
        if source:
136
            source = IPNetwork(source).compressed
137
            existing_routes = existing_routes.filter(source=source)
138
        else:
139
            existing_routes = existing_routes.filter(source=None)
140
        if protocols:
141
            route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
142
            if route_pk_list:
143
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
144
            else:
145
                existing_routes = existing_routes.filter(protocol=None)
146
        else:
147
            existing_routes = existing_routes.filter(protocol=None)
148
        if sourceports:
149
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
150
            if route_pk_list:
151
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
152
        else:
153
            existing_routes = existing_routes.filter(sourceport=None)
154
        if destinationports:
155
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
156
            if route_pk_list:
157
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
158
        else:
159
            existing_routes = existing_routes.filter(destinationport=None)
160
        if ports:
161
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
162
            if route_pk_list:
163
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
164
        else:
165
            existing_routes = existing_routes.filter(port=None)
166
        for route in existing_routes:
167
            if name != route.name:
168
                existing_url = reverse('edit-route', args=[route.name])
169
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
170
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
171
        return self.cleaned_data
172

    
173
class ThenPlainForm(forms.ModelForm):
174
#    action = forms.CharField(initial='rate-limit')
175
    class Meta:
176
        model = ThenAction
177

    
178
    def clean_action_value(self):
179
        action_value = self.cleaned_data['action_value']
180
        if action_value:
181
            try:
182
                assert(int(action_value))
183
                if int(action_value) < 50:
184
                    raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
185
                return "%s" %self.cleaned_data["action_value"]
186
            except:
187
                raise forms.ValidationError('Rate-limiting should be an integer < 50')
188
        else:
189
            raise forms.ValidationError('Cannot be empty')
190

    
191
    def clean_action(self):
192
        action = self.cleaned_data['action']
193
        if action != 'rate-limit':
194
            raise forms.ValidationError('Cannot select something other than rate-limit')
195
        else:
196
            return self.cleaned_data["action"]
197

    
198

    
199
class PortPlainForm(forms.ModelForm):
200
#    action = forms.CharField(initial='rate-limit')
201
    class Meta:
202
        model = MatchPort
203

    
204
    def clean_port(self):
205
        port = self.cleaned_data['port']
206
        if port:
207
            try:
208
                assert(int(port))
209
                return "%s" %self.cleaned_data["port"]
210
            except:
211
                raise forms.ValidationError('Port should be an integer')
212
        else:
213
            raise forms.ValidationError('Cannot be empty')
214

    
215
class MyRuleFormSet(forms.models.BaseModelFormSet):
216
    def clean(self):
217
        if any(self.errors):
218
            return
219
        super(MyRuleFormSet, self).clean()
220
        rules = []
221
        sources = []
222
        destinations = []
223
        protocols = []
224
        sourceports = []
225
        destinationports = []
226
        ports = []
227
        errors = []
228
        same_source_nets = False
229
        same_dest_nets = False
230
        for i in range(0, self.total_form_count()):
231
            rule = {}
232
            form = self.forms[i]
233
            source = IPNetwork(form.cleaned_data['source'])
234
            destination = IPNetwork(form.cleaned_data['destination'])
235
            protocol = form.cleaned_data['protocol']
236
            sourceport = form.cleaned_data['sourceport']
237
            destinationport = form.cleaned_data['destinationport']
238
            port = form.cleaned_data['port']
239
            rule = {
240
                  'source' : source,
241
                  'destination' : destination,
242
                  'protocol' : protocol,
243
                  'sourceport' : sourceport,
244
                  'destinationport' : destinationport,
245
                  'port' : port,
246
                  }
247
            if sources:
248
                for s in sources:
249
                    if (s in source) or (source in s):
250
                        same_source_nets = True
251
            else:
252
                sources.append(source)
253
            if destinations:
254
                for d in destinations:
255
                    if (d in destination) or (destination in d):
256
                        same_dest_nets = True
257
            else:
258
                destinations.append(destination)
259
            if same_source_nets:
260
                raise forms.ValidationError("Detected same sources")
261
            rules.append(rule)
262
        raise forms.ValidationError("%s" %rules)
263

    
264
RuleFormSet = modelformset_factory(Route, form=RouteForm, formset=MyRuleFormSet)
265

    
266
def value_list_to_list(valuelist):
267
    vl = []
268
    for val in valuelist:
269
        vl.append(val[0])
270
    return vl
271

    
272
def get_matchingport_route_pks(portlist, routes):
273
    route_pk_list = []
274
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
275
    for route in routes:
276
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
277
        if rsp and rsp == ports_value_list:
278
            route_pk_list.append(route.pk)
279
    return route_pk_list
280

    
281
def get_matchingprotocol_route_pks(protocolist, routes):
282
    route_pk_list = []
283
    protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
284
    for route in routes:
285
        rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
286
        if rsp and rsp == protocols_value_list:
287
            route_pk_list.append(route.pk)
288
    return route_pk_list