2 """beanstalkc - A beanstalkd Client Library for Python"""
5 Copyright (C) 2008-2012 Andreas Bolka
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
26 DEFAULT_HOST = 'localhost'
28 DEFAULT_PRIORITY = 2 ** 31
32 class BeanstalkcException(Exception): pass
33 class UnexpectedResponse(BeanstalkcException): pass
34 class CommandFailed(BeanstalkcException): pass
35 class DeadlineSoon(BeanstalkcException): pass
37 class SocketError(BeanstalkcException):
39 def wrap(wrapped_function, *args, **kwargs):
41 return wrapped_function(*args, **kwargs)
42 except socket.error, err:
43 raise SocketError(err)
46 class Connection(object):
47 def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, parse_yaml=True,
48 connect_timeout=socket.getdefaulttimeout()):
49 if parse_yaml is True:
51 parse_yaml = __import__('yaml').load
53 logging.error('Failed to load PyYAML, will not parse YAML')
55 self._connect_timeout = connect_timeout
56 self._parse_yaml = parse_yaml or (lambda x: x)
62 """Connect to beanstalkd server."""
63 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64 self._socket.settimeout(self._connect_timeout)
65 SocketError.wrap(self._socket.connect, (self.host, self.port))
66 self._socket.settimeout(None)
67 self._socket_file = self._socket.makefile('rb')
70 """Close connection to server."""
72 self._socket.sendall('quit\r\n')
81 """Re-connect to server."""
85 def _interact(self, command, expected_ok, expected_err=[]):
86 SocketError.wrap(self._socket.sendall, command)
87 status, results = self._read_response()
88 if status in expected_ok:
90 elif status in expected_err:
91 raise CommandFailed(command.split()[0], status, results)
93 raise UnexpectedResponse(command.split()[0], status, results)
95 def _read_response(self):
96 line = SocketError.wrap(self._socket_file.readline)
99 response = line.split()
100 return response[0], response[1:]
102 def _read_body(self, size):
103 body = SocketError.wrap(self._socket_file.read, size)
104 SocketError.wrap(self._socket_file.read, 2) # trailing crlf
105 if size > 0 and not body:
109 def _interact_value(self, command, expected_ok, expected_err=[]):
110 return self._interact(command, expected_ok, expected_err)[0]
112 def _interact_job(self, command, expected_ok, expected_err, reserved=True):
113 jid, size = self._interact(command, expected_ok, expected_err)
114 body = self._read_body(int(size))
115 return Job(self, int(jid), body, reserved)
117 def _interact_yaml(self, command, expected_ok, expected_err=[]):
118 size, = self._interact(command, expected_ok, expected_err)
119 body = self._read_body(int(size))
120 return self._parse_yaml(body)
122 def _interact_peek(self, command):
124 return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
125 except CommandFailed, (_, _status, _results):
128 # -- public interface --
130 def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
131 """Put a job into the current tube. Returns job id."""
132 assert isinstance(body, str), 'Job body must be a str instance'
133 jid = self._interact_value(
134 'put %d %d %d %d\r\n%s\r\n' %
135 (priority, delay, ttr, len(body), body),
136 ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
139 def reserve(self, timeout=None):
140 """Reserve a job from one of the watched tubes, with optional timeout
141 in seconds. Returns a Job object, or None if the request times out."""
142 if timeout is not None:
143 command = 'reserve-with-timeout %d\r\n' % timeout
145 command = 'reserve\r\n'
147 return self._interact_job(command,
149 ['DEADLINE_SOON', 'TIMED_OUT'])
150 except CommandFailed, (_, status, results):
151 if status == 'TIMED_OUT':
153 elif status == 'DEADLINE_SOON':
154 raise DeadlineSoon(results)
156 def kick(self, bound=1):
157 """Kick at most bound jobs into the ready queue."""
158 return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
160 def kick_job(self, jid):
161 """Kick a specific job into the ready queue."""
162 self._interact('kick-job %d\r\n' % jid, ['KICKED'], ['NOT_FOUND'])
165 """Peek at a job. Returns a Job, or None."""
166 return self._interact_peek('peek %d\r\n' % jid)
168 def peek_ready(self):
169 """Peek at next ready job. Returns a Job, or None."""
170 return self._interact_peek('peek-ready\r\n')
172 def peek_delayed(self):
173 """Peek at next delayed job. Returns a Job, or None."""
174 return self._interact_peek('peek-delayed\r\n')
176 def peek_buried(self):
177 """Peek at next buried job. Returns a Job, or None."""
178 return self._interact_peek('peek-buried\r\n')
181 """Return a list of all existing tubes."""
182 return self._interact_yaml('list-tubes\r\n', ['OK'])
185 """Return the tube currently being used."""
186 return self._interact_value('list-tube-used\r\n', ['USING'])
189 """Use a given tube."""
190 return self._interact_value('use %s\r\n' % name, ['USING'])
193 """Return a list of all tubes being watched."""
194 return self._interact_yaml('list-tubes-watched\r\n', ['OK'])
196 def watch(self, name):
197 """Watch a given tube."""
198 return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
200 def ignore(self, name):
201 """Stop watching a given tube."""
203 return int(self._interact_value('ignore %s\r\n' % name,
206 except CommandFailed:
210 """Return a dict of beanstalkd statistics."""
211 return self._interact_yaml('stats\r\n', ['OK'])
213 def stats_tube(self, name):
214 """Return a dict of stats about a given tube."""
215 return self._interact_yaml('stats-tube %s\r\n' % name,
219 def pause_tube(self, name, delay):
220 """Pause a tube for a given delay time, in seconds."""
221 self._interact('pause-tube %s %d\r\n' % (name, delay),
225 # -- job interactors --
227 def delete(self, jid):
228 """Delete a job, by job id."""
229 self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
231 def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
232 """Release a reserved job back into the ready queue."""
233 self._interact('release %d %d %d\r\n' % (jid, priority, delay),
234 ['RELEASED', 'BURIED'],
237 def bury(self, jid, priority=DEFAULT_PRIORITY):
238 """Bury a job, by job id."""
239 self._interact('bury %d %d\r\n' % (jid, priority),
243 def touch(self, jid):
244 """Touch a job, by job id, requesting more time to work on a reserved
245 job before it expires."""
246 self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
248 def stats_job(self, jid):
249 """Return a dict of stats about a job, by job id."""
250 return self._interact_yaml('stats-job %d\r\n' % jid,
256 def __init__(self, conn, jid, body, reserved=True):
260 self.reserved = reserved
264 if isinstance(stats, dict):
266 return DEFAULT_PRIORITY
268 # -- public interface --
271 """Delete this job."""
272 self.conn.delete(self.jid)
273 self.reserved = False
275 def release(self, priority=None, delay=0):
276 """Release this job back into the ready queue."""
278 self.conn.release(self.jid, priority or self._priority(), delay)
279 self.reserved = False
281 def bury(self, priority=None):
284 self.conn.bury(self.jid, priority or self._priority())
285 self.reserved = False
288 """Kick this job alive."""
289 self.conn.kick_job(self.jid)
292 """Touch this reserved job, requesting more time to work on it before
295 self.conn.touch(self.jid)
298 """Return a dict of stats about this job."""
299 return self.conn.stats_job(self.jid)
302 if __name__ == '__main__':
304 nose.main(argv=['nosetests', '-c', '.nose.cfg'])