From 98b30771187d68931d9e556793173c83a9eba5cf Mon Sep 17 00:00:00 2001 From: Leonidas Poulopoulos Date: Thu, 7 Aug 2014 13:41:51 +0300 Subject: [PATCH] Include timeout handling in tasks --- flowspec/tasks.py | 126 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 84 insertions(+), 42 deletions(-) diff --git a/flowspec/tasks.py b/flowspec/tasks.py index 847620c..de6f357 100644 --- a/flowspec/tasks.py +++ b/flowspec/tasks.py @@ -29,6 +29,7 @@ from django.core.mail import send_mail from django.template.loader import render_to_string from django.core.urlresolvers import reverse import os +from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded @@ -47,48 +48,95 @@ logger.addHandler(handler) @task(ignore_result=True) def add(route, callback=None): - applier = PR.Applier(route_object=route) - commit, response = applier.apply() - if commit: - status = "ACTIVE" - else: - status = "ERROR" - route.status = status - route.response = response - route.save() - announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier) + try: + applier = PR.Applier(route_object=route) + commit, response = applier.apply() + if commit: + status = "ACTIVE" + else: + status = "ERROR" + route.status = status + route.response = response + route.save() + announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier) + except TimeLimitExceeded: + route.status = "ERROR" + route.response = "Task timeout" + route.save() + announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier) + except SoftTimeLimitExceeded: + route.status = "ERROR" + route.response = "Task timeout" + route.save() + announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier) + except Exception: + route.status = "ERROR" + route.response = "Error" + route.save() + announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier) @task(ignore_result=True) def edit(route, callback=None): - applier = PR.Applier(route_object=route) - commit, response = applier.apply(operation="replace") - if commit: - status = "ACTIVE" - else: - status = "ERROR" - route.status = status - route.response = response - route.save() - announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier) - + try: + applier = PR.Applier(route_object=route) + commit, response = applier.apply(operation="replace") + if commit: + status = "ACTIVE" + else: + status = "ERROR" + route.status = status + route.response = response + route.save() + announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier) + except TimeLimitExceeded: + route.status = "ERROR" + route.response = "Task timeout" + route.save() + announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier) + except SoftTimeLimitExceeded: + route.status = "ERROR" + route.response = "Task timeout" + route.save() + announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier) + except Exception: + route.status = "ERROR" + route.response = "Error" + route.save() + announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier) @task(ignore_result=True) def delete(route, **kwargs): - applier = PR.Applier(route_object=route) - commit, response = applier.apply(operation="delete") - reason_text = '' - if commit: - status = "INACTIVE" - if "reason" in kwargs and kwargs['reason']=='EXPIRED': - status = 'EXPIRED' - reason_text = " Reason: %s " %status - else: - status = "ERROR" - route.status = status - route.response = response - route.save() - announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier) + try: + applier = PR.Applier(route_object=route) + commit, response = applier.apply(operation="delete") + reason_text = '' + if commit: + status = "INACTIVE" + if "reason" in kwargs and kwargs['reason']=='EXPIRED': + status = 'EXPIRED' + reason_text = " Reason: %s " %status + else: + status = "ERROR" + route.status = status + route.response = response + route.save() + announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier) + except TimeLimitExceeded: + route.status = "ERROR" + route.response = "Task timeout" + route.save() + announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier) + except SoftTimeLimitExceeded: + route.status = "ERROR" + route.response = "Task timeout" + route.save() + announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier) + except Exception: + route.status = "ERROR" + route.response = "Error" + route.save() + announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier) # May not work in the first place... proxy is not aware of Route models @task @@ -122,8 +170,7 @@ def batch_delete(routes, **kwargs): #@task(ignore_result=True) def announce(messg, user): messg = str(messg) -# username = user.username - username = user.get_profile().peer.domain_name + username = user.get_profile().peer.peer_tag b = beanstalkc.Connection() b.use(settings.POLLS_TUBE) tube_message = json.dumps({'message': messg, 'username':username}) @@ -144,11 +191,6 @@ def check_sync(route_name=None, selected_routes = []): if route.status != 'ERROR': logger.info('Expiring %s route %s' %(route.status, route.name)) subtask(delete).delay(route, reason="EXPIRED") -# elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'): -# route.status = 'EXPIRED' -# route.response = 'Rule Expired' -# logger.info('Expiring route %s' %route.name) -# route.save() else: if route.status != 'EXPIRED': route.check_sync() -- 1.7.10.4