forms.py 11.1 KB
Newer Older
1
2
3
4
5
from django import forms
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _
from django.utils.translation import ugettext_lazy
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from ipaddr import *
8
from django.core.urlresolvers import reverse
9
from django.contrib.auth.models import User
10
from django.conf import settings
11
import datetime
12
from django.core.mail import mail_admins, mail_managers, send_mail
13

14
15
16
17
18
19
20
21
22
23
24

class RouteForm(forms.ModelForm):
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
25

26
27
28
29
    class Meta:
        model = Route
    
    def clean_source(self):
30
31
        user = User.objects.get(pk=self.data['applier'])
        peer = user.get_profile().peer
32
        data = self.cleaned_data['source']
33
        private_error = False
34
        protected_error = False
35
36
37
        if data:
            try:
                address = IPNetwork(data)
38
39
40
                for net in settings.PROTECTED_SUBNETS:
                    if address in IPNetwork(net):
                        protected_error = True
41
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
42
43
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
                              mail_body, settings.SERVER_EMAIL,
44
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
45
                        raise forms.ValidationError("Not allowed")
46
47
48
49
50
                if address.is_private:
                    private_error = True
                    raise forms.ValidationError('Private addresses not allowed')
                else:
                    return self.cleaned_data["source"]
51
            except Exception:
52
53
54
                error_text = 'Invalid network address format'
                if private_error:
                    error_text = 'Private addresses not allowed'
55
56
                if protected_error:
                    error_text = 'You have no authority on this subnet'
57
                raise forms.ValidationError(error_text)
58
59

    def clean_destination(self):
60
61
        user = User.objects.get(pk=self.data['applier'])
        peer = user.get_profile().peer        
62
        data = self.cleaned_data['destination']
63
        error = None
64
        protected_error = False
65
66
67
        if data:
            try:
                address = IPNetwork(data)
68
69
70
                for net in settings.PROTECTED_SUBNETS:
                    if address in IPNetwork(net):
                        protected_error = True
71
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
72
73
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
                              mail_body, settings.SERVER_EMAIL,
74
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
75
                        raise forms.ValidationError("Not allowed")
76
77
78
                if address.prefixlen < settings.PREFIX_LENGTH:
                    error = "Currently no prefix lengths < %s are allowed" %settings.PREFIX_LENGTH
                    raise forms.ValidationError('error')
79
80
                return self.cleaned_data["destination"]
            except Exception:
81
                error_text = 'Invalid network address format'
82
83
                if error:
                    error_text = error
84
85
                if protected_error:
                    error_text = 'You have no authority on this subnet'
86
                raise forms.ValidationError(error_text)
87
88
89
90
91
92
93
94
95
    
    def clean_expires(self):
        date = self.cleaned_data['expires']
        if date:
            range_days = (date - datetime.date.today()).days
            if range_days > 0 and range_days < 11:
                return self.cleaned_data["expires"]
            else:
                raise forms.ValidationError('Invalid date range')
96
97

    def clean(self):
98
        name = self.cleaned_data.get('name', None)
99
100
101
        source = self.cleaned_data.get('source', None)
        sourceports = self.cleaned_data.get('sourceport', None)
        ports = self.cleaned_data.get('port', None)
102
        then = self.cleaned_data.get('then', None)
103
104
        destination = self.cleaned_data.get('destination', None)
        destinationports = self.cleaned_data.get('destinationport', None)
105
        user = self.cleaned_data.get('applier', None)
106
107
        peer = user.get_profile().peer
        networks = peer.networks.all()
108
        mynetwork = False
109
        route_pk_list = []
110
111
112
113
114
115
116
        if destination:
            for network in networks:
                net = IPNetwork(network.network)
                if IPNetwork(destination) in net:
                    mynetwork = True
            if not mynetwork:
                 raise forms.ValidationError('Destination address/network should belong to your administrative address space. Check My Profile to review your networks')
117
118
119
120
121
122
123
124
125
        if (sourceports and ports):
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
        if (destinationports and ports):
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
        if sourceports and not source:
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
        if destinationports and not destination:
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
        if not (source or sourceports or ports or destination or destinationports):
126
            raise forms.ValidationError('Fill at least a Rule Match Condition')
127
128
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
            raise forms.ValidationError('This action "%s" is not permitted' %(then[0].action))
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
        existing_routes = Route.objects.exclude(status='EXPIRED').exclude(status='PENDING').exclude(status='ERROR').exclude(status='ADMININACTIVE')
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
        if source:
            source = IPNetwork(source).compressed
            existing_routes = existing_routes.filter(source=source)
        else:
            existing_routes = existing_routes.filter(source=None)
        if sourceports:
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
        else:
            existing_routes = existing_routes.filter(sourceport=None)
        if destinationports:
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
        else:
            existing_routes = existing_routes.filter(destinationport=None)
        if ports:
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
        else:
            existing_routes = existing_routes.filter(port=None)
        
        for route in existing_routes:
            if name != route.name:
                existing_url = reverse('edit-route', args=[route.name])
158
159
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
160
161
162
163
164
165
166
167
168
169
170
171
        return self.cleaned_data

class ThenPlainForm(forms.ModelForm):
#    action = forms.CharField(initial='rate-limit')
    class Meta:
        model = ThenAction
    
    def clean_action_value(self):
        action_value = self.cleaned_data['action_value']
        if action_value:
            try:
                assert(int(action_value))
172
173
                if int(action_value) < 50:
                    raise forms.ValidationError('Rate-limiting cannot be < 50kbps')
174
175
                return "%s" %self.cleaned_data["action_value"]
            except:
176
                raise forms.ValidationError('Rate-limiting should be an integer < 50')
177
178
179
180
181
182
183
184
185
        else:
            raise forms.ValidationError('Cannot be empty')

    def clean_action(self):
        action = self.cleaned_data['action']
        if action != 'rate-limit':
            raise forms.ValidationError('Cannot select something other than rate-limit')
        else:
            return self.cleaned_data["action"]
186
    
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202

class PortPlainForm(forms.ModelForm):
#    action = forms.CharField(initial='rate-limit')
    class Meta:
        model = MatchPort
    
    def clean_port(self):
        port = self.cleaned_data['port']
        if port:
            try:
                assert(int(port))
                return "%s" %self.cleaned_data["port"]
            except:
                raise forms.ValidationError('Port should be an integer')
        else:
            raise forms.ValidationError('Cannot be empty')
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217

def value_list_to_list(valuelist):
    vl = []
    for val in valuelist:
        vl.append(val[0])
    return vl

def get_matchingport_route_pks(portlist, routes):
    route_pk_list = []
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
    for route in routes:
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
        if rsp and rsp == ports_value_list:
            route_pk_list.append(route.pk)
    return route_pk_list