Fixed file logging mechanism bug. Added prefix_length restriction
[flowspy] / flowspec / tasks.py
index 18a3033..35a52bc 100644 (file)
@@ -2,29 +2,39 @@ from utils import proxy as PR
 from celery.task import task
 from celery.task.sets import subtask
 import logging
+import json
 from celery.task.http import *
 from flowspy.utils import beanstalkc
 from django.conf import settings
 
-FORMAT = '%(asctime)s %(levelname)s: %(message)s'
-logging.basicConfig(format=FORMAT)
+import os
+
+
+
+LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')
+
+#FORMAT = '%(asctime)s %(levelname)s: %(message)s'
+#logging.basicConfig(format=FORMAT)
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.DEBUG)
+handler = logging.FileHandler(LOG_FILENAME)
+handler.setFormatter(formatter)
+logger.addHandler(handler)
+
 
 @task
 def add(route, callback=None):
     applier = PR.Applier(route_object=route)
     commit, response = applier.apply()
     if commit:
-        is_online = True
-        is_active = True
+        status = "ACTIVE"
     else:
-        is_online = False
-        is_active = True
-    route.is_online = is_online
-    route.is_active = is_active
+        status = "ERROR"
+    route.status = status
     route.response = response
-    subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response))
+    subtask(announce).delay("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
     route.save()
 
 @task
@@ -32,43 +42,94 @@ def edit(route, callback=None):
     applier = PR.Applier(route_object=route)
     commit, response = applier.apply(operation="replace")
     if commit:
-        is_online = True
+        status = "ACTIVE"
     else:
-        is_online = False
-    route.is_active = True
-    route.is_online = is_online
+        status = "ERROR"
+    route.status = status
     route.response = response
     route.save()
-    subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response))
+    subtask(announce).delay("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
 
 
 
 @task
-def delete(route, callback=None):
+def delete(route, **kwargs):
     applier = PR.Applier(route_object=route)
     commit, response = applier.apply(operation="delete")
+    reason_text = ''
     if commit:
-        is_online = False
-        is_active = False
+        status = "INACTIVE"
+        if "reason" in kwargs and kwargs['reason']=='EXPIRED':
+            status = 'EXPIRED'
+            reason_text = " Reason: %s " %status
     else:
-        is_online = route.is_online
-        is_active = route.is_active
-    route.is_online = is_online
-    route.is_active = is_active
+        status = "ERROR"
+    route.status = status
     route.response = response
     route.save()
-    subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response))
-
+    subtask(announce).delay("[%s] Rule removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
 
+# May not work in the first place... proxy is not aware of Route models
+@task
+def batch_delete(routes, **kwargs):
+    if routes:
+        for route in routes:
+            route.status='PENDING';route.save()
+        applier = PR.Applier(route_objects=routes)
+        conf = applier.delete_routes()
+        commit, response = applier.apply(configuration = conf)
+        reason_text = ''
+        if commit:
+            status = "INACTIVE"
+            if "reason" in kwargs and kwargs['reason']=='EXPIRED':
+                status = 'EXPIRED'
+                reason_text = " Reason: %s " %status
+            elif "reason" in kwargs and kwargs['reason']!='EXPIRED':
+                status = kwargs['reason']
+                reason_text = " Reason: %s " %status
+        else:
+            status = "ERROR"
+        for route in routes:
+            route.status = status
+            route.response = response
+            route.save()
+            subtask(announce).delay("[%s] Rule removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
+    else:
+        return False
 
 @task
-def announce(messg):
+def announce(messg, user):
     messg = str(messg)
+#    username = user.username
+    username = user.get_profile().peer.domain_name
     b = beanstalkc.Connection()
     b.use(settings.POLLS_TUBE)
-    b.put(messg)
+    tube_message = json.dumps({'message': messg, 'username':username})
+    b.put(tube_message)
     b.close()
 
+@task
+def check_sync(route_name=None, selected_routes = []):
+    from flowspy.flowspec.models import Route, MatchPort, MatchDscp, ThenAction
+    if not selected_routes:
+        routes = Route.objects.all()
+    else:
+        routes = selected_routes
+    if route_name:
+        routes = routes.filter(name=route_name)
+    for route in routes:
+        if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE'):
+            logger.info('Expiring route %s' %route.name)
+            subtask(delete).delay(route, reason="EXPIRED")
+        elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'):
+            route.status = 'EXPIRED'
+            route.response = 'Rule Expired'
+            logger.info('Expiring route %s' %route.name)
+            route.save()
+        else:
+            if route.status != 'EXPIRED':
+                route.check_sync()
+
 
 #def delete(route):
 #