Include timeout handling in tasks
authorLeonidas Poulopoulos <leopoul@noc.grnet.gr>
Thu, 7 Aug 2014 10:41:51 +0000 (13:41 +0300)
committerLeonidas Poulopoulos <leopoul@noc.grnet.gr>
Thu, 7 Aug 2014 10:41:51 +0000 (13:41 +0300)
flowspec/tasks.py

index 847620c..de6f357 100644 (file)
@@ -29,6 +29,7 @@ from django.core.mail import send_mail
 from django.template.loader import render_to_string
 from django.core.urlresolvers import reverse
 import os
+from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded
 
 
 
@@ -47,48 +48,95 @@ logger.addHandler(handler)
 
 @task(ignore_result=True)
 def add(route, callback=None):
-    applier = PR.Applier(route_object=route)
-    commit, response = applier.apply()
-    if commit:
-        status = "ACTIVE"
-    else:
-        status = "ERROR"
-    route.status = status
-    route.response = response
-    route.save()
-    announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
+    try:
+        applier = PR.Applier(route_object=route)
+        commit, response = applier.apply()
+        if commit:
+            status = "ACTIVE"
+        else:
+            status = "ERROR"
+        route.status = status
+        route.response = response
+        route.save()
+        announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
+    except TimeLimitExceeded:
+        route.status = "ERROR"
+        route.response = "Task timeout"
+        route.save()
+        announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
+    except SoftTimeLimitExceeded:
+        route.status = "ERROR"
+        route.response = "Task timeout"
+        route.save()
+        announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
+    except Exception:
+        route.status = "ERROR"
+        route.response = "Error"
+        route.save()
+        announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)   
 
 @task(ignore_result=True)
 def edit(route, callback=None):
-    applier = PR.Applier(route_object=route)
-    commit, response = applier.apply(operation="replace")
-    if commit:
-        status = "ACTIVE"
-    else:
-        status = "ERROR"
-    route.status = status
-    route.response = response
-    route.save()
-    announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
-
+    try:
+        applier = PR.Applier(route_object=route)
+        commit, response = applier.apply(operation="replace")
+        if commit:
+            status = "ACTIVE"
+        else:
+            status = "ERROR"
+        route.status = status
+        route.response = response
+        route.save()
+        announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
+    except TimeLimitExceeded:
+        route.status = "ERROR"
+        route.response = "Task timeout"
+        route.save()
+        announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
+    except SoftTimeLimitExceeded:
+        route.status = "ERROR"
+        route.response = "Task timeout"
+        route.save()
+        announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
+    except Exception:
+        route.status = "ERROR"
+        route.response = "Error"
+        route.save()
+        announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)        
 
 
 @task(ignore_result=True)
 def delete(route, **kwargs):
-    applier = PR.Applier(route_object=route)
-    commit, response = applier.apply(operation="delete")
-    reason_text = ''
-    if commit:
-        status = "INACTIVE"
-        if "reason" in kwargs and kwargs['reason']=='EXPIRED':
-            status = 'EXPIRED'
-            reason_text = " Reason: %s " %status
-    else:
-        status = "ERROR"
-    route.status = status
-    route.response = response
-    route.save()
-    announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
+    try:
+        applier = PR.Applier(route_object=route)
+        commit, response = applier.apply(operation="delete")
+        reason_text = ''
+        if commit:
+            status = "INACTIVE"
+            if "reason" in kwargs and kwargs['reason']=='EXPIRED':
+                status = 'EXPIRED'
+                reason_text = " Reason: %s " %status
+        else:
+            status = "ERROR"
+        route.status = status
+        route.response = response
+        route.save()
+        announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
+    except TimeLimitExceeded:
+        route.status = "ERROR"
+        route.response = "Task timeout"
+        route.save()
+        announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
+    except SoftTimeLimitExceeded:
+        route.status = "ERROR"
+        route.response = "Task timeout"
+        route.save()
+        announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
+    except Exception:
+        route.status = "ERROR"
+        route.response = "Error"
+        route.save()
+        announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)   
 
 # May not work in the first place... proxy is not aware of Route models
 @task
@@ -122,8 +170,7 @@ def batch_delete(routes, **kwargs):
 #@task(ignore_result=True)
 def announce(messg, user):
     messg = str(messg)
-#    username = user.username
-    username = user.get_profile().peer.domain_name
+    username = user.get_profile().peer.peer_tag
     b = beanstalkc.Connection()
     b.use(settings.POLLS_TUBE)
     tube_message = json.dumps({'message': messg, 'username':username})
@@ -144,11 +191,6 @@ def check_sync(route_name=None, selected_routes = []):
             if route.status != 'ERROR':
                 logger.info('Expiring %s route %s' %(route.status, route.name))
                 subtask(delete).delay(route, reason="EXPIRED")
-#        elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'):
-#            route.status = 'EXPIRED'
-#            route.response = 'Rule Expired'
-#            logger.info('Expiring route %s' %route.name)
-#            route.save()
         else:
             if route.status != 'EXPIRED':
                 route.check_sync()