Added copyright info plus updated the README file
[flowspy] / poller / views.py
1 #
2 # -*- coding: utf-8 -*- vim:fileencoding=utf-8:
3 #Copyright © 2011-2013 Greek Research and Technology Network (GRNET S.A.)
4
5 #Developed by Leonidas Poulopoulos (leopoul-at-noc-dot-grnet-dot-gr),
6 #GRNET NOC
7 #
8 #Permission to use, copy, modify, and/or distribute this software for any
9 #purpose with or without fee is hereby granted, provided that the above
10 #copyright notice and this permission notice appear in all copies.
11 #
12 #THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH REGARD
13 #TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
14 #FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
15 #CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
16 #DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 #ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
18 #SOFTWARE.
19 #
20 from gevent import monkey
21 monkey.patch_all()
22 from gevent.pool import Pool
23 import json
24
25 import uuid
26 import datetime
27 from django.shortcuts import render_to_response
28 from django.template.loader import render_to_string
29 from django.http import HttpResponse
30 from gevent.event import Event
31 from django.conf import settings
32 #from django.views.decorators.csrf import csrf_exempt
33 from django.http import HttpResponseRedirect
34 from django.core.urlresolvers import reverse
35
36
37 import beanstalkc
38
39 import logging
40
41 FORMAT = '%(asctime)s %(levelname)s: %(message)s'
42 logging.basicConfig(format=FORMAT)
43 logger = logging.getLogger(__name__)
44 logger.setLevel(logging.DEBUG)
45
46
47 def create_message(body, user):
48     data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
49     data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
50     return data
51
52
53 def json_response(value, **kwargs):
54     kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
55     return HttpResponse(json.dumps(value), **kwargs)
56
57 class Msgs(object):
58     cache_size = 500
59
60     def __init__(self):
61         logger.info("initializing")
62         self.user = None
63         self.user_cache = {}
64         self.user_cursor = {}
65         self.cache = []
66         self.new_message_event = None
67         self.new_message_user_event = {}
68
69     def main(self, request):
70         if self.user_cache:
71             request.session['cursor'] = self.user_cache[-1]['id']
72         return render_to_response('poll.html', {'messages': self.user_cache})
73
74     def message_existing(self, request):
75         if request.is_ajax():
76             try:
77                 user = request.user.get_profile().peer.domain_name
78             except:
79                 user = None
80                 return False
81             try:
82                 assert(self.new_message_user_event[user])
83             except:
84                 self.new_message_user_event[user] = Event()
85             try:
86                 if self.user_cache[user]:
87                     self.user_cursor[user] = self.user_cache[user][-1]['id']
88             except:
89                 self.user_cache[user] = []
90                 self.user_cursor[user] = ''
91             return json_response({'messages': self.user_cache[user]})
92         return HttpResponseRedirect(reverse('group-routes'))
93     
94     def message_new(self, mesg=None):
95         if mesg:
96             message = mesg['message']
97             user = mesg['username']
98             now = datetime.datetime.now()
99             msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
100         try:
101             isinstance(self.user_cache[user], list)
102         except:
103             self.user_cache[user] = []
104         self.user_cache[user].append(msg)
105         if self.user_cache[user][-1] == self.user_cache[user][0]: 
106             self.user_cursor[user] = self.user_cache[user][-1]['id']
107         else:
108             self.user_cursor[user] = self.user_cache[user][-2]['id']
109         if len(self.user_cache[user]) > self.cache_size:
110             self.user_cache[user] = self.user_cache[user][-self.cache_size:]
111         self.new_message_user_event[user].set()
112         self.new_message_user_event[user].clear()
113         return json_response(msg)
114     
115     def message_updates(self, request):
116         if request.is_ajax():
117             cursor = {}
118             try:
119                 user = request.user.get_profile().peer.domain_name
120             except:
121                 user = None
122                 return False
123             try:
124                 cursor[user] = self.user_cursor[user]
125             except:
126                 return HttpResponse(content='', mimetype=None, status=400)
127                 
128             try:
129                 if not isinstance(self.user_cache[user], list):
130                     self.user_cache[user] = []
131             except:
132                 self.user_cache[user] = []
133             if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
134                 self.new_message_user_event[user].wait(settings.POLL_SESSION_UPDATE)
135             try:
136                 for index, m in enumerate(self.user_cache[user]):
137                     if m['id'] == cursor[user]:
138                         return json_response({'messages': self.user_cache[user][index + 1:]})
139                 return json_response({'messages': self.user_cache[user]})
140             finally:
141                 if self.user_cache[user]:
142                     self.user_cursor[user] = self.user_cache[user][-1]['id']
143         return HttpResponseRedirect(reverse('group-routes'))
144
145     def monitor_polls(self, polls=None):
146         b = beanstalkc.Connection()
147         b.watch(settings.POLLS_TUBE)
148         while True:
149             job = b.reserve()
150             msg = json.loads(job.body)
151             job.bury()
152             self.message_new(msg)
153             
154     
155     def start_polling(self):
156         logger.info("Start Polling")
157         p = Pool(10)
158         while True:
159             p.spawn(self.monitor_polls)
160             
161 msgs = Msgs()
162 main = msgs.main
163
164 message_new = msgs.message_new
165 message_updates = msgs.message_updates
166 message_existing = msgs.message_existing
167
168 poll = msgs.start_polling
169 poll()
170