Statistics
| Branch: | Tag: | Revision:

root / poller / views.py @ 25d08a62

History | View | Annotate | Download (4.6 kB)

1
from gevent import monkey
2
monkey.patch_all()
3
from gevent.pool import Pool
4
import json
5

    
6
import uuid
7
import simplejson
8
import datetime
9
from django.shortcuts import render_to_response
10
from django.template.loader import render_to_string
11
from django.http import HttpResponse
12
from gevent.event import Event
13
from django.conf import settings
14
from django.views.decorators.csrf import csrf_exempt
15

    
16
from flowspy.utils import beanstalkc
17

    
18
import logging
19

    
20
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
21
logging.basicConfig(format=FORMAT)
22
logger = logging.getLogger(__name__)
23
logger.setLevel(logging.DEBUG)
24

    
25

    
26
def create_message(body, user):
27
    data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
28
    data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
29
    return data
30

    
31

    
32
def json_response(value, **kwargs):
33
    kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
34
    return HttpResponse(simplejson.dumps(value), **kwargs)
35

    
36
class Msgs(object):
37
    cache_size = 200
38

    
39
    def __init__(self):
40
        self.user_cache = {}
41
        self.user_cursor = {}
42
        self.cache = []
43
        self.new_message_event = None
44
        self.new_message_user_event = {}
45

    
46
    def main(self, request):
47
        if self.user_cache:
48
            request.session['cursor'] = self.user_cache[-1]['id']
49
        return render_to_response('poll.html', {'messages': self.user_cache})
50
    
51
    @csrf_exempt
52
    def message_existing(self, request):
53
        
54
        try:
55
            user = request.user.username
56
        except:
57
            user = None
58
        self.new_message_user_event[user] = Event()
59
        try:
60
            if self.user_cache[user]:
61
                self.user_cursor[user] = self.user_cache[user][-1]['id']
62
        except:
63
            self.user_cache[user] = []
64
            self.user_cursor[user] = ''
65
        return json_response({'messages': self.user_cache[user]})
66
    
67
    @csrf_exempt
68
    def message_new(self, mesg=None):
69
        if mesg:
70
            message = mesg['message']
71
            user = mesg['username']
72
            now = datetime.datetime.now()
73
            msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
74
        try:
75
            isinstance(self.user_cache[user], list)
76
        except:
77
            self.user_cache[user] = []
78
        self.user_cache[user].append(msg)
79
        if self.user_cache[user][-1] == self.user_cache[user][0]: 
80
            self.user_cursor[user] = self.user_cache[user][-1]['id']
81
        else:
82
            self.user_cursor[user] = self.user_cache[user][-2]['id']
83
#        self.cache.append(msg)
84
        if len(self.user_cache[user]) > self.cache_size:
85
            self.user_cache[user] = self.user_cache[user][-self.cache_size:]
86
        self.new_message_user_event[user].set()
87
        self.new_message_user_event[user].clear()
88
        return json_response(msg)
89
    
90
    @csrf_exempt
91
    def message_updates(self, request):
92
        cursor = {}
93
        try:
94
            user = request.user.username
95
        except:
96
            user = None
97

    
98
        cursor[user] = self.user_cursor[user]
99
            
100
        try:
101
            if not isinstance(self.user_cache[user], list):
102
                self.user_cache[user] = []
103
        except:
104
            self.user_cache[user] = []
105
        if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
106
            self.new_message_user_event[user].wait()
107
#            self.new_message_event.wait()
108
#        assert cursor[user] != self.user_cache[user][-1]['id'], cursor[user]
109
        try:
110
            for index, m in enumerate(self.user_cache[user]):
111
                if m['id'] == cursor[user]:
112
                    return json_response({'messages': self.user_cache[user][index + 1:]})
113
            return json_response({'messages': self.user_cache[user]})
114
        finally:
115
            if self.user_cache[user]:
116
                self.user_cursor[user] = self.user_cache[user][-1]['id']
117
#            else:
118
#                request.session.pop('cursor', None)
119

    
120
    def monitor_polls(self, polls=None):
121
        b = beanstalkc.Connection()
122
        b.watch(settings.POLLS_TUBE)
123
        while True:
124
            job = b.reserve()
125
            print job.body
126
            msg = json.loads(job.body)
127
            job.bury()
128
            self.message_new(msg)
129
            
130
    
131
    def start_polling(self):
132
        logger.info("Start Polling")
133
        p = Pool(10)
134
        while True:
135
            p.spawn(self.monitor_polls)
136
            
137
msgs = Msgs()
138

    
139
main = msgs.main
140

    
141
message_new = msgs.message_new
142
message_updates = msgs.message_updates
143
message_existing = msgs.message_existing
144

    
145
poll = msgs.start_polling
146
poll()
147

    
148

    
149

    
150

    
151

    
152

    
153

    
154

    
155

    
156