Revision 25d08a62
b/flowspec/models.py | ||
---|---|---|
122 | 122 |
# logger.info("Got save job id: %s" %response) |
123 | 123 |
|
124 | 124 |
def commit_add(self, *args, **kwargs): |
125 |
send_message("Adding route %s. Please wait..." %self.name) |
|
125 |
send_message("Adding route %s. Please wait..." %self.name, self.applier)
|
|
126 | 126 |
response = add.delay(self) |
127 | 127 |
logger.info("Got save job id: %s" %response) |
128 | 128 |
|
... | ... | |
135 | 135 |
# logger.info("Got delete job id: %s" %response) |
136 | 136 |
|
137 | 137 |
def commit_edit(self, *args, **kwargs): |
138 |
send_message("Editing route %s. Please wait..." %self.name) |
|
138 |
send_message("Editing route %s. Please wait..." %self.name, self.applier)
|
|
139 | 139 |
response = edit.delay(self) |
140 | 140 |
logger.info("Got edit job id: %s" %response) |
141 | 141 |
|
142 | 142 |
def commit_delete(self, *args, **kwargs): |
143 |
send_message("Removing route %s. Please wait..." %self.name) |
|
143 |
send_message("Removing route %s. Please wait..." %self.name, self.applier)
|
|
144 | 144 |
response = delete.delay(self) |
145 | 145 |
logger.info("Got edit job id: %s" %response) |
146 | 146 |
# |
... | ... | |
284 | 284 |
get_match.short_description = 'Match statement' |
285 | 285 |
get_match.allow_tags = True |
286 | 286 |
|
287 |
def send_message(msg): |
|
287 |
def send_message(msg, user): |
|
288 |
username = user.username |
|
288 | 289 |
b = beanstalkc.Connection() |
289 | 290 |
b.use(settings.POLLS_TUBE) |
290 |
b.put(str(msg)) |
|
291 |
tube_message = json.dumps({'message': str(msg), 'username':username}) |
|
292 |
b.put(tube_message) |
|
291 | 293 |
b.close() |
b/flowspec/tasks.py | ||
---|---|---|
2 | 2 |
from celery.task import task |
3 | 3 |
from celery.task.sets import subtask |
4 | 4 |
import logging |
5 |
import json |
|
6 |
|
|
5 | 7 |
from celery.task.http import * |
6 | 8 |
from flowspy.utils import beanstalkc |
7 | 9 |
from django.conf import settings |
... | ... | |
24 | 26 |
route.is_online = is_online |
25 | 27 |
route.is_active = is_active |
26 | 28 |
route.response = response |
27 |
subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response)) |
|
29 |
subtask(announce).delay("Route add: %s - Result: %s" %(route.name, response), route.applier)
|
|
28 | 30 |
route.save() |
29 | 31 |
|
30 | 32 |
@task |
... | ... | |
39 | 41 |
route.is_online = is_online |
40 | 42 |
route.response = response |
41 | 43 |
route.save() |
42 |
subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response)) |
|
44 |
subtask(announce).delay("Route edit: %s - Result: %s" %(route.name, response), route.applier)
|
|
43 | 45 |
|
44 | 46 |
|
45 | 47 |
|
... | ... | |
57 | 59 |
route.is_active = is_active |
58 | 60 |
route.response = response |
59 | 61 |
route.save() |
60 |
subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response)) |
|
62 |
subtask(announce).delay("Route delete: %s - Result %s" %(route.name, response), route.applier)
|
|
61 | 63 |
|
62 | 64 |
|
63 | 65 |
|
64 | 66 |
@task |
65 |
def announce(messg): |
|
67 |
def announce(messg, user):
|
|
66 | 68 |
messg = str(messg) |
69 |
username = user.username |
|
67 | 70 |
b = beanstalkc.Connection() |
68 | 71 |
b.use(settings.POLLS_TUBE) |
69 |
b.put(messg) |
|
72 |
tube_message = json.dumps({'message': messg, 'username':username}) |
|
73 |
b.put(tube_message) |
|
70 | 74 |
b.close() |
71 | 75 |
|
72 | 76 |
|
b/poller/views.py | ||
---|---|---|
1 | 1 |
from gevent import monkey |
2 | 2 |
monkey.patch_all() |
3 | 3 |
from gevent.pool import Pool |
4 |
import json |
|
4 | 5 |
|
5 | 6 |
import uuid |
6 | 7 |
import simplejson |
... | ... | |
22 | 23 |
logger.setLevel(logging.DEBUG) |
23 | 24 |
|
24 | 25 |
|
25 |
def create_message(from_, body):
|
|
26 |
data = {'id': str(uuid.uuid4()), 'from': from_, 'body': body}
|
|
26 |
def create_message(body, user):
|
|
27 |
data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
|
|
27 | 28 |
data['html'] = render_to_string('poll_message.html', dictionary={'message': data}) |
28 | 29 |
return data |
29 | 30 |
|
... | ... | |
36 | 37 |
cache_size = 200 |
37 | 38 |
|
38 | 39 |
def __init__(self): |
40 |
self.user_cache = {} |
|
41 |
self.user_cursor = {} |
|
39 | 42 |
self.cache = [] |
40 |
self.new_message_event = Event() |
|
43 |
self.new_message_event = None |
|
44 |
self.new_message_user_event = {} |
|
41 | 45 |
|
42 | 46 |
def main(self, request): |
43 |
if self.cache: |
|
44 |
request.session['cursor'] = self.cache[-1]['id'] |
|
45 |
return render_to_response('poll.html', {'messages': self.cache}) |
|
47 |
if self.user_cache:
|
|
48 |
request.session['cursor'] = self.user_cache[-1]['id']
|
|
49 |
return render_to_response('poll.html', {'messages': self.user_cache})
|
|
46 | 50 |
|
47 | 51 |
@csrf_exempt |
48 | 52 |
def message_existing(self, request): |
49 |
if self.cache: |
|
50 |
request.session['cursor'] = self.cache[-1]['id'] |
|
51 |
return json_response({'messages': self.cache}) |
|
53 |
|
|
54 |
try: |
|
55 |
user = request.user.username |
|
56 |
except: |
|
57 |
user = None |
|
58 |
self.new_message_user_event[user] = Event() |
|
59 |
try: |
|
60 |
if self.user_cache[user]: |
|
61 |
self.user_cursor[user] = self.user_cache[user][-1]['id'] |
|
62 |
except: |
|
63 |
self.user_cache[user] = [] |
|
64 |
self.user_cursor[user] = '' |
|
65 |
return json_response({'messages': self.user_cache[user]}) |
|
52 | 66 |
|
53 | 67 |
@csrf_exempt |
54 |
def message_new(self, request=None, mesg=None): |
|
55 |
if request: |
|
56 |
name = request.META.get('REMOTE_ADDR') or 'Anonymous' |
|
57 |
forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') |
|
58 |
if forwarded_for and name == '127.0.0.1': |
|
59 |
name = forwarded_for |
|
60 |
msg = create_message(name, request.POST['body']) |
|
68 |
def message_new(self, mesg=None): |
|
61 | 69 |
if mesg: |
62 |
message = mesg |
|
70 |
message = mesg['message'] |
|
71 |
user = mesg['username'] |
|
63 | 72 |
now = datetime.datetime.now() |
64 |
msg = create_message("[%s]"%now.strftime("%Y-%m-%d %H:%M:%S"), message) |
|
65 |
self.cache.append(msg) |
|
66 |
if len(self.cache) > self.cache_size: |
|
67 |
self.cache = self.cache[-self.cache_size:] |
|
68 |
self.new_message_event.set() |
|
69 |
self.new_message_event.clear() |
|
73 |
msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user) |
|
74 |
try: |
|
75 |
isinstance(self.user_cache[user], list) |
|
76 |
except: |
|
77 |
self.user_cache[user] = [] |
|
78 |
self.user_cache[user].append(msg) |
|
79 |
if self.user_cache[user][-1] == self.user_cache[user][0]: |
|
80 |
self.user_cursor[user] = self.user_cache[user][-1]['id'] |
|
81 |
else: |
|
82 |
self.user_cursor[user] = self.user_cache[user][-2]['id'] |
|
83 |
# self.cache.append(msg) |
|
84 |
if len(self.user_cache[user]) > self.cache_size: |
|
85 |
self.user_cache[user] = self.user_cache[user][-self.cache_size:] |
|
86 |
self.new_message_user_event[user].set() |
|
87 |
self.new_message_user_event[user].clear() |
|
70 | 88 |
return json_response(msg) |
71 | 89 |
|
72 | 90 |
@csrf_exempt |
73 | 91 |
def message_updates(self, request): |
74 |
cursor = request.session.get('cursor') |
|
75 |
if not self.cache or cursor == self.cache[-1]['id']: |
|
76 |
self.new_message_event.wait() |
|
77 |
assert cursor != self.cache[-1]['id'], cursor |
|
92 |
cursor = {} |
|
93 |
try: |
|
94 |
user = request.user.username |
|
95 |
except: |
|
96 |
user = None |
|
97 |
|
|
98 |
cursor[user] = self.user_cursor[user] |
|
99 |
|
|
100 |
try: |
|
101 |
if not isinstance(self.user_cache[user], list): |
|
102 |
self.user_cache[user] = [] |
|
103 |
except: |
|
104 |
self.user_cache[user] = [] |
|
105 |
if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']: |
|
106 |
self.new_message_user_event[user].wait() |
|
107 |
# self.new_message_event.wait() |
|
108 |
# assert cursor[user] != self.user_cache[user][-1]['id'], cursor[user] |
|
78 | 109 |
try: |
79 |
for index, m in enumerate(self.cache):
|
|
80 |
if m['id'] == cursor: |
|
81 |
return json_response({'messages': self.cache[index + 1:]})
|
|
82 |
return json_response({'messages': self.cache})
|
|
110 |
for index, m in enumerate(self.user_cache[user]):
|
|
111 |
if m['id'] == cursor[user]:
|
|
112 |
return json_response({'messages': self.user_cache[user][index + 1:]})
|
|
113 |
return json_response({'messages': self.user_cache[user]})
|
|
83 | 114 |
finally: |
84 |
if self.cache:
|
|
85 |
request.session['cursor'] = self.cache[-1]['id']
|
|
86 |
else: |
|
87 |
request.session.pop('cursor', None) |
|
115 |
if self.user_cache[user]:
|
|
116 |
self.user_cursor[user] = self.user_cache[user][-1]['id']
|
|
117 |
# else:
|
|
118 |
# request.session.pop('cursor', None)
|
|
88 | 119 |
|
89 | 120 |
def monitor_polls(self, polls=None): |
90 | 121 |
b = beanstalkc.Connection() |
91 | 122 |
b.watch(settings.POLLS_TUBE) |
92 | 123 |
while True: |
93 | 124 |
job = b.reserve() |
94 |
msg = job.body |
|
125 |
print job.body |
|
126 |
msg = json.loads(job.body) |
|
95 | 127 |
job.bury() |
96 |
self.message_new(None, msg)
|
|
128 |
self.message_new(msg) |
|
97 | 129 |
|
98 | 130 |
|
99 | 131 |
def start_polling(self): |
b/static/js/poller.js | ||
---|---|---|
120 | 120 |
try { |
121 | 121 |
updater.existingMessages(eval("(" + response + ")")); |
122 | 122 |
} catch (e) { |
123 |
updater.onError(); |
|
123 |
// updater.onError();
|
|
124 | 124 |
return; |
125 | 125 |
} |
126 | 126 |
}, |
b/templates/poll_message.html | ||
---|---|---|
1 |
<div class="message" id="m{{ message.id }}"><b>{{ message.from }}: </b>{{ message.body }}</div> |
|
1 |
<div class="message" id="m{{ message.id }}">{{ message.body }}</div> |
Also available in: Unified diff