Statistics
| Branch: | Tag: | Revision:

root / poller / views.py @ c00eba1c

History | View | Annotate | Download (5 kB)

1
from gevent import monkey
2
monkey.patch_all()
3
from gevent.pool import Pool
4
import json
5

    
6
import uuid
7
import datetime
8
from django.shortcuts import render_to_response
9
from django.template.loader import render_to_string
10
from django.http import HttpResponse
11
from gevent.event import Event
12
from django.conf import settings
13
#from django.views.decorators.csrf import csrf_exempt
14
from django.http import HttpResponseRedirect
15
from django.core.urlresolvers import reverse
16

    
17

    
18
import beanstalkc
19

    
20
import logging
21

    
22
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
23
logging.basicConfig(format=FORMAT)
24
logger = logging.getLogger(__name__)
25
logger.setLevel(logging.DEBUG)
26

    
27

    
28
def create_message(body, user):
29
    data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
30
    data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
31
    return data
32

    
33

    
34
def json_response(value, **kwargs):
35
    kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
36
    return HttpResponse(json.dumps(value), **kwargs)
37

    
38
class Msgs(object):
39
    cache_size = 500
40

    
41
    def __init__(self):
42
        logger.info("initializing")
43
        self.user = None
44
        self.user_cache = {}
45
        self.user_cursor = {}
46
        self.cache = []
47
        self.new_message_event = None
48
        self.new_message_user_event = {}
49

    
50
    def main(self, request):
51
        if self.user_cache:
52
            request.session['cursor'] = self.user_cache[-1]['id']
53
        return render_to_response('poll.html', {'messages': self.user_cache})
54

    
55
    def message_existing(self, request):
56
        if request.is_ajax():
57
            try:
58
                user = request.user.get_profile().peer.domain_name
59
            except:
60
                user = None
61
                return False
62
            try:
63
                assert(self.new_message_user_event[user])
64
            except:
65
                self.new_message_user_event[user] = Event()
66
            try:
67
                if self.user_cache[user]:
68
                    self.user_cursor[user] = self.user_cache[user][-1]['id']
69
            except:
70
                self.user_cache[user] = []
71
                self.user_cursor[user] = ''
72
            return json_response({'messages': self.user_cache[user]})
73
        return HttpResponseRedirect(reverse('group-routes'))
74
    
75
    def message_new(self, mesg=None):
76
        if mesg:
77
            message = mesg['message']
78
            user = mesg['username']
79
            now = datetime.datetime.now()
80
            msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
81
        try:
82
            isinstance(self.user_cache[user], list)
83
        except:
84
            self.user_cache[user] = []
85
        self.user_cache[user].append(msg)
86
        if self.user_cache[user][-1] == self.user_cache[user][0]: 
87
            self.user_cursor[user] = self.user_cache[user][-1]['id']
88
        else:
89
            self.user_cursor[user] = self.user_cache[user][-2]['id']
90
        if len(self.user_cache[user]) > self.cache_size:
91
            self.user_cache[user] = self.user_cache[user][-self.cache_size:]
92
        self.new_message_user_event[user].set()
93
        self.new_message_user_event[user].clear()
94
        return json_response(msg)
95
    
96
    def message_updates(self, request):
97
        if request.is_ajax():
98
            cursor = {}
99
            try:
100
                user = request.user.get_profile().peer.domain_name
101
            except:
102
                user = None
103
                return False
104
            try:
105
                cursor[user] = self.user_cursor[user]
106
            except:
107
                return HttpResponse(content='', mimetype=None, status=400)
108
                
109
            try:
110
                if not isinstance(self.user_cache[user], list):
111
                    self.user_cache[user] = []
112
            except:
113
                self.user_cache[user] = []
114
            if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
115
                self.new_message_user_event[user].wait(settings.POLL_SESSION_UPDATE)
116
            try:
117
                for index, m in enumerate(self.user_cache[user]):
118
                    if m['id'] == cursor[user]:
119
                        return json_response({'messages': self.user_cache[user][index + 1:]})
120
                return json_response({'messages': self.user_cache[user]})
121
            finally:
122
                if self.user_cache[user]:
123
                    self.user_cursor[user] = self.user_cache[user][-1]['id']
124
        return HttpResponseRedirect(reverse('group-routes'))
125

    
126
    def monitor_polls(self, polls=None):
127
        b = beanstalkc.Connection()
128
        b.watch(settings.POLLS_TUBE)
129
        while True:
130
            job = b.reserve()
131
            msg = json.loads(job.body)
132
            job.bury()
133
            self.message_new(msg)
134
            
135
    
136
    def start_polling(self):
137
        logger.info("Start Polling")
138
        p = Pool(10)
139
        while True:
140
            p.spawn(self.monitor_polls)
141
            
142
msgs = Msgs()
143
main = msgs.main
144

    
145
message_new = msgs.message_new
146
message_updates = msgs.message_updates
147
message_existing = msgs.message_existing
148

    
149
poll = msgs.start_polling
150
poll()
151