forms.py 12.4 KB
Newer Older
1
2
3
4
5
from django import forms
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _
from django.utils.translation import ugettext_lazy
from django.template.defaultfilters import filesizeformat
6
from flowspy.flowspec.models import *
7
from flowspy.peers.models import *
8
from ipaddr import *
9
from django.core.urlresolvers import reverse
10
from django.contrib.auth.models import User
11
from django.conf import settings
12
import datetime
13
from django.core.mail import mail_admins, mail_managers, send_mail
14

15
16
17
18
19
20
21
22
23
24
25

class RouteForm(forms.ModelForm):
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
26

27
28
    class Meta:
        model = Route
29
30
31
32
33
34
35
36

    def clean_applier(self):
        applier = self.cleaned_data['applier']
        if applier:
            return self.cleaned_data["applier"]
        else:
            raise forms.ValidationError('This field is required.')

37
    def clean_source(self):
38
39
        user = User.objects.get(pk=self.data['applier'])
        peer = user.get_profile().peer
40
        data = self.cleaned_data['source']
41
        private_error = False
42
        protected_error = False
43
44
45
        if data:
            try:
                address = IPNetwork(data)
46
47
48
                for net in settings.PROTECTED_SUBNETS:
                    if address in IPNetwork(net):
                        protected_error = True
49
                        mail_body = "User %s %s (%s) attempted to set %s as the source address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
50
51
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as a source address",
                              mail_body, settings.SERVER_EMAIL,
52
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
53
                        raise Exception
54
55
                if address.is_private:
                    private_error = True
56
                    raise Exception
57
58
                else:
                    return self.cleaned_data["source"]
59
            except Exception:
60
                error_text = _('Invalid network address format')
61
                if private_error:
62
                    error_text = _('Private addresses not allowed')
63
                if protected_error:
64
                    error_text = _('You have no authority on this subnet')
65
                raise forms.ValidationError(error_text)
66
67

    def clean_destination(self):
68
        user = User.objects.get(pk=self.data['applier'])
69
        peer = user.get_profile().peer
70
        data = self.cleaned_data['destination']
71
        error = None
72
        protected_error = False
73
74
75
        if data:
            try:
                address = IPNetwork(data)
76
77
78
                for net in settings.PROTECTED_SUBNETS:
                    if address in IPNetwork(net):
                        protected_error = True
79
                        mail_body = "User %s %s (%s) attempted to set %s as the destination address in a firewall rule" %(user.username, user.email, peer.peer_name, data)
80
81
                        send_mail(settings.EMAIL_SUBJECT_PREFIX + "Caught an attempt to set a protected IP/network as the destination address",
                              mail_body, settings.SERVER_EMAIL,
82
                              settings.NOTIFY_ADMIN_MAILS, fail_silently=True)
83
                        raise Exception
84
                if address.prefixlen < settings.PREFIX_LENGTH:
85
                    error = _("Currently no prefix lengths < %s are allowed") %settings.PREFIX_LENGTH
86
                    raise Exception
87
88
                return self.cleaned_data["destination"]
            except Exception:
89
                error_text = _('Invalid network address format')
90
91
                if error:
                    error_text = error
92
                if protected_error:
93
                    error_text = _('You have no authority on this subnet')
94
                raise forms.ValidationError(error_text)
95
96
97
98
99
100
101
102
103
    
    def clean_expires(self):
        date = self.cleaned_data['expires']
        if date:
            range_days = (date - datetime.date.today()).days
            if range_days > 0 and range_days < 11:
                return self.cleaned_data["expires"]
            else:
                raise forms.ValidationError('Invalid date range')
104
105

    def clean(self):
106
        if self.errors:
107
             raise forms.ValidationError(_('Errors in form. Please review and fix them'))
108
        name = self.cleaned_data.get('name', None)
109
110
111
        source = self.cleaned_data.get('source', None)
        sourceports = self.cleaned_data.get('sourceport', None)
        ports = self.cleaned_data.get('port', None)
112
        then = self.cleaned_data.get('then', None)
113
114
        destination = self.cleaned_data.get('destination', None)
        destinationports = self.cleaned_data.get('destinationport', None)
115
        protocols = self.cleaned_data.get('protocol', None)
116
        user = self.cleaned_data.get('applier', None)
117
118
119
120
121
        try:
            issuperuser = self.data['issuperuser']
            su = User.objects.get(username=issuperuser)
        except:
            issuperuser = None
122
123
        peer = user.get_profile().peer
        networks = peer.networks.all()
124
        if issuperuser:
125
            networks = PeerRange.objects.filter(peer__in=Peer.objects.all()).distinct()
126
        mynetwork = False
127
        route_pk_list = []
128
129
130
131
132
133
        if destination:
            for network in networks:
                net = IPNetwork(network.network)
                if IPNetwork(destination) in net:
                    mynetwork = True
            if not mynetwork:
134
                raise forms.ValidationError(_('Destination address/network should belong to your administrative address space. Check My Profile to review your networks'))
135
        if (sourceports and ports):
136
            raise forms.ValidationError(_('Cannot create rule for source ports and ports at the same time. Select either ports or source ports'))
137
        if (destinationports and ports):
138
            raise forms.ValidationError(_('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports'))
139
        if sourceports and not source:
140
            raise forms.ValidationError(_('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address'))
141
        if destinationports and not destination:
142
            raise forms.ValidationError(_('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address'))
143
        if not (source or sourceports or ports or destination or destinationports):
144
            raise forms.ValidationError(_('Fill at least a Rule Match Condition'))
145
        if not user.is_superuser and then[0].action not in settings.UI_USER_THEN_ACTIONS:
146
            raise forms.ValidationError(_('This action "%s" is not permitted') %(then[0].action))
147
        existing_routes = Route.objects.all()
148
149
150
151
152
153
        existing_routes = existing_routes.filter(applier__userprofile__peer=peer)
        if source:
            source = IPNetwork(source).compressed
            existing_routes = existing_routes.filter(source=source)
        else:
            existing_routes = existing_routes.filter(source=None)
154
155
156
157
158
159
160
161
        if protocols:
            route_pk_list=get_matchingprotocol_route_pks(protocols, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
            else:
                existing_routes = existing_routes.filter(protocol=None)
        else:
            existing_routes = existing_routes.filter(protocol=None)
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
        if sourceports:
            route_pk_list=get_matchingport_route_pks(sourceports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
        else:
            existing_routes = existing_routes.filter(sourceport=None)
        if destinationports:
            route_pk_list=get_matchingport_route_pks(destinationports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)
        else:
            existing_routes = existing_routes.filter(destinationport=None)
        if ports:
            route_pk_list=get_matchingport_route_pks(ports, existing_routes)
            if route_pk_list:
                existing_routes = existing_routes.filter(pk__in=route_pk_list)              
        else:
            existing_routes = existing_routes.filter(port=None)
        for route in existing_routes:
            if name != route.name:
                existing_url = reverse('edit-route', args=[route.name])
183
184
                if IPNetwork(destination) in IPNetwork(route.destination) or IPNetwork(route.destination) in IPNetwork(destination):
                    raise forms.ValidationError('Found an exact %s rule, %s with destination prefix %s<br>To avoid overlapping try editing rule <a href=\'%s\'>%s</a>' %(route.status, route.name, route.destination, existing_url, route.name))
185
186
187
188
189
190
191
192
193
194
195
196
        return self.cleaned_data

class ThenPlainForm(forms.ModelForm):
#    action = forms.CharField(initial='rate-limit')
    class Meta:
        model = ThenAction
    
    def clean_action_value(self):
        action_value = self.cleaned_data['action_value']
        if action_value:
            try:
                assert(int(action_value))
197
                if int(action_value) < 50:
198
                    raise forms.ValidationError(_('Rate-limiting cannot be < 50kbps'))
199
200
                return "%s" %self.cleaned_data["action_value"]
            except:
201
                raise forms.ValidationError(_('Rate-limiting should be an integer < 50'))
202
        else:
203
            raise forms.ValidationError(_('Cannot be empty'))
204
205
206
207

    def clean_action(self):
        action = self.cleaned_data['action']
        if action != 'rate-limit':
208
            raise forms.ValidationError(_('Cannot select something other than rate-limit'))
209
210
        else:
            return self.cleaned_data["action"]
211
    
212
213
214
215
216
217
218
219
220
221
222
223
224

class PortPlainForm(forms.ModelForm):
#    action = forms.CharField(initial='rate-limit')
    class Meta:
        model = MatchPort
    
    def clean_port(self):
        port = self.cleaned_data['port']
        if port:
            try:
                assert(int(port))
                return "%s" %self.cleaned_data["port"]
            except:
225
                raise forms.ValidationError(_('Port should be an integer'))
226
        else:
227
            raise forms.ValidationError(_('Cannot be empty'))
228
229
230
231
232
233
234
235
236
237
238
239
240
241

def value_list_to_list(valuelist):
    vl = []
    for val in valuelist:
        vl.append(val[0])
    return vl

def get_matchingport_route_pks(portlist, routes):
    route_pk_list = []
    ports_value_list = value_list_to_list(portlist.values_list('port').order_by('port'))
    for route in routes:
        rsp = value_list_to_list(route.destinationport.all().values_list('port').order_by('port'))
        if rsp and rsp == ports_value_list:
            route_pk_list.append(route.pk)
242
243
244
245
246
247
248
249
250
    return route_pk_list

def get_matchingprotocol_route_pks(protocolist, routes):
    route_pk_list = []
    protocols_value_list = value_list_to_list(protocolist.values_list('protocol').order_by('protocol'))
    for route in routes:
        rsp = value_list_to_list(route.protocol.all().values_list('protocol').order_by('protocol'))
        if rsp and rsp == protocols_value_list:
            route_pk_list.append(route.pk)
251
    return route_pk_list