Statistics
| Branch: | Tag: | Revision:

root / beanstalkc.py @ 0bf16f7f

History | View | Annotate | Download (10.3 kB)

1 01850d03 Leonidas Poulopoulos
#!/usr/bin/env python
2 01850d03 Leonidas Poulopoulos
"""beanstalkc - A beanstalkd Client Library for Python"""
3 01850d03 Leonidas Poulopoulos
4 01850d03 Leonidas Poulopoulos
__license__ = '''
5 01850d03 Leonidas Poulopoulos
Copyright (C) 2008-2012 Andreas Bolka
6 01850d03 Leonidas Poulopoulos

7 01850d03 Leonidas Poulopoulos
Licensed under the Apache License, Version 2.0 (the "License");
8 01850d03 Leonidas Poulopoulos
you may not use this file except in compliance with the License.
9 01850d03 Leonidas Poulopoulos
You may obtain a copy of the License at
10 01850d03 Leonidas Poulopoulos

11 01850d03 Leonidas Poulopoulos
    http://www.apache.org/licenses/LICENSE-2.0
12 01850d03 Leonidas Poulopoulos

13 01850d03 Leonidas Poulopoulos
Unless required by applicable law or agreed to in writing, software
14 01850d03 Leonidas Poulopoulos
distributed under the License is distributed on an "AS IS" BASIS,
15 01850d03 Leonidas Poulopoulos
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 01850d03 Leonidas Poulopoulos
See the License for the specific language governing permissions and
17 01850d03 Leonidas Poulopoulos
limitations under the License.
18 01850d03 Leonidas Poulopoulos
'''
19 01850d03 Leonidas Poulopoulos
20 01850d03 Leonidas Poulopoulos
__version__ = '0.3.0'
21 01850d03 Leonidas Poulopoulos
22 01850d03 Leonidas Poulopoulos
import logging
23 01850d03 Leonidas Poulopoulos
import socket
24 01850d03 Leonidas Poulopoulos
25 01850d03 Leonidas Poulopoulos
26 01850d03 Leonidas Poulopoulos
DEFAULT_HOST = 'localhost'
27 01850d03 Leonidas Poulopoulos
DEFAULT_PORT = 11300
28 01850d03 Leonidas Poulopoulos
DEFAULT_PRIORITY = 2 ** 31
29 01850d03 Leonidas Poulopoulos
DEFAULT_TTR = 120
30 01850d03 Leonidas Poulopoulos
31 01850d03 Leonidas Poulopoulos
32 01850d03 Leonidas Poulopoulos
class BeanstalkcException(Exception): pass
33 01850d03 Leonidas Poulopoulos
class UnexpectedResponse(BeanstalkcException): pass
34 01850d03 Leonidas Poulopoulos
class CommandFailed(BeanstalkcException): pass
35 01850d03 Leonidas Poulopoulos
class DeadlineSoon(BeanstalkcException): pass
36 01850d03 Leonidas Poulopoulos
37 01850d03 Leonidas Poulopoulos
class SocketError(BeanstalkcException):
38 01850d03 Leonidas Poulopoulos
    @staticmethod
39 01850d03 Leonidas Poulopoulos
    def wrap(wrapped_function, *args, **kwargs):
40 01850d03 Leonidas Poulopoulos
        try:
41 01850d03 Leonidas Poulopoulos
            return wrapped_function(*args, **kwargs)
42 01850d03 Leonidas Poulopoulos
        except socket.error, err:
43 01850d03 Leonidas Poulopoulos
            raise SocketError(err)
44 01850d03 Leonidas Poulopoulos
45 01850d03 Leonidas Poulopoulos
46 01850d03 Leonidas Poulopoulos
class Connection(object):
47 01850d03 Leonidas Poulopoulos
    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, parse_yaml=True,
48 01850d03 Leonidas Poulopoulos
                 connect_timeout=socket.getdefaulttimeout()):
49 01850d03 Leonidas Poulopoulos
        if parse_yaml is True:
50 01850d03 Leonidas Poulopoulos
            try:
51 01850d03 Leonidas Poulopoulos
                parse_yaml = __import__('yaml').load
52 01850d03 Leonidas Poulopoulos
            except ImportError:
53 01850d03 Leonidas Poulopoulos
                logging.error('Failed to load PyYAML, will not parse YAML')
54 01850d03 Leonidas Poulopoulos
                parse_yaml = False
55 01850d03 Leonidas Poulopoulos
        self._connect_timeout = connect_timeout
56 01850d03 Leonidas Poulopoulos
        self._parse_yaml = parse_yaml or (lambda x: x)
57 01850d03 Leonidas Poulopoulos
        self.host = host
58 01850d03 Leonidas Poulopoulos
        self.port = port
59 01850d03 Leonidas Poulopoulos
        self.connect()
60 01850d03 Leonidas Poulopoulos
61 01850d03 Leonidas Poulopoulos
    def connect(self):
62 01850d03 Leonidas Poulopoulos
        """Connect to beanstalkd server."""
63 01850d03 Leonidas Poulopoulos
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64 01850d03 Leonidas Poulopoulos
        self._socket.settimeout(self._connect_timeout)
65 01850d03 Leonidas Poulopoulos
        SocketError.wrap(self._socket.connect, (self.host, self.port))
66 01850d03 Leonidas Poulopoulos
        self._socket.settimeout(None)
67 01850d03 Leonidas Poulopoulos
        self._socket_file = self._socket.makefile('rb')
68 01850d03 Leonidas Poulopoulos
69 01850d03 Leonidas Poulopoulos
    def close(self):
70 01850d03 Leonidas Poulopoulos
        """Close connection to server."""
71 01850d03 Leonidas Poulopoulos
        try:
72 01850d03 Leonidas Poulopoulos
            self._socket.sendall('quit\r\n')
73 01850d03 Leonidas Poulopoulos
        except socket.error:
74 01850d03 Leonidas Poulopoulos
            pass
75 01850d03 Leonidas Poulopoulos
        try:
76 01850d03 Leonidas Poulopoulos
            self._socket.close()
77 01850d03 Leonidas Poulopoulos
        except socket.error:
78 01850d03 Leonidas Poulopoulos
            pass
79 01850d03 Leonidas Poulopoulos
80 01850d03 Leonidas Poulopoulos
    def reconnect(self):
81 01850d03 Leonidas Poulopoulos
        """Re-connect to server."""
82 01850d03 Leonidas Poulopoulos
        self.close()
83 01850d03 Leonidas Poulopoulos
        self.connect()
84 01850d03 Leonidas Poulopoulos
85 01850d03 Leonidas Poulopoulos
    def _interact(self, command, expected_ok, expected_err=[]):
86 01850d03 Leonidas Poulopoulos
        SocketError.wrap(self._socket.sendall, command)
87 01850d03 Leonidas Poulopoulos
        status, results = self._read_response()
88 01850d03 Leonidas Poulopoulos
        if status in expected_ok:
89 01850d03 Leonidas Poulopoulos
            return results
90 01850d03 Leonidas Poulopoulos
        elif status in expected_err:
91 01850d03 Leonidas Poulopoulos
            raise CommandFailed(command.split()[0], status, results)
92 01850d03 Leonidas Poulopoulos
        else:
93 01850d03 Leonidas Poulopoulos
            raise UnexpectedResponse(command.split()[0], status, results)
94 01850d03 Leonidas Poulopoulos
95 01850d03 Leonidas Poulopoulos
    def _read_response(self):
96 01850d03 Leonidas Poulopoulos
        line = SocketError.wrap(self._socket_file.readline)
97 01850d03 Leonidas Poulopoulos
        if not line:
98 01850d03 Leonidas Poulopoulos
            raise SocketError()
99 01850d03 Leonidas Poulopoulos
        response = line.split()
100 01850d03 Leonidas Poulopoulos
        return response[0], response[1:]
101 01850d03 Leonidas Poulopoulos
102 01850d03 Leonidas Poulopoulos
    def _read_body(self, size):
103 01850d03 Leonidas Poulopoulos
        body = SocketError.wrap(self._socket_file.read, size)
104 01850d03 Leonidas Poulopoulos
        SocketError.wrap(self._socket_file.read, 2)  # trailing crlf
105 01850d03 Leonidas Poulopoulos
        if size > 0 and not body:
106 01850d03 Leonidas Poulopoulos
            raise SocketError()
107 01850d03 Leonidas Poulopoulos
        return body
108 01850d03 Leonidas Poulopoulos
109 01850d03 Leonidas Poulopoulos
    def _interact_value(self, command, expected_ok, expected_err=[]):
110 01850d03 Leonidas Poulopoulos
        return self._interact(command, expected_ok, expected_err)[0]
111 01850d03 Leonidas Poulopoulos
112 01850d03 Leonidas Poulopoulos
    def _interact_job(self, command, expected_ok, expected_err, reserved=True):
113 01850d03 Leonidas Poulopoulos
        jid, size = self._interact(command, expected_ok, expected_err)
114 01850d03 Leonidas Poulopoulos
        body = self._read_body(int(size))
115 01850d03 Leonidas Poulopoulos
        return Job(self, int(jid), body, reserved)
116 01850d03 Leonidas Poulopoulos
117 01850d03 Leonidas Poulopoulos
    def _interact_yaml(self, command, expected_ok, expected_err=[]):
118 01850d03 Leonidas Poulopoulos
        size, = self._interact(command, expected_ok, expected_err)
119 01850d03 Leonidas Poulopoulos
        body = self._read_body(int(size))
120 01850d03 Leonidas Poulopoulos
        return self._parse_yaml(body)
121 01850d03 Leonidas Poulopoulos
122 01850d03 Leonidas Poulopoulos
    def _interact_peek(self, command):
123 01850d03 Leonidas Poulopoulos
        try:
124 01850d03 Leonidas Poulopoulos
            return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
125 01850d03 Leonidas Poulopoulos
        except CommandFailed, (_, _status, _results):
126 01850d03 Leonidas Poulopoulos
            return None
127 01850d03 Leonidas Poulopoulos
128 01850d03 Leonidas Poulopoulos
    # -- public interface --
129 01850d03 Leonidas Poulopoulos
130 01850d03 Leonidas Poulopoulos
    def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
131 01850d03 Leonidas Poulopoulos
        """Put a job into the current tube. Returns job id."""
132 01850d03 Leonidas Poulopoulos
        assert isinstance(body, str), 'Job body must be a str instance'
133 01850d03 Leonidas Poulopoulos
        jid = self._interact_value(
134 01850d03 Leonidas Poulopoulos
                'put %d %d %d %d\r\n%s\r\n' %
135 01850d03 Leonidas Poulopoulos
                    (priority, delay, ttr, len(body), body),
136 01850d03 Leonidas Poulopoulos
                ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
137 01850d03 Leonidas Poulopoulos
        return int(jid)
138 01850d03 Leonidas Poulopoulos
139 01850d03 Leonidas Poulopoulos
    def reserve(self, timeout=None):
140 01850d03 Leonidas Poulopoulos
        """Reserve a job from one of the watched tubes, with optional timeout
141 01850d03 Leonidas Poulopoulos
        in seconds. Returns a Job object, or None if the request times out."""
142 01850d03 Leonidas Poulopoulos
        if timeout is not None:
143 01850d03 Leonidas Poulopoulos
            command = 'reserve-with-timeout %d\r\n' % timeout
144 01850d03 Leonidas Poulopoulos
        else:
145 01850d03 Leonidas Poulopoulos
            command = 'reserve\r\n'
146 01850d03 Leonidas Poulopoulos
        try:
147 01850d03 Leonidas Poulopoulos
            return self._interact_job(command,
148 01850d03 Leonidas Poulopoulos
                                      ['RESERVED'],
149 01850d03 Leonidas Poulopoulos
                                      ['DEADLINE_SOON', 'TIMED_OUT'])
150 01850d03 Leonidas Poulopoulos
        except CommandFailed, (_, status, results):
151 01850d03 Leonidas Poulopoulos
            if status == 'TIMED_OUT':
152 01850d03 Leonidas Poulopoulos
                return None
153 01850d03 Leonidas Poulopoulos
            elif status == 'DEADLINE_SOON':
154 01850d03 Leonidas Poulopoulos
                raise DeadlineSoon(results)
155 01850d03 Leonidas Poulopoulos
156 01850d03 Leonidas Poulopoulos
    def kick(self, bound=1):
157 01850d03 Leonidas Poulopoulos
        """Kick at most bound jobs into the ready queue."""
158 01850d03 Leonidas Poulopoulos
        return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
159 01850d03 Leonidas Poulopoulos
160 01850d03 Leonidas Poulopoulos
    def kick_job(self, jid):
161 01850d03 Leonidas Poulopoulos
        """Kick a specific job into the ready queue."""
162 01850d03 Leonidas Poulopoulos
        self._interact('kick-job %d\r\n' % jid, ['KICKED'], ['NOT_FOUND'])
163 01850d03 Leonidas Poulopoulos
164 01850d03 Leonidas Poulopoulos
    def peek(self, jid):
165 01850d03 Leonidas Poulopoulos
        """Peek at a job. Returns a Job, or None."""
166 01850d03 Leonidas Poulopoulos
        return self._interact_peek('peek %d\r\n' % jid)
167 01850d03 Leonidas Poulopoulos
168 01850d03 Leonidas Poulopoulos
    def peek_ready(self):
169 01850d03 Leonidas Poulopoulos
        """Peek at next ready job. Returns a Job, or None."""
170 01850d03 Leonidas Poulopoulos
        return self._interact_peek('peek-ready\r\n')
171 01850d03 Leonidas Poulopoulos
172 01850d03 Leonidas Poulopoulos
    def peek_delayed(self):
173 01850d03 Leonidas Poulopoulos
        """Peek at next delayed job. Returns a Job, or None."""
174 01850d03 Leonidas Poulopoulos
        return self._interact_peek('peek-delayed\r\n')
175 01850d03 Leonidas Poulopoulos
176 01850d03 Leonidas Poulopoulos
    def peek_buried(self):
177 01850d03 Leonidas Poulopoulos
        """Peek at next buried job. Returns a Job, or None."""
178 01850d03 Leonidas Poulopoulos
        return self._interact_peek('peek-buried\r\n')
179 01850d03 Leonidas Poulopoulos
180 01850d03 Leonidas Poulopoulos
    def tubes(self):
181 01850d03 Leonidas Poulopoulos
        """Return a list of all existing tubes."""
182 01850d03 Leonidas Poulopoulos
        return self._interact_yaml('list-tubes\r\n', ['OK'])
183 01850d03 Leonidas Poulopoulos
184 01850d03 Leonidas Poulopoulos
    def using(self):
185 01850d03 Leonidas Poulopoulos
        """Return the tube currently being used."""
186 01850d03 Leonidas Poulopoulos
        return self._interact_value('list-tube-used\r\n', ['USING'])
187 01850d03 Leonidas Poulopoulos
188 01850d03 Leonidas Poulopoulos
    def use(self, name):
189 01850d03 Leonidas Poulopoulos
        """Use a given tube."""
190 01850d03 Leonidas Poulopoulos
        return self._interact_value('use %s\r\n' % name, ['USING'])
191 01850d03 Leonidas Poulopoulos
192 01850d03 Leonidas Poulopoulos
    def watching(self):
193 01850d03 Leonidas Poulopoulos
        """Return a list of all tubes being watched."""
194 01850d03 Leonidas Poulopoulos
        return self._interact_yaml('list-tubes-watched\r\n', ['OK'])
195 01850d03 Leonidas Poulopoulos
196 01850d03 Leonidas Poulopoulos
    def watch(self, name):
197 01850d03 Leonidas Poulopoulos
        """Watch a given tube."""
198 01850d03 Leonidas Poulopoulos
        return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
199 01850d03 Leonidas Poulopoulos
200 01850d03 Leonidas Poulopoulos
    def ignore(self, name):
201 01850d03 Leonidas Poulopoulos
        """Stop watching a given tube."""
202 01850d03 Leonidas Poulopoulos
        try:
203 01850d03 Leonidas Poulopoulos
            return int(self._interact_value('ignore %s\r\n' % name,
204 01850d03 Leonidas Poulopoulos
                                            ['WATCHING'],
205 01850d03 Leonidas Poulopoulos
                                            ['NOT_IGNORED']))
206 01850d03 Leonidas Poulopoulos
        except CommandFailed:
207 01850d03 Leonidas Poulopoulos
            return 1
208 01850d03 Leonidas Poulopoulos
209 01850d03 Leonidas Poulopoulos
    def stats(self):
210 01850d03 Leonidas Poulopoulos
        """Return a dict of beanstalkd statistics."""
211 01850d03 Leonidas Poulopoulos
        return self._interact_yaml('stats\r\n', ['OK'])
212 01850d03 Leonidas Poulopoulos
213 01850d03 Leonidas Poulopoulos
    def stats_tube(self, name):
214 01850d03 Leonidas Poulopoulos
        """Return a dict of stats about a given tube."""
215 01850d03 Leonidas Poulopoulos
        return self._interact_yaml('stats-tube %s\r\n' % name,
216 01850d03 Leonidas Poulopoulos
                                   ['OK'],
217 01850d03 Leonidas Poulopoulos
                                   ['NOT_FOUND'])
218 01850d03 Leonidas Poulopoulos
219 01850d03 Leonidas Poulopoulos
    def pause_tube(self, name, delay):
220 01850d03 Leonidas Poulopoulos
        """Pause a tube for a given delay time, in seconds."""
221 01850d03 Leonidas Poulopoulos
        self._interact('pause-tube %s %d\r\n' % (name, delay),
222 01850d03 Leonidas Poulopoulos
                       ['PAUSED'],
223 01850d03 Leonidas Poulopoulos
                       ['NOT_FOUND'])
224 01850d03 Leonidas Poulopoulos
225 01850d03 Leonidas Poulopoulos
    # -- job interactors --
226 01850d03 Leonidas Poulopoulos
227 01850d03 Leonidas Poulopoulos
    def delete(self, jid):
228 01850d03 Leonidas Poulopoulos
        """Delete a job, by job id."""
229 01850d03 Leonidas Poulopoulos
        self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
230 01850d03 Leonidas Poulopoulos
231 01850d03 Leonidas Poulopoulos
    def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
232 01850d03 Leonidas Poulopoulos
        """Release a reserved job back into the ready queue."""
233 01850d03 Leonidas Poulopoulos
        self._interact('release %d %d %d\r\n' % (jid, priority, delay),
234 01850d03 Leonidas Poulopoulos
                       ['RELEASED', 'BURIED'],
235 01850d03 Leonidas Poulopoulos
                       ['NOT_FOUND'])
236 01850d03 Leonidas Poulopoulos
237 01850d03 Leonidas Poulopoulos
    def bury(self, jid, priority=DEFAULT_PRIORITY):
238 01850d03 Leonidas Poulopoulos
        """Bury a job, by job id."""
239 01850d03 Leonidas Poulopoulos
        self._interact('bury %d %d\r\n' % (jid, priority),
240 01850d03 Leonidas Poulopoulos
                       ['BURIED'],
241 01850d03 Leonidas Poulopoulos
                       ['NOT_FOUND'])
242 01850d03 Leonidas Poulopoulos
243 01850d03 Leonidas Poulopoulos
    def touch(self, jid):
244 01850d03 Leonidas Poulopoulos
        """Touch a job, by job id, requesting more time to work on a reserved
245 01850d03 Leonidas Poulopoulos
        job before it expires."""
246 01850d03 Leonidas Poulopoulos
        self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
247 01850d03 Leonidas Poulopoulos
248 01850d03 Leonidas Poulopoulos
    def stats_job(self, jid):
249 01850d03 Leonidas Poulopoulos
        """Return a dict of stats about a job, by job id."""
250 01850d03 Leonidas Poulopoulos
        return self._interact_yaml('stats-job %d\r\n' % jid,
251 01850d03 Leonidas Poulopoulos
                                   ['OK'],
252 01850d03 Leonidas Poulopoulos
                                   ['NOT_FOUND'])
253 01850d03 Leonidas Poulopoulos
254 01850d03 Leonidas Poulopoulos
255 01850d03 Leonidas Poulopoulos
class Job(object):
256 01850d03 Leonidas Poulopoulos
    def __init__(self, conn, jid, body, reserved=True):
257 01850d03 Leonidas Poulopoulos
        self.conn = conn
258 01850d03 Leonidas Poulopoulos
        self.jid = jid
259 01850d03 Leonidas Poulopoulos
        self.body = body
260 01850d03 Leonidas Poulopoulos
        self.reserved = reserved
261 01850d03 Leonidas Poulopoulos
262 01850d03 Leonidas Poulopoulos
    def _priority(self):
263 01850d03 Leonidas Poulopoulos
        stats = self.stats()
264 01850d03 Leonidas Poulopoulos
        if isinstance(stats, dict):
265 01850d03 Leonidas Poulopoulos
            return stats['pri']
266 01850d03 Leonidas Poulopoulos
        return DEFAULT_PRIORITY
267 01850d03 Leonidas Poulopoulos
268 01850d03 Leonidas Poulopoulos
    # -- public interface --
269 01850d03 Leonidas Poulopoulos
270 01850d03 Leonidas Poulopoulos
    def delete(self):
271 01850d03 Leonidas Poulopoulos
        """Delete this job."""
272 01850d03 Leonidas Poulopoulos
        self.conn.delete(self.jid)
273 01850d03 Leonidas Poulopoulos
        self.reserved = False
274 01850d03 Leonidas Poulopoulos
275 01850d03 Leonidas Poulopoulos
    def release(self, priority=None, delay=0):
276 01850d03 Leonidas Poulopoulos
        """Release this job back into the ready queue."""
277 01850d03 Leonidas Poulopoulos
        if self.reserved:
278 01850d03 Leonidas Poulopoulos
            self.conn.release(self.jid, priority or self._priority(), delay)
279 01850d03 Leonidas Poulopoulos
            self.reserved = False
280 01850d03 Leonidas Poulopoulos
281 01850d03 Leonidas Poulopoulos
    def bury(self, priority=None):
282 01850d03 Leonidas Poulopoulos
        """Bury this job."""
283 01850d03 Leonidas Poulopoulos
        if self.reserved:
284 01850d03 Leonidas Poulopoulos
            self.conn.bury(self.jid, priority or self._priority())
285 01850d03 Leonidas Poulopoulos
            self.reserved = False
286 01850d03 Leonidas Poulopoulos
287 01850d03 Leonidas Poulopoulos
    def kick(self):
288 01850d03 Leonidas Poulopoulos
        """Kick this job alive."""
289 01850d03 Leonidas Poulopoulos
        self.conn.kick_job(self.jid)
290 01850d03 Leonidas Poulopoulos
291 01850d03 Leonidas Poulopoulos
    def touch(self):
292 01850d03 Leonidas Poulopoulos
        """Touch this reserved job, requesting more time to work on it before
293 01850d03 Leonidas Poulopoulos
        it expires."""
294 01850d03 Leonidas Poulopoulos
        if self.reserved:
295 01850d03 Leonidas Poulopoulos
            self.conn.touch(self.jid)
296 01850d03 Leonidas Poulopoulos
297 01850d03 Leonidas Poulopoulos
    def stats(self):
298 01850d03 Leonidas Poulopoulos
        """Return a dict of stats about this job."""
299 01850d03 Leonidas Poulopoulos
        return self.conn.stats_job(self.jid)
300 01850d03 Leonidas Poulopoulos
301 01850d03 Leonidas Poulopoulos
302 01850d03 Leonidas Poulopoulos
if __name__ == '__main__':
303 01850d03 Leonidas Poulopoulos
    import nose
304 01850d03 Leonidas Poulopoulos
    nose.main(argv=['nosetests', '-c', '.nose.cfg'])