2 """beanstalkc - A beanstalkd Client Library for Python"""
5 Copyright (C) 2008-2010 Andreas Bolka
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
27 DEFAULT_HOST = 'localhost'
29 DEFAULT_PRIORITY = 2**31
34 class BeanstalkcException(Exception): pass
35 class UnexpectedResponse(BeanstalkcException): pass
36 class CommandFailed(BeanstalkcException): pass
37 class DeadlineSoon(BeanstalkcException): pass
38 class SocketError(BeanstalkcException): pass
41 class Connection(object):
42 def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT,
43 connection_timeout=DEFAULT_TIMEOUT):
47 self.connection_timeout = connection_timeout
51 """Connect to beanstalkd server, unless already connected."""
55 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
56 self._socket.settimeout(self.connection_timeout)
57 self._socket.connect((self.host, self.port))
58 self._socket.settimeout(None)
59 self._socket_file = self._socket.makefile('rb')
60 except socket.error, e:
65 """Close connection to server, if it is open."""
69 self._socket.sendall('quit\r\n')
78 return self._socket is None
80 def _interact(self, command, expected_ok, expected_err=[], size_field=None):
82 self._socket.sendall(command)
83 status, results = self._read_response()
84 if status in expected_ok:
85 if size_field is not None:
86 results.append(self._read_body(int(results[size_field])))
88 elif status in expected_err:
89 raise CommandFailed(command.split()[0], status, results)
91 raise UnexpectedResponse(command.split()[0], status, results)
92 except socket.error, e:
96 def _read_response(self):
97 line = self._socket_file.readline()
99 raise socket.error('no data read')
100 response = line.split()
101 return response[0], response[1:]
103 def _read_body(self, size):
104 body = self._socket_file.read(size)
105 self._socket_file.read(2) # trailing crlf
106 if size > 0 and not body:
107 raise socket.error('no data read')
110 def _interact_value(self, command, expected_ok, expected_err=[]):
111 return self._interact(command, expected_ok, expected_err)[0]
113 def _interact_job(self, command, expected_ok, expected_err, reserved=True):
114 jid, _, body = self._interact(command, expected_ok, expected_err,
116 return Job(self, int(jid), body, reserved)
118 def _interact_yaml_dict(self, command, expected_ok, expected_err=[]):
119 _, body, = self._interact(command, expected_ok, expected_err,
121 return parse_yaml_dict(body)
123 def _interact_yaml_list(self, command, expected_ok, expected_err=[]):
124 _, body, = self._interact(command, expected_ok, expected_err,
126 return parse_yaml_list(body)
128 def _interact_peek(self, command):
130 return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
131 except CommandFailed, (_, status, results):
134 # -- public interface --
136 def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
137 """Put a job into the current tube. Returns job id."""
138 assert isinstance(body, str), 'Job body must be a str instance'
139 jid = self._interact_value(
140 'put %d %d %d %d\r\n%s\r\n' %
141 (priority, delay, ttr, len(body), body),
142 ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
145 def reserve(self, timeout=None):
146 """Reserve a job from one of the watched tubes, with optional timeout in
147 seconds. Returns a Job object, or None if the request times out."""
148 if timeout is not None:
149 command = 'reserve-with-timeout %d\r\n' % timeout
151 command = 'reserve\r\n'
153 return self._interact_job(command,
155 ['DEADLINE_SOON', 'TIMED_OUT'])
156 except CommandFailed, (_, status, results):
157 if status == 'TIMED_OUT':
159 elif status == 'DEADLINE_SOON':
160 raise DeadlineSoon(results)
162 def kick(self, bound=1):
163 """Kick at most bound jobs into the ready queue."""
164 return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
167 """Peek at a job. Returns a Job, or None."""
168 return self._interact_peek('peek %d\r\n' % jid)
170 def peek_ready(self):
171 """Peek at next ready job. Returns a Job, or None."""
172 return self._interact_peek('peek-ready\r\n')
174 def peek_delayed(self):
175 """Peek at next delayed job. Returns a Job, or None."""
176 return self._interact_peek('peek-delayed\r\n')
178 def peek_buried(self):
179 """Peek at next buried job. Returns a Job, or None."""
180 return self._interact_peek('peek-buried\r\n')
183 """Return a list of all existing tubes."""
184 return self._interact_yaml_list('list-tubes\r\n', ['OK'])
187 """Return a list of all tubes currently being used."""
188 return self._interact_value('list-tube-used\r\n', ['USING'])
191 """Use a given tube."""
192 return self._interact_value('use %s\r\n' % name, ['USING'])
195 """Return a list of all tubes being watched."""
196 return self._interact_yaml_list('list-tubes-watched\r\n', ['OK'])
198 def watch(self, name):
199 """Watch a given tube."""
200 return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
202 def ignore(self, name):
203 """Stop watching a given tube."""
205 return int(self._interact_value('ignore %s\r\n' % name,
208 except CommandFailed:
212 """Return a dict of beanstalkd statistics."""
213 return self._interact_yaml_dict('stats\r\n', ['OK'])
215 def stats_tube(self, name):
216 """Return a dict of stats about a given tube."""
217 return self._interact_yaml_dict('stats-tube %s\r\n' % name,
221 def pause_tube(self, name, delay):
222 """Pause a tube for a given delay time, in seconds."""
223 self._interact('pause-tube %s %d\r\n' %(name, delay),
227 # -- job interactors --
229 def delete(self, jid):
230 """Delete a job, by job id."""
231 self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
233 def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
234 """Release a reserved job back into the ready queue."""
235 self._interact('release %d %d %d\r\n' % (jid, priority, delay),
236 ['RELEASED', 'BURIED'],
239 def bury(self, jid, priority=DEFAULT_PRIORITY):
240 """Bury a job, by job id."""
241 self._interact('bury %d %d\r\n' % (jid, priority),
245 def touch(self, jid):
246 """Touch a job, by job id, requesting more time to work on a reserved
247 job before it expires."""
248 self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
250 def stats_job(self, jid):
251 """Return a dict of stats about a job, by job id."""
252 return self._interact_yaml_dict('stats-job %d\r\n' % jid,
258 def __init__(self, conn, jid, body, reserved=True):
262 self.reserved = reserved
266 if isinstance(stats, dict):
268 return DEFAULT_PRIORITY
270 # -- public interface --
273 """Delete this job."""
274 self.conn.delete(self.jid)
275 self.reserved = False
277 def release(self, priority=None, delay=0):
278 """Release this job back into the ready queue."""
280 self.conn.release(self.jid, priority or self._priority(), delay)
281 self.reserved = False
283 def bury(self, priority=None):
286 self.conn.bury(self.jid, priority or self._priority())
287 self.reserved = False
290 """Touch this reserved job, requesting more time to work on it before it
293 self.conn.touch(self.jid)
296 """Return a dict of stats about this job."""
297 return self.conn.stats_job(self.jid)
299 def parse_yaml_dict(yaml):
300 """Parse a YAML dict, in the form returned by beanstalkd."""
302 for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M):
303 key, val = m.group(1), m.group(2)
304 # Check the type of the value, and parse it.
305 if key == 'name' or key == 'tube' or key == 'version':
306 dict[key] = val # String, even if it looks like a number
307 elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None:
308 dict[key] = int(val) # Integer value
309 elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None:
310 dict[key] = float(val) # Float value
312 dict[key] = val # String value
315 def parse_yaml_list(yaml):
316 """Parse a YAML list, in the form returned by beanstalkd."""
317 return re.findall(r'^- (.*)$', yaml, re.M)
319 if __name__ == '__main__':
320 import doctest, os, signal
322 pid = os.spawnlp(os.P_NOWAIT,
324 'beanstalkd', '-l', '127.0.0.1', '-p', '14711')
325 doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
326 doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
328 os.kill(pid, signal.SIGTERM)