Better wording in settings.py.dist
[flowspy] / utils / beanstalkc.py
1 #!/usr/bin/env python
2 """beanstalkc - A beanstalkd Client Library for Python"""
3
4 __license__ = '''
5 Copyright (C) 2008-2010 Andreas Bolka
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11     http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 '''
19
20 __version__ = '0.2.0'
21
22 import logging
23 import socket
24 import re
25
26
27 DEFAULT_HOST = 'localhost'
28 DEFAULT_PORT = 11300
29 DEFAULT_PRIORITY = 2**31
30 DEFAULT_TTR = 120
31 DEFAULT_TIMEOUT = 1
32
33
34 class BeanstalkcException(Exception): pass
35 class UnexpectedResponse(BeanstalkcException): pass
36 class CommandFailed(BeanstalkcException): pass
37 class DeadlineSoon(BeanstalkcException): pass
38 class SocketError(BeanstalkcException): pass
39
40
41 class Connection(object):
42     def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT,
43                  connection_timeout=DEFAULT_TIMEOUT):
44         self._socket = None
45         self.host = host
46         self.port = port
47         self.connection_timeout = connection_timeout
48         self.connect()
49
50     def connect(self):
51         """Connect to beanstalkd server, unless already connected."""
52         if not self.closed:
53             return
54         try:
55             self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
56             self._socket.settimeout(self.connection_timeout)
57             self._socket.connect((self.host, self.port))
58             self._socket.settimeout(None)
59             self._socket_file = self._socket.makefile('rb')
60         except socket.error, e:
61             self._socket = None
62             raise SocketError(e)
63
64     def close(self):
65         """Close connection to server, if it is open."""
66         if self.closed:
67             return
68         try:
69             self._socket.sendall('quit\r\n')
70             self._socket.close()
71         except socket.error:
72             pass
73         finally:
74             self._socket = None
75
76     @property
77     def closed(self):
78         return self._socket is None
79
80     def _interact(self, command, expected_ok, expected_err=[], size_field=None):
81         try:
82             self._socket.sendall(command)
83             status, results = self._read_response()
84             if status in expected_ok:
85                 if size_field is not None:
86                     results.append(self._read_body(int(results[size_field])))
87                 return results
88             elif status in expected_err:
89                 raise CommandFailed(command.split()[0], status, results)
90             else:
91                 raise UnexpectedResponse(command.split()[0], status, results)
92         except socket.error, e:
93             self.close()
94             raise SocketError(e)
95
96     def _read_response(self):
97         line = self._socket_file.readline()
98         if not line:
99             raise socket.error('no data read')
100         response = line.split()
101         return response[0], response[1:]
102
103     def _read_body(self, size):
104         body = self._socket_file.read(size)
105         self._socket_file.read(2) # trailing crlf
106         if size > 0 and not body:
107             raise socket.error('no data read')
108         return body
109
110     def _interact_value(self, command, expected_ok, expected_err=[]):
111         return self._interact(command, expected_ok, expected_err)[0]
112
113     def _interact_job(self, command, expected_ok, expected_err, reserved=True):
114         jid, _, body = self._interact(command, expected_ok, expected_err,
115                                       size_field=1)
116         return Job(self, int(jid), body, reserved)
117
118     def _interact_yaml_dict(self, command, expected_ok, expected_err=[]):
119         _, body, = self._interact(command, expected_ok, expected_err,
120                                   size_field=0)
121         return parse_yaml_dict(body)
122
123     def _interact_yaml_list(self, command, expected_ok, expected_err=[]):
124         _, body, = self._interact(command, expected_ok, expected_err,
125                                   size_field=0)
126         return parse_yaml_list(body)
127
128     def _interact_peek(self, command):
129         try:
130             return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
131         except CommandFailed, (_, status, results):
132             return None
133
134     # -- public interface --
135
136     def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
137         """Put a job into the current tube. Returns job id."""
138         assert isinstance(body, str), 'Job body must be a str instance'
139         jid = self._interact_value(
140                 'put %d %d %d %d\r\n%s\r\n' %
141                     (priority, delay, ttr, len(body), body),
142                 ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
143         return int(jid)
144
145     def reserve(self, timeout=None):
146         """Reserve a job from one of the watched tubes, with optional timeout in
147         seconds. Returns a Job object, or None if the request times out."""
148         if timeout is not None:
149             command = 'reserve-with-timeout %d\r\n' % timeout
150         else:
151             command = 'reserve\r\n'
152         try:
153             return self._interact_job(command,
154                                       ['RESERVED'],
155                                       ['DEADLINE_SOON', 'TIMED_OUT'])
156         except CommandFailed, (_, status, results):
157             if status == 'TIMED_OUT':
158                 return None
159             elif status == 'DEADLINE_SOON':
160                 raise DeadlineSoon(results)
161
162     def kick(self, bound=1):
163         """Kick at most bound jobs into the ready queue."""
164         return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
165
166     def peek(self, jid):
167         """Peek at a job. Returns a Job, or None."""
168         return self._interact_peek('peek %d\r\n' % jid)
169
170     def peek_ready(self):
171         """Peek at next ready job. Returns a Job, or None."""
172         return self._interact_peek('peek-ready\r\n')
173
174     def peek_delayed(self):
175         """Peek at next delayed job. Returns a Job, or None."""
176         return self._interact_peek('peek-delayed\r\n')
177
178     def peek_buried(self):
179         """Peek at next buried job. Returns a Job, or None."""
180         return self._interact_peek('peek-buried\r\n')
181
182     def tubes(self):
183         """Return a list of all existing tubes."""
184         return self._interact_yaml_list('list-tubes\r\n', ['OK'])
185
186     def using(self):
187         """Return a list of all tubes currently being used."""
188         return self._interact_value('list-tube-used\r\n', ['USING'])
189
190     def use(self, name):
191         """Use a given tube."""
192         return self._interact_value('use %s\r\n' % name, ['USING'])
193
194     def watching(self):
195         """Return a list of all tubes being watched."""
196         return self._interact_yaml_list('list-tubes-watched\r\n', ['OK'])
197
198     def watch(self, name):
199         """Watch a given tube."""
200         return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
201
202     def ignore(self, name):
203         """Stop watching a given tube."""
204         try:
205             return int(self._interact_value('ignore %s\r\n' % name,
206                                             ['WATCHING'],
207                                             ['NOT_IGNORED']))
208         except CommandFailed:
209             return 1
210
211     def stats(self):
212         """Return a dict of beanstalkd statistics."""
213         return self._interact_yaml_dict('stats\r\n', ['OK'])
214
215     def stats_tube(self, name):
216         """Return a dict of stats about a given tube."""
217         return self._interact_yaml_dict('stats-tube %s\r\n' % name,
218                                         ['OK'],
219                                         ['NOT_FOUND'])
220
221     def pause_tube(self, name, delay):
222         """Pause a tube for a given delay time, in seconds."""
223         self._interact('pause-tube %s %d\r\n' %(name, delay),
224                        ['PAUSED'],
225                        ['NOT_FOUND'])
226
227     # -- job interactors --
228
229     def delete(self, jid):
230         """Delete a job, by job id."""
231         self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
232
233     def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
234         """Release a reserved job back into the ready queue."""
235         self._interact('release %d %d %d\r\n' % (jid, priority, delay),
236                        ['RELEASED', 'BURIED'],
237                        ['NOT_FOUND'])
238
239     def bury(self, jid, priority=DEFAULT_PRIORITY):
240         """Bury a job, by job id."""
241         self._interact('bury %d %d\r\n' % (jid, priority),
242                        ['BURIED'],
243                        ['NOT_FOUND'])
244
245     def touch(self, jid):
246         """Touch a job, by job id, requesting more time to work on a reserved
247         job before it expires."""
248         self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
249
250     def stats_job(self, jid):
251         """Return a dict of stats about a job, by job id."""
252         return self._interact_yaml_dict('stats-job %d\r\n' % jid,
253                                         ['OK'],
254                                         ['NOT_FOUND'])
255
256
257 class Job(object):
258     def __init__(self, conn, jid, body, reserved=True):
259         self.conn = conn
260         self.jid = jid
261         self.body = body
262         self.reserved = reserved
263
264     def _priority(self):
265         stats = self.stats()
266         if isinstance(stats, dict):
267             return stats['pri']
268         return DEFAULT_PRIORITY
269
270     # -- public interface --
271
272     def delete(self):
273         """Delete this job."""
274         self.conn.delete(self.jid)
275         self.reserved = False
276
277     def release(self, priority=None, delay=0):
278         """Release this job back into the ready queue."""
279         if self.reserved:
280             self.conn.release(self.jid, priority or self._priority(), delay)
281             self.reserved = False
282
283     def bury(self, priority=None):
284         """Bury this job."""
285         if self.reserved:
286             self.conn.bury(self.jid, priority or self._priority())
287             self.reserved = False
288
289     def touch(self):
290         """Touch this reserved job, requesting more time to work on it before it
291         expires."""
292         if self.reserved:
293             self.conn.touch(self.jid)
294
295     def stats(self):
296         """Return a dict of stats about this job."""
297         return self.conn.stats_job(self.jid)
298
299 def parse_yaml_dict(yaml):
300     """Parse a YAML dict, in the form returned by beanstalkd."""
301     dict = {}
302     for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M):
303         key, val = m.group(1), m.group(2)
304         # Check the type of the value, and parse it.
305         if key == 'name' or key == 'tube' or key == 'version':
306             dict[key] = val   # String, even if it looks like a number
307         elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None:
308             dict[key] = int(val) # Integer value
309         elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None:
310             dict[key] = float(val) # Float value
311         else:
312             dict[key] = val     # String value
313     return dict
314
315 def parse_yaml_list(yaml):
316     """Parse a YAML list, in the form returned by beanstalkd."""
317     return re.findall(r'^- (.*)$', yaml, re.M)
318
319 if __name__ == '__main__':
320     import doctest, os, signal
321     try:
322         pid = os.spawnlp(os.P_NOWAIT,
323                          'beanstalkd',
324                          'beanstalkd', '-l', '127.0.0.1', '-p', '14711')
325         doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
326         doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
327     finally:
328         os.kill(pid, signal.SIGTERM)