Minor fixes. Added expiration date to admin interface. Distinguish between add and...
[flowspy] / flowspec / tasks.py
1 from utils import proxy as PR
2 from celery.task import task
3 from celery.task.sets import subtask
4 import logging
5 import json
6 from celery.task.http import *
7 from flowspy.utils import beanstalkc
8 from django.conf import settings
9 import datetime
10
11 import os
12
13
14
15 LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')
16
17 #FORMAT = '%(asctime)s %(levelname)s: %(message)s'
18 #logging.basicConfig(format=FORMAT)
19 formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
20
21 logger = logging.getLogger(__name__)
22 logger.setLevel(logging.DEBUG)
23 handler = logging.FileHandler(LOG_FILENAME)
24 handler.setFormatter(formatter)
25 logger.addHandler(handler)
26
27
28 @task
29 def add(route, callback=None):
30     applier = PR.Applier(route_object=route)
31     commit, response = applier.apply()
32     if commit:
33         status = "ACTIVE"
34     else:
35         status = "ERROR"
36     route.status = status
37     route.response = response
38     subtask(announce).delay("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
39     route.save()
40
41 @task
42 def edit(route, callback=None):
43     applier = PR.Applier(route_object=route)
44     commit, response = applier.apply(operation="replace")
45     if commit:
46         status = "ACTIVE"
47     else:
48         status = "ERROR"
49     route.status = status
50     route.response = response
51     route.save()
52     subtask(announce).delay("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
53
54
55
56 @task
57 def delete(route, **kwargs):
58     applier = PR.Applier(route_object=route)
59     commit, response = applier.apply(operation="delete")
60     reason_text = ''
61     if commit:
62         status = "INACTIVE"
63         if "reason" in kwargs and kwargs['reason']=='EXPIRED':
64             status = 'EXPIRED'
65             reason_text = " Reason: %s " %status
66     else:
67         status = "ERROR"
68     route.status = status
69     route.response = response
70     route.save()
71     subtask(announce).delay("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
72
73 # May not work in the first place... proxy is not aware of Route models
74 @task
75 def batch_delete(routes, **kwargs):
76     if routes:
77         for route in routes:
78             route.status='PENDING';route.save()
79         applier = PR.Applier(route_objects=routes)
80         conf = applier.delete_routes()
81         commit, response = applier.apply(configuration = conf)
82         reason_text = ''
83         if commit:
84             status = "INACTIVE"
85             if "reason" in kwargs and kwargs['reason']=='EXPIRED':
86                 status = 'EXPIRED'
87                 reason_text = " Reason: %s " %status
88             elif "reason" in kwargs and kwargs['reason']!='EXPIRED':
89                 status = kwargs['reason']
90                 reason_text = " Reason: %s " %status
91         else:
92             status = "ERROR"
93         for route in routes:
94             route.status = status
95             route.response = response
96             route.expires = datetime.date.today()
97             route.save()
98             subtask(announce).delay("[%s] Rule removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
99     else:
100         return False
101
102 @task
103 def announce(messg, user):
104     messg = str(messg)
105 #    username = user.username
106     username = user.get_profile().peer.domain_name
107     b = beanstalkc.Connection()
108     b.use(settings.POLLS_TUBE)
109     tube_message = json.dumps({'message': messg, 'username':username})
110     b.put(tube_message)
111     b.close()
112
113 @task
114 def check_sync(route_name=None, selected_routes = []):
115     from flowspy.flowspec.models import Route, MatchPort, MatchDscp, ThenAction
116     if not selected_routes:
117         routes = Route.objects.all()
118     else:
119         routes = selected_routes
120     if route_name:
121         routes = routes.filter(name=route_name)
122     for route in routes:
123         if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE'):
124             logger.info('Expiring route %s' %route.name)
125             subtask(delete).delay(route, reason="EXPIRED")
126 #        elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'):
127 #            route.status = 'EXPIRED'
128 #            route.response = 'Rule Expired'
129 #            logger.info('Expiring route %s' %route.name)
130 #            route.save()
131         else:
132             if route.status != 'EXPIRED':
133                 route.check_sync()
134
135
136 #def delete(route):
137 #    
138 #    applier = PR.Applier(route_object=route)
139 #    commit, response = applier.apply(configuration=applier.delete_routes())
140 #    if commit:
141 #            rows = queryset.update(is_online=False, is_active=False)
142 #            queryset.update(response="Successfully removed route from network")
143 #            self.message_user(request, "Successfully removed %s routes from network" % rows)
144 #        else:
145 #            self.message_user(request, "Could not remove routes from network")
146 #    if commit:
147 #        is_online = False
148 #        is_active = False
149 #        response = "Successfully removed route from network"
150 #    else:
151 #        is_online = False
152 #        is_active = True
153 #    route.is_online = is_online
154 #    route.is_active = is_active
155 #    route.response = response
156 #    route.save()