Revision f57f6e68

b/flowspec/models.py
11 11
from flowspec.tasks import *
12 12
from time import sleep
13 13

  
14
from flowspy.utils import beanstalkc
14
import beanstalkc
15 15
from flowspy.utils.randomizer import id_generator as id_gen
16 16

  
17 17

  
b/flowspec/tasks.py
4 4
import logging
5 5
import json
6 6
from celery.task.http import *
7
from flowspy.utils import beanstalkc
7
import beanstalkc
8 8
from django.conf import settings
9 9
import datetime
10 10

  
b/poller/views.py
16 16
from django.core.urlresolvers import reverse
17 17

  
18 18

  
19
from flowspy.utils import beanstalkc
19
import beanstalkc
20 20

  
21 21
import logging
22 22

  
/dev/null
1
#!/usr/bin/env python
2
"""beanstalkc - A beanstalkd Client Library for Python"""
3

  
4
__license__ = '''
5
Copyright (C) 2008-2010 Andreas Bolka
6

  
7
Licensed under the Apache License, Version 2.0 (the "License");
8
you may not use this file except in compliance with the License.
9
You may obtain a copy of the License at
10

  
11
    http://www.apache.org/licenses/LICENSE-2.0
12

  
13
Unless required by applicable law or agreed to in writing, software
14
distributed under the License is distributed on an "AS IS" BASIS,
15
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
See the License for the specific language governing permissions and
17
limitations under the License.
18
'''
19

  
20
__version__ = '0.2.0'
21

  
22
import logging
23
import socket
24
import re
25

  
26

  
27
DEFAULT_HOST = 'localhost'
28
DEFAULT_PORT = 11300
29
DEFAULT_PRIORITY = 2**31
30
DEFAULT_TTR = 120
31
DEFAULT_TIMEOUT = 1
32

  
33

  
34
class BeanstalkcException(Exception): pass
35
class UnexpectedResponse(BeanstalkcException): pass
36
class CommandFailed(BeanstalkcException): pass
37
class DeadlineSoon(BeanstalkcException): pass
38
class SocketError(BeanstalkcException): pass
39

  
40

  
41
class Connection(object):
42
    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT,
43
                 connection_timeout=DEFAULT_TIMEOUT):
44
        self._socket = None
45
        self.host = host
46
        self.port = port
47
        self.connection_timeout = connection_timeout
48
        self.connect()
49

  
50
    def connect(self):
51
        """Connect to beanstalkd server, unless already connected."""
52
        if not self.closed:
53
            return
54
        try:
55
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
56
            self._socket.settimeout(self.connection_timeout)
57
            self._socket.connect((self.host, self.port))
58
            self._socket.settimeout(None)
59
            self._socket_file = self._socket.makefile('rb')
60
        except socket.error, e:
61
            self._socket = None
62
            raise SocketError(e)
63

  
64
    def close(self):
65
        """Close connection to server, if it is open."""
66
        if self.closed:
67
            return
68
        try:
69
            self._socket.sendall('quit\r\n')
70
            self._socket.close()
71
        except socket.error:
72
            pass
73
        finally:
74
            self._socket = None
75

  
76
    @property
77
    def closed(self):
78
        return self._socket is None
79

  
80
    def _interact(self, command, expected_ok, expected_err=[], size_field=None):
81
        try:
82
            self._socket.sendall(command)
83
            status, results = self._read_response()
84
            if status in expected_ok:
85
                if size_field is not None:
86
                    results.append(self._read_body(int(results[size_field])))
87
                return results
88
            elif status in expected_err:
89
                raise CommandFailed(command.split()[0], status, results)
90
            else:
91
                raise UnexpectedResponse(command.split()[0], status, results)
92
        except socket.error, e:
93
            self.close()
94
            raise SocketError(e)
95

  
96
    def _read_response(self):
97
        line = self._socket_file.readline()
98
        if not line:
99
            raise socket.error('no data read')
100
        response = line.split()
101
        return response[0], response[1:]
102

  
103
    def _read_body(self, size):
104
        body = self._socket_file.read(size)
105
        self._socket_file.read(2) # trailing crlf
106
        if size > 0 and not body:
107
            raise socket.error('no data read')
108
        return body
109

  
110
    def _interact_value(self, command, expected_ok, expected_err=[]):
111
        return self._interact(command, expected_ok, expected_err)[0]
112

  
113
    def _interact_job(self, command, expected_ok, expected_err, reserved=True):
114
        jid, _, body = self._interact(command, expected_ok, expected_err,
115
                                      size_field=1)
116
        return Job(self, int(jid), body, reserved)
117

  
118
    def _interact_yaml_dict(self, command, expected_ok, expected_err=[]):
119
        _, body, = self._interact(command, expected_ok, expected_err,
120
                                  size_field=0)
121
        return parse_yaml_dict(body)
122

  
123
    def _interact_yaml_list(self, command, expected_ok, expected_err=[]):
124
        _, body, = self._interact(command, expected_ok, expected_err,
125
                                  size_field=0)
126
        return parse_yaml_list(body)
127

  
128
    def _interact_peek(self, command):
129
        try:
130
            return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
131
        except CommandFailed, (_, status, results):
132
            return None
133

  
134
    # -- public interface --
135

  
136
    def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
137
        """Put a job into the current tube. Returns job id."""
138
        assert isinstance(body, str), 'Job body must be a str instance'
139
        jid = self._interact_value(
140
                'put %d %d %d %d\r\n%s\r\n' %
141
                    (priority, delay, ttr, len(body), body),
142
                ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
143
        return int(jid)
144

  
145
    def reserve(self, timeout=None):
146
        """Reserve a job from one of the watched tubes, with optional timeout in
147
        seconds. Returns a Job object, or None if the request times out."""
148
        if timeout is not None:
149
            command = 'reserve-with-timeout %d\r\n' % timeout
150
        else:
151
            command = 'reserve\r\n'
152
        try:
153
            return self._interact_job(command,
154
                                      ['RESERVED'],
155
                                      ['DEADLINE_SOON', 'TIMED_OUT'])
156
        except CommandFailed, (_, status, results):
157
            if status == 'TIMED_OUT':
158
                return None
159
            elif status == 'DEADLINE_SOON':
160
                raise DeadlineSoon(results)
161

  
162
    def kick(self, bound=1):
163
        """Kick at most bound jobs into the ready queue."""
164
        return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
165

  
166
    def peek(self, jid):
167
        """Peek at a job. Returns a Job, or None."""
168
        return self._interact_peek('peek %d\r\n' % jid)
169

  
170
    def peek_ready(self):
171
        """Peek at next ready job. Returns a Job, or None."""
172
        return self._interact_peek('peek-ready\r\n')
173

  
174
    def peek_delayed(self):
175
        """Peek at next delayed job. Returns a Job, or None."""
176
        return self._interact_peek('peek-delayed\r\n')
177

  
178
    def peek_buried(self):
179
        """Peek at next buried job. Returns a Job, or None."""
180
        return self._interact_peek('peek-buried\r\n')
181

  
182
    def tubes(self):
183
        """Return a list of all existing tubes."""
184
        return self._interact_yaml_list('list-tubes\r\n', ['OK'])
185

  
186
    def using(self):
187
        """Return a list of all tubes currently being used."""
188
        return self._interact_value('list-tube-used\r\n', ['USING'])
189

  
190
    def use(self, name):
191
        """Use a given tube."""
192
        return self._interact_value('use %s\r\n' % name, ['USING'])
193

  
194
    def watching(self):
195
        """Return a list of all tubes being watched."""
196
        return self._interact_yaml_list('list-tubes-watched\r\n', ['OK'])
197

  
198
    def watch(self, name):
199
        """Watch a given tube."""
200
        return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
201

  
202
    def ignore(self, name):
203
        """Stop watching a given tube."""
204
        try:
205
            return int(self._interact_value('ignore %s\r\n' % name,
206
                                            ['WATCHING'],
207
                                            ['NOT_IGNORED']))
208
        except CommandFailed:
209
            return 1
210

  
211
    def stats(self):
212
        """Return a dict of beanstalkd statistics."""
213
        return self._interact_yaml_dict('stats\r\n', ['OK'])
214

  
215
    def stats_tube(self, name):
216
        """Return a dict of stats about a given tube."""
217
        return self._interact_yaml_dict('stats-tube %s\r\n' % name,
218
                                        ['OK'],
219
                                        ['NOT_FOUND'])
220

  
221
    def pause_tube(self, name, delay):
222
        """Pause a tube for a given delay time, in seconds."""
223
        self._interact('pause-tube %s %d\r\n' %(name, delay),
224
                       ['PAUSED'],
225
                       ['NOT_FOUND'])
226

  
227
    # -- job interactors --
228

  
229
    def delete(self, jid):
230
        """Delete a job, by job id."""
231
        self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
232

  
233
    def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
234
        """Release a reserved job back into the ready queue."""
235
        self._interact('release %d %d %d\r\n' % (jid, priority, delay),
236
                       ['RELEASED', 'BURIED'],
237
                       ['NOT_FOUND'])
238

  
239
    def bury(self, jid, priority=DEFAULT_PRIORITY):
240
        """Bury a job, by job id."""
241
        self._interact('bury %d %d\r\n' % (jid, priority),
242
                       ['BURIED'],
243
                       ['NOT_FOUND'])
244

  
245
    def touch(self, jid):
246
        """Touch a job, by job id, requesting more time to work on a reserved
247
        job before it expires."""
248
        self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
249

  
250
    def stats_job(self, jid):
251
        """Return a dict of stats about a job, by job id."""
252
        return self._interact_yaml_dict('stats-job %d\r\n' % jid,
253
                                        ['OK'],
254
                                        ['NOT_FOUND'])
255

  
256

  
257
class Job(object):
258
    def __init__(self, conn, jid, body, reserved=True):
259
        self.conn = conn
260
        self.jid = jid
261
        self.body = body
262
        self.reserved = reserved
263

  
264
    def _priority(self):
265
        stats = self.stats()
266
        if isinstance(stats, dict):
267
            return stats['pri']
268
        return DEFAULT_PRIORITY
269

  
270
    # -- public interface --
271

  
272
    def delete(self):
273
        """Delete this job."""
274
        self.conn.delete(self.jid)
275
        self.reserved = False
276

  
277
    def release(self, priority=None, delay=0):
278
        """Release this job back into the ready queue."""
279
        if self.reserved:
280
            self.conn.release(self.jid, priority or self._priority(), delay)
281
            self.reserved = False
282

  
283
    def bury(self, priority=None):
284
        """Bury this job."""
285
        if self.reserved:
286
            self.conn.bury(self.jid, priority or self._priority())
287
            self.reserved = False
288

  
289
    def touch(self):
290
        """Touch this reserved job, requesting more time to work on it before it
291
        expires."""
292
        if self.reserved:
293
            self.conn.touch(self.jid)
294

  
295
    def stats(self):
296
        """Return a dict of stats about this job."""
297
        return self.conn.stats_job(self.jid)
298

  
299
def parse_yaml_dict(yaml):
300
    """Parse a YAML dict, in the form returned by beanstalkd."""
301
    dict = {}
302
    for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M):
303
        key, val = m.group(1), m.group(2)
304
        # Check the type of the value, and parse it.
305
        if key == 'name' or key == 'tube' or key == 'version':
306
            dict[key] = val   # String, even if it looks like a number
307
        elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None:
308
            dict[key] = int(val) # Integer value
309
        elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None:
310
            dict[key] = float(val) # Float value
311
        else:
312
            dict[key] = val     # String value
313
    return dict
314

  
315
def parse_yaml_list(yaml):
316
    """Parse a YAML list, in the form returned by beanstalkd."""
317
    return re.findall(r'^- (.*)$', yaml, re.M)
318

  
319
if __name__ == '__main__':
320
    import doctest, os, signal
321
    try:
322
        pid = os.spawnlp(os.P_NOWAIT,
323
                         'beanstalkd',
324
                         'beanstalkd', '-l', '127.0.0.1', '-p', '14711')
325
        doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
326
        doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
327
    finally:
328
        os.kill(pid, signal.SIGTERM)

Also available in: Unified diff