Statistics
| Branch: | Tag: | Revision:

root / flowspec / tasks.py @ 6d153302

History | View | Annotate | Download (4.7 kB)

1
from utils import proxy as PR
2
from celery.task import task
3
from celery.task.sets import subtask
4
import logging
5
import json
6
from celery.task.http import *
7
from flowspy.utils import beanstalkc
8
from django.conf import settings
9

    
10
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
11
logging.basicConfig(format=FORMAT)
12
logger = logging.getLogger(__name__)
13
logger.setLevel(logging.DEBUG)
14

    
15
@task
16
def add(route, callback=None):
17
    applier = PR.Applier(route_object=route)
18
    commit, response = applier.apply()
19
    if commit:
20
        status = "ACTIVE"
21
    else:
22
        status = "ERROR"
23
    route.status = status
24
    route.response = response
25
    subtask(announce).delay("[%s] Route add: %s - Result: %s" %(route.applier, route.name, response), route.applier)
26
    route.save()
27

    
28
@task
29
def edit(route, callback=None):
30
    applier = PR.Applier(route_object=route)
31
    commit, response = applier.apply(operation="replace")
32
    if commit:
33
        status = "ACTIVE"
34
    else:
35
        status = "ERROR"
36
    route.status = status
37
    route.response = response
38
    route.save()
39
    subtask(announce).delay("[%s] Route edit: %s - Result: %s"%(route.applier, route.name, response), route.applier)
40

    
41

    
42

    
43
@task
44
def delete(route, **kwargs):
45
    applier = PR.Applier(route_object=route)
46
    commit, response = applier.apply(operation="delete")
47
    reason_text = ''
48
    if commit:
49
        status = "INACTIVE"
50
        if "reason" in kwargs and kwargs['reason']=='EXPIRED':
51
            status = 'EXPIRED'
52
            reason_text = " Reason: %s " %status
53
    else:
54
        status = "ERROR"
55
    route.status = status
56
    route.response = response
57
    route.save()
58
    subtask(announce).delay("[%s] Route removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
59

    
60
# May not work in the first place... proxy is not aware of Route models
61
@task
62
def batch_delete(routes, **kwargs):
63
    if routes:
64
        applier = PR.Applier(route_objects=routes)
65
        conf = applier.delete_routes()
66
        commit, response = applier.apply(configuration = conf)
67
        reason_text = ''
68
        if commit:
69
            status = "INACTIVE"
70
            if "reason" in kwargs and kwargs['reason']=='EXPIRED':
71
                status = 'EXPIRED'
72
                reason_text = " Reason: %s " %status
73
            elif "reason" in kwargs and kwargs['reason']!='EXPIRED':
74
                status = kwargs['reason']
75
                reason_text = " Reason: %s " %status
76
        else:
77
            status = "ERROR"
78
        for route in routes:
79
            route.status = status
80
            route.response = response
81
            route.save()
82
            subtask(announce).delay("[%s] Route removal: %s%s- Result %s" %(route.applier, route.name, reason_text, response), route.applier)
83
    else:
84
        return False
85

    
86
@task
87
def announce(messg, user):
88
    messg = str(messg)
89
#    username = user.username
90
    username = user.get_profile().peer.domain_name
91
    b = beanstalkc.Connection()
92
    b.use(settings.POLLS_TUBE)
93
    tube_message = json.dumps({'message': messg, 'username':username})
94
    b.put(tube_message)
95
    b.close()
96

    
97
@task
98
def check_sync(route_name=None, selected_routes = []):
99
    from flowspy.flowspec.models import Route, MatchPort, MatchDscp, ThenAction
100
    if not selected_routes:
101
        routes = Route.objects.all()
102
    else:
103
        routes = selected_routes
104
    if route_name:
105
        routes = routes.filter(name=route_name)
106
    for route in routes:
107
        if route.has_expired() and (route.status != 'EXPIRED' and route.status != 'ADMININACTIVE' and route.status != 'INACTIVE'):
108
            logger.info('Expiring route %s' %route.name)
109
            subtask(delete).delay(route, reason="EXPIRED")
110
        elif route.has_expired() and (route.status == 'ADMININACTIVE' or route.status == 'INACTIVE'):
111
            route.status = 'EXPIRED'
112
            route.response = 'Route Expired'
113
            route.save()
114
        else:
115
            if route.status != 'EXPIRED':
116
                route.check_sync()
117

    
118

    
119
#def delete(route):
120
#    
121
#    applier = PR.Applier(route_object=route)
122
#    commit, response = applier.apply(configuration=applier.delete_routes())
123
#    if commit:
124
#            rows = queryset.update(is_online=False, is_active=False)
125
#            queryset.update(response="Successfully removed route from network")
126
#            self.message_user(request, "Successfully removed %s routes from network" % rows)
127
#        else:
128
#            self.message_user(request, "Could not remove routes from network")
129
#    if commit:
130
#        is_online = False
131
#        is_active = False
132
#        response = "Successfully removed route from network"
133
#    else:
134
#        is_online = False
135
#        is_active = True
136
#    route.is_online = is_online
137
#    route.is_active = is_active
138
#    route.response = response
139
#    route.save()