Revision 25d08a62 poller/views.py
b/poller/views.py | ||
---|---|---|
1 | 1 |
from gevent import monkey |
2 | 2 |
monkey.patch_all() |
3 | 3 |
from gevent.pool import Pool |
4 |
import json |
|
4 | 5 |
|
5 | 6 |
import uuid |
6 | 7 |
import simplejson |
... | ... | |
22 | 23 |
logger.setLevel(logging.DEBUG) |
23 | 24 |
|
24 | 25 |
|
25 |
def create_message(from_, body):
|
|
26 |
data = {'id': str(uuid.uuid4()), 'from': from_, 'body': body}
|
|
26 |
def create_message(body, user):
|
|
27 |
data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
|
|
27 | 28 |
data['html'] = render_to_string('poll_message.html', dictionary={'message': data}) |
28 | 29 |
return data |
29 | 30 |
|
... | ... | |
36 | 37 |
cache_size = 200 |
37 | 38 |
|
38 | 39 |
def __init__(self): |
40 |
self.user_cache = {} |
|
41 |
self.user_cursor = {} |
|
39 | 42 |
self.cache = [] |
40 |
self.new_message_event = Event() |
|
43 |
self.new_message_event = None |
|
44 |
self.new_message_user_event = {} |
|
41 | 45 |
|
42 | 46 |
def main(self, request): |
43 |
if self.cache: |
|
44 |
request.session['cursor'] = self.cache[-1]['id'] |
|
45 |
return render_to_response('poll.html', {'messages': self.cache}) |
|
47 |
if self.user_cache:
|
|
48 |
request.session['cursor'] = self.user_cache[-1]['id']
|
|
49 |
return render_to_response('poll.html', {'messages': self.user_cache})
|
|
46 | 50 |
|
47 | 51 |
@csrf_exempt |
48 | 52 |
def message_existing(self, request): |
49 |
if self.cache: |
|
50 |
request.session['cursor'] = self.cache[-1]['id'] |
|
51 |
return json_response({'messages': self.cache}) |
|
53 |
|
|
54 |
try: |
|
55 |
user = request.user.username |
|
56 |
except: |
|
57 |
user = None |
|
58 |
self.new_message_user_event[user] = Event() |
|
59 |
try: |
|
60 |
if self.user_cache[user]: |
|
61 |
self.user_cursor[user] = self.user_cache[user][-1]['id'] |
|
62 |
except: |
|
63 |
self.user_cache[user] = [] |
|
64 |
self.user_cursor[user] = '' |
|
65 |
return json_response({'messages': self.user_cache[user]}) |
|
52 | 66 |
|
53 | 67 |
@csrf_exempt |
54 |
def message_new(self, request=None, mesg=None): |
|
55 |
if request: |
|
56 |
name = request.META.get('REMOTE_ADDR') or 'Anonymous' |
|
57 |
forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') |
|
58 |
if forwarded_for and name == '127.0.0.1': |
|
59 |
name = forwarded_for |
|
60 |
msg = create_message(name, request.POST['body']) |
|
68 |
def message_new(self, mesg=None): |
|
61 | 69 |
if mesg: |
62 |
message = mesg |
|
70 |
message = mesg['message'] |
|
71 |
user = mesg['username'] |
|
63 | 72 |
now = datetime.datetime.now() |
64 |
msg = create_message("[%s]"%now.strftime("%Y-%m-%d %H:%M:%S"), message) |
|
65 |
self.cache.append(msg) |
|
66 |
if len(self.cache) > self.cache_size: |
|
67 |
self.cache = self.cache[-self.cache_size:] |
|
68 |
self.new_message_event.set() |
|
69 |
self.new_message_event.clear() |
|
73 |
msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user) |
|
74 |
try: |
|
75 |
isinstance(self.user_cache[user], list) |
|
76 |
except: |
|
77 |
self.user_cache[user] = [] |
|
78 |
self.user_cache[user].append(msg) |
|
79 |
if self.user_cache[user][-1] == self.user_cache[user][0]: |
|
80 |
self.user_cursor[user] = self.user_cache[user][-1]['id'] |
|
81 |
else: |
|
82 |
self.user_cursor[user] = self.user_cache[user][-2]['id'] |
|
83 |
# self.cache.append(msg) |
|
84 |
if len(self.user_cache[user]) > self.cache_size: |
|
85 |
self.user_cache[user] = self.user_cache[user][-self.cache_size:] |
|
86 |
self.new_message_user_event[user].set() |
|
87 |
self.new_message_user_event[user].clear() |
|
70 | 88 |
return json_response(msg) |
71 | 89 |
|
72 | 90 |
@csrf_exempt |
73 | 91 |
def message_updates(self, request): |
74 |
cursor = request.session.get('cursor') |
|
75 |
if not self.cache or cursor == self.cache[-1]['id']: |
|
76 |
self.new_message_event.wait() |
|
77 |
assert cursor != self.cache[-1]['id'], cursor |
|
92 |
cursor = {} |
|
93 |
try: |
|
94 |
user = request.user.username |
|
95 |
except: |
|
96 |
user = None |
|
97 |
|
|
98 |
cursor[user] = self.user_cursor[user] |
|
99 |
|
|
100 |
try: |
|
101 |
if not isinstance(self.user_cache[user], list): |
|
102 |
self.user_cache[user] = [] |
|
103 |
except: |
|
104 |
self.user_cache[user] = [] |
|
105 |
if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']: |
|
106 |
self.new_message_user_event[user].wait() |
|
107 |
# self.new_message_event.wait() |
|
108 |
# assert cursor[user] != self.user_cache[user][-1]['id'], cursor[user] |
|
78 | 109 |
try: |
79 |
for index, m in enumerate(self.cache):
|
|
80 |
if m['id'] == cursor: |
|
81 |
return json_response({'messages': self.cache[index + 1:]})
|
|
82 |
return json_response({'messages': self.cache})
|
|
110 |
for index, m in enumerate(self.user_cache[user]):
|
|
111 |
if m['id'] == cursor[user]:
|
|
112 |
return json_response({'messages': self.user_cache[user][index + 1:]})
|
|
113 |
return json_response({'messages': self.user_cache[user]})
|
|
83 | 114 |
finally: |
84 |
if self.cache:
|
|
85 |
request.session['cursor'] = self.cache[-1]['id']
|
|
86 |
else: |
|
87 |
request.session.pop('cursor', None) |
|
115 |
if self.user_cache[user]:
|
|
116 |
self.user_cursor[user] = self.user_cache[user][-1]['id']
|
|
117 |
# else:
|
|
118 |
# request.session.pop('cursor', None)
|
|
88 | 119 |
|
89 | 120 |
def monitor_polls(self, polls=None): |
90 | 121 |
b = beanstalkc.Connection() |
91 | 122 |
b.watch(settings.POLLS_TUBE) |
92 | 123 |
while True: |
93 | 124 |
job = b.reserve() |
94 |
msg = job.body |
|
125 |
print job.body |
|
126 |
msg = json.loads(job.body) |
|
95 | 127 |
job.bury() |
96 |
self.message_new(None, msg)
|
|
128 |
self.message_new(msg) |
|
97 | 129 |
|
98 | 130 |
|
99 | 131 |
def start_polling(self): |
Also available in: Unified diff