1 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
2 # vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
4 # Copyright (C) 2010-2014 GRNET S.A.
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from utils import proxy as PR
21 from celery.task import task
22 from celery.task.sets import subtask
25 from celery.task.http import *
27 from django.conf import settings
29 from django.core.mail import send_mail
30 from django.template.loader import render_to_string
31 from django.core.urlresolvers import reverse
33 from celery.exceptions import TimeLimitExceeded, SoftTimeLimitExceeded
37 LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')
39 #FORMAT = '%(asctime)s %(levelname)s: %(message)s'
40 #logging.basicConfig(format=FORMAT)
41 formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
43 logger = logging.getLogger(__name__)
44 logger.setLevel(logging.DEBUG)
45 handler = logging.FileHandler(LOG_FILENAME)
46 handler.setFormatter(formatter)
47 logger.addHandler(handler)
50 @task(ignore_result=True)
51 def add(route, callback=None):
53 applier = PR.Applier(route_object=route)
54 commit, response = applier.apply()
60 route.response = response
62 announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
63 except TimeLimitExceeded:
64 route.status = "ERROR"
65 route.response = "Task timeout"
67 announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
68 except SoftTimeLimitExceeded:
69 route.status = "ERROR"
70 route.response = "Task timeout"
72 announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
74 route.status = "ERROR"
75 route.response = "Error"
77 announce("[%s] Rule add: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
79 @task(ignore_result=True)
80 def edit(route, callback=None):
82 applier = PR.Applier(route_object=route)
83 commit, response = applier.apply(operation="replace")
89 route.response = response
91 announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
92 except TimeLimitExceeded:
93 route.status = "ERROR"
94 route.response = "Task timeout"
96 announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
97 except SoftTimeLimitExceeded:
98 route.status = "ERROR"
99 route.response = "Task timeout"
101 announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
103 route.status = "ERROR"
104 route.response = "Error"
106 announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
109 @task(ignore_result=True)
110 def delete(route, **kwargs):
112 applier = PR.Applier(route_object=route)
113 commit, response = applier.apply(operation="delete")
117 if "reason" in kwargs and kwargs['reason']=='EXPIRED':
119 reason_text = " Reason: %s " %status
122 route.status = status
123 route.response = response
125 announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
126 except TimeLimitExceeded:
127 route.status = "ERROR"
128 route.response = "Task timeout"
130 announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
131 except SoftTimeLimitExceeded:
132 route.status = "ERROR"
133 route.response = "Task timeout"
135 announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
137 route.status = "ERROR"
138 route.response = "Error"
140 announce("[%s] Suspending rule : %s - Result: %s"%(route.applier, route.name, route.response), route.applier)
142 # May not work in the first place... proxy is not aware of Route models
144 def batch_delete(routes, **kwargs):
147 route.status='PENDING';route.save()
148 applier = PR.Applier(route_objects=routes)
149 conf = applier.delete_routes()
150 commit, response = applier.apply(configuration = conf)
154 if "reason" in kwargs and kwargs['reason']=='EXPIRED':
156 reason_text = " Reason: %s " %status
157 elif "reason" in kwargs and kwargs['reason']!='EXPIRED':
158 status = kwargs['reason']
159 reason_text = " Reason: %s " %status
163 route.status = status
164 route.response = response
165 route.expires = datetime.date.today()
167 announce("[%s] Rule removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
171 #@task(ignore_result=True)
172 def announce(messg, user):
174 username = user.get_profile().peer.peer_tag
175 b = beanstalkc.Connection()
176 b.use(settings.POLLS_TUBE)
177 tube_message = json.dumps({'message': messg, 'username':username})
182 def check_sync(route_name=None, selected_routes = []):
183 from flowspec.models import Route, MatchPort, MatchDscp, ThenAction
184 if not selected_routes:
185 routes = Route.objects.all()
187 routes = selected_routes
189 routes = routes.filter(name=route_name)
191 if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE'):
192 if route.status != 'ERROR':
193 logger.info('Expiring %s route %s' %(route.status, route.name))
194 subtask(delete).delay(route, reason="EXPIRED")
196 if route.status != 'EXPIRED':
199 @task(ignore_result=True)
200 def notify_expired():
201 from flowspec.models import *
202 from django.contrib.sites.models import Site
203 logger.info('Initializing expiration notification')
204 routes = Route.objects.all()
206 if route.status not in ['EXPIRED', 'ADMININACTIVE', 'INACTIVE', 'ERROR']:
207 expiration_days = (route.expires - datetime.date.today()).days
208 if expiration_days < settings.EXPIRATION_NOTIFY_DAYS:
210 fqdn = Site.objects.get_current().domain
211 admin_url = "https://%s%s" % \
213 "/edit/%s"%route.name)
214 mail_body = render_to_string("rule_action.txt",
215 {"route": route, 'expiration_days':expiration_days, 'action':'expires', 'url':admin_url})
217 expiration_days_text = "%s %s" %('in',expiration_days)
218 if expiration_days == 0:
220 expiration_days_text = ''
221 if expiration_days == 1:
223 logger.info('Route %s expires %s%s. Notifying %s (%s)' %(route.name, expiration_days_text, days_num, route.applier.username, route.applier.email))
224 send_mail(settings.EMAIL_SUBJECT_PREFIX + "Rule %s expires %s%s" %
225 (route.name,expiration_days_text, days_num),
226 mail_body, settings.SERVER_EMAIL,
227 [route.applier.email])
228 except Exception as e:
229 logger.info("Exception: %s"%e)
231 logger.info('Expiration notification process finished')
235 # applier = PR.Applier(route_object=route)
236 # commit, response = applier.apply(configuration=applier.delete_routes())
238 # rows = queryset.update(is_online=False, is_active=False)
239 # queryset.update(response="Successfully removed route from network")
240 # self.message_user(request, "Successfully removed %s routes from network" % rows)
242 # self.message_user(request, "Could not remove routes from network")
246 # response = "Successfully removed route from network"
250 # route.is_online = is_online
251 # route.is_active = is_active
252 # route.response = response