Statistics
| Branch: | Tag: | Revision:

root / utils / beanstalkc.py @ 9cad4715

History | View | Annotate | Download (11.5 kB)

1
#!/usr/bin/env python
2
"""beanstalkc - A beanstalkd Client Library for Python"""
3

    
4
__license__ = '''
5
Copyright (C) 2008-2010 Andreas Bolka
6

7
Licensed under the Apache License, Version 2.0 (the "License");
8
you may not use this file except in compliance with the License.
9
You may obtain a copy of the License at
10

11
    http://www.apache.org/licenses/LICENSE-2.0
12

13
Unless required by applicable law or agreed to in writing, software
14
distributed under the License is distributed on an "AS IS" BASIS,
15
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
See the License for the specific language governing permissions and
17
limitations under the License.
18
'''
19

    
20
__version__ = '0.2.0'
21

    
22
import logging
23
import socket
24
import re
25

    
26

    
27
DEFAULT_HOST = 'localhost'
28
DEFAULT_PORT = 11300
29
DEFAULT_PRIORITY = 2**31
30
DEFAULT_TTR = 120
31
DEFAULT_TIMEOUT = 1
32

    
33

    
34
class BeanstalkcException(Exception): pass
35
class UnexpectedResponse(BeanstalkcException): pass
36
class CommandFailed(BeanstalkcException): pass
37
class DeadlineSoon(BeanstalkcException): pass
38
class SocketError(BeanstalkcException): pass
39

    
40

    
41
class Connection(object):
42
    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT,
43
                 connection_timeout=DEFAULT_TIMEOUT):
44
        self._socket = None
45
        self.host = host
46
        self.port = port
47
        self.connection_timeout = connection_timeout
48
        self.connect()
49

    
50
    def connect(self):
51
        """Connect to beanstalkd server, unless already connected."""
52
        if not self.closed:
53
            return
54
        try:
55
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
56
            self._socket.settimeout(self.connection_timeout)
57
            self._socket.connect((self.host, self.port))
58
            self._socket.settimeout(None)
59
            self._socket_file = self._socket.makefile('rb')
60
        except socket.error, e:
61
            self._socket = None
62
            raise SocketError(e)
63

    
64
    def close(self):
65
        """Close connection to server, if it is open."""
66
        if self.closed:
67
            return
68
        try:
69
            self._socket.sendall('quit\r\n')
70
            self._socket.close()
71
        except socket.error:
72
            pass
73
        finally:
74
            self._socket = None
75

    
76
    @property
77
    def closed(self):
78
        return self._socket is None
79

    
80
    def _interact(self, command, expected_ok, expected_err=[], size_field=None):
81
        try:
82
            self._socket.sendall(command)
83
            status, results = self._read_response()
84
            if status in expected_ok:
85
                if size_field is not None:
86
                    results.append(self._read_body(int(results[size_field])))
87
                return results
88
            elif status in expected_err:
89
                raise CommandFailed(command.split()[0], status, results)
90
            else:
91
                raise UnexpectedResponse(command.split()[0], status, results)
92
        except socket.error, e:
93
            self.close()
94
            raise SocketError(e)
95

    
96
    def _read_response(self):
97
        line = self._socket_file.readline()
98
        if not line:
99
            raise socket.error('no data read')
100
        response = line.split()
101
        return response[0], response[1:]
102

    
103
    def _read_body(self, size):
104
        body = self._socket_file.read(size)
105
        self._socket_file.read(2) # trailing crlf
106
        if size > 0 and not body:
107
            raise socket.error('no data read')
108
        return body
109

    
110
    def _interact_value(self, command, expected_ok, expected_err=[]):
111
        return self._interact(command, expected_ok, expected_err)[0]
112

    
113
    def _interact_job(self, command, expected_ok, expected_err, reserved=True):
114
        jid, _, body = self._interact(command, expected_ok, expected_err,
115
                                      size_field=1)
116
        return Job(self, int(jid), body, reserved)
117

    
118
    def _interact_yaml_dict(self, command, expected_ok, expected_err=[]):
119
        _, body, = self._interact(command, expected_ok, expected_err,
120
                                  size_field=0)
121
        return parse_yaml_dict(body)
122

    
123
    def _interact_yaml_list(self, command, expected_ok, expected_err=[]):
124
        _, body, = self._interact(command, expected_ok, expected_err,
125
                                  size_field=0)
126
        return parse_yaml_list(body)
127

    
128
    def _interact_peek(self, command):
129
        try:
130
            return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
131
        except CommandFailed, (_, status, results):
132
            return None
133

    
134
    # -- public interface --
135

    
136
    def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
137
        """Put a job into the current tube. Returns job id."""
138
        assert isinstance(body, str), 'Job body must be a str instance'
139
        jid = self._interact_value(
140
                'put %d %d %d %d\r\n%s\r\n' %
141
                    (priority, delay, ttr, len(body), body),
142
                ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
143
        return int(jid)
144

    
145
    def reserve(self, timeout=None):
146
        """Reserve a job from one of the watched tubes, with optional timeout in
147
        seconds. Returns a Job object, or None if the request times out."""
148
        if timeout is not None:
149
            command = 'reserve-with-timeout %d\r\n' % timeout
150
        else:
151
            command = 'reserve\r\n'
152
        try:
153
            return self._interact_job(command,
154
                                      ['RESERVED'],
155
                                      ['DEADLINE_SOON', 'TIMED_OUT'])
156
        except CommandFailed, (_, status, results):
157
            if status == 'TIMED_OUT':
158
                return None
159
            elif status == 'DEADLINE_SOON':
160
                raise DeadlineSoon(results)
161

    
162
    def kick(self, bound=1):
163
        """Kick at most bound jobs into the ready queue."""
164
        return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
165

    
166
    def peek(self, jid):
167
        """Peek at a job. Returns a Job, or None."""
168
        return self._interact_peek('peek %d\r\n' % jid)
169

    
170
    def peek_ready(self):
171
        """Peek at next ready job. Returns a Job, or None."""
172
        return self._interact_peek('peek-ready\r\n')
173

    
174
    def peek_delayed(self):
175
        """Peek at next delayed job. Returns a Job, or None."""
176
        return self._interact_peek('peek-delayed\r\n')
177

    
178
    def peek_buried(self):
179
        """Peek at next buried job. Returns a Job, or None."""
180
        return self._interact_peek('peek-buried\r\n')
181

    
182
    def tubes(self):
183
        """Return a list of all existing tubes."""
184
        return self._interact_yaml_list('list-tubes\r\n', ['OK'])
185

    
186
    def using(self):
187
        """Return a list of all tubes currently being used."""
188
        return self._interact_value('list-tube-used\r\n', ['USING'])
189

    
190
    def use(self, name):
191
        """Use a given tube."""
192
        return self._interact_value('use %s\r\n' % name, ['USING'])
193

    
194
    def watching(self):
195
        """Return a list of all tubes being watched."""
196
        return self._interact_yaml_list('list-tubes-watched\r\n', ['OK'])
197

    
198
    def watch(self, name):
199
        """Watch a given tube."""
200
        return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
201

    
202
    def ignore(self, name):
203
        """Stop watching a given tube."""
204
        try:
205
            return int(self._interact_value('ignore %s\r\n' % name,
206
                                            ['WATCHING'],
207
                                            ['NOT_IGNORED']))
208
        except CommandFailed:
209
            return 1
210

    
211
    def stats(self):
212
        """Return a dict of beanstalkd statistics."""
213
        return self._interact_yaml_dict('stats\r\n', ['OK'])
214

    
215
    def stats_tube(self, name):
216
        """Return a dict of stats about a given tube."""
217
        return self._interact_yaml_dict('stats-tube %s\r\n' % name,
218
                                        ['OK'],
219
                                        ['NOT_FOUND'])
220

    
221
    def pause_tube(self, name, delay):
222
        """Pause a tube for a given delay time, in seconds."""
223
        self._interact('pause-tube %s %d\r\n' %(name, delay),
224
                       ['PAUSED'],
225
                       ['NOT_FOUND'])
226

    
227
    # -- job interactors --
228

    
229
    def delete(self, jid):
230
        """Delete a job, by job id."""
231
        self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
232

    
233
    def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
234
        """Release a reserved job back into the ready queue."""
235
        self._interact('release %d %d %d\r\n' % (jid, priority, delay),
236
                       ['RELEASED', 'BURIED'],
237
                       ['NOT_FOUND'])
238

    
239
    def bury(self, jid, priority=DEFAULT_PRIORITY):
240
        """Bury a job, by job id."""
241
        self._interact('bury %d %d\r\n' % (jid, priority),
242
                       ['BURIED'],
243
                       ['NOT_FOUND'])
244

    
245
    def touch(self, jid):
246
        """Touch a job, by job id, requesting more time to work on a reserved
247
        job before it expires."""
248
        self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
249

    
250
    def stats_job(self, jid):
251
        """Return a dict of stats about a job, by job id."""
252
        return self._interact_yaml_dict('stats-job %d\r\n' % jid,
253
                                        ['OK'],
254
                                        ['NOT_FOUND'])
255

    
256

    
257
class Job(object):
258
    def __init__(self, conn, jid, body, reserved=True):
259
        self.conn = conn
260
        self.jid = jid
261
        self.body = body
262
        self.reserved = reserved
263

    
264
    def _priority(self):
265
        stats = self.stats()
266
        if isinstance(stats, dict):
267
            return stats['pri']
268
        return DEFAULT_PRIORITY
269

    
270
    # -- public interface --
271

    
272
    def delete(self):
273
        """Delete this job."""
274
        self.conn.delete(self.jid)
275
        self.reserved = False
276

    
277
    def release(self, priority=None, delay=0):
278
        """Release this job back into the ready queue."""
279
        if self.reserved:
280
            self.conn.release(self.jid, priority or self._priority(), delay)
281
            self.reserved = False
282

    
283
    def bury(self, priority=None):
284
        """Bury this job."""
285
        if self.reserved:
286
            self.conn.bury(self.jid, priority or self._priority())
287
            self.reserved = False
288

    
289
    def touch(self):
290
        """Touch this reserved job, requesting more time to work on it before it
291
        expires."""
292
        if self.reserved:
293
            self.conn.touch(self.jid)
294

    
295
    def stats(self):
296
        """Return a dict of stats about this job."""
297
        return self.conn.stats_job(self.jid)
298

    
299
def parse_yaml_dict(yaml):
300
    """Parse a YAML dict, in the form returned by beanstalkd."""
301
    dict = {}
302
    for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M):
303
        key, val = m.group(1), m.group(2)
304
        # Check the type of the value, and parse it.
305
        if key == 'name' or key == 'tube' or key == 'version':
306
            dict[key] = val   # String, even if it looks like a number
307
        elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None:
308
            dict[key] = int(val) # Integer value
309
        elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None:
310
            dict[key] = float(val) # Float value
311
        else:
312
            dict[key] = val     # String value
313
    return dict
314

    
315
def parse_yaml_list(yaml):
316
    """Parse a YAML list, in the form returned by beanstalkd."""
317
    return re.findall(r'^- (.*)$', yaml, re.M)
318

    
319
if __name__ == '__main__':
320
    import doctest, os, signal
321
    try:
322
        pid = os.spawnlp(os.P_NOWAIT,
323
                         'beanstalkd',
324
                         'beanstalkd', '-l', '127.0.0.1', '-p', '14711')
325
        doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
326
        doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
327
    finally:
328
        os.kill(pid, signal.SIGTERM)