Commit 25d08a62 authored by Leonidas Poulopoulos's avatar Leonidas Poulopoulos

Added user seperation into long-polling

parent 3e99e2d1
......@@ -122,7 +122,7 @@ class Route(models.Model):
# logger.info("Got save job id: %s" %response)
def commit_add(self, *args, **kwargs):
send_message("Adding route %s. Please wait..." %self.name)
send_message("Adding route %s. Please wait..." %self.name, self.applier)
response = add.delay(self)
logger.info("Got save job id: %s" %response)
......@@ -135,12 +135,12 @@ class Route(models.Model):
# logger.info("Got delete job id: %s" %response)
def commit_edit(self, *args, **kwargs):
send_message("Editing route %s. Please wait..." %self.name)
send_message("Editing route %s. Please wait..." %self.name, self.applier)
response = edit.delay(self)
logger.info("Got edit job id: %s" %response)
def commit_delete(self, *args, **kwargs):
send_message("Removing route %s. Please wait..." %self.name)
send_message("Removing route %s. Please wait..." %self.name, self.applier)
response = delete.delay(self)
logger.info("Got edit job id: %s" %response)
#
......@@ -284,8 +284,10 @@ class Route(models.Model):
get_match.short_description = 'Match statement'
get_match.allow_tags = True
def send_message(msg):
def send_message(msg, user):
username = user.username
b = beanstalkc.Connection()
b.use(settings.POLLS_TUBE)
b.put(str(msg))
tube_message = json.dumps({'message': str(msg), 'username':username})
b.put(tube_message)
b.close()
......@@ -2,6 +2,8 @@ from utils import proxy as PR
from celery.task import task
from celery.task.sets import subtask
import logging
import json
from celery.task.http import *
from flowspy.utils import beanstalkc
from django.conf import settings
......@@ -24,7 +26,7 @@ def add(route, callback=None):
route.is_online = is_online
route.is_active = is_active
route.response = response
subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response))
subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response), route.applier)
route.save()
@task
......@@ -39,7 +41,7 @@ def edit(route, callback=None):
route.is_online = is_online
route.response = response
route.save()
subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response))
subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response), route.applier)
......@@ -57,16 +59,18 @@ def delete(route, callback=None):
route.is_active = is_active
route.response = response
route.save()
subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response))
subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response), route.applier)
@task
def announce(messg):
def announce(messg, user):
messg = str(messg)
username = user.username
b = beanstalkc.Connection()
b.use(settings.POLLS_TUBE)
b.put(messg)
tube_message = json.dumps({'message': messg, 'username':username})
b.put(tube_message)
b.close()
......
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
import json
import uuid
import simplejson
......@@ -22,8 +23,8 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def create_message(from_, body):
data = {'id': str(uuid.uuid4()), 'from': from_, 'body': body}
def create_message(body, user):
data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
return data
......@@ -36,64 +37,95 @@ class Msgs(object):
cache_size = 200
def __init__(self):
self.user_cache = {}
self.user_cursor = {}
self.cache = []
self.new_message_event = Event()
self.new_message_event = None
self.new_message_user_event = {}
def main(self, request):
if self.cache:
request.session['cursor'] = self.cache[-1]['id']
return render_to_response('poll.html', {'messages': self.cache})
if self.user_cache:
request.session['cursor'] = self.user_cache[-1]['id']
return render_to_response('poll.html', {'messages': self.user_cache})
@csrf_exempt
def message_existing(self, request):
if self.cache:
request.session['cursor'] = self.cache[-1]['id']
return json_response({'messages': self.cache})
try:
user = request.user.username
except:
user = None
self.new_message_user_event[user] = Event()
try:
if self.user_cache[user]:
self.user_cursor[user] = self.user_cache[user][-1]['id']
except:
self.user_cache[user] = []
self.user_cursor[user] = ''
return json_response({'messages': self.user_cache[user]})
@csrf_exempt
def message_new(self, request=None, mesg=None):
if request:
name = request.META.get('REMOTE_ADDR') or 'Anonymous'
forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if forwarded_for and name == '127.0.0.1':
name = forwarded_for
msg = create_message(name, request.POST['body'])
def message_new(self, mesg=None):
if mesg:
message = mesg
message = mesg['message']
user = mesg['username']
now = datetime.datetime.now()
msg = create_message("[%s]"%now.strftime("%Y-%m-%d %H:%M:%S"), message)
self.cache.append(msg)
if len(self.cache) > self.cache_size:
self.cache = self.cache[-self.cache_size:]
self.new_message_event.set()
self.new_message_event.clear()
msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
try:
isinstance(self.user_cache[user], list)
except:
self.user_cache[user] = []
self.user_cache[user].append(msg)
if self.user_cache[user][-1] == self.user_cache[user][0]:
self.user_cursor[user] = self.user_cache[user][-1]['id']
else:
self.user_cursor[user] = self.user_cache[user][-2]['id']
# self.cache.append(msg)
if len(self.user_cache[user]) > self.cache_size:
self.user_cache[user] = self.user_cache[user][-self.cache_size:]
self.new_message_user_event[user].set()
self.new_message_user_event[user].clear()
return json_response(msg)
@csrf_exempt
def message_updates(self, request):
cursor = request.session.get('cursor')
if not self.cache or cursor == self.cache[-1]['id']:
self.new_message_event.wait()
assert cursor != self.cache[-1]['id'], cursor
cursor = {}
try:
user = request.user.username
except:
user = None
cursor[user] = self.user_cursor[user]
try:
if not isinstance(self.user_cache[user], list):
self.user_cache[user] = []
except:
self.user_cache[user] = []
if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
self.new_message_user_event[user].wait()
# self.new_message_event.wait()
# assert cursor[user] != self.user_cache[user][-1]['id'], cursor[user]
try:
for index, m in enumerate(self.cache):
if m['id'] == cursor:
return json_response({'messages': self.cache[index + 1:]})
return json_response({'messages': self.cache})
for index, m in enumerate(self.user_cache[user]):
if m['id'] == cursor[user]:
return json_response({'messages': self.user_cache[user][index + 1:]})
return json_response({'messages': self.user_cache[user]})
finally:
if self.cache:
request.session['cursor'] = self.cache[-1]['id']
else:
request.session.pop('cursor', None)
if self.user_cache[user]:
self.user_cursor[user] = self.user_cache[user][-1]['id']
# else:
# request.session.pop('cursor', None)
def monitor_polls(self, polls=None):
b = beanstalkc.Connection()
b.watch(settings.POLLS_TUBE)
while True:
job = b.reserve()
msg = job.body
print job.body
msg = json.loads(job.body)
job.bury()
self.message_new(None, msg)
self.message_new(msg)
def start_polling(self):
......
......@@ -120,7 +120,7 @@ var updater = {
try {
updater.existingMessages(eval("(" + response + ")"));
} catch (e) {
updater.onError();
// updater.onError();
return;
}
},
......
<div class="message" id="m{{ message.id }}"><b>{{ message.from }}: </b>{{ message.body }}</div>
<div class="message" id="m{{ message.id }}">{{ message.body }}</div>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment