Statistics
| Branch: | Tag: | Revision:

root / flowspec / forms.py @ 1698da77

History | View | Annotate | Download (12.5 kB)

1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from flowspy.peers.models import *
8
from flowspy.accounts.models import *
9
from ipaddr import *
10
from django.core.urlresolvers import reverse
11
from django.contrib.auth.models import User
12
from django.conf import settings
13
import datetime
14
from django.core.mail import mail_admins, mail_managers, send_mail
15

    
16
class UserProfileForm(forms.ModelForm):
17
    class Meta:
18
        model = UserProfile
19

    
20
class RouteForm(forms.ModelForm):
21
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
22
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
23
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
24
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
25
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
26
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
27
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
28
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
29
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
30

    
31
    class Meta:
32
        model = Route
33

    
34
    def clean_applier(self):
35
        applier = self.cleaned_data['applier']
36
        if applier:
37
            return self.cleaned_data["applier"]
38
        else:
39
            raise forms.ValidationError('This field is required.')
40

    
41
    def clean_source(self):
42
        user = User.objects.get(pk=self.data['applier'])
43
        peer = user.get_profile().peer
44
        data = self.cleaned_data['source']
45
        private_error = False
46
        protected_error = False
47
        if data:
48
            try:
49
                address = IPNetwork(data)
50
                for net in settings.PROTECTED_SUBNETS:
51
                    if address in IPNetwork(net):
52
                        protected_error = True
53
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
54
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
55
                              mail_body, settings.SERVER_EMAIL,
56
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
57
                        raise Exception
58
                if address.is_private:
59
                    private_error = True
60
                    raise Exception
61
                else:
62
                    return self.cleaned_data["source"]
63
            except Exception:
64
                error_text = _('Invalid network address format')
65
                if private_error:
66
                    error_text = _('Private addresses not allowed')
67
                if protected_error:
68
                    error_text = _('You have no authority on this subnet')
69
                raise forms.ValidationError(error_text)
70

    
71
    def clean_destination(self):
72
        user = User.objects.get(pk=self.data['applier'])
73
        peer = user.get_profile().peer
74
        data = self.cleaned_data['destination']
75
        error = None
76
        protected_error = False
77
        if data:
78
            try:
79
                address = IPNetwork(data)
80
                for net in settings.PROTECTED_SUBNETS:
81
                    if address in IPNetwork(net):
82
                        protected_error = True
83
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
84
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
85
                              mail_body, settings.SERVER_EMAIL,
86
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
87
                        raise Exception
88
                if address.prefixlen < settings.PREFIX_LENGTH:
89
                    error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
90
                    raise Exception
91
                return self.cleaned_data["destination"]
92
            except Exception:
93
                error_text = _('Invalid network address format')
94
                if error:
95
                    error_text = error
96
                if protected_error:
97
                    error_text = _('You have no authority on this subnet')
98
                raise forms.ValidationError(error_text)
99
    
100
    def clean_expires(self):
101
        date = self.cleaned_data['expires']
102
        if date:
103
            range_days = (date - datetime.date.today()).days
104
            if range_days > 0 and range_days < 11:
105
                return self.cleaned_data["expires"]
106
            else:
107
                raise forms.ValidationError('Invalid date range')
108

    
109
    def clean(self):
110
        if self.errors:
111
             raise forms.ValidationError(_('Errors in form. Please review and fix them'))
112
        name = self.cleaned_data.get('name', None)
113
        source = self.cleaned_data.get('source', None)
114
        sourceports = self.cleaned_data.get('sourceport', None)
115
        ports = self.cleaned_data.get('port', None)
116
        then = self.cleaned_data.get('then', None)
117
        destination = self.cleaned_data.get('destination', None)
118
        destinationports = self.cleaned_data.get('destinationport', None)
119
        protocols = self.cleaned_data.get('protocol', None)
120
        user = self.cleaned_data.get('applier', None)
121
        try:
122
            issuperuser = self.data['issuperuser']
123
            su = User.objects.get(username=issuperuser)
124
        except:
125
            issuperuser = None
126
        peer = user.get_profile().peer
127
        networks = peer.networks.all()
128
        if issuperuser:
129
            networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
130
        mynetwork = False
131
        route_pk_list = []
132
        if destination:
133
            for network in networks:
134
                net = IPNetwork(network.network)
135
                if IPNetwork(destination) in net:
136
                    mynetwork = True
137
            if not mynetwork:
138
                raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
139
        if (sourceports and ports):
140
            raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
141
        if (destinationports and ports):
142
            raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
143
        if sourceports and not source:
144
            raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
145
        if destinationports and not destination:
146
            raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
147
        if not (source or sourceports or ports or destination or destinationports):
148
            raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
149
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
150
            raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
151
        existing_routes = Route.objects.all()
152
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
153
        if source:
154
            source = IPNetwork(source).compressed
155
            existing_routes = existing_routes.filter(source=source)
156
        else:
157
            existing_routes = existing_routes.filter(source=None)
158
        if protocols:
159
            route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
160
            if route_pk_list:
161
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
162
            else:
163
                existing_routes = existing_routes.filter(protocol=None)
164
        else:
165
            existing_routes = existing_routes.filter(protocol=None)
166
        if sourceports:
167
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
168
            if route_pk_list:
169
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
170
        else:
171
            existing_routes = existing_routes.filter(sourceport=None)
172
        if destinationports:
173
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
174
            if route_pk_list:
175
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
176
        else:
177
            existing_routes = existing_routes.filter(destinationport=None)
178
        if ports:
179
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
180
            if route_pk_list:
181
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
182
        else:
183
            existing_routes = existing_routes.filter(port=None)
184
        for route in existing_routes:
185
            if name != route.name:
186
                existing_url = reverse('edit-route', args=[route.name])
187
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
188
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
189
        return self.cleaned_data
190

    
191
class ThenPlainForm(forms.ModelForm):
192
#    action = forms.CharField(initial='rate-limit')
193
    class Meta:
194
        model = ThenAction
195
    
196
    def clean_action_value(self):
197
        action_value = self.cleaned_data['action_value']
198
        if action_value:
199
            try:
200
                assert(int(action_value))
201
                if int(action_value) < 50:
202
                    raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
203
                return "%s" %self.cleaned_data["action_value"]
204
            except:
205
                raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
206
        else:
207
            raise forms.ValidationError(_('Cannot be empty'))
208

    
209
    def clean_action(self):
210
        action = self.cleaned_data['action']
211
        if action != 'rate-limit':
212
            raise forms.ValidationError(_('Cannot select something other than rate-limit'))
213
        else:
214
            return self.cleaned_data["action"]
215
    
216

    
217
class PortPlainForm(forms.ModelForm):
218
#    action = forms.CharField(initial='rate-limit')
219
    class Meta:
220
        model = MatchPort
221
    
222
    def clean_port(self):
223
        port = self.cleaned_data['port']
224
        if port:
225
            try:
226
                assert(int(port))
227
                return "%s" %self.cleaned_data["port"]
228
            except:
229
                raise forms.ValidationError(_('Port should be an integer'))
230
        else:
231
            raise forms.ValidationError(_('Cannot be empty'))
232

    
233
def value_list_to_list(valuelist):
234
    vl = []
235
    for val in valuelist:
236
        vl.append(val[0])
237
    return vl
238

    
239
def get_matchingport_route_pks(portlist, routes):
240
    route_pk_list = []
241
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
242
    for route in routes:
243
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
244
        if rsp and rsp == ports_value_list:
245
            route_pk_list.append(route.pk)
246
    return route_pk_list
247

    
248
def get_matchingprotocol_route_pks(protocolist, routes):
249
    route_pk_list = []
250
    protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
251
    for route in routes:
252
        rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
253
        if rsp and rsp == protocols_value_list:
254
            route_pk_list.append(route.pk)
255
    return route_pk_list