Statistics
| Branch: | Tag: | Revision:

root / flowspec / forms.py @ 0667633c

History | View | Annotate | Download (11.1 kB)

1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from ipaddr import *
8
from django.core.urlresolvers import reverse
9
from django.contrib.auth.models import User
10
from django.conf import settings
11
import datetime
12
from django.core.mail import mail_admins, mail_managers, send_mail
13

    
14

    
15
class RouteForm(forms.ModelForm):
16
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25

    
26
    class Meta:
27
        model = Route
28
    
29
    def clean_source(self):
30
        user = User.objects.get(pk=self.data['applier'])
31
        peer = user.get_profile().peer
32
        data = self.cleaned_data['source']
33
        private_error = False
34
        protected_error = False
35
        if data:
36
            try:
37
                address = IPNetwork(data)
38
                for net in settings.PROTECTED_SUBNETS:
39
                    if address in IPNetwork(net):
40
                        protected_error = True
41
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
43
                              mail_body, settings.SERVER_EMAIL,
44
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45
                        raise forms.ValidationError("Not allowed")
46
                if address.is_private:
47
                    private_error = True
48
                    raise forms.ValidationError('Private addresses not allowed')
49
                else:
50
                    return self.cleaned_data["source"]
51
            except Exception:
52
                error_text = 'Invalid network address format'
53
                if private_error:
54
                    error_text = 'Private addresses not allowed'
55
                if protected_error:
56
                    error_text = 'You have no authority on this subnet'
57
                raise forms.ValidationError(error_text)
58

    
59
    def clean_destination(self):
60
        user = User.objects.get(pk=self.data['applier'])
61
        peer = user.get_profile().peer        
62
        data = self.cleaned_data['destination']
63
        error = None
64
        protected_error = False
65
        if data:
66
            try:
67
                address = IPNetwork(data)
68
                for net in settings.PROTECTED_SUBNETS:
69
                    if address in IPNetwork(net):
70
                        protected_error = True
71
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
73
                              mail_body, settings.SERVER_EMAIL,
74
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75
                        raise forms.ValidationError("Not allowed")
76
                if address.prefixlen < settings.PREFIX_LENGTH:
77
                    error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
78
                    raise forms.ValidationError('error')
79
                return self.cleaned_data["destination"]
80
            except Exception:
81
                error_text = 'Invalid network address format'
82
                if error:
83
                    error_text = error
84
                if protected_error:
85
                    error_text = 'You have no authority on this subnet'
86
                raise forms.ValidationError(error_text)
87
    
88
    def clean_expires(self):
89
        date = self.cleaned_data['expires']
90
        if date:
91
            range_days = (date - datetime.date.today()).days
92
            if range_days > 0 and range_days < 11:
93
                return self.cleaned_data["expires"]
94
            else:
95
                raise forms.ValidationError('Invalid date range')
96

    
97
    def clean(self):
98
        name = self.cleaned_data.get('name', None)
99
        source = self.cleaned_data.get('source', None)
100
        sourceports = self.cleaned_data.get('sourceport', None)
101
        ports = self.cleaned_data.get('port', None)
102
        then = self.cleaned_data.get('then', None)
103
        destination = self.cleaned_data.get('destination', None)
104
        destinationports = self.cleaned_data.get('destinationport', None)
105
        user = self.cleaned_data.get('applier', None)
106
        peer = user.get_profile().peer
107
        networks = peer.networks.all()
108
        mynetwork = False
109
        route_pk_list = []
110
        if destination:
111
            for network in networks:
112
                net = IPNetwork(network.network)
113
                if IPNetwork(destination) in net:
114
                    mynetwork = True
115
            if not mynetwork:
116
                 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
117
        if (sourceports and ports):
118
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
119
        if (destinationports and ports):
120
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
121
        if sourceports and not source:
122
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
123
        if destinationports and not destination:
124
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
125
        if not (source or sourceports or ports or destination or destinationports):
126
            raise forms.ValidationError('Fill at least a Route Match Condition')
127
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
128
            raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
129
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='PENDING').exclude(status='ERROR').exclude(status='ADMININACTIVE')
130
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
131
        if source:
132
            source = IPNetwork(source).compressed
133
            existing_routes = existing_routes.filter(source=source)
134
        else:
135
            existing_routes = existing_routes.filter(source=None)
136
        if sourceports:
137
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
138
            if route_pk_list:
139
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
140
        else:
141
            existing_routes = existing_routes.filter(sourceport=None)
142
        if destinationports:
143
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
144
            if route_pk_list:
145
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
146
        else:
147
            existing_routes = existing_routes.filter(destinationport=None)
148
        if ports:
149
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
150
            if route_pk_list:
151
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
152
        else:
153
            existing_routes = existing_routes.filter(port=None)
154
        
155
        for route in existing_routes:
156
            if name != route.name:
157
                existing_url = reverse('edit-route', args=[route.name])
158
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
159
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
160
        return self.cleaned_data
161

    
162
class ThenPlainForm(forms.ModelForm):
163
#    action = forms.CharField(initial='rate-limit')
164
    class Meta:
165
        model = ThenAction
166
    
167
    def clean_action_value(self):
168
        action_value = self.cleaned_data['action_value']
169
        if action_value:
170
            try:
171
                assert(int(action_value))
172
                if int(action_value) < 50:
173
                    raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
174
                return "%s" %self.cleaned_data["action_value"]
175
            except:
176
                raise forms.ValidationError('Rate-limiting should be an integer < 50')
177
        else:
178
            raise forms.ValidationError('Cannot be empty')
179

    
180
    def clean_action(self):
181
        action = self.cleaned_data['action']
182
        if action != 'rate-limit':
183
            raise forms.ValidationError('Cannot select something other than rate-limit')
184
        else:
185
            return self.cleaned_data["action"]
186
    
187

    
188
class PortPlainForm(forms.ModelForm):
189
#    action = forms.CharField(initial='rate-limit')
190
    class Meta:
191
        model = MatchPort
192
    
193
    def clean_port(self):
194
        port = self.cleaned_data['port']
195
        if port:
196
            try:
197
                assert(int(port))
198
                return "%s" %self.cleaned_data["port"]
199
            except:
200
                raise forms.ValidationError('Port should be an integer')
201
        else:
202
            raise forms.ValidationError('Cannot be empty')
203

    
204
def value_list_to_list(valuelist):
205
    vl = []
206
    for val in valuelist:
207
        vl.append(val[0])
208
    return vl
209

    
210
def get_matchingport_route_pks(portlist, routes):
211
    route_pk_list = []
212
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
213
    for route in routes:
214
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
215
        if rsp and rsp == ports_value_list:
216
            route_pk_list.append(route.pk)
217
    return route_pk_list