Revision 9cad4715

b/flowspec/admin.py
5 5
class RouteAdmin(admin.ModelAdmin):
6 6
    
7 7
    actions = ['deactivate']
8

  
8
    
9 9
    def deactivate(self, request, queryset):
10 10
        applier = PR.Applier(route_objects=queryset)
11 11
        commit, response = applier.apply(configuration=applier.delete_routes())
......
19 19

  
20 20
    list_display = ('name', 'is_online', 'applier', 'get_match', 'get_then', 'response')
21 21
    fieldsets = [
22
        (None,               {'fields': ['name',]}),
22
        (None,               {'fields': ['name','applier']}),
23 23
        ("Match",               {'fields': ['source', 'sourceport', 'destination', 'destinationport', 'port']}),
24 24
        ('Advanced Match Statements', {'fields': ['dscp', 'fragmenttype', 'icmpcode', 'icmptype', 'packetlength', 'protocol', 'tcpflag'], 'classes': ['collapse']}),
25 25
        ("Then",               {'fields': ['then' ]}),
b/flowspec/forms.py
1
from django import forms
2
from django.utils.safestring import mark_safe
3
from django.utils.translation import ugettext as _
4
from django.utils.translation import ugettext_lazy
5
from django.template.defaultfilters import filesizeformat
6

  
7
from flowspy.flowspec.models import * 
8
from ipaddr import *
9

  
10
class RouteForm(forms.ModelForm):
11
#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
12
#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
13
#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
14
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
15
#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
16
#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
17
#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
18
#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
19
#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
20
    class Meta:
21
        model = Route
22
    
23
    def clean_source(self):
24
        data = self.cleaned_data['source']
25
        if data:
26
            try:
27
                address = IPNetwork(data)
28
                return self.cleaned_data["source"]
29
            except Exception:
30
                raise forms.ValidationError('Invalid network address format')
31

  
32
    def clean_destination(self):
33
        data = self.cleaned_data['destination']
34
        if data:
35
            try:
36
                address = IPNetwork(data)
37
                return self.cleaned_data["destination"]
38
            except Exception:
39
                raise forms.ValidationError('Invalid network address format')
40

  
41
    def clean(self):
42
        source = self.cleaned_data.get('source', None)
43
        sourceports = self.cleaned_data.get('sourceport', None)
44
        ports = self.cleaned_data.get('port', None)
45
        destination = self.cleaned_data.get('destination', None)
46
        destinationports = self.cleaned_data.get('destinationport', None)
47
        if (sourceports and ports):
48
            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
49
        if (destinationports and ports):
50
            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
51
        if sourceports and not source:
52
            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
53
        if destinationports and not destination:
54
            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
55
        if not (source or sourceports or ports or destination or destinationports):
56
            raise forms.ValidationError('Fill at least a Route Match Condition')
57
        return self.cleaned_data
b/flowspec/models.py
8 8
from ipaddr import *
9 9
from datetime import *
10 10
import logging
11
from flowspec.tasks import *
12
from time import sleep
11 13

  
12 14
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
13 15
logging.basicConfig(format=FORMAT)
......
61 63

  
62 64
class Route(models.Model):
63 65
    name = models.CharField(max_length=128)
64
    applier = models.ForeignKey(User)
66
    applier = models.ForeignKey(User, blank=True, null=True)
65 67
    source = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Source Address")
66 68
    sourceport = models.ManyToManyField(MatchPort, blank=True, null=True, related_name="matchSourcePort", verbose_name="Source Port")
67 69
    destination = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Destination Address")
......
79 81
    last_updated = models.DateTimeField(auto_now=True)
80 82
    is_online = models.BooleanField(default=False)
81 83
    is_active = models.BooleanField(default=False)
82
    expires = models.DateField(default=days_offset)
84
    expires = models.DateField(default=days_offset, blank=True, null=True,)
83 85
    response = models.CharField(max_length=512, blank=True, null=True)
84 86
    comments = models.TextField(null=True, blank=True, verbose_name="Comments")
85 87

  
......
106 108
            except Exception:
107 109
                raise ValidationError('Invalid network address format at Source Field')
108 110
    
109
    def save(self, *args, **kwargs):
110
        applier = PR.Applier(route_object=self)
111
        commit, response = applier.apply()
112
        if commit:
113
            self.is_online = True
114
            self.is_active = True
115
            self.response = response
116
        else:
117
            self.is_online = False
118
            self.response = response
119
        super(Route, self).save(*args, **kwargs)
111
#    def save(self, *args, **kwargs):
112
#        edit = False
113
#        if self.pk:
114
#            #This is an edit
115
#            edit = True
116
#        super(Route, self).save(*args, **kwargs)
117
#        if not edit:
118
#            response = add.delay(self)
119
#            logger.info("Got save job id: %s" %response)
120
    
121
    def commit_add(self, *args, **kwargs):
122
        response = add.delay(self)
123
        logger.info("Got save job id: %s" %response)
124
#    
125
#    def delete(self, *args, **kwargs):
126
#        response = delete.delay(self)
127
#        logger.info("Got delete job id: %s" %response)
128
        
120 129

  
121 130
    def is_synced(self):
122 131
        
b/flowspec/tasks.py
1
from utils import proxy as PR
2
from celery.task import task
3

  
4
@task
5
def add(route):
6
    applier = PR.Applier(route_object=route)
7
    commit, response = applier.apply()
8
    if commit:
9
        is_online = True
10
        is_active = True
11
    else:
12
        is_online = False
13
        is_active = True
14
    route.is_online = is_online
15
    route.is_active = is_active
16
    route.response = response
17
    route.save()
18
#
19
#@task
20
#def delete(route):
21
#    
22
#    applier = PR.Applier(route_object=route)
23
#    commit, response = applier.apply(configuration=applier.delete_routes())
24
#    if commit:
25
#            rows = queryset.update(is_online=False, is_active=False)
26
#            queryset.update(response="Successfully removed route from network")
27
#            self.message_user(request, "Successfully removed %s routes from network" % rows)
28
#        else:
29
#            self.message_user(request, "Could not remove routes from network")
30
#    if commit:
31
#        is_online = False
32
#        is_active = False
33
#        response = "Successfully removed route from network"
34
#    else:
35
#        is_online = False
36
#        is_active = True
37
#    route.is_online = is_online
38
#    route.is_active = is_active
39
#    route.response = response
40
#    route.save()
/dev/null
1
"""
2
This file demonstrates two different styles of tests (one doctest and one
3
unittest). These will both pass when you run "manage.py test".
4

  
5
Replace these with more appropriate tests for your application.
6
"""
7

  
8
from django.test import TestCase
9

  
10
class SimpleTest(TestCase):
11
    def test_basic_addition(self):
12
        """
13
        Tests that 1 + 1 always equals 2.
14
        """
15
        self.failUnlessEqual(1 + 1, 2)
16

  
17
__test__ = {"doctest": """
18
Another way to test that 1 + 1 is equal to 2.
19

  
20
>>> 1 + 1 == 2
21
True
22
"""}
23

  
b/flowspec/views.py
3 3
import re
4 4
import socket
5 5
from django import forms
6
from django.core.cache import cache
7 6
from django.views.decorators.csrf import csrf_exempt
7
from django.core import urlresolvers
8 8
from django.contrib.auth.decorators import login_required
9 9
from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse
10 10
from django.shortcuts import get_object_or_404, render_to_response
......
15 15
from django.core.urlresolvers import reverse
16 16
from django.contrib import messages
17 17

  
18
from flowspy.flowspec.forms import * 
18 19
from flowspy.flowspec.models import *
19 20

  
21
def days_offset(): return datetime.now() + timedelta(days = settings.EXPIRATION_DAYS_OFFSET)
22

  
20 23
def user_routes(request):
21 24
    if request.user.is_anonymous():
22 25
        return HttpResponseRedirect(reverse('login'))
23 26
    user_routes = Route.objects.filter(applier=request.user)
24
    print user_routes
25 27
    return render_to_response('user_routes.html', {'routes': user_routes},
26 28
                              context_instance=RequestContext(request))
27 29

  
28 30

  
31
def add_route(request):
32
    if request.method == "GET":
33
        form = RouteForm()
34
        return render_to_response('apply.html', {'form': form},
35
                                  context_instance=RequestContext(request))
36

  
37
    else:
38
        form = RouteForm(request.POST)
39
        if form.is_valid():
40
            route=form.save(commit=False)
41
            route.applier = request.user
42
            route.expires = days_offset()
43
            route.save()
44
            form.save_m2m()
45
            route.commit_add()
46
            return HttpResponseRedirect(urlresolvers.reverse("user-routes"))
47
        else:
48
            return render_to_response('apply.html', {'form': form},
49
                                      context_instance=RequestContext(request))
b/templates/apply.html
1 1
{% extends "base.html" %}
2 2
{% load i18n %}
3
{% block title %}{% trans "Create new Virtual Machine" %}{% endblock %}
4
{% block breadcrumbs %}:: {% trans "Create Instance" %}{% endblock %}
3
{% block title %}{% trans "Create new Route" %}{% endblock %}
4
{% block breadcrumbs %}:: {% trans "Create Route" %}{% endblock %}
5 5
{% block content %}
6 6
<style type="text/css">
7 7
th {
......
17 17
</style>
18 18

  
19 19
<div align="center">
20
<h3>{% trans "Apply for a new instance" %}</h3>
20
<h3>{% trans "Apply for a new route" %}</h3>
21 21
<form method="POST">
22 22
{% csrf_token %}
23
{% if form.non_field_errors %}
24
<p class="error">{{ form.non_field_errors|join:", "}}</p>
25
{% endif %}
23 26
<fieldset>
24
<legend>{% trans "Instance information" %}</legend>
27
	<legend>{% trans "Route Basic Info" %}</legend>
25 28
<table>
26
<tr><th>{{ form.hostname.label_tag }}</th><td>{{ form.hostname }}<span class="error">{{ form.hostname.errors|join:", " }}</span></td></tr>
27
<tr class="help"><td></td><td>{{ form.hostname.help_text }}</td></tr>
28
<tr><th>{{ form.memory.label_tag }}</th><td>{{ form.memory }}<span class="error">{{ form.memory.errors|join:", " }}</span></td></tr>
29
<tr><th>{{ form.vcpus.label_tag }}</th><td>{{ form.vcpus }}<span class="error">{{ form.vcpus.errors|join:", " }}</span></td></tr>
30
<tr><th>{{ form.disk_size.label_tag }}</th><td>{{ form.disk_size }}<span class="error">{{ form.disk_size.errors|join:", " }}</span></td></tr>
31
<tr class="help"><td></td><td>{{ form.disk_size.help_text }}</td></tr>
32
<tr><th>{{ form.hosts_mail_server.label_tag }}</th><td>{{ form.hosts_mail_server }}<span class="error">{{ form.hosts_mail_server.errors|join:", " }}</span></td></tr>
33
<tr class="help"><td></td><td>{{ form.hosts_mail_server.help_text }}</td></tr>
34
<tr><th>{{ form.operating_system.label_tag }}</th><td>{{ form.operating_system }}<span class="error">{{ form.operating_system.errors|join:", " }}</span></td></tr>
35
{% if form.network %}
36
<tr><th>{{ form.network.label_tag }}</th><td>{{ form.network }}<span class="error">{{ form.network.errors|join:", " }}</span></td></tr>
37
<tr class="help"><td></td><td>{{ form.network.help_text|linebreaks }}</td></tr>
38
{% endif %}
29
<tr><th>{{ form.name.label_tag }}</th><td>{{ form.name }}<span class="error">{{ form.name.errors|join:", " }}</span></td></tr>
30
<tr class="help"><td></td><td>{{ form.name.help_text }}</td></tr>
39 31
</table>
40 32
</fieldset>
41 33

  
42 34
<fieldset>
43
<legend>{% trans "Use/Comments" %}</legend>
44
{% blocktrans %}
45
<p>Give a short description of the intended use of this virtual machine, that justifies the parameter selection above. Feel free to include any additional comments.</p>
46
{% endblocktrans %}
47
<p>{{ form.comments }}
48
{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %}
49
</p>
35
<legend>{% trans "Route Match Conditions" %}</legend>
36
<table>
37
<tr><th>{{ form.source.label_tag }}</th><td>{{ form.source }}<span class="error">{{ form.source.errors|join:", " }}</span></td></tr>
38
<tr class="help"><td></td><td>{{ form.source.help_text }}</td></tr>
39
<tr><th>{{ form.sourceport.label_tag }}</th><td>{{ form.sourceport }}<span class="error">{{ form.sourceport.errors|join:", " }}</span></td></tr>
40
<tr class="help"><td></td><td>{{ form.sourceport.help_text }}</td></tr>
41
<tr><th>{{ form.destination.label_tag }}</th><td>{{ form.destination }}<span class="error">{{ form.destination.errors|join:", " }}</span></td></tr>
42
<tr class="help"><td></td><td>{{ form.destination.help_text }}</td></tr>
43
<tr><th>{{ form.destinationport.label_tag }}</th><td>{{ form.destinationport }}<span class="error">{{ form.destinationport.errors|join:", " }}</span></td></tr>
44
<tr class="help"><td></td><td>{{ form.destinationport.help_text }}</td></tr>
45
<tr><th>{{ form.port.label_tag }}</th><td>{{ form.port }}<span class="error">{{ form.port.errors|join:", " }}</span></td></tr>
46
<tr class="help"><td></td><td>{{ form.port.help_text }}</td></tr>
47
</table>
50 48
</fieldset>
51

  
52 49
<fieldset>
53
<legend>{% trans "Administrative contact" %}</legend>
54
{% blocktrans %}
55
<p>If you are applying on behalf of a NOC under GRNET's constituency, please select the appropriate organization. Otherwise, fill-in the admin contact information below.</p>
56
{% endblocktrans %}
57

  
58
{% if form.non_field_errors %}
59
<p class="error">{{ form.non_field_errors|join:", "}}</p>
60
{% endif %}
61

  
50
<legend>{% trans "Route Actions" %}</legend>
62 51
<table>
63
<tr><th>{{ form.organization.label_tag }}</th><td>{{ form.organization }}<span class="error">{{ form.organization.errors|join:", " }}</span></td></tr>
64

  
65

  
66
<tr><td colspan="3"><div align="center">{% trans "OR" %}</div></td></tr>
67

  
68

  
69
<tr><th colspan="3"><div align="center">{% trans "Administrative contact" %}</div></th></tr>
70
<tr><th>{% trans "Name" %}</th><td>{{ form.admin_contact_name }}<span class="error">{{ form.admin_contact_name.errors|join:", " }}</span></td></tr>
71
<tr><th>E-mail</th><td>{{ form.admin_contact_email }}<span class="error">{{ form.admin_contact_email.errors|join:", " }}</span></td></tr>
72
<tr><th>{% trans "Phone" %}</th><td>{{ form.admin_contact_phone }}<span class="error">{{ form.admin_contact_phone.errors|join:", " }}</span></td></tr>
52
<tr><th>{{ form.then.label_tag }}</th><td>{{ form.then }}<span class="error">{{ form.then.errors|join:", " }}</span></td></tr>
53
<tr class="help"><td></td><td>{{ form.then.help_text }}</td></tr>
73 54
</table>
74 55
</fieldset>
75

  
76 56
<fieldset>
77
<legend>{% trans "Miscellaneous" %}</legend>
57
<legend>{% trans "Use/Comments" %}</legend>
78 58
{% blocktrans %}
79
<p>We kindly remind you of the following:</p>
80
<ul align="left">
81
<li>You are solely responsible for the data on your VM. You have to take care of back-ups etc.</li>
82
<li>We reserve the right to temporarily suspend the operation of your VM in case it causes malfunctions to our infrastructure</li>
83
</ul>
59
<p>Give a short description of the intended use of this route, that justifies the parameter selection above. Feel free to include any additional comments.</p>
84 60
{% endblocktrans %}
85
<p>{{ form.accept_tos }} {% trans "I have read the above and accept them, along with the" %} <a href="/about/terms-of-service/" target="_blank">{% trans "Terms of Service" %}</a></p>
86
{% if form.accept_tos.errors %}
87
<p class="error">
88
{% trans "You must accept the terms of service before continuing." %}
61
<p>{{ form.comments }}
62
{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %}
89 63
</p>
90
{% endif %}
91 64
</fieldset>
92 65

  
93 66
<p><input type="submit" value="{% trans "Apply" %}" /></p>
94 67
</form>
95 68
</div>
69

  
96 70
{% endblock %}
b/templates/base.html
2 2
<html>
3 3
<head>
4 4
<title>GRNET's FoD :: {% block title %}{% endblock %} </title>
5
<META HTTP-EQUIV="Pragma" CONTENT="no-cache">
6
<META HTTP-EQUIV="Expires" CONTENT="-1">
5 7
<script src="/static/js/jquery.min.js" type="text/javascript"></script>
6 8
<link rel="stylesheet" type="text/css" href="/static/css/base.css">
7 9
<link rel="stylesheet" type="text/css" href="/static/css/smoothness/jquery-ui-1.8.13.custom.css">
b/templates/user_routes.html
4 4
<script type="text/javascript" src="/static/js/jquery.dataTables.js"></script>
5 5
<script type="text/javascript">
6 6
	$(document).ready( function(){
7
		$('#create_dialog').dialog({
8
			height: 400,
9
            width: 500,
10
			modal: true,
11
			autoOpen: false,
12
		});
7 13
			$('#routes_table').dataTable( {
8 14
			"bJQueryUI": true,
9 15
			"oLanguage": {
......
11 17
			},
12 18
			"iDisplayLength": 25,
13 19
	} );
20
	$( ".button_place #routebutton" ).button({
21
            icons: {
22
                primary: "ui-icon-circle-plus"
23
            },
24
			});
14 25
		});
26
		
27
		
28
		
29

  
15 30
</script>
16 31
{% endblock %}
17 32
{% block title %}{% trans "My routes" %}{% endblock %}
18 33
{% block content %}
19
<h3>{% trans "My routes" %}</h3>
34
<div style="float:left">
35
	<h3 style="margin-top: 0px;">{% trans "My routes" %}</h3>
36
</div>
37
<div class='button_place' style="float:right">
38
	<a href="{% url add-route %}" id="routebutton">Add Route</a>
39
</div>
20 40

  
21 41
<table class="display" width="100%" id="routes_table">
22 42
<thead>
......
27 47
	<th style="text-align: center;">{% trans "Status" %}</th>
28 48
	{% comment %}<th style="text-align: center;">{% trans "Details" %}</th>{% endcomment %}
29 49
	<th style="text-align: center;">{% trans "Expires" %}</th>
50
	<th style="text-align: center;">{% trans "Response" %}</th>
30 51
</tr>
31 52
</thead>
32 53

  
......
40 61
	<td style="text-align: center;">{% if route.is_online %}Online{% else %}Offline{% endif %}</td>
41 62
	{% comment %}<td style="text-align: center;">{{ route.response }}</td>{% endcomment %}
42 63
	<td style="text-align: center;">{{ route.expires }}</td>
64
	<td style="text-align: center;">{{ route.response }}</td>
43 65
</tr>
44 66

  
45 67
{% endfor %}
46 68
</tbody>
47 69
</table>
48 70

  
71
<div id="create_dialog" title="Add a new Route">
72
	KOKO
73
	</div>
74

  
49 75
{% endblock %}
b/urls.py
8 8
    # Example:
9 9
    # (r'^flowspy/', include('flowspy.foo.urls')),
10 10
    url(r'^/?$', 'flowspy.flowspec.views.user_routes', name="user-routes"),
11
    url(r'^add/?$', 'flowspy.flowspec.views.add_route', name="add-route"),
11 12
    url(r'^user/login/?', 'django.contrib.auth.views.login', {'template_name': 'login.html'}, name="login"),
12 13
    url(r'^user/logout/?', 'django.contrib.auth.views.logout', {'next_page': '/'}, name="logout"),
13 14
    (r'^setlang/?$', 'django.views.i18n.set_language'),
b/utils/beanstalkc.py
1
#!/usr/bin/env python
2
"""beanstalkc - A beanstalkd Client Library for Python"""
3

  
4
__license__ = '''
5
Copyright (C) 2008-2010 Andreas Bolka
6

  
7
Licensed under the Apache License, Version 2.0 (the "License");
8
you may not use this file except in compliance with the License.
9
You may obtain a copy of the License at
10

  
11
    http://www.apache.org/licenses/LICENSE-2.0
12

  
13
Unless required by applicable law or agreed to in writing, software
14
distributed under the License is distributed on an "AS IS" BASIS,
15
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
See the License for the specific language governing permissions and
17
limitations under the License.
18
'''
19

  
20
__version__ = '0.2.0'
21

  
22
import logging
23
import socket
24
import re
25

  
26

  
27
DEFAULT_HOST = 'localhost'
28
DEFAULT_PORT = 11300
29
DEFAULT_PRIORITY = 2**31
30
DEFAULT_TTR = 120
31
DEFAULT_TIMEOUT = 1
32

  
33

  
34
class BeanstalkcException(Exception): pass
35
class UnexpectedResponse(BeanstalkcException): pass
36
class CommandFailed(BeanstalkcException): pass
37
class DeadlineSoon(BeanstalkcException): pass
38
class SocketError(BeanstalkcException): pass
39

  
40

  
41
class Connection(object):
42
    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT,
43
                 connection_timeout=DEFAULT_TIMEOUT):
44
        self._socket = None
45
        self.host = host
46
        self.port = port
47
        self.connection_timeout = connection_timeout
48
        self.connect()
49

  
50
    def connect(self):
51
        """Connect to beanstalkd server, unless already connected."""
52
        if not self.closed:
53
            return
54
        try:
55
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
56
            self._socket.settimeout(self.connection_timeout)
57
            self._socket.connect((self.host, self.port))
58
            self._socket.settimeout(None)
59
            self._socket_file = self._socket.makefile('rb')
60
        except socket.error, e:
61
            self._socket = None
62
            raise SocketError(e)
63

  
64
    def close(self):
65
        """Close connection to server, if it is open."""
66
        if self.closed:
67
            return
68
        try:
69
            self._socket.sendall('quit\r\n')
70
            self._socket.close()
71
        except socket.error:
72
            pass
73
        finally:
74
            self._socket = None
75

  
76
    @property
77
    def closed(self):
78
        return self._socket is None
79

  
80
    def _interact(self, command, expected_ok, expected_err=[], size_field=None):
81
        try:
82
            self._socket.sendall(command)
83
            status, results = self._read_response()
84
            if status in expected_ok:
85
                if size_field is not None:
86
                    results.append(self._read_body(int(results[size_field])))
87
                return results
88
            elif status in expected_err:
89
                raise CommandFailed(command.split()[0], status, results)
90
            else:
91
                raise UnexpectedResponse(command.split()[0], status, results)
92
        except socket.error, e:
93
            self.close()
94
            raise SocketError(e)
95

  
96
    def _read_response(self):
97
        line = self._socket_file.readline()
98
        if not line:
99
            raise socket.error('no data read')
100
        response = line.split()
101
        return response[0], response[1:]
102

  
103
    def _read_body(self, size):
104
        body = self._socket_file.read(size)
105
        self._socket_file.read(2) # trailing crlf
106
        if size > 0 and not body:
107
            raise socket.error('no data read')
108
        return body
109

  
110
    def _interact_value(self, command, expected_ok, expected_err=[]):
111
        return self._interact(command, expected_ok, expected_err)[0]
112

  
113
    def _interact_job(self, command, expected_ok, expected_err, reserved=True):
114
        jid, _, body = self._interact(command, expected_ok, expected_err,
115
                                      size_field=1)
116
        return Job(self, int(jid), body, reserved)
117

  
118
    def _interact_yaml_dict(self, command, expected_ok, expected_err=[]):
119
        _, body, = self._interact(command, expected_ok, expected_err,
120
                                  size_field=0)
121
        return parse_yaml_dict(body)
122

  
123
    def _interact_yaml_list(self, command, expected_ok, expected_err=[]):
124
        _, body, = self._interact(command, expected_ok, expected_err,
125
                                  size_field=0)
126
        return parse_yaml_list(body)
127

  
128
    def _interact_peek(self, command):
129
        try:
130
            return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
131
        except CommandFailed, (_, status, results):
132
            return None
133

  
134
    # -- public interface --
135

  
136
    def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
137
        """Put a job into the current tube. Returns job id."""
138
        assert isinstance(body, str), 'Job body must be a str instance'
139
        jid = self._interact_value(
140
                'put %d %d %d %d\r\n%s\r\n' %
141
                    (priority, delay, ttr, len(body), body),
142
                ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
143
        return int(jid)
144

  
145
    def reserve(self, timeout=None):
146
        """Reserve a job from one of the watched tubes, with optional timeout in
147
        seconds. Returns a Job object, or None if the request times out."""
148
        if timeout is not None:
149
            command = 'reserve-with-timeout %d\r\n' % timeout
150
        else:
151
            command = 'reserve\r\n'
152
        try:
153
            return self._interact_job(command,
154
                                      ['RESERVED'],
155
                                      ['DEADLINE_SOON', 'TIMED_OUT'])
156
        except CommandFailed, (_, status, results):
157
            if status == 'TIMED_OUT':
158
                return None
159
            elif status == 'DEADLINE_SOON':
160
                raise DeadlineSoon(results)
161

  
162
    def kick(self, bound=1):
163
        """Kick at most bound jobs into the ready queue."""
164
        return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
165

  
166
    def peek(self, jid):
167
        """Peek at a job. Returns a Job, or None."""
168
        return self._interact_peek('peek %d\r\n' % jid)
169

  
170
    def peek_ready(self):
171
        """Peek at next ready job. Returns a Job, or None."""
172
        return self._interact_peek('peek-ready\r\n')
173

  
174
    def peek_delayed(self):
175
        """Peek at next delayed job. Returns a Job, or None."""
176
        return self._interact_peek('peek-delayed\r\n')
177

  
178
    def peek_buried(self):
179
        """Peek at next buried job. Returns a Job, or None."""
180
        return self._interact_peek('peek-buried\r\n')
181

  
182
    def tubes(self):
183
        """Return a list of all existing tubes."""
184
        return self._interact_yaml_list('list-tubes\r\n', ['OK'])
185

  
186
    def using(self):
187
        """Return a list of all tubes currently being used."""
188
        return self._interact_value('list-tube-used\r\n', ['USING'])
189

  
190
    def use(self, name):
191
        """Use a given tube."""
192
        return self._interact_value('use %s\r\n' % name, ['USING'])
193

  
194
    def watching(self):
195
        """Return a list of all tubes being watched."""
196
        return self._interact_yaml_list('list-tubes-watched\r\n', ['OK'])
197

  
198
    def watch(self, name):
199
        """Watch a given tube."""
200
        return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
201

  
202
    def ignore(self, name):
203
        """Stop watching a given tube."""
204
        try:
205
            return int(self._interact_value('ignore %s\r\n' % name,
206
                                            ['WATCHING'],
207
                                            ['NOT_IGNORED']))
208
        except CommandFailed:
209
            return 1
210

  
211
    def stats(self):
212
        """Return a dict of beanstalkd statistics."""
213
        return self._interact_yaml_dict('stats\r\n', ['OK'])
214

  
215
    def stats_tube(self, name):
216
        """Return a dict of stats about a given tube."""
217
        return self._interact_yaml_dict('stats-tube %s\r\n' % name,
218
                                        ['OK'],
219
                                        ['NOT_FOUND'])
220

  
221
    def pause_tube(self, name, delay):
222
        """Pause a tube for a given delay time, in seconds."""
223
        self._interact('pause-tube %s %d\r\n' %(name, delay),
224
                       ['PAUSED'],
225
                       ['NOT_FOUND'])
226

  
227
    # -- job interactors --
228

  
229
    def delete(self, jid):
230
        """Delete a job, by job id."""
231
        self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
232

  
233
    def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
234
        """Release a reserved job back into the ready queue."""
235
        self._interact('release %d %d %d\r\n' % (jid, priority, delay),
236
                       ['RELEASED', 'BURIED'],
237
                       ['NOT_FOUND'])
238

  
239
    def bury(self, jid, priority=DEFAULT_PRIORITY):
240
        """Bury a job, by job id."""
241
        self._interact('bury %d %d\r\n' % (jid, priority),
242
                       ['BURIED'],
243
                       ['NOT_FOUND'])
244

  
245
    def touch(self, jid):
246
        """Touch a job, by job id, requesting more time to work on a reserved
247
        job before it expires."""
248
        self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
249

  
250
    def stats_job(self, jid):
251
        """Return a dict of stats about a job, by job id."""
252
        return self._interact_yaml_dict('stats-job %d\r\n' % jid,
253
                                        ['OK'],
254
                                        ['NOT_FOUND'])
255

  
256

  
257
class Job(object):
258
    def __init__(self, conn, jid, body, reserved=True):
259
        self.conn = conn
260
        self.jid = jid
261
        self.body = body
262
        self.reserved = reserved
263

  
264
    def _priority(self):
265
        stats = self.stats()
266
        if isinstance(stats, dict):
267
            return stats['pri']
268
        return DEFAULT_PRIORITY
269

  
270
    # -- public interface --
271

  
272
    def delete(self):
273
        """Delete this job."""
274
        self.conn.delete(self.jid)
275
        self.reserved = False
276

  
277
    def release(self, priority=None, delay=0):
278
        """Release this job back into the ready queue."""
279
        if self.reserved:
280
            self.conn.release(self.jid, priority or self._priority(), delay)
281
            self.reserved = False
282

  
283
    def bury(self, priority=None):
284
        """Bury this job."""
285
        if self.reserved:
286
            self.conn.bury(self.jid, priority or self._priority())
287
            self.reserved = False
288

  
289
    def touch(self):
290
        """Touch this reserved job, requesting more time to work on it before it
291
        expires."""
292
        if self.reserved:
293
            self.conn.touch(self.jid)
294

  
295
    def stats(self):
296
        """Return a dict of stats about this job."""
297
        return self.conn.stats_job(self.jid)
298

  
299
def parse_yaml_dict(yaml):
300
    """Parse a YAML dict, in the form returned by beanstalkd."""
301
    dict = {}
302
    for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M):
303
        key, val = m.group(1), m.group(2)
304
        # Check the type of the value, and parse it.
305
        if key == 'name' or key == 'tube' or key == 'version':
306
            dict[key] = val   # String, even if it looks like a number
307
        elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None:
308
            dict[key] = int(val) # Integer value
309
        elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None:
310
            dict[key] = float(val) # Float value
311
        else:
312
            dict[key] = val     # String value
313
    return dict
314

  
315
def parse_yaml_list(yaml):
316
    """Parse a YAML list, in the form returned by beanstalkd."""
317
    return re.findall(r'^- (.*)$', yaml, re.M)
318

  
319
if __name__ == '__main__':
320
    import doctest, os, signal
321
    try:
322
        pid = os.spawnlp(os.P_NOWAIT,
323
                         'beanstalkd',
324
                         'beanstalkd', '-l', '127.0.0.1', '-p', '14711')
325
        doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
326
        doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
327
    finally:
328
        os.kill(pid, signal.SIGTERM)

Also available in: Unified diff