Handled a key error that could be triggered by service flaps
[flowspy] / poller / views.py
1 from gevent import monkey
2 monkey.patch_all()
3 from gevent.pool import Pool
4 import json
5
6 import uuid
7 import simplejson
8 import datetime
9 from django.shortcuts import render_to_response
10 from django.template.loader import render_to_string
11 from django.http import HttpResponse
12 from gevent.event import Event
13 from django.conf import settings
14 #from django.views.decorators.csrf import csrf_exempt
15 from django.http import HttpResponseRedirect
16 from django.core.urlresolvers import reverse
17
18
19 from flowspy.utils import beanstalkc
20
21 import logging
22
23 FORMAT = '%(asctime)s %(levelname)s: %(message)s'
24 logging.basicConfig(format=FORMAT)
25 logger = logging.getLogger(__name__)
26 logger.setLevel(logging.DEBUG)
27
28
29 def create_message(body, user):
30     data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
31     data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
32     return data
33
34
35 def json_response(value, **kwargs):
36     kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
37     return HttpResponse(simplejson.dumps(value), **kwargs)
38
39 class Msgs(object):
40     cache_size = 500
41
42     def __init__(self):
43         logger.info("initializing")
44         self.user = None
45         self.user_cache = {}
46         self.user_cursor = {}
47         self.cache = []
48         self.new_message_event = None
49         self.new_message_user_event = {}
50
51     def main(self, request):
52         if self.user_cache:
53             request.session['cursor'] = self.user_cache[-1]['id']
54         return render_to_response('poll.html', {'messages': self.user_cache})
55
56     def message_existing(self, request):
57         if request.is_ajax():
58             try:
59                 user = request.user.get_profile().peer.domain_name
60             except:
61                 user = None
62                 return False
63             try:
64                 assert(self.new_message_user_event[user])
65             except:
66                 self.new_message_user_event[user] = Event()
67             try:
68                 if self.user_cache[user]:
69                     self.user_cursor[user] = self.user_cache[user][-1]['id']
70             except:
71                 self.user_cache[user] = []
72                 self.user_cursor[user] = ''
73             return json_response({'messages': self.user_cache[user]})
74         return HttpResponseRedirect(reverse('group-routes'))
75     
76     def message_new(self, mesg=None):
77         if mesg:
78             message = mesg['message']
79             user = mesg['username']
80             now = datetime.datetime.now()
81             msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
82         try:
83             isinstance(self.user_cache[user], list)
84         except:
85             self.user_cache[user] = []
86         self.user_cache[user].append(msg)
87         if self.user_cache[user][-1] == self.user_cache[user][0]: 
88             self.user_cursor[user] = self.user_cache[user][-1]['id']
89         else:
90             self.user_cursor[user] = self.user_cache[user][-2]['id']
91         if len(self.user_cache[user]) > self.cache_size:
92             self.user_cache[user] = self.user_cache[user][-self.cache_size:]
93         self.new_message_user_event[user].set()
94         self.new_message_user_event[user].clear()
95         return json_response(msg)
96     
97     def message_updates(self, request):
98         if request.is_ajax():
99             cursor = {}
100             try:
101                 user = request.user.get_profile().peer.domain_name
102             except:
103                 user = None
104                 return False
105             try:
106                 cursor[user] = self.user_cursor[user]
107             except:
108                 return HttpResponse(content='', mimetype=None, status=400)
109                 
110             try:
111                 if not isinstance(self.user_cache[user], list):
112                     self.user_cache[user] = []
113             except:
114                 self.user_cache[user] = []
115             if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
116                 self.new_message_user_event[user].wait(settings.POLL_SESSION_UPDATE)
117             try:
118                 for index, m in enumerate(self.user_cache[user]):
119                     if m['id'] == cursor[user]:
120                         return json_response({'messages': self.user_cache[user][index + 1:]})
121                 return json_response({'messages': self.user_cache[user]})
122             finally:
123                 if self.user_cache[user]:
124                     self.user_cursor[user] = self.user_cache[user][-1]['id']
125         return HttpResponseRedirect(reverse('group-routes'))
126
127     def monitor_polls(self, polls=None):
128         b = beanstalkc.Connection()
129         b.watch(settings.POLLS_TUBE)
130         while True:
131             job = b.reserve()
132             msg = json.loads(job.body)
133             job.bury()
134             self.message_new(msg)
135             
136     
137     def start_polling(self):
138         logger.info("Start Polling")
139         p = Pool(10)
140         while True:
141             p.spawn(self.monitor_polls)
142             
143 msgs = Msgs()
144 main = msgs.main
145
146 message_new = msgs.message_new
147 message_updates = msgs.message_updates
148 message_existing = msgs.message_existing
149
150 poll = msgs.start_polling
151 poll()
152