Commit 98b30771 authored by Leonidas Poulopoulos's avatar Leonidas Poulopoulos

Include timeout handling in tasks

parent f929870f
...@@ -29,6 +29,7 @@ from django.core.mail import send_mail ...@@ -29,6 +29,7 @@ from django.core.mail import send_mail
from django.template.loader import render_to_string from django.template.loader import render_to_string
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
import os import os
from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded
...@@ -47,48 +48,95 @@ logger.addHandler(handler) ...@@ -47,48 +48,95 @@ logger.addHandler(handler)
@task(ignore_result=True) @task(ignore_result=True)
def add(route, callback=None): def add(route, callback=None):
applier = PR.Applier(route_object=route) try:
commit, response = applier.apply() applier = PR.Applier(route_object=route)
if commit: commit, response = applier.apply()
status = "ACTIVE" if commit:
else: status = "ACTIVE"
status = "ERROR" else:
route.status = status status = "ERROR"
route.response = response route.status = status
route.save() route.response = response
announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier) route.save()
announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
except TimeLimitExceeded:
route.status = "ERROR"
route.response = "Task timeout"
route.save()
announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
except SoftTimeLimitExceeded:
route.status = "ERROR"
route.response = "Task timeout"
route.save()
announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
except Exception:
route.status = "ERROR"
route.response = "Error"
route.save()
announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
@task(ignore_result=True) @task(ignore_result=True)
def edit(route, callback=None): def edit(route, callback=None):
applier = PR.Applier(route_object=route) try:
commit, response = applier.apply(operation="replace") applier = PR.Applier(route_object=route)
if commit: commit, response = applier.apply(operation="replace")
status = "ACTIVE" if commit:
else: status = "ACTIVE"
status = "ERROR" else:
route.status = status status = "ERROR"
route.response = response route.status = status
route.save() route.response = response
announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier) route.save()
announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
except TimeLimitExceeded:
route.status = "ERROR"
route.response = "Task timeout"
route.save()
announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
except SoftTimeLimitExceeded:
route.status = "ERROR"
route.response = "Task timeout"
route.save()
announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
except Exception:
route.status = "ERROR"
route.response = "Error"
route.save()
announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
@task(ignore_result=True) @task(ignore_result=True)
def delete(route, **kwargs): def delete(route, **kwargs):
applier = PR.Applier(route_object=route) try:
commit, response = applier.apply(operation="delete") applier = PR.Applier(route_object=route)
reason_text = '' commit, response = applier.apply(operation="delete")
if commit: reason_text = ''
status = "INACTIVE" if commit:
if "reason" in kwargs and kwargs['reason']=='EXPIRED': status = "INACTIVE"
status = 'EXPIRED' if "reason" in kwargs and kwargs['reason']=='EXPIRED':
reason_text = " Reason: %s " %status status = 'EXPIRED'
else: reason_text = " Reason: %s " %status
status = "ERROR" else:
route.status = status status = "ERROR"
route.response = response route.status = status
route.save() route.response = response
announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier) route.save()
announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
except TimeLimitExceeded:
route.status = "ERROR"
route.response = "Task timeout"
route.save()
announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
except SoftTimeLimitExceeded:
route.status = "ERROR"
route.response = "Task timeout"
route.save()
announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
except Exception:
route.status = "ERROR"
route.response = "Error"
route.save()
announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
# May not work in the first place... proxy is not aware of Route models # May not work in the first place... proxy is not aware of Route models
@task @task
...@@ -122,8 +170,7 @@ def batch_delete(routes, **kwargs): ...@@ -122,8 +170,7 @@ def batch_delete(routes, **kwargs):
#@task(ignore_result=True) #@task(ignore_result=True)
def announce(messg, user): def announce(messg, user):
messg = str(messg) messg = str(messg)
# username = user.username username = user.get_profile().peer.peer_tag
username = user.get_profile().peer.domain_name
b = beanstalkc.Connection() b = beanstalkc.Connection()
b.use(settings.POLLS_TUBE) b.use(settings.POLLS_TUBE)
tube_message = json.dumps({'message': messg, 'username':username}) tube_message = json.dumps({'message': messg, 'username':username})
...@@ -144,11 +191,6 @@ def check_sync(route_name=None, selected_routes = []): ...@@ -144,11 +191,6 @@ def check_sync(route_name=None, selected_routes = []):
if route.status != 'ERROR': if route.status != 'ERROR':
logger.info('Expiring %s route %s' %(route.status, route.name)) logger.info('Expiring %s route %s' %(route.status, route.name))
subtask(delete).delay(route, reason="EXPIRED") subtask(delete).delay(route, reason="EXPIRED")
# elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'):
# route.status = 'EXPIRED'
# route.response = 'Rule Expired'
# logger.info('Expiring route %s' %route.name)
# route.save()
else: else:
if route.status != 'EXPIRED': if route.status != 'EXPIRED':
route.check_sync() route.check_sync()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment