root / beanstalkc.py @ master
History | View | Annotate | Download (10.3 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
"""beanstalkc - A beanstalkd Client Library for Python"""
|
3 |
|
4 |
__license__ = '''
|
5 |
Copyright (C) 2008-2012 Andreas Bolka
|
6 |
|
7 |
Licensed under the Apache License, Version 2.0 (the "License");
|
8 |
you may not use this file except in compliance with the License.
|
9 |
You may obtain a copy of the License at
|
10 |
|
11 |
http://www.apache.org/licenses/LICENSE-2.0
|
12 |
|
13 |
Unless required by applicable law or agreed to in writing, software
|
14 |
distributed under the License is distributed on an "AS IS" BASIS,
|
15 |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
16 |
See the License for the specific language governing permissions and
|
17 |
limitations under the License.
|
18 |
'''
|
19 |
|
20 |
__version__ = '0.3.0'
|
21 |
|
22 |
import logging |
23 |
import socket |
24 |
|
25 |
|
26 |
DEFAULT_HOST = 'localhost'
|
27 |
DEFAULT_PORT = 11300
|
28 |
DEFAULT_PRIORITY = 2 ** 31 |
29 |
DEFAULT_TTR = 120
|
30 |
|
31 |
|
32 |
class BeanstalkcException(Exception): pass |
33 |
class UnexpectedResponse(BeanstalkcException): pass |
34 |
class CommandFailed(BeanstalkcException): pass |
35 |
class DeadlineSoon(BeanstalkcException): pass |
36 |
|
37 |
class SocketError(BeanstalkcException): |
38 |
@staticmethod
|
39 |
def wrap(wrapped_function, *args, **kwargs): |
40 |
try:
|
41 |
return wrapped_function(*args, **kwargs)
|
42 |
except socket.error, err:
|
43 |
raise SocketError(err)
|
44 |
|
45 |
|
46 |
class Connection(object): |
47 |
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, parse_yaml=True, |
48 |
connect_timeout=socket.getdefaulttimeout()): |
49 |
if parse_yaml is True: |
50 |
try:
|
51 |
parse_yaml = __import__('yaml').load |
52 |
except ImportError: |
53 |
logging.error('Failed to load PyYAML, will not parse YAML')
|
54 |
parse_yaml = False
|
55 |
self._connect_timeout = connect_timeout
|
56 |
self._parse_yaml = parse_yaml or (lambda x: x) |
57 |
self.host = host
|
58 |
self.port = port
|
59 |
self.connect()
|
60 |
|
61 |
def connect(self): |
62 |
"""Connect to beanstalkd server."""
|
63 |
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
64 |
self._socket.settimeout(self._connect_timeout) |
65 |
SocketError.wrap(self._socket.connect, (self.host, self.port)) |
66 |
self._socket.settimeout(None) |
67 |
self._socket_file = self._socket.makefile('rb') |
68 |
|
69 |
def close(self): |
70 |
"""Close connection to server."""
|
71 |
try:
|
72 |
self._socket.sendall('quit\r\n') |
73 |
except socket.error:
|
74 |
pass
|
75 |
try:
|
76 |
self._socket.close()
|
77 |
except socket.error:
|
78 |
pass
|
79 |
|
80 |
def reconnect(self): |
81 |
"""Re-connect to server."""
|
82 |
self.close()
|
83 |
self.connect()
|
84 |
|
85 |
def _interact(self, command, expected_ok, expected_err=[]): |
86 |
SocketError.wrap(self._socket.sendall, command)
|
87 |
status, results = self._read_response()
|
88 |
if status in expected_ok: |
89 |
return results
|
90 |
elif status in expected_err: |
91 |
raise CommandFailed(command.split()[0], status, results) |
92 |
else:
|
93 |
raise UnexpectedResponse(command.split()[0], status, results) |
94 |
|
95 |
def _read_response(self): |
96 |
line = SocketError.wrap(self._socket_file.readline)
|
97 |
if not line: |
98 |
raise SocketError()
|
99 |
response = line.split() |
100 |
return response[0], response[1:] |
101 |
|
102 |
def _read_body(self, size): |
103 |
body = SocketError.wrap(self._socket_file.read, size)
|
104 |
SocketError.wrap(self._socket_file.read, 2) # trailing crlf |
105 |
if size > 0 and not body: |
106 |
raise SocketError()
|
107 |
return body
|
108 |
|
109 |
def _interact_value(self, command, expected_ok, expected_err=[]): |
110 |
return self._interact(command, expected_ok, expected_err)[0] |
111 |
|
112 |
def _interact_job(self, command, expected_ok, expected_err, reserved=True): |
113 |
jid, size = self._interact(command, expected_ok, expected_err)
|
114 |
body = self._read_body(int(size)) |
115 |
return Job(self, int(jid), body, reserved) |
116 |
|
117 |
def _interact_yaml(self, command, expected_ok, expected_err=[]): |
118 |
size, = self._interact(command, expected_ok, expected_err)
|
119 |
body = self._read_body(int(size)) |
120 |
return self._parse_yaml(body) |
121 |
|
122 |
def _interact_peek(self, command): |
123 |
try:
|
124 |
return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False) |
125 |
except CommandFailed, (_, _status, _results):
|
126 |
return None |
127 |
|
128 |
# -- public interface --
|
129 |
|
130 |
def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR): |
131 |
"""Put a job into the current tube. Returns job id."""
|
132 |
assert isinstance(body, str), 'Job body must be a str instance' |
133 |
jid = self._interact_value(
|
134 |
'put %d %d %d %d\r\n%s\r\n' %
|
135 |
(priority, delay, ttr, len(body), body),
|
136 |
['INSERTED', 'BURIED'], ['JOB_TOO_BIG']) |
137 |
return int(jid) |
138 |
|
139 |
def reserve(self, timeout=None): |
140 |
"""Reserve a job from one of the watched tubes, with optional timeout
|
141 |
in seconds. Returns a Job object, or None if the request times out."""
|
142 |
if timeout is not None: |
143 |
command = 'reserve-with-timeout %d\r\n' % timeout
|
144 |
else:
|
145 |
command = 'reserve\r\n'
|
146 |
try:
|
147 |
return self._interact_job(command, |
148 |
['RESERVED'],
|
149 |
['DEADLINE_SOON', 'TIMED_OUT']) |
150 |
except CommandFailed, (_, status, results):
|
151 |
if status == 'TIMED_OUT': |
152 |
return None |
153 |
elif status == 'DEADLINE_SOON': |
154 |
raise DeadlineSoon(results)
|
155 |
|
156 |
def kick(self, bound=1): |
157 |
"""Kick at most bound jobs into the ready queue."""
|
158 |
return int(self._interact_value('kick %d\r\n' % bound, ['KICKED'])) |
159 |
|
160 |
def kick_job(self, jid): |
161 |
"""Kick a specific job into the ready queue."""
|
162 |
self._interact('kick-job %d\r\n' % jid, ['KICKED'], ['NOT_FOUND']) |
163 |
|
164 |
def peek(self, jid): |
165 |
"""Peek at a job. Returns a Job, or None."""
|
166 |
return self._interact_peek('peek %d\r\n' % jid) |
167 |
|
168 |
def peek_ready(self): |
169 |
"""Peek at next ready job. Returns a Job, or None."""
|
170 |
return self._interact_peek('peek-ready\r\n') |
171 |
|
172 |
def peek_delayed(self): |
173 |
"""Peek at next delayed job. Returns a Job, or None."""
|
174 |
return self._interact_peek('peek-delayed\r\n') |
175 |
|
176 |
def peek_buried(self): |
177 |
"""Peek at next buried job. Returns a Job, or None."""
|
178 |
return self._interact_peek('peek-buried\r\n') |
179 |
|
180 |
def tubes(self): |
181 |
"""Return a list of all existing tubes."""
|
182 |
return self._interact_yaml('list-tubes\r\n', ['OK']) |
183 |
|
184 |
def using(self): |
185 |
"""Return the tube currently being used."""
|
186 |
return self._interact_value('list-tube-used\r\n', ['USING']) |
187 |
|
188 |
def use(self, name): |
189 |
"""Use a given tube."""
|
190 |
return self._interact_value('use %s\r\n' % name, ['USING']) |
191 |
|
192 |
def watching(self): |
193 |
"""Return a list of all tubes being watched."""
|
194 |
return self._interact_yaml('list-tubes-watched\r\n', ['OK']) |
195 |
|
196 |
def watch(self, name): |
197 |
"""Watch a given tube."""
|
198 |
return int(self._interact_value('watch %s\r\n' % name, ['WATCHING'])) |
199 |
|
200 |
def ignore(self, name): |
201 |
"""Stop watching a given tube."""
|
202 |
try:
|
203 |
return int(self._interact_value('ignore %s\r\n' % name, |
204 |
['WATCHING'],
|
205 |
['NOT_IGNORED']))
|
206 |
except CommandFailed:
|
207 |
return 1 |
208 |
|
209 |
def stats(self): |
210 |
"""Return a dict of beanstalkd statistics."""
|
211 |
return self._interact_yaml('stats\r\n', ['OK']) |
212 |
|
213 |
def stats_tube(self, name): |
214 |
"""Return a dict of stats about a given tube."""
|
215 |
return self._interact_yaml('stats-tube %s\r\n' % name, |
216 |
['OK'],
|
217 |
['NOT_FOUND'])
|
218 |
|
219 |
def pause_tube(self, name, delay): |
220 |
"""Pause a tube for a given delay time, in seconds."""
|
221 |
self._interact('pause-tube %s %d\r\n' % (name, delay), |
222 |
['PAUSED'],
|
223 |
['NOT_FOUND'])
|
224 |
|
225 |
# -- job interactors --
|
226 |
|
227 |
def delete(self, jid): |
228 |
"""Delete a job, by job id."""
|
229 |
self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND']) |
230 |
|
231 |
def release(self, jid, priority=DEFAULT_PRIORITY, delay=0): |
232 |
"""Release a reserved job back into the ready queue."""
|
233 |
self._interact('release %d %d %d\r\n' % (jid, priority, delay), |
234 |
['RELEASED', 'BURIED'], |
235 |
['NOT_FOUND'])
|
236 |
|
237 |
def bury(self, jid, priority=DEFAULT_PRIORITY): |
238 |
"""Bury a job, by job id."""
|
239 |
self._interact('bury %d %d\r\n' % (jid, priority), |
240 |
['BURIED'],
|
241 |
['NOT_FOUND'])
|
242 |
|
243 |
def touch(self, jid): |
244 |
"""Touch a job, by job id, requesting more time to work on a reserved
|
245 |
job before it expires."""
|
246 |
self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND']) |
247 |
|
248 |
def stats_job(self, jid): |
249 |
"""Return a dict of stats about a job, by job id."""
|
250 |
return self._interact_yaml('stats-job %d\r\n' % jid, |
251 |
['OK'],
|
252 |
['NOT_FOUND'])
|
253 |
|
254 |
|
255 |
class Job(object): |
256 |
def __init__(self, conn, jid, body, reserved=True): |
257 |
self.conn = conn
|
258 |
self.jid = jid
|
259 |
self.body = body
|
260 |
self.reserved = reserved
|
261 |
|
262 |
def _priority(self): |
263 |
stats = self.stats()
|
264 |
if isinstance(stats, dict): |
265 |
return stats['pri'] |
266 |
return DEFAULT_PRIORITY
|
267 |
|
268 |
# -- public interface --
|
269 |
|
270 |
def delete(self): |
271 |
"""Delete this job."""
|
272 |
self.conn.delete(self.jid) |
273 |
self.reserved = False |
274 |
|
275 |
def release(self, priority=None, delay=0): |
276 |
"""Release this job back into the ready queue."""
|
277 |
if self.reserved: |
278 |
self.conn.release(self.jid, priority or self._priority(), delay) |
279 |
self.reserved = False |
280 |
|
281 |
def bury(self, priority=None): |
282 |
"""Bury this job."""
|
283 |
if self.reserved: |
284 |
self.conn.bury(self.jid, priority or self._priority()) |
285 |
self.reserved = False |
286 |
|
287 |
def kick(self): |
288 |
"""Kick this job alive."""
|
289 |
self.conn.kick_job(self.jid) |
290 |
|
291 |
def touch(self): |
292 |
"""Touch this reserved job, requesting more time to work on it before
|
293 |
it expires."""
|
294 |
if self.reserved: |
295 |
self.conn.touch(self.jid) |
296 |
|
297 |
def stats(self): |
298 |
"""Return a dict of stats about this job."""
|
299 |
return self.conn.stats_job(self.jid) |
300 |
|
301 |
|
302 |
if __name__ == '__main__': |
303 |
import nose |
304 |
nose.main(argv=['nosetests', '-c', '.nose.cfg']) |