Statistics
| Branch: | Tag: | Revision:

root / flowspec / forms.py @ 88a6afb7

History | View | Annotate | Download (12.4 kB)

1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from flowspy.peers.models import *
8
from ipaddr import *
9
from django.core.urlresolvers import reverse
10
from django.contrib.auth.models import User
11
from django.conf import settings
12
import datetime
13
from django.core.mail import mail_admins, mail_managers, send_mail
14

    
15

    
16
class RouteForm(forms.ModelForm):
17
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
18
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
19
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
20
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
21
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
22
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
23
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
24
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
25
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
26

    
27
    class Meta:
28
        model = Route
29

    
30
    def clean_applier(self):
31
        applier = self.cleaned_data['applier']
32
        if applier:
33
            return self.cleaned_data["applier"]
34
        else:
35
            raise forms.ValidationError('This field is required.')
36

    
37
    def clean_source(self):
38
        user = User.objects.get(pk=self.data['applier'])
39
        peer = user.get_profile().peer
40
        data = self.cleaned_data['source']
41
        private_error = False
42
        protected_error = False
43
        if data:
44
            try:
45
                address = IPNetwork(data)
46
                for net in settings.PROTECTED_SUBNETS:
47
                    if address in IPNetwork(net):
48
                        protected_error = True
49
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
50
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
51
                              mail_body, settings.SERVER_EMAIL,
52
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
53
                        raise Exception
54
                if address.is_private:
55
                    private_error = True
56
                    raise Exception
57
                else:
58
                    return self.cleaned_data["source"]
59
            except Exception:
60
                error_text = 'Invalid network address format'
61
                if private_error:
62
                    error_text = 'Private addresses not allowed'
63
                if protected_error:
64
                    error_text = 'You have no authority on this subnet'
65
                raise forms.ValidationError(error_text)
66

    
67
    def clean_destination(self):
68
        user = User.objects.get(pk=self.data['applier'])
69
        peer = user.get_profile().peer
70
        data = self.cleaned_data['destination']
71
        error = None
72
        protected_error = False
73
        if data:
74
            try:
75
                address = IPNetwork(data)
76
                for net in settings.PROTECTED_SUBNETS:
77
                    if address in IPNetwork(net):
78
                        protected_error = True
79
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
80
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
81
                              mail_body, settings.SERVER_EMAIL,
82
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
83
                        raise Exception
84
                if address.prefixlen < settings.PREFIX_LENGTH:
85
                    error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
86
                    raise Exception
87
                return self.cleaned_data["destination"]
88
            except Exception:
89
                error_text = 'Invalid network address format'
90
                if error:
91
                    error_text = error
92
                if protected_error:
93
                    error_text = 'You have no authority on this subnet'
94
                raise forms.ValidationError(error_text)
95
    
96
    def clean_expires(self):
97
        date = self.cleaned_data['expires']
98
        if date:
99
            range_days = (date - datetime.date.today()).days
100
            if range_days > 0 and range_days < 11:
101
                return self.cleaned_data["expires"]
102
            else:
103
                raise forms.ValidationError('Invalid date range')
104

    
105
    def clean(self):
106
        if self.errors:
107
             raise forms.ValidationError('Errors in form. Please review and fix them')
108
        name = self.cleaned_data.get('name', None)
109
        source = self.cleaned_data.get('source', None)
110
        sourceports = self.cleaned_data.get('sourceport', None)
111
        ports = self.cleaned_data.get('port', None)
112
        then = self.cleaned_data.get('then', None)
113
        destination = self.cleaned_data.get('destination', None)
114
        destinationports = self.cleaned_data.get('destinationport', None)
115
        protocols = self.cleaned_data.get('protocol', None)
116
        user = self.cleaned_data.get('applier', None)
117
        try:
118
            issuperuser = self.data['issuperuser']
119
            su = User.objects.get(username=issuperuser)
120
        except:
121
            issuperuser = None
122
        peer = user.get_profile().peer
123
        networks = peer.networks.all()
124
        if issuperuser:
125
            networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
126
        mynetwork = False
127
        route_pk_list = []
128
        if destination:
129
            for network in networks:
130
                net = IPNetwork(network.network)
131
                if IPNetwork(destination) in net:
132
                    mynetwork = True
133
            if not mynetwork:
134
                raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
135
        if (sourceports and ports):
136
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
137
        if (destinationports and ports):
138
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
139
        if sourceports and not source:
140
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
141
        if destinationports and not destination:
142
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
143
        if not (source or sourceports or ports or destination or destinationports):
144
            raise forms.ValidationError('Fill at least a Rule Match Condition')
145
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
146
            raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
147
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
148
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
149
        if source:
150
            source = IPNetwork(source).compressed
151
            existing_routes = existing_routes.filter(source=source)
152
        else:
153
            existing_routes = existing_routes.filter(source=None)
154
        if protocols:
155
            route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
156
            if route_pk_list:
157
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
158
            else:
159
                existing_routes = existing_routes.filter(protocol=None)
160
        else:
161
            existing_routes = existing_routes.filter(protocol=None)
162
        if sourceports:
163
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
164
            if route_pk_list:
165
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
166
        else:
167
            existing_routes = existing_routes.filter(sourceport=None)
168
        if destinationports:
169
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
170
            if route_pk_list:
171
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
172
        else:
173
            existing_routes = existing_routes.filter(destinationport=None)
174
        if ports:
175
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
176
            if route_pk_list:
177
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
178
        else:
179
            existing_routes = existing_routes.filter(port=None)
180
        for route in existing_routes:
181
            if name != route.name:
182
                existing_url = reverse('edit-route', args=[route.name])
183
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
184
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
185
        return self.cleaned_data
186

    
187
class ThenPlainForm(forms.ModelForm):
188
#    action = forms.CharField(initial='rate-limit')
189
    class Meta:
190
        model = ThenAction
191
    
192
    def clean_action_value(self):
193
        action_value = self.cleaned_data['action_value']
194
        if action_value:
195
            try:
196
                assert(int(action_value))
197
                if int(action_value) < 50:
198
                    raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
199
                return "%s" %self.cleaned_data["action_value"]
200
            except:
201
                raise forms.ValidationError('Rate-limiting should be an integer < 50')
202
        else:
203
            raise forms.ValidationError('Cannot be empty')
204

    
205
    def clean_action(self):
206
        action = self.cleaned_data['action']
207
        if action != 'rate-limit':
208
            raise forms.ValidationError('Cannot select something other than rate-limit')
209
        else:
210
            return self.cleaned_data["action"]
211
    
212

    
213
class PortPlainForm(forms.ModelForm):
214
#    action = forms.CharField(initial='rate-limit')
215
    class Meta:
216
        model = MatchPort
217
    
218
    def clean_port(self):
219
        port = self.cleaned_data['port']
220
        if port:
221
            try:
222
                assert(int(port))
223
                return "%s" %self.cleaned_data["port"]
224
            except:
225
                raise forms.ValidationError('Port should be an integer')
226
        else:
227
            raise forms.ValidationError('Cannot be empty')
228

    
229
def value_list_to_list(valuelist):
230
    vl = []
231
    for val in valuelist:
232
        vl.append(val[0])
233
    return vl
234

    
235
def get_matchingport_route_pks(portlist, routes):
236
    route_pk_list = []
237
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
238
    for route in routes:
239
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
240
        if rsp and rsp == ports_value_list:
241
            route_pk_list.append(route.pk)
242
    return route_pk_list
243

    
244
def get_matchingprotocol_route_pks(protocolist, routes):
245
    route_pk_list = []
246
    protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
247
    for route in routes:
248
        rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
249
        if rsp and rsp == protocols_value_list:
250
            route_pk_list.append(route.pk)
251
    return route_pk_list