Revision 25d08a62

b/flowspec/models.py
122 122
#            logger.info("Got save job id: %s" %response)
123 123
    
124 124
    def commit_add(self, *args, **kwargs):
125
        send_message("Adding route %s. Please wait..." %self.name)
125
        send_message("Adding route %s. Please wait..." %self.name, self.applier)
126 126
        response = add.delay(self)
127 127
        logger.info("Got save job id: %s" %response)
128 128

  
......
135 135
#        logger.info("Got delete job id: %s" %response)
136 136
        
137 137
    def commit_edit(self, *args, **kwargs):
138
        send_message("Editing route %s. Please wait..." %self.name)
138
        send_message("Editing route %s. Please wait..." %self.name, self.applier)
139 139
        response = edit.delay(self)
140 140
        logger.info("Got edit job id: %s" %response)
141 141

  
142 142
    def commit_delete(self, *args, **kwargs):
143
        send_message("Removing route %s. Please wait..." %self.name)
143
        send_message("Removing route %s. Please wait..." %self.name, self.applier)
144 144
        response = delete.delay(self)
145 145
        logger.info("Got edit job id: %s" %response)
146 146
#    
......
284 284
    get_match.short_description = 'Match statement'
285 285
    get_match.allow_tags = True
286 286

  
287
def send_message(msg):
287
def send_message(msg, user):
288
    username = user.username
288 289
    b = beanstalkc.Connection()
289 290
    b.use(settings.POLLS_TUBE)
290
    b.put(str(msg))
291
    tube_message = json.dumps({'message': str(msg), 'username':username})
292
    b.put(tube_message)
291 293
    b.close()
b/flowspec/tasks.py
2 2
from celery.task import task
3 3
from celery.task.sets import subtask
4 4
import logging
5
import json
6

  
5 7
from celery.task.http import *
6 8
from flowspy.utils import beanstalkc
7 9
from django.conf import settings
......
24 26
    route.is_online = is_online
25 27
    route.is_active = is_active
26 28
    route.response = response
27
    subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response))
29
    subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response), route.applier)
28 30
    route.save()
29 31

  
30 32
@task
......
39 41
    route.is_online = is_online
40 42
    route.response = response
41 43
    route.save()
42
    subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response))
44
    subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response), route.applier)
43 45

  
44 46

  
45 47

  
......
57 59
    route.is_active = is_active
58 60
    route.response = response
59 61
    route.save()
60
    subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response))
62
    subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response), route.applier)
61 63

  
62 64

  
63 65

  
64 66
@task
65
def announce(messg):
67
def announce(messg, user):
66 68
    messg = str(messg)
69
    username = user.username
67 70
    b = beanstalkc.Connection()
68 71
    b.use(settings.POLLS_TUBE)
69
    b.put(messg)
72
    tube_message = json.dumps({'message': messg, 'username':username})
73
    b.put(tube_message)
70 74
    b.close()
71 75

  
72 76

  
b/poller/views.py
1 1
from gevent import monkey
2 2
monkey.patch_all()
3 3
from gevent.pool import Pool
4
import json
4 5

  
5 6
import uuid
6 7
import simplejson
......
22 23
logger.setLevel(logging.DEBUG)
23 24

  
24 25

  
25
def create_message(from_, body):
26
    data = {'id': str(uuid.uuid4()), 'from': from_, 'body': body}
26
def create_message(body, user):
27
    data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
27 28
    data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
28 29
    return data
29 30

  
......
36 37
    cache_size = 200
37 38

  
38 39
    def __init__(self):
40
        self.user_cache = {}
41
        self.user_cursor = {}
39 42
        self.cache = []
40
        self.new_message_event = Event()
43
        self.new_message_event = None
44
        self.new_message_user_event = {}
41 45

  
42 46
    def main(self, request):
43
        if self.cache:
44
            request.session['cursor'] = self.cache[-1]['id']
45
        return render_to_response('poll.html', {'messages': self.cache})
47
        if self.user_cache:
48
            request.session['cursor'] = self.user_cache[-1]['id']
49
        return render_to_response('poll.html', {'messages': self.user_cache})
46 50
    
47 51
    @csrf_exempt
48 52
    def message_existing(self, request):
49
        if self.cache:
50
            request.session['cursor'] = self.cache[-1]['id']
51
        return json_response({'messages': self.cache})
53
        
54
        try:
55
            user = request.user.username
56
        except:
57
            user = None
58
        self.new_message_user_event[user] = Event()
59
        try:
60
            if self.user_cache[user]:
61
                self.user_cursor[user] = self.user_cache[user][-1]['id']
62
        except:
63
            self.user_cache[user] = []
64
            self.user_cursor[user] = ''
65
        return json_response({'messages': self.user_cache[user]})
52 66
    
53 67
    @csrf_exempt
54
    def message_new(self, request=None, mesg=None):
55
        if request:
56
            name = request.META.get('REMOTE_ADDR') or 'Anonymous'
57
            forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
58
            if forwarded_for and name == '127.0.0.1':
59
                name = forwarded_for
60
            msg = create_message(name, request.POST['body'])
68
    def message_new(self, mesg=None):
61 69
        if mesg:
62
            message = mesg
70
            message = mesg['message']
71
            user = mesg['username']
63 72
            now = datetime.datetime.now()
64
            msg = create_message("[%s]"%now.strftime("%Y-%m-%d %H:%M:%S"), message)
65
        self.cache.append(msg)
66
        if len(self.cache) > self.cache_size:
67
            self.cache = self.cache[-self.cache_size:]
68
        self.new_message_event.set()
69
        self.new_message_event.clear()
73
            msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
74
        try:
75
            isinstance(self.user_cache[user], list)
76
        except:
77
            self.user_cache[user] = []
78
        self.user_cache[user].append(msg)
79
        if self.user_cache[user][-1] == self.user_cache[user][0]: 
80
            self.user_cursor[user] = self.user_cache[user][-1]['id']
81
        else:
82
            self.user_cursor[user] = self.user_cache[user][-2]['id']
83
#        self.cache.append(msg)
84
        if len(self.user_cache[user]) > self.cache_size:
85
            self.user_cache[user] = self.user_cache[user][-self.cache_size:]
86
        self.new_message_user_event[user].set()
87
        self.new_message_user_event[user].clear()
70 88
        return json_response(msg)
71 89
    
72 90
    @csrf_exempt
73 91
    def message_updates(self, request):
74
        cursor = request.session.get('cursor')
75
        if not self.cache or cursor == self.cache[-1]['id']:
76
            self.new_message_event.wait()
77
        assert cursor != self.cache[-1]['id'], cursor
92
        cursor = {}
93
        try:
94
            user = request.user.username
95
        except:
96
            user = None
97

  
98
        cursor[user] = self.user_cursor[user]
99
            
100
        try:
101
            if not isinstance(self.user_cache[user], list):
102
                self.user_cache[user] = []
103
        except:
104
            self.user_cache[user] = []
105
        if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
106
            self.new_message_user_event[user].wait()
107
#            self.new_message_event.wait()
108
#        assert cursor[user] != self.user_cache[user][-1]['id'], cursor[user]
78 109
        try:
79
            for index, m in enumerate(self.cache):
80
                if m['id'] == cursor:
81
                    return json_response({'messages': self.cache[index + 1:]})
82
            return json_response({'messages': self.cache})
110
            for index, m in enumerate(self.user_cache[user]):
111
                if m['id'] == cursor[user]:
112
                    return json_response({'messages': self.user_cache[user][index + 1:]})
113
            return json_response({'messages': self.user_cache[user]})
83 114
        finally:
84
            if self.cache:
85
                request.session['cursor'] = self.cache[-1]['id']
86
            else:
87
                request.session.pop('cursor', None)
115
            if self.user_cache[user]:
116
                self.user_cursor[user] = self.user_cache[user][-1]['id']
117
#            else:
118
#                request.session.pop('cursor', None)
88 119

  
89 120
    def monitor_polls(self, polls=None):
90 121
        b = beanstalkc.Connection()
91 122
        b.watch(settings.POLLS_TUBE)
92 123
        while True:
93 124
            job = b.reserve()
94
            msg = job.body
125
            print job.body
126
            msg = json.loads(job.body)
95 127
            job.bury()
96
            self.message_new(None, msg)
128
            self.message_new(msg)
97 129
            
98 130
    
99 131
    def start_polling(self):
b/static/js/poller.js
120 120
    	try {
121 121
    	    updater.existingMessages(eval("(" + response + ")"));
122 122
    	} catch (e) {
123
    	    updater.onError();
123
//    	    updater.onError();
124 124
    	    return;
125 125
    	}
126 126
        },
b/templates/poll_message.html
1
<div class="message" id="m{{ message.id }}"><b>{{ message.from }}: </b>{{ message.body }}</div>
1
<div class="message" id="m{{ message.id }}">{{ message.body }}</div>

Also available in: Unified diff