Statistics
| Branch: | Tag: | Revision:

root / flowspec / forms.py @ b4401a0c

History | View | Annotate | Download (9.1 kB)

1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from ipaddr import *
8
from django.core.urlresolvers import reverse
9
from django.contrib.auth.models import User
10
from django.conf import settings
11
import datetime
12

    
13

    
14

    
15
class RouteForm(forms.ModelForm):
16
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
17
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
18
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
19
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
20
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
21
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
22
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
23
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
24
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25
    class Meta:
26
        model = Route
27
    
28
    def clean_source(self):
29
        data = self.cleaned_data['source']
30
        private_error = False
31
        if data:
32
            try:
33
                address = IPNetwork(data)
34
                if address.is_private:
35
                    private_error = True
36
                    raise forms.ValidationError('Private addresses not allowed')
37
                else:
38
                    return self.cleaned_data["source"]
39
            except Exception:
40
                error_text = 'Invalid network address format'
41
                if private_error:
42
                    error_text = 'Private addresses not allowed'
43
                raise forms.ValidationError(error_text)
44

    
45
    def clean_destination(self):
46
        data = self.cleaned_data['destination']
47
        error = None
48
        if data:
49
            try:
50
                address = IPNetwork(data)
51
                if address.prefixlen < settings.PREFIX_LENGTH:
52
                    error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
53
                    raise forms.ValidationError('error')
54
                return self.cleaned_data["destination"]
55
            except Exception:
56
                if error:
57
                    error_text = error
58
                else:
59
                    error_text = 'Invalid network address format'
60
                raise forms.ValidationError(error_text)
61
    
62
    def clean_expires(self):
63
        date = self.cleaned_data['expires']
64
        if date:
65
            range_days = (date - datetime.date.today()).days
66
            if range_days > 0 and range_days < 11:
67
                return self.cleaned_data["expires"]
68
            else:
69
                raise forms.ValidationError('Invalid date range')
70

    
71
    def clean(self):
72
        name = self.cleaned_data.get('name', None)
73
        source = self.cleaned_data.get('source', None)
74
        sourceports = self.cleaned_data.get('sourceport', None)
75
        ports = self.cleaned_data.get('port', None)
76
        then = self.cleaned_data.get('then', None)
77
        destination = self.cleaned_data.get('destination', None)
78
        destinationports = self.cleaned_data.get('destinationport', None)
79
        user = self.cleaned_data.get('applier', None)
80
        peer = user.get_profile().peer
81
        networks = peer.networks.all()
82
        mynetwork = False
83
        route_pk_list = []
84
        
85
        if destination:
86
            for network in networks:
87
                net = IPNetwork(network.network)
88
                if IPNetwork(destination) in net:
89
                    mynetwork = True
90
            if not mynetwork:
91
                 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
92
        if (sourceports and ports):
93
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
94
        if (destinationports and ports):
95
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
96
        if sourceports and not source:
97
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
98
        if destinationports and not destination:
99
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
100
        if not (source or sourceports or ports or destination or destinationports):
101
            raise forms.ValidationError('Fill at least a Route Match Condition')
102
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='PENDING').exclude(status='ERROR').exclude(status='ADMININACTIVE')
103
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
104
        if source:
105
            source = IPNetwork(source).compressed
106
            existing_routes = existing_routes.filter(source=source)
107
        else:
108
            existing_routes = existing_routes.filter(source=None)
109
        if sourceports:
110
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
111
            if route_pk_list:
112
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
113
        else:
114
            existing_routes = existing_routes.filter(sourceport=None)
115
        if destinationports:
116
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
117
            if route_pk_list:
118
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
119
        else:
120
            existing_routes = existing_routes.filter(destinationport=None)
121
        if ports:
122
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
123
            if route_pk_list:
124
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
125
        else:
126
            existing_routes = existing_routes.filter(port=None)
127
        
128
        for route in existing_routes:
129
            if name != route.name:
130
                existing_url = reverse('edit-route', args=[route.name])
131
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
132
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
133
        return self.cleaned_data
134

    
135
class ThenPlainForm(forms.ModelForm):
136
#    action = forms.CharField(initial='rate-limit')
137
    class Meta:
138
        model = ThenAction
139
    
140
    def clean_action_value(self):
141
        action_value = self.cleaned_data['action_value']
142
        if action_value:
143
            try:
144
                assert(int(action_value))
145
                return "%s" %self.cleaned_data["action_value"]
146
            except:
147
                raise forms.ValidationError('Rate-limiting should be an integer')
148
            if int(action_value) < 50:
149
                raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
150
        else:
151
            raise forms.ValidationError('Cannot be empty')
152

    
153
    def clean_action(self):
154
        action = self.cleaned_data['action']
155
        if action != 'rate-limit':
156
            raise forms.ValidationError('Cannot select something other than rate-limit')
157
        else:
158
            return self.cleaned_data["action"]
159

    
160
class PortPlainForm(forms.ModelForm):
161
#    action = forms.CharField(initial='rate-limit')
162
    class Meta:
163
        model = MatchPort
164
    
165
    def clean_port(self):
166
        port = self.cleaned_data['port']
167
        if port:
168
            try:
169
                assert(int(port))
170
                return "%s" %self.cleaned_data["port"]
171
            except:
172
                raise forms.ValidationError('Port should be an integer')
173
        else:
174
            raise forms.ValidationError('Cannot be empty')
175

    
176
def value_list_to_list(valuelist):
177
    vl = []
178
    for val in valuelist:
179
        vl.append(val[0])
180
    return vl
181

    
182
def get_matchingport_route_pks(portlist, routes):
183
    route_pk_list = []
184
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
185
    for route in routes:
186
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
187
        if rsp and rsp == ports_value_list:
188
            route_pk_list.append(route.pk)
189
    return route_pk_list