Switch to GPLv3
[flowspy] / flowspec / tasks.py
1 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
2 # vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
3
4 # Copyright (C) 2010-2014 GRNET S.A.
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 from utils import proxy as PR
21 from celery.task import task
22 from celery.task.sets import subtask
23 import logging
24 import json
25 from celery.task.http import *
26 import beanstalkc
27 from django.conf import settings
28 import datetime
29 from django.core.mail import send_mail
30 from django.template.loader import render_to_string
31 from django.core.urlresolvers import reverse
32 import os
33 from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded
34
35
36
37 LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')
38
39 #FORMAT = '%(asctime)s %(levelname)s: %(message)s'
40 #logging.basicConfig(format=FORMAT)
41 formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
42
43 logger = logging.getLogger(__name__)
44 logger.setLevel(logging.DEBUG)
45 handler = logging.FileHandler(LOG_FILENAME)
46 handler.setFormatter(formatter)
47 logger.addHandler(handler)
48
49
50 @task(ignore_result=True)
51 def add(route, callback=None):
52     try:
53         applier = PR.Applier(route_object=route)
54         commit, response = applier.apply()
55         if commit:
56             status = "ACTIVE"
57         else:
58             status = "ERROR"
59         route.status = status
60         route.response = response
61         route.save()
62         announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
63     except TimeLimitExceeded:
64         route.status = "ERROR"
65         route.response = "Task timeout"
66         route.save()
67         announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
68     except SoftTimeLimitExceeded:
69         route.status = "ERROR"
70         route.response = "Task timeout"
71         route.save()
72         announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
73     except Exception:
74         route.status = "ERROR"
75         route.response = "Error"
76         route.save()
77         announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)   
78
79 @task(ignore_result=True)
80 def edit(route, callback=None):
81     try:
82         applier = PR.Applier(route_object=route)
83         commit, response = applier.apply(operation="replace")
84         if commit:
85             status = "ACTIVE"
86         else:
87             status = "ERROR"
88         route.status = status
89         route.response = response
90         route.save()
91         announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
92     except TimeLimitExceeded:
93         route.status = "ERROR"
94         route.response = "Task timeout"
95         route.save()
96         announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
97     except SoftTimeLimitExceeded:
98         route.status = "ERROR"
99         route.response = "Task timeout"
100         route.save()
101         announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
102     except Exception:
103         route.status = "ERROR"
104         route.response = "Error"
105         route.save()
106         announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)        
107
108
109 @task(ignore_result=True)
110 def delete(route, **kwargs):
111     try:
112         applier = PR.Applier(route_object=route)
113         commit, response = applier.apply(operation="delete")
114         reason_text = ''
115         if commit:
116             status = "INACTIVE"
117             if "reason" in kwargs and kwargs['reason']=='EXPIRED':
118                 status = 'EXPIRED'
119                 reason_text = " Reason: %s " %status
120         else:
121             status = "ERROR"
122         route.status = status
123         route.response = response
124         route.save()
125         announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
126     except TimeLimitExceeded:
127         route.status = "ERROR"
128         route.response = "Task timeout"
129         route.save()
130         announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
131     except SoftTimeLimitExceeded:
132         route.status = "ERROR"
133         route.response = "Task timeout"
134         route.save()
135         announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
136     except Exception:
137         route.status = "ERROR"
138         route.response = "Error"
139         route.save()
140         announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)   
141
142 # May not work in the first place... proxy is not aware of Route models
143 @task
144 def batch_delete(routes, **kwargs):
145     if routes:
146         for route in routes:
147             route.status='PENDING';route.save()
148         applier = PR.Applier(route_objects=routes)
149         conf = applier.delete_routes()
150         commit, response = applier.apply(configuration = conf)
151         reason_text = ''
152         if commit:
153             status = "INACTIVE"
154             if "reason" in kwargs and kwargs['reason']=='EXPIRED':
155                 status = 'EXPIRED'
156                 reason_text = " Reason: %s " %status
157             elif "reason" in kwargs and kwargs['reason']!='EXPIRED':
158                 status = kwargs['reason']
159                 reason_text = " Reason: %s " %status
160         else:
161             status = "ERROR"
162         for route in routes:
163             route.status = status
164             route.response = response
165             route.expires = datetime.date.today()
166             route.save()
167             announce("[%s] Rule removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
168     else:
169         return False
170
171 #@task(ignore_result=True)
172 def announce(messg, user):
173     messg = str(messg)
174     username = user.get_profile().peer.peer_tag
175     b = beanstalkc.Connection()
176     b.use(settings.POLLS_TUBE)
177     tube_message = json.dumps({'message': messg, 'username':username})
178     b.put(tube_message)
179     b.close()
180
181 @task
182 def check_sync(route_name=None, selected_routes = []):
183     from flowspec.models import Route, MatchPort, MatchDscp, ThenAction
184     if not selected_routes:
185         routes = Route.objects.all()
186     else:
187         routes = selected_routes
188     if route_name:
189         routes = routes.filter(name=route_name)
190     for route in routes:
191         if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE'):
192             if route.status != 'ERROR':
193                 logger.info('Expiring %s route %s' %(route.status, route.name))
194                 subtask(delete).delay(route, reason="EXPIRED")
195         else:
196             if route.status != 'EXPIRED':
197                 route.check_sync()
198
199 @task(ignore_result=True)
200 def notify_expired():
201     from flowspec.models import *
202     from django.contrib.sites.models import Site
203     logger.info('Initializing expiration notification')
204     routes = Route.objects.all()
205     for route in routes:
206         if route.status not in ['EXPIRED', 'ADMININACTIVE', 'INACTIVE', 'ERROR']:
207             expiration_days = (route.expires - datetime.date.today()).days
208             if expiration_days < settings.EXPIRATION_NOTIFY_DAYS:
209                 try:
210                     fqdn = Site.objects.get_current().domain
211                     admin_url = "https://%s%s" % \
212                     (fqdn,
213                      "/edit/%s"%route.name)
214                     mail_body = render_to_string("rule_action.txt",
215                                              {"route": route, 'expiration_days':expiration_days, 'action':'expires', 'url':admin_url})
216                     days_num = ' days'
217                     expiration_days_text = "%s %s" %('in',expiration_days)
218                     if expiration_days == 0:
219                         days_num = ' today'
220                         expiration_days_text = ''
221                     if expiration_days == 1:
222                         days_num = ' day'
223                     logger.info('Route %s expires %s%s. Notifying %s (%s)' %(route.name, expiration_days_text, days_num, route.applier.username, route.applier.email))
224                     send_mail(settings.EMAIL_SUBJECT_PREFIX + "Rule %s expires %s%s" %
225                               (route.name,expiration_days_text, days_num),
226                               mail_body, settings.SERVER_EMAIL,
227                               [route.applier.email])
228                 except Exception as e:
229                     logger.info("Exception: %s"%e)
230                     pass
231     logger.info('Expiration notification process finished')
232
233 #def delete(route):
234 #    
235 #    applier = PR.Applier(route_object=route)
236 #    commit, response = applier.apply(configuration=applier.delete_routes())
237 #    if commit:
238 #            rows = queryset.update(is_online=False, is_active=False)
239 #            queryset.update(response="Successfully removed route from network")
240 #            self.message_user(request, "Successfully removed %s routes from network" % rows)
241 #        else:
242 #            self.message_user(request, "Could not remove routes from network")
243 #    if commit:
244 #        is_online = False
245 #        is_active = False
246 #        response = "Successfully removed route from network"
247 #    else:
248 #        is_online = False
249 #        is_active = True
250 #    route.is_online = is_online
251 #    route.is_active = is_active
252 #    route.response = response
253 #    route.save()