Statistics
| Branch: | Tag: | Revision:

root / flowspec / forms.py @ dbdc30ec

History | View | Annotate | Download (11.9 kB)

1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from ipaddr import *
8
from django.core.urlresolvers import reverse
9
from django.contrib.auth.models import User
10
from django.conf import settings
11
import datetime
12
from django.core.mail import mail_admins, mail_managers, send_mail
13

    
14

    
15
class RouteForm(forms.ModelForm):
16
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25

    
26
    class Meta:
27
        model = Route
28
    
29
    def clean_source(self):
30
        user = User.objects.get(pk=self.data['applier'])
31
        peer = user.get_profile().peer
32
        data = self.cleaned_data['source']
33
        private_error = False
34
        protected_error = False
35
        if data:
36
            try:
37
                address = IPNetwork(data)
38
                for net in settings.PROTECTED_SUBNETS:
39
                    if address in IPNetwork(net):
40
                        protected_error = True
41
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
43
                              mail_body, settings.SERVER_EMAIL,
44
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45
                        raise forms.ValidationError("Not allowed")
46
                if address.is_private:
47
                    private_error = True
48
                    raise forms.ValidationError('Private addresses not allowed')
49
                else:
50
                    return self.cleaned_data["source"]
51
            except Exception:
52
                error_text = 'Invalid network address format'
53
                if private_error:
54
                    error_text = 'Private addresses not allowed'
55
                if protected_error:
56
                    error_text = 'You have no authority on this subnet'
57
                raise forms.ValidationError(error_text)
58

    
59
    def clean_destination(self):
60
        user = User.objects.get(pk=self.data['applier'])
61
        peer = user.get_profile().peer        
62
        data = self.cleaned_data['destination']
63
        error = None
64
        protected_error = False
65
        if data:
66
            try:
67
                address = IPNetwork(data)
68
                for net in settings.PROTECTED_SUBNETS:
69
                    if address in IPNetwork(net):
70
                        protected_error = True
71
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
73
                              mail_body, settings.SERVER_EMAIL,
74
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75
                        raise forms.ValidationError("Not allowed")
76
                if address.prefixlen < settings.PREFIX_LENGTH:
77
                    error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
78
                    raise forms.ValidationError('error')
79
                return self.cleaned_data["destination"]
80
            except Exception:
81
                error_text = 'Invalid network address format'
82
                if error:
83
                    error_text = error
84
                if protected_error:
85
                    error_text = 'You have no authority on this subnet'
86
                raise forms.ValidationError(error_text)
87
    
88
    def clean_expires(self):
89
        date = self.cleaned_data['expires']
90
        if date:
91
            range_days = (date - datetime.date.today()).days
92
            if range_days > 0 and range_days < 11:
93
                return self.cleaned_data["expires"]
94
            else:
95
                raise forms.ValidationError('Invalid date range')
96

    
97
    def clean(self):
98
        name = self.cleaned_data.get('name', None)
99
        source = self.cleaned_data.get('source', None)
100
        sourceports = self.cleaned_data.get('sourceport', None)
101
        ports = self.cleaned_data.get('port', None)
102
        then = self.cleaned_data.get('then', None)
103
        destination = self.cleaned_data.get('destination', None)
104
        destinationports = self.cleaned_data.get('destinationport', None)
105
        protocols = self.cleaned_data.get('protocol', None)
106
        user = self.cleaned_data.get('applier', None)
107
        peer = user.get_profile().peer
108
        networks = peer.networks.all()
109
        mynetwork = False
110
        route_pk_list = []
111
        if destination:
112
            for network in networks:
113
                net = IPNetwork(network.network)
114
                if IPNetwork(destination) in net:
115
                    mynetwork = True
116
            if not mynetwork:
117
                 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
118
        if (sourceports and ports):
119
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
120
        if (destinationports and ports):
121
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
122
        if sourceports and not source:
123
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
124
        if destinationports and not destination:
125
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
126
        if not (source or sourceports or ports or destination or destinationports):
127
            raise forms.ValidationError('Fill at least a Rule Match Condition')
128
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
129
            raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
130
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
131
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
132
        if source:
133
            source = IPNetwork(source).compressed
134
            existing_routes = existing_routes.filter(source=source)
135
        else:
136
            existing_routes = existing_routes.filter(source=None)
137
        if protocols:
138
            route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
139
            if route_pk_list:
140
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
141
            else:
142
                existing_routes = existing_routes.filter(protocol=None)
143
        else:
144
            existing_routes = existing_routes.filter(protocol=None)
145
        if sourceports:
146
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
147
            if route_pk_list:
148
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
149
        else:
150
            existing_routes = existing_routes.filter(sourceport=None)
151
        if destinationports:
152
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
153
            if route_pk_list:
154
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
155
        else:
156
            existing_routes = existing_routes.filter(destinationport=None)
157
        if ports:
158
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
159
            if route_pk_list:
160
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
161
        else:
162
            existing_routes = existing_routes.filter(port=None)
163
        for route in existing_routes:
164
            if name != route.name:
165
                existing_url = reverse('edit-route', args=[route.name])
166
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
167
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
168
        return self.cleaned_data
169

    
170
class ThenPlainForm(forms.ModelForm):
171
#    action = forms.CharField(initial='rate-limit')
172
    class Meta:
173
        model = ThenAction
174
    
175
    def clean_action_value(self):
176
        action_value = self.cleaned_data['action_value']
177
        if action_value:
178
            try:
179
                assert(int(action_value))
180
                if int(action_value) < 50:
181
                    raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
182
                return "%s" %self.cleaned_data["action_value"]
183
            except:
184
                raise forms.ValidationError('Rate-limiting should be an integer < 50')
185
        else:
186
            raise forms.ValidationError('Cannot be empty')
187

    
188
    def clean_action(self):
189
        action = self.cleaned_data['action']
190
        if action != 'rate-limit':
191
            raise forms.ValidationError('Cannot select something other than rate-limit')
192
        else:
193
            return self.cleaned_data["action"]
194
    
195

    
196
class PortPlainForm(forms.ModelForm):
197
#    action = forms.CharField(initial='rate-limit')
198
    class Meta:
199
        model = MatchPort
200
    
201
    def clean_port(self):
202
        port = self.cleaned_data['port']
203
        if port:
204
            try:
205
                assert(int(port))
206
                return "%s" %self.cleaned_data["port"]
207
            except:
208
                raise forms.ValidationError('Port should be an integer')
209
        else:
210
            raise forms.ValidationError('Cannot be empty')
211

    
212
def value_list_to_list(valuelist):
213
    vl = []
214
    for val in valuelist:
215
        vl.append(val[0])
216
    return vl
217

    
218
def get_matchingport_route_pks(portlist, routes):
219
    route_pk_list = []
220
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
221
    for route in routes:
222
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
223
        if rsp and rsp == ports_value_list:
224
            route_pk_list.append(route.pk)
225
    return route_pk_list
226

    
227
def get_matchingprotocol_route_pks(protocolist, routes):
228
    route_pk_list = []
229
    protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
230
    for route in routes:
231
        rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
232
        if rsp and rsp == protocols_value_list:
233
            route_pk_list.append(route.pk)
234
    return route_pk_list