1 from gevent import monkey
3 from gevent.pool import Pool
9 from django.shortcuts import render_to_response
10 from django.template.loader import render_to_string
11 from django.http import HttpResponse
12 from gevent.event import Event
13 from django.conf import settings
14 #from django.views.decorators.csrf import csrf_exempt
15 from django.http import HttpResponseRedirect
16 from django.core.urlresolvers import reverse
19 from flowspy.utils import beanstalkc
23 FORMAT = '%(asctime)s %(levelname)s: %(message)s'
24 logging.basicConfig(format=FORMAT)
25 logger = logging.getLogger(__name__)
26 logger.setLevel(logging.DEBUG)
29 def create_message(body, user):
30 data = {'id': str(uuid.uuid4()), 'body': body, 'user':user}
31 data['html'] = render_to_string('poll_message.html', dictionary={'message': data})
35 def json_response(value, **kwargs):
36 kwargs.setdefault('content_type', 'text/javascript; charset=UTF-8')
37 return HttpResponse(simplejson.dumps(value), **kwargs)
43 logger.info("initializing")
48 self.new_message_event = None
49 self.new_message_user_event = {}
51 def main(self, request):
53 request.session['cursor'] = self.user_cache[-1]['id']
54 return render_to_response('poll.html', {'messages': self.user_cache})
56 def message_existing(self, request):
59 user = request.user.get_profile().peer.domain_name
64 assert(self.new_message_user_event[user])
66 self.new_message_user_event[user] = Event()
68 if self.user_cache[user]:
69 self.user_cursor[user] = self.user_cache[user][-1]['id']
71 self.user_cache[user] = []
72 self.user_cursor[user] = ''
73 return json_response({'messages': self.user_cache[user]})
74 return HttpResponseRedirect(reverse('group-routes'))
76 def message_new(self, mesg=None):
78 message = mesg['message']
79 user = mesg['username']
80 now = datetime.datetime.now()
81 msg = create_message("[%s]: %s"%(now.strftime("%Y-%m-%d %H:%M:%S"),message), user)
83 isinstance(self.user_cache[user], list)
85 self.user_cache[user] = []
86 self.user_cache[user].append(msg)
87 if self.user_cache[user][-1] == self.user_cache[user][0]:
88 self.user_cursor[user] = self.user_cache[user][-1]['id']
90 self.user_cursor[user] = self.user_cache[user][-2]['id']
91 if len(self.user_cache[user]) > self.cache_size:
92 self.user_cache[user] = self.user_cache[user][-self.cache_size:]
93 self.new_message_user_event[user].set()
94 self.new_message_user_event[user].clear()
95 return json_response(msg)
97 def message_updates(self, request):
101 user = request.user.get_profile().peer.domain_name
106 cursor[user] = self.user_cursor[user]
108 return HttpResponse(content='', mimetype=None, status=400)
111 if not isinstance(self.user_cache[user], list):
112 self.user_cache[user] = []
114 self.user_cache[user] = []
115 if not self.user_cache[user] or cursor[user] == self.user_cache[user][-1]['id']:
116 self.new_message_user_event[user].wait(settings.POLL_SESSION_UPDATE)
118 for index, m in enumerate(self.user_cache[user]):
119 if m['id'] == cursor[user]:
120 return json_response({'messages': self.user_cache[user][index + 1:]})
121 return json_response({'messages': self.user_cache[user]})
123 if self.user_cache[user]:
124 self.user_cursor[user] = self.user_cache[user][-1]['id']
125 return HttpResponseRedirect(reverse('group-routes'))
127 def monitor_polls(self, polls=None):
128 b = beanstalkc.Connection()
129 b.watch(settings.POLLS_TUBE)
132 msg = json.loads(job.body)
134 self.message_new(msg)
137 def start_polling(self):
138 logger.info("Start Polling")
141 p.spawn(self.monitor_polls)
146 message_new = msgs.message_new
147 message_updates = msgs.message_updates
148 message_existing = msgs.message_existing
150 poll = msgs.start_polling