root / poller / views.py @ 3e99e2d1
History | View | Annotate | Download (3.5 kB)
1 |
from gevent import monkey |
---|---|
2 |
monkey.patch_all() |
3 |
from gevent.pool import Pool |
4 |
|
5 |
import uuid |
6 |
import simplejson |
7 |
import datetime |
8 |
from django.shortcuts import render_to_response |
9 |
from django.template.loader import render_to_string |
10 |
from django.http import HttpResponse |
11 |
from gevent.event import Event |
12 |
from django.conf import settings |
13 |
from django.views.decorators.csrf import csrf_exempt |
14 |
|
15 |
from flowspy.utils import beanstalkc |
16 |
|
17 |
import logging |
18 |
|
19 |
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
|
20 |
logging.basicConfig(format=FORMAT) |
21 |
logger = logging.getLogger(__name__) |
22 |
logger.setLevel(logging.DEBUG) |
23 |
|
24 |
|
25 |
def create_message(from_, body): |
26 |
data = {'id': str(uuid.uuid4()), 'from': from_, 'body': body} |
27 |
data['html'] = render_to_string('poll_message.html', dictionary={'message': data}) |
28 |
return data
|
29 |
|
30 |
|
31 |
def json_response(value, **kwargs): |
32 |
kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8') |
33 |
return HttpResponse(simplejson.dumps(value), **kwargs)
|
34 |
|
35 |
class Msgs(object): |
36 |
cache_size = 200
|
37 |
|
38 |
def __init__(self): |
39 |
self.cache = []
|
40 |
self.new_message_event = Event()
|
41 |
|
42 |
def main(self, request): |
43 |
if self.cache: |
44 |
request.session['cursor'] = self.cache[-1]['id'] |
45 |
return render_to_response('poll.html', {'messages': self.cache}) |
46 |
|
47 |
@csrf_exempt
|
48 |
def message_existing(self, request): |
49 |
if self.cache: |
50 |
request.session['cursor'] = self.cache[-1]['id'] |
51 |
return json_response({'messages': self.cache}) |
52 |
|
53 |
@csrf_exempt
|
54 |
def message_new(self, request=None, mesg=None): |
55 |
if request:
|
56 |
name = request.META.get('REMOTE_ADDR') or 'Anonymous' |
57 |
forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
58 |
if forwarded_for and name == '127.0.0.1': |
59 |
name = forwarded_for |
60 |
msg = create_message(name, request.POST['body'])
|
61 |
if mesg:
|
62 |
message = mesg |
63 |
now = datetime.datetime.now() |
64 |
msg = create_message("[%s]"%now.strftime("%Y-%m-%d %H:%M:%S"), message) |
65 |
self.cache.append(msg)
|
66 |
if len(self.cache) > self.cache_size: |
67 |
self.cache = self.cache[-self.cache_size:] |
68 |
self.new_message_event.set()
|
69 |
self.new_message_event.clear()
|
70 |
return json_response(msg)
|
71 |
|
72 |
@csrf_exempt
|
73 |
def message_updates(self, request): |
74 |
cursor = request.session.get('cursor')
|
75 |
if not self.cache or cursor == self.cache[-1]['id']: |
76 |
self.new_message_event.wait()
|
77 |
assert cursor != self.cache[-1]['id'], cursor |
78 |
try:
|
79 |
for index, m in enumerate(self.cache): |
80 |
if m['id'] == cursor: |
81 |
return json_response({'messages': self.cache[index + 1:]}) |
82 |
return json_response({'messages': self.cache}) |
83 |
finally:
|
84 |
if self.cache: |
85 |
request.session['cursor'] = self.cache[-1]['id'] |
86 |
else:
|
87 |
request.session.pop('cursor', None) |
88 |
|
89 |
def monitor_polls(self, polls=None): |
90 |
b = beanstalkc.Connection() |
91 |
b.watch(settings.POLLS_TUBE) |
92 |
while True: |
93 |
job = b.reserve() |
94 |
msg = job.body |
95 |
job.bury() |
96 |
self.message_new(None, msg) |
97 |
|
98 |
|
99 |
def start_polling(self): |
100 |
logger.info("Start Polling")
|
101 |
p = Pool(10)
|
102 |
while True: |
103 |
p.spawn(self.monitor_polls)
|
104 |
|
105 |
msgs = Msgs() |
106 |
|
107 |
main = msgs.main |
108 |
|
109 |
message_new = msgs.message_new |
110 |
message_updates = msgs.message_updates |
111 |
message_existing = msgs.message_existing |
112 |
|
113 |
poll = msgs.start_polling |
114 |
poll() |
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 |
|
124 |
|