Statistics
| Branch: | Tag: | Revision:

root / poller / views.py @ d0777394

History | View | Annotate | Download (5 kB)

1
from gevent import monkey
2
monkey.patch_all()
3
from gevent.pool import Pool
4
import json
5

    
6
import uuid
7
import simplejson
8
import datetime
9
from django.shortcuts import render_to_response
10
from django.template.loader import render_to_string
11
from django.http import HttpResponse
12
from gevent.event import Event
13
from django.conf import settings
14
#from django.views.decorators.csrf import csrf_exempt
15
from django.http import HttpResponseRedirect
16
from django.core.urlresolvers import reverse
17

    
18

    
19
from flowspy.utils import beanstalkc
20

    
21
import logging
22

    
23
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
24
logging.basicConfig(format=FORMAT)
25
logger = logging.getLogger(__name__)
26
logger.setLevel(logging.DEBUG)
27

    
28

    
29
def create_message(body, user):
30
    data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
31
    data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
32
    return data
33

    
34

    
35
def json_response(value, **kwargs):
36
    kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
37
    return HttpResponse(simplejson.dumps(value), **kwargs)
38

    
39
class Msgs(object):
40
    cache_size = 500
41

    
42
    def __init__(self):
43
        logger.info("initializing")
44
        self.user = None
45
        self.user_cache = {}
46
        self.user_cursor = {}
47
        self.cache = []
48
        self.new_message_event = None
49
        self.new_message_user_event = {}
50

    
51
    def main(self, request):
52
        if self.user_cache:
53
            request.session['cursor'] = self.user_cache[-1]['id']
54
        return render_to_response('poll.html', {'messages': self.user_cache})
55

    
56
    def message_existing(self, request):
57
        if request.is_ajax():
58
            try:
59
                user = request.user.get_profile().peer.domain_name
60
            except:
61
                user = None
62
                return False
63
            try:
64
                assert(self.new_message_user_event[user])
65
            except:
66
                self.new_message_user_event[user] = Event()
67
            try:
68
                if self.user_cache[user]:
69
                    self.user_cursor[user] = self.user_cache[user][-1]['id']
70
            except:
71
                self.user_cache[user] = []
72
                self.user_cursor[user] = ''
73
            return json_response({'messages': self.user_cache[user]})
74
        return HttpResponseRedirect(reverse('group-routes'))
75
    
76
    def message_new(self, mesg=None):
77
        if mesg:
78
            message = mesg['message']
79
            user = mesg['username']
80
            now = datetime.datetime.now()
81
            msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
82
        try:
83
            isinstance(self.user_cache[user], list)
84
        except:
85
            self.user_cache[user] = []
86
        self.user_cache[user].append(msg)
87
        if self.user_cache[user][-1] == self.user_cache[user][0]: 
88
            self.user_cursor[user] = self.user_cache[user][-1]['id']
89
        else:
90
            self.user_cursor[user] = self.user_cache[user][-2]['id']
91
        if len(self.user_cache[user]) > self.cache_size:
92
            self.user_cache[user] = self.user_cache[user][-self.cache_size:]
93
        self.new_message_user_event[user].set()
94
        self.new_message_user_event[user].clear()
95
        return json_response(msg)
96
    
97
    def message_updates(self, request):
98
        if request.is_ajax():
99
            cursor = {}
100
            try:
101
                user = request.user.get_profile().peer.domain_name
102
            except:
103
                user = None
104
                return False
105
            try:
106
                cursor[user] = self.user_cursor[user]
107
            except:
108
                return HttpResponse(content='', mimetype=None, status=400)
109
                
110
            try:
111
                if not isinstance(self.user_cache[user], list):
112
                    self.user_cache[user] = []
113
            except:
114
                self.user_cache[user] = []
115
            if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
116
                self.new_message_user_event[user].wait(settings.POLL_SESSION_UPDATE)
117
            try:
118
                for index, m in enumerate(self.user_cache[user]):
119
                    if m['id'] == cursor[user]:
120
                        return json_response({'messages': self.user_cache[user][index + 1:]})
121
                return json_response({'messages': self.user_cache[user]})
122
            finally:
123
                if self.user_cache[user]:
124
                    self.user_cursor[user] = self.user_cache[user][-1]['id']
125
        return HttpResponseRedirect(reverse('group-routes'))
126

    
127
    def monitor_polls(self, polls=None):
128
        b = beanstalkc.Connection()
129
        b.watch(settings.POLLS_TUBE)
130
        while True:
131
            job = b.reserve()
132
            msg = json.loads(job.body)
133
            job.bury()
134
            self.message_new(msg)
135
            
136
    
137
    def start_polling(self):
138
        logger.info("Start Polling")
139
        p = Pool(10)
140
        while True:
141
            p.spawn(self.monitor_polls)
142
            
143
msgs = Msgs()
144
main = msgs.main
145

    
146
message_new = msgs.message_new
147
message_updates = msgs.message_updates
148
message_existing = msgs.message_existing
149

    
150
poll = msgs.start_polling
151
poll()
152