Statistics
| Branch: | Tag: | Revision:

root / beanstalkc.py @ master

History | View | Annotate | Download (10.3 kB)

1
#!/usr/bin/env python
2
"""beanstalkc - A beanstalkd Client Library for Python"""
3

    
4
__license__ = '''
5
Copyright (C) 2008-2012 Andreas Bolka
6

7
Licensed under the Apache License, Version 2.0 (the "License");
8
you may not use this file except in compliance with the License.
9
You may obtain a copy of the License at
10

11
    http://www.apache.org/licenses/LICENSE-2.0
12

13
Unless required by applicable law or agreed to in writing, software
14
distributed under the License is distributed on an "AS IS" BASIS,
15
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
See the License for the specific language governing permissions and
17
limitations under the License.
18
'''
19

    
20
__version__ = '0.3.0'
21

    
22
import logging
23
import socket
24

    
25

    
26
DEFAULT_HOST = 'localhost'
27
DEFAULT_PORT = 11300
28
DEFAULT_PRIORITY = 2 ** 31
29
DEFAULT_TTR = 120
30

    
31

    
32
class BeanstalkcException(Exception): pass
33
class UnexpectedResponse(BeanstalkcException): pass
34
class CommandFailed(BeanstalkcException): pass
35
class DeadlineSoon(BeanstalkcException): pass
36

    
37
class SocketError(BeanstalkcException):
38
    @staticmethod
39
    def wrap(wrapped_function, *args, **kwargs):
40
        try:
41
            return wrapped_function(*args, **kwargs)
42
        except socket.error, err:
43
            raise SocketError(err)
44

    
45

    
46
class Connection(object):
47
    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, parse_yaml=True,
48
                 connect_timeout=socket.getdefaulttimeout()):
49
        if parse_yaml is True:
50
            try:
51
                parse_yaml = __import__('yaml').load
52
            except ImportError:
53
                logging.error('Failed to load PyYAML, will not parse YAML')
54
                parse_yaml = False
55
        self._connect_timeout = connect_timeout
56
        self._parse_yaml = parse_yaml or (lambda x: x)
57
        self.host = host
58
        self.port = port
59
        self.connect()
60

    
61
    def connect(self):
62
        """Connect to beanstalkd server."""
63
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64
        self._socket.settimeout(self._connect_timeout)
65
        SocketError.wrap(self._socket.connect, (self.host, self.port))
66
        self._socket.settimeout(None)
67
        self._socket_file = self._socket.makefile('rb')
68

    
69
    def close(self):
70
        """Close connection to server."""
71
        try:
72
            self._socket.sendall('quit\r\n')
73
        except socket.error:
74
            pass
75
        try:
76
            self._socket.close()
77
        except socket.error:
78
            pass
79

    
80
    def reconnect(self):
81
        """Re-connect to server."""
82
        self.close()
83
        self.connect()
84

    
85
    def _interact(self, command, expected_ok, expected_err=[]):
86
        SocketError.wrap(self._socket.sendall, command)
87
        status, results = self._read_response()
88
        if status in expected_ok:
89
            return results
90
        elif status in expected_err:
91
            raise CommandFailed(command.split()[0], status, results)
92
        else:
93
            raise UnexpectedResponse(command.split()[0], status, results)
94

    
95
    def _read_response(self):
96
        line = SocketError.wrap(self._socket_file.readline)
97
        if not line:
98
            raise SocketError()
99
        response = line.split()
100
        return response[0], response[1:]
101

    
102
    def _read_body(self, size):
103
        body = SocketError.wrap(self._socket_file.read, size)
104
        SocketError.wrap(self._socket_file.read, 2)  # trailing crlf
105
        if size > 0 and not body:
106
            raise SocketError()
107
        return body
108

    
109
    def _interact_value(self, command, expected_ok, expected_err=[]):
110
        return self._interact(command, expected_ok, expected_err)[0]
111

    
112
    def _interact_job(self, command, expected_ok, expected_err, reserved=True):
113
        jid, size = self._interact(command, expected_ok, expected_err)
114
        body = self._read_body(int(size))
115
        return Job(self, int(jid), body, reserved)
116

    
117
    def _interact_yaml(self, command, expected_ok, expected_err=[]):
118
        size, = self._interact(command, expected_ok, expected_err)
119
        body = self._read_body(int(size))
120
        return self._parse_yaml(body)
121

    
122
    def _interact_peek(self, command):
123
        try:
124
            return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
125
        except CommandFailed, (_, _status, _results):
126
            return None
127

    
128
    # -- public interface --
129

    
130
    def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
131
        """Put a job into the current tube. Returns job id."""
132
        assert isinstance(body, str), 'Job body must be a str instance'
133
        jid = self._interact_value(
134
                'put %d %d %d %d\r\n%s\r\n' %
135
                    (priority, delay, ttr, len(body), body),
136
                ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
137
        return int(jid)
138

    
139
    def reserve(self, timeout=None):
140
        """Reserve a job from one of the watched tubes, with optional timeout
141
        in seconds. Returns a Job object, or None if the request times out."""
142
        if timeout is not None:
143
            command = 'reserve-with-timeout %d\r\n' % timeout
144
        else:
145
            command = 'reserve\r\n'
146
        try:
147
            return self._interact_job(command,
148
                                      ['RESERVED'],
149
                                      ['DEADLINE_SOON', 'TIMED_OUT'])
150
        except CommandFailed, (_, status, results):
151
            if status == 'TIMED_OUT':
152
                return None
153
            elif status == 'DEADLINE_SOON':
154
                raise DeadlineSoon(results)
155

    
156
    def kick(self, bound=1):
157
        """Kick at most bound jobs into the ready queue."""
158
        return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
159

    
160
    def kick_job(self, jid):
161
        """Kick a specific job into the ready queue."""
162
        self._interact('kick-job %d\r\n' % jid, ['KICKED'], ['NOT_FOUND'])
163

    
164
    def peek(self, jid):
165
        """Peek at a job. Returns a Job, or None."""
166
        return self._interact_peek('peek %d\r\n' % jid)
167

    
168
    def peek_ready(self):
169
        """Peek at next ready job. Returns a Job, or None."""
170
        return self._interact_peek('peek-ready\r\n')
171

    
172
    def peek_delayed(self):
173
        """Peek at next delayed job. Returns a Job, or None."""
174
        return self._interact_peek('peek-delayed\r\n')
175

    
176
    def peek_buried(self):
177
        """Peek at next buried job. Returns a Job, or None."""
178
        return self._interact_peek('peek-buried\r\n')
179

    
180
    def tubes(self):
181
        """Return a list of all existing tubes."""
182
        return self._interact_yaml('list-tubes\r\n', ['OK'])
183

    
184
    def using(self):
185
        """Return the tube currently being used."""
186
        return self._interact_value('list-tube-used\r\n', ['USING'])
187

    
188
    def use(self, name):
189
        """Use a given tube."""
190
        return self._interact_value('use %s\r\n' % name, ['USING'])
191

    
192
    def watching(self):
193
        """Return a list of all tubes being watched."""
194
        return self._interact_yaml('list-tubes-watched\r\n', ['OK'])
195

    
196
    def watch(self, name):
197
        """Watch a given tube."""
198
        return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
199

    
200
    def ignore(self, name):
201
        """Stop watching a given tube."""
202
        try:
203
            return int(self._interact_value('ignore %s\r\n' % name,
204
                                            ['WATCHING'],
205
                                            ['NOT_IGNORED']))
206
        except CommandFailed:
207
            return 1
208

    
209
    def stats(self):
210
        """Return a dict of beanstalkd statistics."""
211
        return self._interact_yaml('stats\r\n', ['OK'])
212

    
213
    def stats_tube(self, name):
214
        """Return a dict of stats about a given tube."""
215
        return self._interact_yaml('stats-tube %s\r\n' % name,
216
                                   ['OK'],
217
                                   ['NOT_FOUND'])
218

    
219
    def pause_tube(self, name, delay):
220
        """Pause a tube for a given delay time, in seconds."""
221
        self._interact('pause-tube %s %d\r\n' % (name, delay),
222
                       ['PAUSED'],
223
                       ['NOT_FOUND'])
224

    
225
    # -- job interactors --
226

    
227
    def delete(self, jid):
228
        """Delete a job, by job id."""
229
        self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
230

    
231
    def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
232
        """Release a reserved job back into the ready queue."""
233
        self._interact('release %d %d %d\r\n' % (jid, priority, delay),
234
                       ['RELEASED', 'BURIED'],
235
                       ['NOT_FOUND'])
236

    
237
    def bury(self, jid, priority=DEFAULT_PRIORITY):
238
        """Bury a job, by job id."""
239
        self._interact('bury %d %d\r\n' % (jid, priority),
240
                       ['BURIED'],
241
                       ['NOT_FOUND'])
242

    
243
    def touch(self, jid):
244
        """Touch a job, by job id, requesting more time to work on a reserved
245
        job before it expires."""
246
        self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
247

    
248
    def stats_job(self, jid):
249
        """Return a dict of stats about a job, by job id."""
250
        return self._interact_yaml('stats-job %d\r\n' % jid,
251
                                   ['OK'],
252
                                   ['NOT_FOUND'])
253

    
254

    
255
class Job(object):
256
    def __init__(self, conn, jid, body, reserved=True):
257
        self.conn = conn
258
        self.jid = jid
259
        self.body = body
260
        self.reserved = reserved
261

    
262
    def _priority(self):
263
        stats = self.stats()
264
        if isinstance(stats, dict):
265
            return stats['pri']
266
        return DEFAULT_PRIORITY
267

    
268
    # -- public interface --
269

    
270
    def delete(self):
271
        """Delete this job."""
272
        self.conn.delete(self.jid)
273
        self.reserved = False
274

    
275
    def release(self, priority=None, delay=0):
276
        """Release this job back into the ready queue."""
277
        if self.reserved:
278
            self.conn.release(self.jid, priority or self._priority(), delay)
279
            self.reserved = False
280

    
281
    def bury(self, priority=None):
282
        """Bury this job."""
283
        if self.reserved:
284
            self.conn.bury(self.jid, priority or self._priority())
285
            self.reserved = False
286

    
287
    def kick(self):
288
        """Kick this job alive."""
289
        self.conn.kick_job(self.jid)
290

    
291
    def touch(self):
292
        """Touch this reserved job, requesting more time to work on it before
293
        it expires."""
294
        if self.reserved:
295
            self.conn.touch(self.jid)
296

    
297
    def stats(self):
298
        """Return a dict of stats about this job."""
299
        return self.conn.stats_job(self.jid)
300

    
301

    
302
if __name__ == '__main__':
303
    import nose
304
    nose.main(argv=['nosetests', '-c', '.nose.cfg'])