Statistics
| Branch: | Tag: | Revision:

root / flowspec / tasks.py @ 6e9a9419

History | View | Annotate | Download (5.1 kB)

1
from utils import proxy as PR
2
from celery.task import task
3
from celery.task.sets import subtask
4
import logging
5
import json
6
from celery.task.http import *
7
from flowspy.utils import beanstalkc
8
from django.conf import settings
9
import datetime
10

    
11
import os
12

    
13

    
14

    
15
LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'celery_jobs.log')
16

    
17
#FORMAT = '%(asctime)s %(levelname)s: %(message)s'
18
#logging.basicConfig(format=FORMAT)
19
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
20

    
21
logger = logging.getLogger(__name__)
22
logger.setLevel(logging.DEBUG)
23
handler = logging.FileHandler(LOG_FILENAME)
24
handler.setFormatter(formatter)
25
logger.addHandler(handler)
26

    
27

    
28
@task
29
def add(route, callback=None):
30
    applier = PR.Applier(route_object=route)
31
    commit, response = applier.apply()
32
    if commit:
33
        status = "ACTIVE"
34
    else:
35
        status = "ERROR"
36
    route.status = status
37
    route.response = response
38
    subtask(announce).delay("[%s] Rule add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
39
    route.save()
40

    
41
@task
42
def edit(route, callback=None):
43
    applier = PR.Applier(route_object=route)
44
    commit, response = applier.apply(operation="replace")
45
    if commit:
46
        status = "ACTIVE"
47
    else:
48
        status = "ERROR"
49
    route.status = status
50
    route.response = response
51
    route.save()
52
    subtask(announce).delay("[%s] Rule edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
53

    
54

    
55

    
56
@task
57
def delete(route, **kwargs):
58
    applier = PR.Applier(route_object=route)
59
    commit, response = applier.apply(operation="delete")
60
    reason_text = ''
61
    if commit:
62
        status = "INACTIVE"
63
        if "reason" in kwargs and kwargs['reason']=='EXPIRED':
64
            status = 'EXPIRED'
65
            reason_text = " Reason: %s " %status
66
    else:
67
        status = "ERROR"
68
    route.status = status
69
    route.response = response
70
    route.save()
71
    subtask(announce).delay("[%s] Suspending rule : %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
72

    
73
# May not work in the first place... proxy is not aware of Route models
74
@task
75
def batch_delete(routes, **kwargs):
76
    if routes:
77
        for route in routes:
78
            route.status='PENDING';route.save()
79
        applier = PR.Applier(route_objects=routes)
80
        conf = applier.delete_routes()
81
        commit, response = applier.apply(configuration = conf)
82
        reason_text = ''
83
        if commit:
84
            status = "INACTIVE"
85
            if "reason" in kwargs and kwargs['reason']=='EXPIRED':
86
                status = 'EXPIRED'
87
                reason_text = " Reason: %s " %status
88
            elif "reason" in kwargs and kwargs['reason']!='EXPIRED':
89
                status = kwargs['reason']
90
                reason_text = " Reason: %s " %status
91
        else:
92
            status = "ERROR"
93
        for route in routes:
94
            route.status = status
95
            route.response = response
96
            route.expires = datetime.date.today()
97
            route.save()
98
            subtask(announce).delay("[%s] Rule removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
99
    else:
100
        return False
101

    
102
@task
103
def announce(messg, user):
104
    messg = str(messg)
105
#    username = user.username
106
    username = user.get_profile().peer.domain_name
107
    b = beanstalkc.Connection()
108
    b.use(settings.POLLS_TUBE)
109
    tube_message = json.dumps({'message': messg, 'username':username})
110
    b.put(tube_message)
111
    b.close()
112

    
113
@task
114
def check_sync(route_name=None, selected_routes = []):
115
    from flowspy.flowspec.models import Route, MatchPort, MatchDscp, ThenAction
116
    if not selected_routes:
117
        routes = Route.objects.all()
118
    else:
119
        routes = selected_routes
120
    if route_name:
121
        routes = routes.filter(name=route_name)
122
    for route in routes:
123
        if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE'):
124
            logger.info('Expiring route %s' %route.name)
125
            subtask(delete).delay(route, reason="EXPIRED")
126
#        elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'):
127
#            route.status = 'EXPIRED'
128
#            route.response = 'Rule Expired'
129
#            logger.info('Expiring route %s' %route.name)
130
#            route.save()
131
        else:
132
            if route.status != 'EXPIRED':
133
                route.check_sync()
134

    
135

    
136
#def delete(route):
137
#    
138
#    applier = PR.Applier(route_object=route)
139
#    commit, response = applier.apply(configuration=applier.delete_routes())
140
#    if commit:
141
#            rows = queryset.update(is_online=False, is_active=False)
142
#            queryset.update(response="Successfully removed route from network")
143
#            self.message_user(request, "Successfully removed %s routes from network" % rows)
144
#        else:
145
#            self.message_user(request, "Could not remove routes from network")
146
#    if commit:
147
#        is_online = False
148
#        is_active = False
149
#        response = "Successfully removed route from network"
150
#    else:
151
#        is_online = False
152
#        is_active = True
153
#    route.is_online = is_online
154
#    route.is_active = is_active
155
#    route.response = response
156
#    route.save()