Statistics
| Branch: | Tag: | Revision:

root / poller / views.py @ 3ff6f95b

History | View | Annotate | Download (6.3 kB)

1
# -*- coding: utf-8 -*- vim:fileencoding=utf-8:
2
# vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
3

    
4
# Copyright (C) 2010-2014 GRNET S.A.
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
#
19

    
20
from gevent.pool import Pool
21
import gevent
22
import json
23

    
24
import uuid
25
import datetime
26
from django.shortcuts import render_to_response
27
from django.template.loader import render_to_string
28
from django.http import HttpResponse
29
from gevent.event import Event
30
from django.conf import settings
31
#from django.views.decorators.csrf import csrf_exempt
32
from django.http import HttpResponseRedirect
33
from django.core.urlresolvers import reverse
34
from django.conf import settings
35

    
36
import beanstalkc
37

    
38
import logging
39
import os
40

    
41
LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'poller.log')
42
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
43
logger = logging.getLogger(__name__)
44
logger.setLevel(logging.DEBUG)
45
handler = logging.FileHandler(LOG_FILENAME)
46
handler.setFormatter(formatter)
47
logger.addHandler(handler)
48

    
49
def create_message(message, user, time):
50
    data = {'id': str(uuid.uuid4()), 'body': message, 'user':user, 'time':time}
51
    data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
52
    return data
53

    
54

    
55
def json_response(value, **kwargs):
56
    kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
57
    return HttpResponse(json.dumps(value), **kwargs)
58

    
59
class Msgs(object):
60
    cache_size = 500
61
    
62
    _instance = None
63
    def __new__(cls, *args, **kwargs):
64
        if not cls._instance:
65
            cls._instance = super(Msgs, cls).__new__(cls, *args, **kwargs)
66
        return cls._instance
67

    
68
    def __init__(self):
69
        logger.info("initializing")
70
        self.user = None
71
        self.user_cache = {}
72
        self.user_cursor = {}
73
        self.cache = []
74
        self.new_message_event = None
75
        self.new_message_user_event = {}
76

    
77
    def main(self, request):
78
        if self.user_cache:
79
            request.session['cursor'] = self.user_cache[-1]['id']
80
        return render_to_response('poll.html', {'messages': self.user_cache})
81

    
82
    def message_existing(self, request):
83
        if request.is_ajax():
84
            try:
85
                user = request.user.get_profile().peer.peer_tag
86
            except:
87
                user = None
88
                return False
89
            try:
90
                assert(self.new_message_user_event[user])
91
            except:
92
                self.new_message_user_event[user] = Event()
93
            try:
94
                if self.user_cache[user]:
95
                    self.user_cursor[user] = self.user_cache[user][-1]['id']
96
            except:
97
                self.user_cache[user] = []
98
                self.user_cursor[user] = ''
99
            return json_response({'messages': self.user_cache[user]})
100
        return HttpResponseRedirect(reverse('group-routes'))
101
    
102
    def message_new(self, mesg=None):
103
        if mesg:
104
            message = mesg['message']
105
            user = mesg['username']
106
            logger.info("from %s" %user)
107
            now = datetime.datetime.now()
108
            msg = create_message(message, user, now.strftime("%Y-%m-%d %H:%M:%S"))
109
        try:
110
            isinstance(self.user_cache[user], list)
111
        except:
112
            self.user_cache[user] = []
113
        self.user_cache[user].append(msg)
114
        if self.user_cache[user][-1] == self.user_cache[user][0]: 
115
            self.user_cursor[user] = self.user_cache[user][-1]['id']
116
        else:
117
            self.user_cursor[user] = self.user_cache[user][-2]['id']
118
        if len(self.user_cache[user]) > self.cache_size:
119
            self.user_cache[user] = self.user_cache[user][-self.cache_size:]
120
        try:
121
            assert(self.new_message_user_event[user])
122
        except:
123
            self.new_message_user_event[user] = Event()            
124
        self.new_message_user_event[user].set()
125
        self.new_message_user_event[user].clear()
126
        return json_response(msg)
127
    
128
    def message_updates(self, request):
129
        if request.is_ajax():
130
            cursor = {}
131
            try:
132
                user = request.user.get_profile().peer.peer_tag
133
            except:
134
                user = None
135
                return False
136
            try:
137
                cursor[user] = self.user_cursor[user]
138
            except:
139
                return HttpResponse(content='', mimetype=None, status=400)
140
                
141
            try:
142
                if not isinstance(self.user_cache[user], list):
143
                    self.user_cache[user] = []
144
            except:
145
                self.user_cache[user] = []
146
            if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
147
                self.new_message_user_event[user].wait(settings.POLL_SESSION_UPDATE)
148
            try:
149
                for index, m in enumerate(self.user_cache[user]):
150
                    if m['id'] == cursor[user]:
151
                        return json_response({'messages': self.user_cache[user][index + 1:]})
152
                return json_response({'messages': self.user_cache[user]})
153
            finally:
154
                if self.user_cache[user]:
155
                    self.user_cursor[user] = self.user_cache[user][-1]['id']
156
        return HttpResponseRedirect(reverse('group-routes'))
157

    
158
    def monitor_polls(self):
159
        b = beanstalkc.Connection()
160
        b.watch(settings.POLLS_TUBE)
161
        while True:
162
            job = b.reserve()
163
            msg = json.loads(job.body)
164
            job.bury()
165
            logger.info("Got New message")
166
            self.message_new(msg)
167
            
168
    
169
    def start_polling(self):
170
        logger.info("Start Polling")
171
        gevent.spawn(self.monitor_polls)
172

    
173
            
174
msgs = Msgs()
175
main = msgs.main
176

    
177
message_updates = msgs.message_updates
178
message_existing = msgs.message_existing
179

    
180

    
181
poll = msgs.start_polling
182
poll()