Add new theme plus new icons
[flowspy] / beanstalkc.py
1 #!/usr/bin/env python
2 """beanstalkc - A beanstalkd Client Library for Python"""
3
4 __license__ = '''
5 Copyright (C) 2008-2012 Andreas Bolka
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11     http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 '''
19
20 __version__ = '0.3.0'
21
22 import logging
23 import socket
24
25
26 DEFAULT_HOST = 'localhost'
27 DEFAULT_PORT = 11300
28 DEFAULT_PRIORITY = 2 ** 31
29 DEFAULT_TTR = 120
30
31
32 class BeanstalkcException(Exception): pass
33 class UnexpectedResponse(BeanstalkcException): pass
34 class CommandFailed(BeanstalkcException): pass
35 class DeadlineSoon(BeanstalkcException): pass
36
37 class SocketError(BeanstalkcException):
38     @staticmethod
39     def wrap(wrapped_function, *args, **kwargs):
40         try:
41             return wrapped_function(*args, **kwargs)
42         except socket.error, err:
43             raise SocketError(err)
44
45
46 class Connection(object):
47     def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, parse_yaml=True,
48                  connect_timeout=socket.getdefaulttimeout()):
49         if parse_yaml is True:
50             try:
51                 parse_yaml = __import__('yaml').load
52             except ImportError:
53                 logging.error('Failed to load PyYAML, will not parse YAML')
54                 parse_yaml = False
55         self._connect_timeout = connect_timeout
56         self._parse_yaml = parse_yaml or (lambda x: x)
57         self.host = host
58         self.port = port
59         self.connect()
60
61     def connect(self):
62         """Connect to beanstalkd server."""
63         self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64         self._socket.settimeout(self._connect_timeout)
65         SocketError.wrap(self._socket.connect, (self.host, self.port))
66         self._socket.settimeout(None)
67         self._socket_file = self._socket.makefile('rb')
68
69     def close(self):
70         """Close connection to server."""
71         try:
72             self._socket.sendall('quit\r\n')
73         except socket.error:
74             pass
75         try:
76             self._socket.close()
77         except socket.error:
78             pass
79
80     def reconnect(self):
81         """Re-connect to server."""
82         self.close()
83         self.connect()
84
85     def _interact(self, command, expected_ok, expected_err=[]):
86         SocketError.wrap(self._socket.sendall, command)
87         status, results = self._read_response()
88         if status in expected_ok:
89             return results
90         elif status in expected_err:
91             raise CommandFailed(command.split()[0], status, results)
92         else:
93             raise UnexpectedResponse(command.split()[0], status, results)
94
95     def _read_response(self):
96         line = SocketError.wrap(self._socket_file.readline)
97         if not line:
98             raise SocketError()
99         response = line.split()
100         return response[0], response[1:]
101
102     def _read_body(self, size):
103         body = SocketError.wrap(self._socket_file.read, size)
104         SocketError.wrap(self._socket_file.read, 2)  # trailing crlf
105         if size > 0 and not body:
106             raise SocketError()
107         return body
108
109     def _interact_value(self, command, expected_ok, expected_err=[]):
110         return self._interact(command, expected_ok, expected_err)[0]
111
112     def _interact_job(self, command, expected_ok, expected_err, reserved=True):
113         jid, size = self._interact(command, expected_ok, expected_err)
114         body = self._read_body(int(size))
115         return Job(self, int(jid), body, reserved)
116
117     def _interact_yaml(self, command, expected_ok, expected_err=[]):
118         size, = self._interact(command, expected_ok, expected_err)
119         body = self._read_body(int(size))
120         return self._parse_yaml(body)
121
122     def _interact_peek(self, command):
123         try:
124             return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
125         except CommandFailed, (_, _status, _results):
126             return None
127
128     # -- public interface --
129
130     def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
131         """Put a job into the current tube. Returns job id."""
132         assert isinstance(body, str), 'Job body must be a str instance'
133         jid = self._interact_value(
134                 'put %d %d %d %d\r\n%s\r\n' %
135                     (priority, delay, ttr, len(body), body),
136                 ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
137         return int(jid)
138
139     def reserve(self, timeout=None):
140         """Reserve a job from one of the watched tubes, with optional timeout
141         in seconds. Returns a Job object, or None if the request times out."""
142         if timeout is not None:
143             command = 'reserve-with-timeout %d\r\n' % timeout
144         else:
145             command = 'reserve\r\n'
146         try:
147             return self._interact_job(command,
148                                       ['RESERVED'],
149                                       ['DEADLINE_SOON', 'TIMED_OUT'])
150         except CommandFailed, (_, status, results):
151             if status == 'TIMED_OUT':
152                 return None
153             elif status == 'DEADLINE_SOON':
154                 raise DeadlineSoon(results)
155
156     def kick(self, bound=1):
157         """Kick at most bound jobs into the ready queue."""
158         return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
159
160     def kick_job(self, jid):
161         """Kick a specific job into the ready queue."""
162         self._interact('kick-job %d\r\n' % jid, ['KICKED'], ['NOT_FOUND'])
163
164     def peek(self, jid):
165         """Peek at a job. Returns a Job, or None."""
166         return self._interact_peek('peek %d\r\n' % jid)
167
168     def peek_ready(self):
169         """Peek at next ready job. Returns a Job, or None."""
170         return self._interact_peek('peek-ready\r\n')
171
172     def peek_delayed(self):
173         """Peek at next delayed job. Returns a Job, or None."""
174         return self._interact_peek('peek-delayed\r\n')
175
176     def peek_buried(self):
177         """Peek at next buried job. Returns a Job, or None."""
178         return self._interact_peek('peek-buried\r\n')
179
180     def tubes(self):
181         """Return a list of all existing tubes."""
182         return self._interact_yaml('list-tubes\r\n', ['OK'])
183
184     def using(self):
185         """Return the tube currently being used."""
186         return self._interact_value('list-tube-used\r\n', ['USING'])
187
188     def use(self, name):
189         """Use a given tube."""
190         return self._interact_value('use %s\r\n' % name, ['USING'])
191
192     def watching(self):
193         """Return a list of all tubes being watched."""
194         return self._interact_yaml('list-tubes-watched\r\n', ['OK'])
195
196     def watch(self, name):
197         """Watch a given tube."""
198         return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
199
200     def ignore(self, name):
201         """Stop watching a given tube."""
202         try:
203             return int(self._interact_value('ignore %s\r\n' % name,
204                                             ['WATCHING'],
205                                             ['NOT_IGNORED']))
206         except CommandFailed:
207             return 1
208
209     def stats(self):
210         """Return a dict of beanstalkd statistics."""
211         return self._interact_yaml('stats\r\n', ['OK'])
212
213     def stats_tube(self, name):
214         """Return a dict of stats about a given tube."""
215         return self._interact_yaml('stats-tube %s\r\n' % name,
216                                    ['OK'],
217                                    ['NOT_FOUND'])
218
219     def pause_tube(self, name, delay):
220         """Pause a tube for a given delay time, in seconds."""
221         self._interact('pause-tube %s %d\r\n' % (name, delay),
222                        ['PAUSED'],
223                        ['NOT_FOUND'])
224
225     # -- job interactors --
226
227     def delete(self, jid):
228         """Delete a job, by job id."""
229         self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
230
231     def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
232         """Release a reserved job back into the ready queue."""
233         self._interact('release %d %d %d\r\n' % (jid, priority, delay),
234                        ['RELEASED', 'BURIED'],
235                        ['NOT_FOUND'])
236
237     def bury(self, jid, priority=DEFAULT_PRIORITY):
238         """Bury a job, by job id."""
239         self._interact('bury %d %d\r\n' % (jid, priority),
240                        ['BURIED'],
241                        ['NOT_FOUND'])
242
243     def touch(self, jid):
244         """Touch a job, by job id, requesting more time to work on a reserved
245         job before it expires."""
246         self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
247
248     def stats_job(self, jid):
249         """Return a dict of stats about a job, by job id."""
250         return self._interact_yaml('stats-job %d\r\n' % jid,
251                                    ['OK'],
252                                    ['NOT_FOUND'])
253
254
255 class Job(object):
256     def __init__(self, conn, jid, body, reserved=True):
257         self.conn = conn
258         self.jid = jid
259         self.body = body
260         self.reserved = reserved
261
262     def _priority(self):
263         stats = self.stats()
264         if isinstance(stats, dict):
265             return stats['pri']
266         return DEFAULT_PRIORITY
267
268     # -- public interface --
269
270     def delete(self):
271         """Delete this job."""
272         self.conn.delete(self.jid)
273         self.reserved = False
274
275     def release(self, priority=None, delay=0):
276         """Release this job back into the ready queue."""
277         if self.reserved:
278             self.conn.release(self.jid, priority or self._priority(), delay)
279             self.reserved = False
280
281     def bury(self, priority=None):
282         """Bury this job."""
283         if self.reserved:
284             self.conn.bury(self.jid, priority or self._priority())
285             self.reserved = False
286
287     def kick(self):
288         """Kick this job alive."""
289         self.conn.kick_job(self.jid)
290
291     def touch(self):
292         """Touch this reserved job, requesting more time to work on it before
293         it expires."""
294         if self.reserved:
295             self.conn.touch(self.jid)
296
297     def stats(self):
298         """Return a dict of stats about this job."""
299         return self.conn.stats_job(self.jid)
300
301
302 if __name__ == '__main__':
303     import nose
304     nose.main(argv=['nosetests', '-c', '.nose.cfg'])