Statistics
| Branch: | Tag: | Revision:

root / flowspec / forms.py @ f289f4c8

History | View | Annotate | Download (11.9 kB)

1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from ipaddr import *
8
from django.core.urlresolvers import reverse
9
from django.contrib.auth.models import User
10
from django.conf import settings
11
import datetime
12
from django.core.mail import mail_admins, mail_managers, send_mail
13

    
14

    
15
class RouteForm(forms.ModelForm):
16
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25

    
26
    class Meta:
27
        model = Route
28
    
29
    def clean_source(self):
30
        user = User.objects.get(pk=self.data['applier'])
31
        peer = user.get_profile().peer
32
        data = self.cleaned_data['source']
33
        private_error = False
34
        protected_error = False
35
        if data:
36
            try:
37
                address = IPNetwork(data)
38
                for net in settings.PROTECTED_SUBNETS:
39
                    if address in IPNetwork(net):
40
                        protected_error = True
41
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
43
                              mail_body, settings.SERVER_EMAIL,
44
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45
                        raise Exception
46
                if address.is_private:
47
                    private_error = True
48
                    raise Exception
49
                else:
50
                    return self.cleaned_data["source"]
51
            except Exception:
52
                error_text = 'Invalid network address format'
53
                if private_error:
54
                    error_text = 'Private addresses not allowed'
55
                if protected_error:
56
                    error_text = 'You have no authority on this subnet'
57
                raise forms.ValidationError(error_text)
58

    
59
    def clean_destination(self):
60
        user = User.objects.get(pk=self.data['applier'])
61
        peer = user.get_profile().peer
62
        data = self.cleaned_data['destination']
63
        error = None
64
        protected_error = False
65
        if data:
66
            try:
67
                address = IPNetwork(data)
68
                for net in settings.PROTECTED_SUBNETS:
69
                    if address in IPNetwork(net):
70
                        protected_error = True
71
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
73
                              mail_body, settings.SERVER_EMAIL,
74
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75
                        raise Exception
76
                if address.prefixlen < settings.PREFIX_LENGTH:
77
                    error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
78
                    raise Exception
79
                return self.cleaned_data["destination"]
80
            except Exception:
81
                error_text = 'Invalid network address format'
82
                if error:
83
                    error_text = error
84
                if protected_error:
85
                    error_text = 'You have no authority on this subnet'
86
                raise forms.ValidationError(error_text)
87
    
88
    def clean_expires(self):
89
        date = self.cleaned_data['expires']
90
        if date:
91
            range_days = (date - datetime.date.today()).days
92
            if range_days > 0 and range_days < 11:
93
                return self.cleaned_data["expires"]
94
            else:
95
                raise forms.ValidationError('Invalid date range')
96

    
97
    def clean(self):
98
        if self.errors:
99
             raise forms.ValidationError('Errors in form. Please review and fix them')
100
        name = self.cleaned_data.get('name', None)
101
        source = self.cleaned_data.get('source', None)
102
        sourceports = self.cleaned_data.get('sourceport', None)
103
        ports = self.cleaned_data.get('port', None)
104
        then = self.cleaned_data.get('then', None)
105
        destination = self.cleaned_data.get('destination', None)
106
        destinationports = self.cleaned_data.get('destinationport', None)
107
        protocols = self.cleaned_data.get('protocol', None)
108
        user = self.cleaned_data.get('applier', None)
109
        peer = user.get_profile().peer
110
        networks = peer.networks.all()
111
        mynetwork = False
112
        route_pk_list = []
113
        if destination:
114
            for network in networks:
115
                net = IPNetwork(network.network)
116
                if IPNetwork(destination) in net:
117
                    mynetwork = True
118
            if not mynetwork:
119
                 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
120
        if (sourceports and ports):
121
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
122
        if (destinationports and ports):
123
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
124
        if sourceports and not source:
125
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
126
        if destinationports and not destination:
127
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
128
        if not (source or sourceports or ports or destination or destinationports):
129
            raise forms.ValidationError('Fill at least a Rule Match Condition')
130
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
131
            raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
132
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
133
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
134
        if source:
135
            source = IPNetwork(source).compressed
136
            existing_routes = existing_routes.filter(source=source)
137
        else:
138
            existing_routes = existing_routes.filter(source=None)
139
        if protocols:
140
            route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
141
            if route_pk_list:
142
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
143
            else:
144
                existing_routes = existing_routes.filter(protocol=None)
145
        else:
146
            existing_routes = existing_routes.filter(protocol=None)
147
        if sourceports:
148
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
149
            if route_pk_list:
150
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
151
        else:
152
            existing_routes = existing_routes.filter(sourceport=None)
153
        if destinationports:
154
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
155
            if route_pk_list:
156
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
157
        else:
158
            existing_routes = existing_routes.filter(destinationport=None)
159
        if ports:
160
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
161
            if route_pk_list:
162
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
163
        else:
164
            existing_routes = existing_routes.filter(port=None)
165
        for route in existing_routes:
166
            if name != route.name:
167
                existing_url = reverse('edit-route', args=[route.name])
168
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
169
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
170
        return self.cleaned_data
171

    
172
class ThenPlainForm(forms.ModelForm):
173
#    action = forms.CharField(initial='rate-limit')
174
    class Meta:
175
        model = ThenAction
176
    
177
    def clean_action_value(self):
178
        action_value = self.cleaned_data['action_value']
179
        if action_value:
180
            try:
181
                assert(int(action_value))
182
                if int(action_value) < 50:
183
                    raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
184
                return "%s" %self.cleaned_data["action_value"]
185
            except:
186
                raise forms.ValidationError('Rate-limiting should be an integer < 50')
187
        else:
188
            raise forms.ValidationError('Cannot be empty')
189

    
190
    def clean_action(self):
191
        action = self.cleaned_data['action']
192
        if action != 'rate-limit':
193
            raise forms.ValidationError('Cannot select something other than rate-limit')
194
        else:
195
            return self.cleaned_data["action"]
196
    
197

    
198
class PortPlainForm(forms.ModelForm):
199
#    action = forms.CharField(initial='rate-limit')
200
    class Meta:
201
        model = MatchPort
202
    
203
    def clean_port(self):
204
        port = self.cleaned_data['port']
205
        if port:
206
            try:
207
                assert(int(port))
208
                return "%s" %self.cleaned_data["port"]
209
            except:
210
                raise forms.ValidationError('Port should be an integer')
211
        else:
212
            raise forms.ValidationError('Cannot be empty')
213

    
214
def value_list_to_list(valuelist):
215
    vl = []
216
    for val in valuelist:
217
        vl.append(val[0])
218
    return vl
219

    
220
def get_matchingport_route_pks(portlist, routes):
221
    route_pk_list = []
222
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
223
    for route in routes:
224
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
225
        if rsp and rsp == ports_value_list:
226
            route_pk_list.append(route.pk)
227
    return route_pk_list
228

    
229
def get_matchingprotocol_route_pks(protocolist, routes):
230
    route_pk_list = []
231
    protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
232
    for route in routes:
233
        rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
234
        if rsp and rsp == protocols_value_list:
235
            route_pk_list.append(route.pk)
236
    return route_pk_list