Statistics
| Branch: | Tag: | Revision:

root / flowspec / tasks.py @ 2a2ea58f

History | View | Annotate | Download (7.1 kB)

1
from utils import proxy as PR
2
from celery.task import task
3
from celery.task.sets import subtask
4
import logging
5
import json
6
from celery.task.http import *
7
import beanstalkc
8
from django.conf import settings
9
import datetime
10
from django.core.mail import send_mail
11
from django.template.loader import render_to_string
12
from django.core.urlresolvers import reverse
13
import os
14

    
15

    
16

    
17
LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')
18

    
19
#FORMAT = '%(asctime)s %(levelname)s: %(message)s'
20
#logging.basicConfig(format=FORMAT)
21
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
22

    
23
logger = logging.getLogger(__name__)
24
logger.setLevel(logging.DEBUG)
25
handler = logging.FileHandler(LOG_FILENAME)
26
handler.setFormatter(formatter)
27
logger.addHandler(handler)
28

    
29

    
30
@task(ignore_result=True)
31
def add(route, callback=None):
32
    applier = PR.Applier(route_object=route)
33
    commit, response = applier.apply()
34
    if commit:
35
        status = "ACTIVE"
36
    else:
37
        status = "ERROR"
38
    route.status = status
39
    route.response = response
40
    route.save()
41
    announce("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
42

    
43
@task(ignore_result=True)
44
def edit(route, callback=None):
45
    applier = PR.Applier(route_object=route)
46
    commit, response = applier.apply(operation="replace")
47
    if commit:
48
        status = "ACTIVE"
49
    else:
50
        status = "ERROR"
51
    route.status = status
52
    route.response = response
53
    route.save()
54
    announce("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
55

    
56

    
57

    
58
@task(ignore_result=True)
59
def delete(route, **kwargs):
60
    applier = PR.Applier(route_object=route)
61
    commit, response = applier.apply(operation="delete")
62
    reason_text = ''
63
    if commit:
64
        status = "INACTIVE"
65
        if "reason" in kwargs and kwargs['reason']=='EXPIRED':
66
            status = 'EXPIRED'
67
            reason_text = " Reason: %s " %status
68
    else:
69
        status = "ERROR"
70
    route.status = status
71
    route.response = response
72
    route.save()
73
    announce("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
74

    
75
# May not work in the first place... proxy is not aware of Route models
76
@task
77
def batch_delete(routes, **kwargs):
78
    if routes:
79
        for route in routes:
80
            route.status='PENDING';route.save()
81
        applier = PR.Applier(route_objects=routes)
82
        conf = applier.delete_routes()
83
        commit, response = applier.apply(configuration = conf)
84
        reason_text = ''
85
        if commit:
86
            status = "INACTIVE"
87
            if "reason" in kwargs and kwargs['reason']=='EXPIRED':
88
                status = 'EXPIRED'
89
                reason_text = " Reason: %s " %status
90
            elif "reason" in kwargs and kwargs['reason']!='EXPIRED':
91
                status = kwargs['reason']
92
                reason_text = " Reason: %s " %status
93
        else:
94
            status = "ERROR"
95
        for route in routes:
96
            route.status = status
97
            route.response = response
98
            route.expires = datetime.date.today()
99
            route.save()
100
            announce("[%s] Rule removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
101
    else:
102
        return False
103

    
104
#@task(ignore_result=True)
105
def announce(messg, user):
106
    messg = str(messg)
107
#    username = user.username
108
    username = user.get_profile().peer.domain_name
109
    b = beanstalkc.Connection()
110
    b.use(settings.POLLS_TUBE)
111
    tube_message = json.dumps({'message': messg, 'username':username})
112
    b.put(tube_message)
113
    b.close()
114

    
115
@task
116
def check_sync(route_name=None, selected_routes = []):
117
    from flowspy.flowspec.models import Route, MatchPort, MatchDscp, ThenAction
118
    if not selected_routes:
119
        routes = Route.objects.all()
120
    else:
121
        routes = selected_routes
122
    if route_name:
123
        routes = routes.filter(name=route_name)
124
    for route in routes:
125
        if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE'):
126
            logger.info('Expiring route %s' %route.name)
127
            subtask(delete).delay(route, reason="EXPIRED")
128
#        elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'):
129
#            route.status = 'EXPIRED'
130
#            route.response = 'Rule Expired'
131
#            logger.info('Expiring route %s' %route.name)
132
#            route.save()
133
        else:
134
            if route.status != 'EXPIRED':
135
                route.check_sync()
136

    
137
@task(ignore_result=True)
138
def notify_expired():
139
    from flowspy.flowspec.models import *
140
    from django.contrib.sites.models import Site
141
    logger.info('Initializing expiration notification')
142
    routes = Route.objects.all()
143
    for route in routes:
144
        if route.status not in ['EXPIRED', 'ADMININACTIVE', 'INACTIVE', 'ERROR']:
145
            expiration_days = (route.expires - datetime.date.today()).days
146
            if expiration_days < settings.EXPIRATION_NOTIFY_DAYS:
147
                try:
148
                    fqdn = Site.objects.get_current().domain
149
                    admin_url = "https://%s%s" % \
150
                    (fqdn,
151
                     "/fod/edit/%s"%route.name)
152
                    mail_body = render_to_string("rule_expiration.txt",
153
                                             {"route": route, 'expiration_days':expiration_days, 'url':admin_url})
154
                    days_num = ' days'
155
                    expiration_days_text = "%s %s" %('in',expiration_days)
156
                    if expiration_days == 0:
157
                        days_num = ' today'
158
                        expiration_days_text = ''
159
                    if expiration_days == 1:
160
                        days_num = ' day'
161
                    logger.info('Route %s expires %s%s. Notifying %s (%s)' %(route.name, expiration_days_text, days_num, route.applier.username, route.applier.email))
162
                    send_mail(settings.EMAIL_SUBJECT_PREFIX + "Rule %s expires %s%s" %
163
                              (route.name,expiration_days_text, days_num),
164
                              mail_body, settings.SERVER_EMAIL,
165
                              [route.applier.email])
166
                except Exception as e:
167
                    logger.info("Exception: %s"%e)
168
                    pass
169
    logger.info('Expiration notification process finished')
170

    
171
#def delete(route):
172
#    
173
#    applier = PR.Applier(route_object=route)
174
#    commit, response = applier.apply(configuration=applier.delete_routes())
175
#    if commit:
176
#            rows = queryset.update(is_online=False, is_active=False)
177
#            queryset.update(response="Successfully removed route from network")
178
#            self.message_user(request, "Successfully removed %s routes from network" % rows)
179
#        else:
180
#            self.message_user(request, "Could not remove routes from network")
181
#    if commit:
182
#        is_online = False
183
#        is_active = False
184
#        response = "Successfully removed route from network"
185
#    else:
186
#        is_online = False
187
#        is_active = True
188
#    route.is_online = is_online
189
#    route.is_active = is_active
190
#    route.response = response
191
#    route.save()