9a79211e6ad7379a0b06e33a7914f3eb2e9fcb92
[flowspy] / poller / views.py
1 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
2 # vim: tabstop=4:shiftwidth=4:softtabstop=4:expandtab
3
4 # Copyright © 2011-2014 Greek Research and Technology Network (GRNET S.A.)
5 # Copyright © 2011-2014 Leonidas Poulopoulos (@leopoul)
6
7 # Permission to use, copy, modify, and/or distribute this software for any
8 # purpose with or without fee is hereby granted, provided that the above
9 # copyright notice and this permission notice appear in all copies.
10
11 # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD
12 # TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
13 # FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
14 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
15 # DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
16 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
17 # SOFTWARE.
18
19 from gevent import monkey
20 monkey.patch_all()
21 from gevent.pool import Pool
22 import gevent
23 import json
24
25 import uuid
26 import datetime
27 from django.shortcuts import render_to_response
28 from django.template.loader import render_to_string
29 from django.http import HttpResponse
30 from gevent.event import Event
31 from django.conf import settings
32 #from django.views.decorators.csrf import csrf_exempt
33 from django.http import HttpResponseRedirect
34 from django.core.urlresolvers import reverse
35 from django.conf import settings
36
37 import beanstalkc
38
39 import logging
40 import os
41
42 LOG_FILENAME = os.path.join(settings.LOG_FILE_LOCATION, 'poller.log')
43 formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
44 logger = logging.getLogger(__name__)
45 logger.setLevel(logging.DEBUG)
46 handler = logging.FileHandler(LOG_FILENAME)
47 handler.setFormatter(formatter)
48 logger.addHandler(handler)
49
50 def create_message(message, user, time):
51     data = {'id': str(uuid.uuid4()), 'body': message, 'user':user, 'time':time}
52     data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
53     return data
54
55
56 def json_response(value, **kwargs):
57     kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
58     return HttpResponse(json.dumps(value), **kwargs)
59
60 class Msgs(object):
61     cache_size = 500
62     
63     _instance = None
64     def __new__(cls, *args, **kwargs):
65         if not cls._instance:
66             cls._instance = super(Msgs, cls).__new__(cls, *args, **kwargs)
67         return cls._instance
68
69     def __init__(self):
70         logger.info("initializing")
71         self.user = None
72         self.user_cache = {}
73         self.user_cursor = {}
74         self.cache = []
75         self.new_message_event = None
76         self.new_message_user_event = {}
77
78     def main(self, request):
79         if self.user_cache:
80             request.session['cursor'] = self.user_cache[-1]['id']
81         return render_to_response('poll.html', {'messages': self.user_cache})
82
83     def message_existing(self, request):
84         if request.is_ajax():
85             try:
86                 user = request.user.get_profile().peer.peer_tag
87             except:
88                 user = None
89                 return False
90             try:
91                 assert(self.new_message_user_event[user])
92             except:
93                 self.new_message_user_event[user] = Event()
94             try:
95                 if self.user_cache[user]:
96                     self.user_cursor[user] = self.user_cache[user][-1]['id']
97             except:
98                 self.user_cache[user] = []
99                 self.user_cursor[user] = ''
100             return json_response({'messages': self.user_cache[user]})
101         return HttpResponseRedirect(reverse('group-routes'))
102     
103     def message_new(self, mesg=None):
104         if mesg:
105             message = mesg['message']
106             user = mesg['username']
107             logger.info("from %s" %user)
108             now = datetime.datetime.now()
109             msg = create_message(message, user, now.strftime("%Y-%m-%d %H:%M:%S"))
110         try:
111             isinstance(self.user_cache[user], list)
112         except:
113             self.user_cache[user] = []
114         self.user_cache[user].append(msg)
115         if self.user_cache[user][-1] == self.user_cache[user][0]: 
116             self.user_cursor[user] = self.user_cache[user][-1]['id']
117         else:
118             self.user_cursor[user] = self.user_cache[user][-2]['id']
119         if len(self.user_cache[user]) > self.cache_size:
120             self.user_cache[user] = self.user_cache[user][-self.cache_size:]
121         try:
122             assert(self.new_message_user_event[user])
123         except:
124             self.new_message_user_event[user] = Event()            
125         self.new_message_user_event[user].set()
126         self.new_message_user_event[user].clear()
127         return json_response(msg)
128     
129     def message_updates(self, request):
130         if request.is_ajax():
131             cursor = {}
132             try:
133                 user = request.user.get_profile().peer.peer_tag
134             except:
135                 user = None
136                 return False
137             try:
138                 cursor[user] = self.user_cursor[user]
139             except:
140                 return HttpResponse(content='', mimetype=None, status=400)
141                 
142             try:
143                 if not isinstance(self.user_cache[user], list):
144                     self.user_cache[user] = []
145             except:
146                 self.user_cache[user] = []
147             if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
148                 self.new_message_user_event[user].wait(settings.POLL_SESSION_UPDATE)
149             try:
150                 for index, m in enumerate(self.user_cache[user]):
151                     if m['id'] == cursor[user]:
152                         return json_response({'messages': self.user_cache[user][index + 1:]})
153                 return json_response({'messages': self.user_cache[user]})
154             finally:
155                 if self.user_cache[user]:
156                     self.user_cursor[user] = self.user_cache[user][-1]['id']
157         return HttpResponseRedirect(reverse('group-routes'))
158
159     def monitor_polls(self):
160         b = beanstalkc.Connection()
161         b.watch(settings.POLLS_TUBE)
162         while True:
163             job = b.reserve()
164             msg = json.loads(job.body)
165             job.bury()
166             logger.info("Got New message")
167             self.message_new(msg)
168             
169     
170     def start_polling(self):
171         logger.info("Start Polling")
172         gevent.spawn(self.monitor_polls)
173
174             
175 msgs = Msgs()
176 main = msgs.main
177
178 message_updates = msgs.message_updates
179 message_existing = msgs.message_existing
180
181
182 poll = msgs.start_polling
183 poll()