forms.py 11.9 KB
Newer Older
1
2
3
4
5
from django import forms
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _
from django.utils.translation import ugettext_lazy
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from ipaddr import *
8
from django.core.urlresolvers import reverse
9
from django.contrib.auth.models import User
10
from django.conf import settings
11
import datetime
12
from django.core.mail import mail_admins, mail_managers, send_mail
13

14
15
16
17
18
19
20
21
22
23
24

class RouteForm(forms.ModelForm):
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25

26
27
28
29
    class Meta:
        model = Route
    
    def clean_source(self):
30
31
        user = User.objects.get(pk=self.data['applier'])
        peer = user.get_profile().peer
32
        data = self.cleaned_data['source']
33
        private_error = False
34
        protected_error = False
35
36
37
        if data:
            try:
                address = IPNetwork(data)
38
39
40
                for net in settings.PROTECTED_SUBNETS:
                    if address in IPNetwork(net):
                        protected_error = True
41
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42
43
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
                              mail_body, settings.SERVER_EMAIL,
44
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45
                        raise Exception
46
47
                if address.is_private:
                    private_error = True
48
                    raise Exception
49
50
                else:
                    return self.cleaned_data["source"]
51
            except Exception:
52
                error_text = _('Invalid network address format')
53
                if private_error:
54
                    error_text = _('Private addresses not allowed')
55
                if protected_error:
56
                    error_text = _('You have no authority on this subnet')
57
                raise forms.ValidationError(error_text)
58
59

    def clean_destination(self):
60
        user = User.objects.get(pk=self.data['applier'])
61
        peer = user.get_profile().peer
62
        data = self.cleaned_data['destination']
63
        error = None
64
        protected_error = False
65
66
67
        if data:
            try:
                address = IPNetwork(data)
68
69
70
                for net in settings.PROTECTED_SUBNETS:
                    if address in IPNetwork(net):
                        protected_error = True
71
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72
73
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
                              mail_body, settings.SERVER_EMAIL,
74
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75
                        raise Exception
76
                if address.prefixlen < settings.PREFIX_LENGTH:
77
                    error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
78
                    raise Exception
79
80
                return self.cleaned_data["destination"]
            except Exception:
81
                error_text = _('Invalid network address format')
82
83
                if error:
                    error_text = error
84
                if protected_error:
85
                    error_text = _('You have no authority on this subnet')
86
                raise forms.ValidationError(error_text)
87
88
89
90
91
92
93
94
95
    
    def clean_expires(self):
        date = self.cleaned_data['expires']
        if date:
            range_days = (date - datetime.date.today()).days
            if range_days > 0 and range_days < 11:
                return self.cleaned_data["expires"]
            else:
                raise forms.ValidationError('Invalid date range')
96
97

    def clean(self):
98
        if self.errors:
99
             raise forms.ValidationError(_('Errors in form. Please review and fix them'))
100
        name = self.cleaned_data.get('name', None)
101
102
103
        source = self.cleaned_data.get('source', None)
        sourceports = self.cleaned_data.get('sourceport', None)
        ports = self.cleaned_data.get('port', None)
104
        then = self.cleaned_data.get('then', None)
105
106
        destination = self.cleaned_data.get('destination', None)
        destinationports = self.cleaned_data.get('destinationport', None)
107
        protocols = self.cleaned_data.get('protocol', None)
108
        user = self.cleaned_data.get('applier', None)
109
110
        peer = user.get_profile().peer
        networks = peer.networks.all()
111
        mynetwork = False
112
        route_pk_list = []
113
114
115
116
117
118
        if destination:
            for network in networks:
                net = IPNetwork(network.network)
                if IPNetwork(destination) in net:
                    mynetwork = True
            if not mynetwork:
119
                 raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
120
        if (sourceports and ports):
121
            raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
122
        if (destinationports and ports):
123
            raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
124
        if sourceports and not source:
125
            raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
126
        if destinationports and not destination:
127
            raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
128
        if not (source or sourceports or ports or destination or destinationports):
129
            raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
130
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
131
            raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
132
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='ERROR').exclude(status='ADMININACTIVE')
133
134
135
136
137
138
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
        if source:
            source = IPNetwork(source).compressed
            existing_routes = existing_routes.filter(source=source)
        else:
            existing_routes = existing_routes.filter(source=None)
139
140
141
142
143
144
145
146
        if protocols:
            route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
            else:
                existing_routes = existing_routes.filter(protocol=None)
        else:
            existing_routes = existing_routes.filter(protocol=None)
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
        if sourceports:
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
        else:
            existing_routes = existing_routes.filter(sourceport=None)
        if destinationports:
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
        else:
            existing_routes = existing_routes.filter(destinationport=None)
        if ports:
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
        else:
            existing_routes = existing_routes.filter(port=None)
        for route in existing_routes:
            if name != route.name:
                existing_url = reverse('edit-route', args=[route.name])
168
169
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
170
171
172
173
174
175
176
177
178
179
180
181
        return self.cleaned_data

class ThenPlainForm(forms.ModelForm):
#    action = forms.CharField(initial='rate-limit')
    class Meta:
        model = ThenAction
    
    def clean_action_value(self):
        action_value = self.cleaned_data['action_value']
        if action_value:
            try:
                assert(int(action_value))
182
                if int(action_value) < 50:
183
                    raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
184
185
                return "%s" %self.cleaned_data["action_value"]
            except:
186
                raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
187
        else:
188
            raise forms.ValidationError(_('Cannot be empty'))
189
190
191
192

    def clean_action(self):
        action = self.cleaned_data['action']
        if action != 'rate-limit':
193
            raise forms.ValidationError(_('Cannot select something other than rate-limit'))
194
195
        else:
            return self.cleaned_data["action"]
196
    
197
198
199
200
201
202
203
204
205
206
207
208
209

class PortPlainForm(forms.ModelForm):
#    action = forms.CharField(initial='rate-limit')
    class Meta:
        model = MatchPort
    
    def clean_port(self):
        port = self.cleaned_data['port']
        if port:
            try:
                assert(int(port))
                return "%s" %self.cleaned_data["port"]
            except:
210
                raise forms.ValidationError(_('Port should be an integer'))
211
        else:
212
            raise forms.ValidationError(_('Cannot be empty'))
213
214
215
216
217
218
219
220
221
222
223
224
225
226

def value_list_to_list(valuelist):
    vl = []
    for val in valuelist:
        vl.append(val[0])
    return vl

def get_matchingport_route_pks(portlist, routes):
    route_pk_list = []
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
    for route in routes:
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
        if rsp and rsp == ports_value_list:
            route_pk_list.append(route.pk)
227
228
229
230
231
232
233
234
235
    return route_pk_list

def get_matchingprotocol_route_pks(protocolist, routes):
    route_pk_list = []
    protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
    for route in routes:
        rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
        if rsp and rsp == protocols_value_list:
            route_pk_list.append(route.pk)
236
    return route_pk_list