Commit 3e99e2d1 authored by Leonidas Poulopoulos's avatar Leonidas Poulopoulos

Massive changes. Added long-polling support

parent e9d46ce1
......@@ -11,6 +11,9 @@ import logging
from flowspec.tasks import *
from time import sleep
from flowspy.utils import beanstalkc
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(format=FORMAT)
logger = logging.getLogger(__name__)
......@@ -119,14 +122,31 @@ class Route(models.Model):
# logger.info("Got save job id: %s" %response)
def commit_add(self, *args, **kwargs):
send_message("Adding route %s. Please wait..." %self.name)
response = add.delay(self)
logger.info("Got save job id: %s" %response)
#
def deactivate(self):
self.is_online = False
self.is_active = False
self.save()
# def delete(self, *args, **kwargs):
# response = delete.delay(self)
# logger.info("Got delete job id: %s" %response)
def commit_edit(self, *args, **kwargs):
send_message("Editing route %s. Please wait..." %self.name)
response = edit.delay(self)
logger.info("Got edit job id: %s" %response)
def commit_delete(self, *args, **kwargs):
send_message("Removing route %s. Please wait..." %self.name)
response = delete.delay(self)
logger.info("Got edit job id: %s" %response)
#
# def delete(self, *args, **kwargs):
# response = delete.delay(self)
# logger.info("Got delete job id: %s" %response)
def is_synced(self):
found = False
......@@ -232,35 +252,40 @@ class Route(models.Model):
def get_match(self):
ret = ''
if self.destination:
ret = ret = '%s Destination Address:<strong>%s</strong><br/>' %(ret, self.destination)
ret = '%s Destination Address:<strong>%s</strong><br/>' %(ret, self.destination)
if self.fragmenttype:
ret = ret = "%s Fragment Type:<strong>%s</strong><br/>" %(ret, self.fragmenttype)
ret = "%s Fragment Type:<strong>%s</strong><br/>" %(ret, self.fragmenttype)
if self.icmpcode:
ret = ret = "%s ICMP code:<strong>%s</strong><br/>" %(ret, self.icmpcode)
ret = "%s ICMP code:<strong>%s</strong><br/>" %(ret, self.icmpcode)
if self.icmptype:
ret = ret = "%s ICMP Type:<strong>%s</strong><br/>" %(ret, self.icmptype)
ret = "%s ICMP Type:<strong>%s</strong><br/>" %(ret, self.icmptype)
if self.packetlength:
ret = ret = "%s Packet Length:<strong>%s</strong><br/>" %(ret, self.packetlength)
ret = "%s Packet Length:<strong>%s</strong><br/>" %(ret, self.packetlength)
if self.protocol:
ret = ret = "%s Protocol:<strong>%s</strong><br/>" %(ret, self.protocol)
ret = "%s Protocol:<strong>%s</strong><br/>" %(ret, self.protocol)
if self.source:
ret = ret = "%s Source Address:<strong>%s</strong><br/>" %(ret, self.source)
ret = "%s Source Address:<strong>%s</strong><br/>" %(ret, self.source)
if self.tcpflag:
ret = ret = "%s TCP flag:<strong>%s</strong><br/>" %(ret, self.tcpflag)
ret = "%s TCP flag:<strong>%s</strong><br/>" %(ret, self.tcpflag)
if self.port:
for port in self.port.all():
ret = "%s Port:<strong>%s</strong><br/>" %(ret, port)
ret = ret + "Port:<strong>%s</strong><br/>" %(port)
if self.destinationport:
for port in self.destinationport.all():
ret = "%s Port:<strong>%s</strong><br/>" %(ret, port)
ret = ret + "Destination Port:<strong>%s</strong><br/>" %(port)
if self.sourceport:
for port in self.sourceport.all():
ret = "%s Port:<strong>%s</strong><br/>" %(ret, port)
ret = ret +"Source Port:<strong>%s</strong><br/>" %(port)
if self.dscp:
for dscp in self.dscp.all():
ret = "%s Port:<strong>%s</strong><br/>" %(ret, dscp)
ret = ret + "%s Port:<strong>%s</strong><br/>" %(ret, dscp)
return ret.rstrip('<br/>')
get_match.short_description = 'Match statement'
get_match.allow_tags = True
def send_message(msg):
b = beanstalkc.Connection()
b.use(settings.POLLS_TUBE)
b.put(str(msg))
b.close()
from utils import proxy as PR
from celery.task import task
from celery.task.sets import subtask
import logging
from celery.task.http import *
from flowspy.utils import beanstalkc
from django.conf import settings
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(format=FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@task
def add(route):
def add(route, callback=None):
applier = PR.Applier(route_object=route)
commit, response = applier.apply()
if commit:
......@@ -14,13 +24,52 @@ def add(route):
route.is_online = is_online
route.is_active = is_active
route.response = response
subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response))
route.save()
@task
def edit(route, callback=None):
applier = PR.Applier(route_object=route)
commit, response = applier.apply(operation="replace")
if commit:
is_online = True
else:
is_online = False
route.is_active = True
route.is_online = is_online
route.response = response
route.save()
subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response))
@task
def multi(x,y):
return x*y
#
#@task
def delete(route, callback=None):
applier = PR.Applier(route_object=route)
commit, response = applier.apply(operation="delete")
if commit:
is_online = False
is_active = False
else:
is_online = route.is_online
is_active = route.is_active
route.is_online = is_online
route.is_active = is_active
route.response = response
route.save()
subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response))
@task
def announce(messg):
messg = str(messg)
b = beanstalkc.Connection()
b.use(settings.POLLS_TUBE)
b.put(messg)
b.close()
#def delete(route):
#
# applier = PR.Applier(route_object=route)
......
......@@ -15,11 +15,16 @@ from django.utils import simplejson
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.forms.models import model_to_dict
from flowspy.flowspec.forms import *
from flowspy.flowspec.models import *
from copy import deepcopy
def days_offset(): return datetime.now() + timedelta(days = settings.EXPIRATION_DAYS_OFFSET)
@login_required
def user_routes(request):
if request.user.is_anonymous():
return HttpResponseRedirect(reverse('login'))
......@@ -27,7 +32,7 @@ def user_routes(request):
return render_to_response('user_routes.html', {'routes': user_routes},
context_instance=RequestContext(request))
@login_required
def add_route(request):
if request.method == "GET":
form = RouteForm()
......@@ -47,3 +52,36 @@ def add_route(request):
else:
return render_to_response('apply.html', {'form': form},
context_instance=RequestContext(request))
@login_required
def edit_route(request, route_slug):
route_edit = get_object_or_404(Route, name=route_slug)
route_original = deepcopy(route_edit)
if request.POST:
form = RouteForm(request.POST, instance = route_edit)
if form.is_valid():
route=form.save(commit=False)
route.name = route_original.name
route.applier = route_original.applier
route.expires = route_original.expires
route.is_active = route_original.is_active
route.save()
form.save_m2m()
route.commit_edit()
return HttpResponseRedirect(urlresolvers.reverse("user-routes"))
else:
return render_to_response('apply.html', {'form': form, 'edit':True},
context_instance=RequestContext(request))
else:
dictionary = model_to_dict(route_edit, fields=[], exclude=[])
form = RouteForm(dictionary)
return render_to_response('apply.html', {'form': form, 'edit':True},
context_instance=RequestContext(request))
@login_required
def delete_route(request, route_slug):
if request.is_ajax():
route = get_object_or_404(Route, name=route_slug)
if route.applier == request.user:
route.deactivate()
route.commit_delete()
return HttpResponseRedirect(urlresolvers.reverse("user-routes"))
An example of AJAX chat taken from Tornado demos and converted to use django and gevent.
To start the server, run
$ python run.py
#!/usr/bin/python
from gevent import monkey; monkey.patch_all()
import os
import traceback
from django.core.handlers.wsgi import WSGIHandler
from django.core.signals import got_request_exception
from django.core.management import call_command
os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'
def exception_printer(sender, **kwargs):
traceback.print_exc()
got_request_exception.connect(exception_printer)
call_command('syncdb')
application = WSGIHandler()
#!/bin/sh
# see http://projects.unbit.it/uwsgi and http://projects.unbit.it/uwsgi/wiki/Gevent
exec uwsgi --loop gevent --http-socket :8000 --module application --async 1000
File added
from django.conf.urls.defaults import *
from django.conf import settings
urlpatterns = patterns('flowspy.poller.views',
('^$', 'main'),
('^a/message/existing$', 'message_existing'),
('^a/message/new$', 'message_new'),
('^a/message/updates$', 'message_updates'))
urlpatterns += patterns('',
(r'^static/(?P<path>.*)', 'django.views.static.serve',\
{'document_root': settings.STATIC_URL}),
)
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
import uuid
import simplejson
import datetime
from django.shortcuts import render_to_response
from django.template.loader import render_to_string
from django.http import HttpResponse
from gevent.event import Event
from django.conf import settings
from django.views.decorators.csrf import csrf_exempt
from flowspy.utils import beanstalkc
import logging
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(format=FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def create_message(from_, body):
data = {'id': str(uuid.uuid4()), 'from': from_, 'body': body}
data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
return data
def json_response(value, **kwargs):
kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
return HttpResponse(simplejson.dumps(value), **kwargs)
class Msgs(object):
cache_size = 200
def __init__(self):
self.cache = []
self.new_message_event = Event()
def main(self, request):
if self.cache:
request.session['cursor'] = self.cache[-1]['id']
return render_to_response('poll.html', {'messages': self.cache})
@csrf_exempt
def message_existing(self, request):
if self.cache:
request.session['cursor'] = self.cache[-1]['id']
return json_response({'messages': self.cache})
@csrf_exempt
def message_new(self, request=None, mesg=None):
if request:
name = request.META.get('REMOTE_ADDR') or 'Anonymous'
forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if forwarded_for and name == '127.0.0.1':
name = forwarded_for
msg = create_message(name, request.POST['body'])
if mesg:
message = mesg
now = datetime.datetime.now()
msg = create_message("[%s]"%now.strftime("%Y-%m-%d %H:%M:%S"), message)
self.cache.append(msg)
if len(self.cache) > self.cache_size:
self.cache = self.cache[-self.cache_size:]
self.new_message_event.set()
self.new_message_event.clear()
return json_response(msg)
@csrf_exempt
def message_updates(self, request):
cursor = request.session.get('cursor')
if not self.cache or cursor == self.cache[-1]['id']:
self.new_message_event.wait()
assert cursor != self.cache[-1]['id'], cursor
try:
for index, m in enumerate(self.cache):
if m['id'] == cursor:
return json_response({'messages': self.cache[index + 1:]})
return json_response({'messages': self.cache})
finally:
if self.cache:
request.session['cursor'] = self.cache[-1]['id']
else:
request.session.pop('cursor', None)
def monitor_polls(self, polls=None):
b = beanstalkc.Connection()
b.watch(settings.POLLS_TUBE)
while True:
job = b.reserve()
msg = job.body
job.bury()
self.message_new(None, msg)
def start_polling(self):
logger.info("Start Polling")
p = Pool(10)
while True:
p.spawn(self.monitor_polls)
msgs = Msgs()
main = msgs.main
message_new = msgs.message_new
message_updates = msgs.message_updates
message_existing = msgs.message_existing
poll = msgs.start_polling
poll()
#!/usr/bin/python
from gevent.wsgi import WSGIServer
from poller.application import application
print 'Serving on 8000...'
WSGIServer(('netdev.grnet.gr', 9090), application).serve_forever()
/*
* Copyright 2009 FriendFeed
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
body {
background: white;
margin: 10px;
}
body,
input {
font-family: sans-serif;
font-size: 10pt;
color: black;
}
table {
border-collapse: collapse;
border: 0;
}
td {
border: 0;
padding: 0;
}
#body {
position: absolute;
bottom: 10px;
left: 10px;
right: 100px;
}
#input {
margin-top: 0.5em;
}
#inbox .message {
padding-top: 0.25em;
}
#nav {
text-align: right;
float: right;
z-index: 99;
}
// Copyright 2009 FriendFeed
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
$(document).ready(function() {
if (!window.console) window.console = {};
if (!window.console.log) window.console.log = function() {};
$("#messageform").live("submit", function() {
newMessage($(this));
return false;
});
$("#messageform").live("keypress", function(e) {
if (e.keyCode == 13) {
newMessage($(this));
return false;
}
});
$("#message").select();
updater.start();
updater.poll();
});
function newMessage(form) {
var message = form.formToDict();
var disabled = form.find("input[type=submit]");
disabled.disable();
$.postJSON("/poll/a/message/new", message, function(response) {
updater.showMessage(response);
if (message.id) {
form.parent().remove();
} else {
form.find("input[type=text]").val("").select();
disabled.enable();
}
});
}
function getCookie(name) {
var r = document.cookie.match("\\b" + name + "=([^;]*)\\b");
return r ? r[1] : undefined;
}
jQuery.postJSON = function(url, args, callback) {
args._xsrf = getCookie("_xsrf");
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(response) {
if (callback) callback(eval("(" + response + ")"));
}, error: function(response) {
console.log("ERROR:", response)
}});
};
jQuery.fn.formToDict = function() {
var fields = this.serializeArray();
var json = {}
for (var i = 0; i < fields.length; i++) {
json[fields[i].name] = fields[i].value;
}
if (json.next) delete json.next;
return json;
};
jQuery.fn.disable = function() {
this.enable(false);
return this;
};
jQuery.fn.enable = function(opt_enable) {
if (arguments.length && !opt_enable) {
this.attr("disabled", "disabled");
} else {
this.removeAttr("disabled");
}
return this;
};
var updater = {
errorSleepTime: 500,
cursor: null,
start: function() {
var args = {"_xsrf": getCookie("_xsrf")};
if (updater.cursor) args.cursor = updater.cursor;
$.ajax({url: "/poll/a/message/existing", type: "POST", dataType: "text",
data: $.param(args), success: updater.onFetchExisting,
error: updater.onError});
},
poll: function() {
var args = {"_xsrf": getCookie("_xsrf")};
if (updater.cursor) args.cursor = updater.cursor;
$.ajax({url: "/poll/a/message/updates", type: "POST", dataType: "text",
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(response) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
updater.onError();
return;
}
updater.errorSleepTime = 500;
window.setTimeout(updater.poll, 0);
},
onFetchExisting: function(response) {
try {
updater.existingMessages(eval("(" + response + ")"));
} catch (e) {
updater.onError();
return;
}
},
onError: function(response) {
updater.errorSleepTime *= 2;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(response) {
if (!response.messages) return;
updater.cursor = response.cursor;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;