root / utils / beanstalkc.py @ 9cad4715
History | View | Annotate | Download (11.5 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
"""beanstalkc - A beanstalkd Client Library for Python"""
|
3 |
|
4 |
__license__ = '''
|
5 |
Copyright (C) 2008-2010 Andreas Bolka
|
6 |
|
7 |
Licensed under the Apache License, Version 2.0 (the "License");
|
8 |
you may not use this file except in compliance with the License.
|
9 |
You may obtain a copy of the License at
|
10 |
|
11 |
http://www.apache.org/licenses/LICENSE-2.0
|
12 |
|
13 |
Unless required by applicable law or agreed to in writing, software
|
14 |
distributed under the License is distributed on an "AS IS" BASIS,
|
15 |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
16 |
See the License for the specific language governing permissions and
|
17 |
limitations under the License.
|
18 |
'''
|
19 |
|
20 |
__version__ = '0.2.0'
|
21 |
|
22 |
import logging |
23 |
import socket |
24 |
import re |
25 |
|
26 |
|
27 |
DEFAULT_HOST = 'localhost'
|
28 |
DEFAULT_PORT = 11300
|
29 |
DEFAULT_PRIORITY = 2**31 |
30 |
DEFAULT_TTR = 120
|
31 |
DEFAULT_TIMEOUT = 1
|
32 |
|
33 |
|
34 |
class BeanstalkcException(Exception): pass |
35 |
class UnexpectedResponse(BeanstalkcException): pass |
36 |
class CommandFailed(BeanstalkcException): pass |
37 |
class DeadlineSoon(BeanstalkcException): pass |
38 |
class SocketError(BeanstalkcException): pass |
39 |
|
40 |
|
41 |
class Connection(object): |
42 |
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, |
43 |
connection_timeout=DEFAULT_TIMEOUT): |
44 |
self._socket = None |
45 |
self.host = host
|
46 |
self.port = port
|
47 |
self.connection_timeout = connection_timeout
|
48 |
self.connect()
|
49 |
|
50 |
def connect(self): |
51 |
"""Connect to beanstalkd server, unless already connected."""
|
52 |
if not self.closed: |
53 |
return
|
54 |
try:
|
55 |
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
56 |
self._socket.settimeout(self.connection_timeout) |
57 |
self._socket.connect((self.host, self.port)) |
58 |
self._socket.settimeout(None) |
59 |
self._socket_file = self._socket.makefile('rb') |
60 |
except socket.error, e:
|
61 |
self._socket = None |
62 |
raise SocketError(e)
|
63 |
|
64 |
def close(self): |
65 |
"""Close connection to server, if it is open."""
|
66 |
if self.closed: |
67 |
return
|
68 |
try:
|
69 |
self._socket.sendall('quit\r\n') |
70 |
self._socket.close()
|
71 |
except socket.error:
|
72 |
pass
|
73 |
finally:
|
74 |
self._socket = None |
75 |
|
76 |
@property
|
77 |
def closed(self): |
78 |
return self._socket is None |
79 |
|
80 |
def _interact(self, command, expected_ok, expected_err=[], size_field=None): |
81 |
try:
|
82 |
self._socket.sendall(command)
|
83 |
status, results = self._read_response()
|
84 |
if status in expected_ok: |
85 |
if size_field is not None: |
86 |
results.append(self._read_body(int(results[size_field]))) |
87 |
return results
|
88 |
elif status in expected_err: |
89 |
raise CommandFailed(command.split()[0], status, results) |
90 |
else:
|
91 |
raise UnexpectedResponse(command.split()[0], status, results) |
92 |
except socket.error, e:
|
93 |
self.close()
|
94 |
raise SocketError(e)
|
95 |
|
96 |
def _read_response(self): |
97 |
line = self._socket_file.readline()
|
98 |
if not line: |
99 |
raise socket.error('no data read') |
100 |
response = line.split() |
101 |
return response[0], response[1:] |
102 |
|
103 |
def _read_body(self, size): |
104 |
body = self._socket_file.read(size)
|
105 |
self._socket_file.read(2) # trailing crlf |
106 |
if size > 0 and not body: |
107 |
raise socket.error('no data read') |
108 |
return body
|
109 |
|
110 |
def _interact_value(self, command, expected_ok, expected_err=[]): |
111 |
return self._interact(command, expected_ok, expected_err)[0] |
112 |
|
113 |
def _interact_job(self, command, expected_ok, expected_err, reserved=True): |
114 |
jid, _, body = self._interact(command, expected_ok, expected_err,
|
115 |
size_field=1)
|
116 |
return Job(self, int(jid), body, reserved) |
117 |
|
118 |
def _interact_yaml_dict(self, command, expected_ok, expected_err=[]): |
119 |
_, body, = self._interact(command, expected_ok, expected_err,
|
120 |
size_field=0)
|
121 |
return parse_yaml_dict(body)
|
122 |
|
123 |
def _interact_yaml_list(self, command, expected_ok, expected_err=[]): |
124 |
_, body, = self._interact(command, expected_ok, expected_err,
|
125 |
size_field=0)
|
126 |
return parse_yaml_list(body)
|
127 |
|
128 |
def _interact_peek(self, command): |
129 |
try:
|
130 |
return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False) |
131 |
except CommandFailed, (_, status, results):
|
132 |
return None |
133 |
|
134 |
# -- public interface --
|
135 |
|
136 |
def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR): |
137 |
"""Put a job into the current tube. Returns job id."""
|
138 |
assert isinstance(body, str), 'Job body must be a str instance' |
139 |
jid = self._interact_value(
|
140 |
'put %d %d %d %d\r\n%s\r\n' %
|
141 |
(priority, delay, ttr, len(body), body),
|
142 |
['INSERTED', 'BURIED'], ['JOB_TOO_BIG']) |
143 |
return int(jid) |
144 |
|
145 |
def reserve(self, timeout=None): |
146 |
"""Reserve a job from one of the watched tubes, with optional timeout in
|
147 |
seconds. Returns a Job object, or None if the request times out."""
|
148 |
if timeout is not None: |
149 |
command = 'reserve-with-timeout %d\r\n' % timeout
|
150 |
else:
|
151 |
command = 'reserve\r\n'
|
152 |
try:
|
153 |
return self._interact_job(command, |
154 |
['RESERVED'],
|
155 |
['DEADLINE_SOON', 'TIMED_OUT']) |
156 |
except CommandFailed, (_, status, results):
|
157 |
if status == 'TIMED_OUT': |
158 |
return None |
159 |
elif status == 'DEADLINE_SOON': |
160 |
raise DeadlineSoon(results)
|
161 |
|
162 |
def kick(self, bound=1): |
163 |
"""Kick at most bound jobs into the ready queue."""
|
164 |
return int(self._interact_value('kick %d\r\n' % bound, ['KICKED'])) |
165 |
|
166 |
def peek(self, jid): |
167 |
"""Peek at a job. Returns a Job, or None."""
|
168 |
return self._interact_peek('peek %d\r\n' % jid) |
169 |
|
170 |
def peek_ready(self): |
171 |
"""Peek at next ready job. Returns a Job, or None."""
|
172 |
return self._interact_peek('peek-ready\r\n') |
173 |
|
174 |
def peek_delayed(self): |
175 |
"""Peek at next delayed job. Returns a Job, or None."""
|
176 |
return self._interact_peek('peek-delayed\r\n') |
177 |
|
178 |
def peek_buried(self): |
179 |
"""Peek at next buried job. Returns a Job, or None."""
|
180 |
return self._interact_peek('peek-buried\r\n') |
181 |
|
182 |
def tubes(self): |
183 |
"""Return a list of all existing tubes."""
|
184 |
return self._interact_yaml_list('list-tubes\r\n', ['OK']) |
185 |
|
186 |
def using(self): |
187 |
"""Return a list of all tubes currently being used."""
|
188 |
return self._interact_value('list-tube-used\r\n', ['USING']) |
189 |
|
190 |
def use(self, name): |
191 |
"""Use a given tube."""
|
192 |
return self._interact_value('use %s\r\n' % name, ['USING']) |
193 |
|
194 |
def watching(self): |
195 |
"""Return a list of all tubes being watched."""
|
196 |
return self._interact_yaml_list('list-tubes-watched\r\n', ['OK']) |
197 |
|
198 |
def watch(self, name): |
199 |
"""Watch a given tube."""
|
200 |
return int(self._interact_value('watch %s\r\n' % name, ['WATCHING'])) |
201 |
|
202 |
def ignore(self, name): |
203 |
"""Stop watching a given tube."""
|
204 |
try:
|
205 |
return int(self._interact_value('ignore %s\r\n' % name, |
206 |
['WATCHING'],
|
207 |
['NOT_IGNORED']))
|
208 |
except CommandFailed:
|
209 |
return 1 |
210 |
|
211 |
def stats(self): |
212 |
"""Return a dict of beanstalkd statistics."""
|
213 |
return self._interact_yaml_dict('stats\r\n', ['OK']) |
214 |
|
215 |
def stats_tube(self, name): |
216 |
"""Return a dict of stats about a given tube."""
|
217 |
return self._interact_yaml_dict('stats-tube %s\r\n' % name, |
218 |
['OK'],
|
219 |
['NOT_FOUND'])
|
220 |
|
221 |
def pause_tube(self, name, delay): |
222 |
"""Pause a tube for a given delay time, in seconds."""
|
223 |
self._interact('pause-tube %s %d\r\n' %(name, delay), |
224 |
['PAUSED'],
|
225 |
['NOT_FOUND'])
|
226 |
|
227 |
# -- job interactors --
|
228 |
|
229 |
def delete(self, jid): |
230 |
"""Delete a job, by job id."""
|
231 |
self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND']) |
232 |
|
233 |
def release(self, jid, priority=DEFAULT_PRIORITY, delay=0): |
234 |
"""Release a reserved job back into the ready queue."""
|
235 |
self._interact('release %d %d %d\r\n' % (jid, priority, delay), |
236 |
['RELEASED', 'BURIED'], |
237 |
['NOT_FOUND'])
|
238 |
|
239 |
def bury(self, jid, priority=DEFAULT_PRIORITY): |
240 |
"""Bury a job, by job id."""
|
241 |
self._interact('bury %d %d\r\n' % (jid, priority), |
242 |
['BURIED'],
|
243 |
['NOT_FOUND'])
|
244 |
|
245 |
def touch(self, jid): |
246 |
"""Touch a job, by job id, requesting more time to work on a reserved
|
247 |
job before it expires."""
|
248 |
self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND']) |
249 |
|
250 |
def stats_job(self, jid): |
251 |
"""Return a dict of stats about a job, by job id."""
|
252 |
return self._interact_yaml_dict('stats-job %d\r\n' % jid, |
253 |
['OK'],
|
254 |
['NOT_FOUND'])
|
255 |
|
256 |
|
257 |
class Job(object): |
258 |
def __init__(self, conn, jid, body, reserved=True): |
259 |
self.conn = conn
|
260 |
self.jid = jid
|
261 |
self.body = body
|
262 |
self.reserved = reserved
|
263 |
|
264 |
def _priority(self): |
265 |
stats = self.stats()
|
266 |
if isinstance(stats, dict): |
267 |
return stats['pri'] |
268 |
return DEFAULT_PRIORITY
|
269 |
|
270 |
# -- public interface --
|
271 |
|
272 |
def delete(self): |
273 |
"""Delete this job."""
|
274 |
self.conn.delete(self.jid) |
275 |
self.reserved = False |
276 |
|
277 |
def release(self, priority=None, delay=0): |
278 |
"""Release this job back into the ready queue."""
|
279 |
if self.reserved: |
280 |
self.conn.release(self.jid, priority or self._priority(), delay) |
281 |
self.reserved = False |
282 |
|
283 |
def bury(self, priority=None): |
284 |
"""Bury this job."""
|
285 |
if self.reserved: |
286 |
self.conn.bury(self.jid, priority or self._priority()) |
287 |
self.reserved = False |
288 |
|
289 |
def touch(self): |
290 |
"""Touch this reserved job, requesting more time to work on it before it
|
291 |
expires."""
|
292 |
if self.reserved: |
293 |
self.conn.touch(self.jid) |
294 |
|
295 |
def stats(self): |
296 |
"""Return a dict of stats about this job."""
|
297 |
return self.conn.stats_job(self.jid) |
298 |
|
299 |
def parse_yaml_dict(yaml): |
300 |
"""Parse a YAML dict, in the form returned by beanstalkd."""
|
301 |
dict = {} |
302 |
for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M): |
303 |
key, val = m.group(1), m.group(2) |
304 |
# Check the type of the value, and parse it.
|
305 |
if key == 'name' or key == 'tube' or key == 'version': |
306 |
dict[key] = val # String, even if it looks like a number |
307 |
elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None: |
308 |
dict[key] = int(val) # Integer value |
309 |
elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None: |
310 |
dict[key] = float(val) # Float value |
311 |
else:
|
312 |
dict[key] = val # String value |
313 |
return dict |
314 |
|
315 |
def parse_yaml_list(yaml): |
316 |
"""Parse a YAML list, in the form returned by beanstalkd."""
|
317 |
return re.findall(r'^- (.*)$', yaml, re.M) |
318 |
|
319 |
if __name__ == '__main__': |
320 |
import doctest, os, signal |
321 |
try:
|
322 |
pid = os.spawnlp(os.P_NOWAIT, |
323 |
'beanstalkd',
|
324 |
'beanstalkd', '-l', '127.0.0.1', '-p', '14711') |
325 |
doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
|
326 |
doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
|
327 |
finally:
|
328 |
os.kill(pid, signal.SIGTERM) |