Added celery and beanstalk
authorLeonidas Poulopoulos <leopoul@noc.grnet.gr>
Mon, 21 Nov 2011 17:34:42 +0000 (19:34 +0200)
committerLeonidas Poulopoulos <leopoul@noc.grnet.gr>
Mon, 21 Nov 2011 17:34:42 +0000 (19:34 +0200)
12 files changed:
flowspec/admin.py
flowspec/forms.py [new file with mode: 0644]
flowspec/models.py
flowspec/tasks.py [new file with mode: 0644]
flowspec/tests.py [deleted file]
flowspec/views.py
flowspec_dev.db
templates/apply.html
templates/base.html
templates/user_routes.html
urls.py
utils/beanstalkc.py [new file with mode: 0644]

index f6c3ad9..35052d1 100644 (file)
@@ -5,7 +5,7 @@ from utils import proxy as PR
 class RouteAdmin(admin.ModelAdmin):
     
     actions = ['deactivate']
-
+    
     def deactivate(self, request, queryset):
         applier = PR.Applier(route_objects=queryset)
         commit, response = applier.apply(configuration=applier.delete_routes())
@@ -19,7 +19,7 @@ class RouteAdmin(admin.ModelAdmin):
 
     list_display = ('name', 'is_online', 'applier', 'get_match', 'get_then', 'response')
     fieldsets = [
-        (None,               {'fields': ['name',]}),
+        (None,               {'fields': ['name','applier']}),
         ("Match",               {'fields': ['source', 'sourceport', 'destination', 'destinationport', 'port']}),
         ('Advanced Match Statements', {'fields': ['dscp', 'fragmenttype', 'icmpcode', 'icmptype', 'packetlength', 'protocol', 'tcpflag'], 'classes': ['collapse']}),
         ("Then",               {'fields': ['then' ]}),
diff --git a/flowspec/forms.py b/flowspec/forms.py
new file mode 100644 (file)
index 0000000..bdd78a6
--- /dev/null
@@ -0,0 +1,57 @@
+from django import forms
+from django.utils.safestring import mark_safe
+from django.utils.translation import ugettext as _
+from django.utils.translation import ugettext_lazy
+from django.template.defaultfilters import filesizeformat
+
+from flowspy.flowspec.models import * 
+from ipaddr import *
+
+class RouteForm(forms.ModelForm):
+#    name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
+#                                         " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
+#    source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
+#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
+#    source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
+#    destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
+#                                         " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
+#    destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
+#    ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
+    class Meta:
+        model = Route
+    
+    def clean_source(self):
+        data = self.cleaned_data['source']
+        if data:
+            try:
+                address = IPNetwork(data)
+                return self.cleaned_data["source"]
+            except Exception:
+                raise forms.ValidationError('Invalid network address format')
+
+    def clean_destination(self):
+        data = self.cleaned_data['destination']
+        if data:
+            try:
+                address = IPNetwork(data)
+                return self.cleaned_data["destination"]
+            except Exception:
+                raise forms.ValidationError('Invalid network address format')
+
+    def clean(self):
+        source = self.cleaned_data.get('source', None)
+        sourceports = self.cleaned_data.get('sourceport', None)
+        ports = self.cleaned_data.get('port', None)
+        destination = self.cleaned_data.get('destination', None)
+        destinationports = self.cleaned_data.get('destinationport', None)
+        if (sourceports and ports):
+            raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
+        if (destinationports and ports):
+            raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
+        if sourceports and not source:
+            raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
+        if destinationports and not destination:
+            raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
+        if not (source or sourceports or ports or destination or destinationports):
+            raise forms.ValidationError('Fill at least a Route Match Condition')
+        return self.cleaned_data
\ No newline at end of file
index b3f917b..257c660 100644 (file)
@@ -8,6 +8,8 @@ from utils import proxy as PR
 from ipaddr import *
 from datetime import *
 import logging
+from flowspec.tasks import *
+from time import sleep
 
 FORMAT = '%(asctime)s %(levelname)s: %(message)s'
 logging.basicConfig(format=FORMAT)
@@ -61,7 +63,7 @@ class ThenAction(models.Model):
 
 class Route(models.Model):
     name = models.CharField(max_length=128)
-    applier = models.ForeignKey(User)
+    applier = models.ForeignKey(User, blank=True, null=True)
     source = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Source Address")
     sourceport = models.ManyToManyField(MatchPort, blank=True, null=True, related_name="matchSourcePort", verbose_name="Source Port")
     destination = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Destination Address")
@@ -79,7 +81,7 @@ class Route(models.Model):
     last_updated = models.DateTimeField(auto_now=True)
     is_online = models.BooleanField(default=False)
     is_active = models.BooleanField(default=False)
-    expires = models.DateField(default=days_offset)
+    expires = models.DateField(default=days_offset, blank=True, null=True,)
     response = models.CharField(max_length=512, blank=True, null=True)
     comments = models.TextField(null=True, blank=True, verbose_name="Comments")
 
@@ -106,17 +108,24 @@ class Route(models.Model):
             except Exception:
                 raise ValidationError('Invalid network address format at Source Field')
     
-    def save(self, *args, **kwargs):
-        applier = PR.Applier(route_object=self)
-        commit, response = applier.apply()
-        if commit:
-            self.is_online = True
-            self.is_active = True
-            self.response = response
-        else:
-            self.is_online = False
-            self.response = response
-        super(Route, self).save(*args, **kwargs)
+#    def save(self, *args, **kwargs):
+#        edit = False
+#        if self.pk:
+#            #This is an edit
+#            edit = True
+#        super(Route, self).save(*args, **kwargs)
+#        if not edit:
+#            response = add.delay(self)
+#            logger.info("Got save job id: %s" %response)
+    
+    def commit_add(self, *args, **kwargs):
+        response = add.delay(self)
+        logger.info("Got save job id: %s" %response)
+#    
+#    def delete(self, *args, **kwargs):
+#        response = delete.delay(self)
+#        logger.info("Got delete job id: %s" %response)
+        
 
     def is_synced(self):
         
diff --git a/flowspec/tasks.py b/flowspec/tasks.py
new file mode 100644 (file)
index 0000000..3a997af
--- /dev/null
@@ -0,0 +1,40 @@
+from utils import proxy as PR
+from celery.task import task
+
+@task
+def add(route):
+    applier = PR.Applier(route_object=route)
+    commit, response = applier.apply()
+    if commit:
+        is_online = True
+        is_active = True
+    else:
+        is_online = False
+        is_active = True
+    route.is_online = is_online
+    route.is_active = is_active
+    route.response = response
+    route.save()
+#
+#@task
+#def delete(route):
+#    
+#    applier = PR.Applier(route_object=route)
+#    commit, response = applier.apply(configuration=applier.delete_routes())
+#    if commit:
+#            rows = queryset.update(is_online=False, is_active=False)
+#            queryset.update(response="Successfully removed route from network")
+#            self.message_user(request, "Successfully removed %s routes from network" % rows)
+#        else:
+#            self.message_user(request, "Could not remove routes from network")
+#    if commit:
+#        is_online = False
+#        is_active = False
+#        response = "Successfully removed route from network"
+#    else:
+#        is_online = False
+#        is_active = True
+#    route.is_online = is_online
+#    route.is_active = is_active
+#    route.response = response
+#    route.save()
\ No newline at end of file
diff --git a/flowspec/tests.py b/flowspec/tests.py
deleted file mode 100644 (file)
index 2247054..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-"""
-This file demonstrates two different styles of tests (one doctest and one
-unittest). These will both pass when you run "manage.py test".
-
-Replace these with more appropriate tests for your application.
-"""
-
-from django.test import TestCase
-
-class SimpleTest(TestCase):
-    def test_basic_addition(self):
-        """
-        Tests that 1 + 1 always equals 2.
-        """
-        self.failUnlessEqual(1 + 1, 2)
-
-__test__ = {"doctest": """
-Another way to test that 1 + 1 is equal to 2.
-
->>> 1 + 1 == 2
-True
-"""}
-
index 77ae75a..d8216fb 100644 (file)
@@ -3,8 +3,8 @@ import urllib2
 import re
 import socket
 from django import forms
-from django.core.cache import cache
 from django.views.decorators.csrf import csrf_exempt
+from django.core import urlresolvers
 from django.contrib.auth.decorators import login_required
 from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse
 from django.shortcuts import get_object_or_404, render_to_response
@@ -15,14 +15,35 @@ from django.utils import simplejson
 from django.core.urlresolvers import reverse
 from django.contrib import messages
 
+from flowspy.flowspec.forms import * 
 from flowspy.flowspec.models import *
 
+def days_offset(): return datetime.now() + timedelta(days = settings.EXPIRATION_DAYS_OFFSET)
+
 def user_routes(request):
     if request.user.is_anonymous():
         return HttpResponseRedirect(reverse('login'))
     user_routes = Route.objects.filter(applier=request.user)
-    print user_routes
     return render_to_response('user_routes.html', {'routes': user_routes},
                               context_instance=RequestContext(request))
 
 
+def add_route(request):
+    if request.method == "GET":
+        form = RouteForm()
+        return render_to_response('apply.html', {'form': form},
+                                  context_instance=RequestContext(request))
+
+    else:
+        form = RouteForm(request.POST)
+        if form.is_valid():
+            route=form.save(commit=False)
+            route.applier = request.user
+            route.expires = days_offset()
+            route.save()
+            form.save_m2m()
+            route.commit_add()
+            return HttpResponseRedirect(urlresolvers.reverse("user-routes"))
+        else:
+            return render_to_response('apply.html', {'form': form},
+                                      context_instance=RequestContext(request))
index 9a32e30..fdb41ff 100644 (file)
Binary files a/flowspec_dev.db and b/flowspec_dev.db differ
index 23b15f1..5c83c45 100644 (file)
@@ -1,7 +1,7 @@
 {% extends "base.html" %}
 {% load i18n %}
-{% block title %}{% trans "Create new Virtual Machine" %}{% endblock %}
-{% block breadcrumbs %}:: {% trans "Create Instance" %}{% endblock %}
+{% block title %}{% trans "Create new Route" %}{% endblock %}
+{% block breadcrumbs %}:: {% trans "Create Route" %}{% endblock %}
 {% block content %}
 <style type="text/css">
 th {
@@ -17,80 +17,54 @@ th {
 </style>
 
 <div align="center">
-<h3>{% trans "Apply for a new instance" %}</h3>
+<h3>{% trans "Apply for a new route" %}</h3>
 <form method="POST">
 {% csrf_token %}
+{% if form.non_field_errors %}
+<p class="error">{{ form.non_field_errors|join:", "}}</p>
+{% endif %}
 <fieldset>
-<legend>{% trans "Instance information" %}</legend>
+       <legend>{% trans "Route Basic Info" %}</legend>
 <table>
-<tr><th>{{ form.hostname.label_tag }}</th><td>{{ form.hostname }}<span class="error">{{ form.hostname.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.hostname.help_text }}</td></tr>
-<tr><th>{{ form.memory.label_tag }}</th><td>{{ form.memory }}<span class="error">{{ form.memory.errors|join:", " }}</span></td></tr>
-<tr><th>{{ form.vcpus.label_tag }}</th><td>{{ form.vcpus }}<span class="error">{{ form.vcpus.errors|join:", " }}</span></td></tr>
-<tr><th>{{ form.disk_size.label_tag }}</th><td>{{ form.disk_size }}<span class="error">{{ form.disk_size.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.disk_size.help_text }}</td></tr>
-<tr><th>{{ form.hosts_mail_server.label_tag }}</th><td>{{ form.hosts_mail_server }}<span class="error">{{ form.hosts_mail_server.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.hosts_mail_server.help_text }}</td></tr>
-<tr><th>{{ form.operating_system.label_tag }}</th><td>{{ form.operating_system }}<span class="error">{{ form.operating_system.errors|join:", " }}</span></td></tr>
-{% if form.network %}
-<tr><th>{{ form.network.label_tag }}</th><td>{{ form.network }}<span class="error">{{ form.network.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.network.help_text|linebreaks }}</td></tr>
-{% endif %}
+<tr><th>{{ form.name.label_tag }}</th><td>{{ form.name }}<span class="error">{{ form.name.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.name.help_text }}</td></tr>
 </table>
 </fieldset>
 
 <fieldset>
-<legend>{% trans "Use/Comments" %}</legend>
-{% blocktrans %}
-<p>Give a short description of the intended use of this virtual machine, that justifies the parameter selection above. Feel free to include any additional comments.</p>
-{% endblocktrans %}
-<p>{{ form.comments }}
-{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %}
-</p>
+<legend>{% trans "Route Match Conditions" %}</legend>
+<table>
+<tr><th>{{ form.source.label_tag }}</th><td>{{ form.source }}<span class="error">{{ form.source.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.source.help_text }}</td></tr>
+<tr><th>{{ form.sourceport.label_tag }}</th><td>{{ form.sourceport }}<span class="error">{{ form.sourceport.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.sourceport.help_text }}</td></tr>
+<tr><th>{{ form.destination.label_tag }}</th><td>{{ form.destination }}<span class="error">{{ form.destination.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.destination.help_text }}</td></tr>
+<tr><th>{{ form.destinationport.label_tag }}</th><td>{{ form.destinationport }}<span class="error">{{ form.destinationport.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.destinationport.help_text }}</td></tr>
+<tr><th>{{ form.port.label_tag }}</th><td>{{ form.port }}<span class="error">{{ form.port.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.port.help_text }}</td></tr>
+</table>
 </fieldset>
-
 <fieldset>
-<legend>{% trans "Administrative contact" %}</legend>
-{% blocktrans %}
-<p>If you are applying on behalf of a NOC under GRNET's constituency, please select the appropriate organization. Otherwise, fill-in the admin contact information below.</p>
-{% endblocktrans %}
-
-{% if form.non_field_errors %}
-<p class="error">{{ form.non_field_errors|join:", "}}</p>
-{% endif %}
-
+<legend>{% trans "Route Actions" %}</legend>
 <table>
-<tr><th>{{ form.organization.label_tag }}</th><td>{{ form.organization }}<span class="error">{{ form.organization.errors|join:", " }}</span></td></tr>
-
-
-<tr><td colspan="3"><div align="center">{% trans "OR" %}</div></td></tr>
-
-
-<tr><th colspan="3"><div align="center">{% trans "Administrative contact" %}</div></th></tr>
-<tr><th>{% trans "Name" %}</th><td>{{ form.admin_contact_name }}<span class="error">{{ form.admin_contact_name.errors|join:", " }}</span></td></tr>
-<tr><th>E-mail</th><td>{{ form.admin_contact_email }}<span class="error">{{ form.admin_contact_email.errors|join:", " }}</span></td></tr>
-<tr><th>{% trans "Phone" %}</th><td>{{ form.admin_contact_phone }}<span class="error">{{ form.admin_contact_phone.errors|join:", " }}</span></td></tr>
+<tr><th>{{ form.then.label_tag }}</th><td>{{ form.then }}<span class="error">{{ form.then.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.then.help_text }}</td></tr>
 </table>
 </fieldset>
-
 <fieldset>
-<legend>{% trans "Miscellaneous" %}</legend>
+<legend>{% trans "Use/Comments" %}</legend>
 {% blocktrans %}
-<p>We kindly remind you of the following:</p>
-<ul align="left">
-<li>You are solely responsible for the data on your VM. You have to take care of back-ups etc.</li>
-<li>We reserve the right to temporarily suspend the operation of your VM in case it causes malfunctions to our infrastructure</li>
-</ul>
+<p>Give a short description of the intended use of this route, that justifies the parameter selection above. Feel free to include any additional comments.</p>
 {% endblocktrans %}
-<p>{{ form.accept_tos }} {% trans "I have read the above and accept them, along with the" %} <a href="/about/terms-of-service/" target="_blank">{% trans "Terms of Service" %}</a></p>
-{% if form.accept_tos.errors %}
-<p class="error">
-{% trans "You must accept the terms of service before continuing." %}
+<p>{{ form.comments }}
+{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %}
 </p>
-{% endif %}
 </fieldset>
 
 <p><input type="submit" value="{% trans "Apply" %}" /></p>
 </form>
 </div>
+
 {% endblock %}
index 19973f0..9ce5e2e 100644 (file)
@@ -2,6 +2,8 @@
 <html>
 <head>
 <title>GRNET's FoD :: {% block title %}{% endblock %} </title>
+<META HTTP-EQUIV="Pragma" CONTENT="no-cache">
+<META HTTP-EQUIV="Expires" CONTENT="-1">
 <script src="/static/js/jquery.min.js" type="text/javascript"></script>
 <link rel="stylesheet" type="text/css" href="/static/css/base.css">
 <link rel="stylesheet" type="text/css" href="/static/css/smoothness/jquery-ui-1.8.13.custom.css">
index 6fe303d..c29c4f7 100644 (file)
@@ -4,6 +4,12 @@
 <script type="text/javascript" src="/static/js/jquery.dataTables.js"></script>
 <script type="text/javascript">
        $(document).ready( function(){
+               $('#create_dialog').dialog({
+                       height: 400,
+            width: 500,
+                       modal: true,
+                       autoOpen: false,
+               });
                        $('#routes_table').dataTable( {
                        "bJQueryUI": true,
                        "oLanguage": {
                        },
                        "iDisplayLength": 25,
        } );
+       $( ".button_place #routebutton" ).button({
+            icons: {
+                primary: "ui-icon-circle-plus"
+            },
+                       });
                });
+               
+               
+               
+
 </script>
 {% endblock %}
 {% block title %}{% trans "My routes" %}{% endblock %}
 {% block content %}
-<h3>{% trans "My routes" %}</h3>
+<div style="float:left">
+       <h3 style="margin-top: 0px;">{% trans "My routes" %}</h3>
+</div>
+<div class='button_place' style="float:right">
+       <a href="{% url add-route %}" id="routebutton">Add Route</a>
+</div>
 
 <table class="display" width="100%" id="routes_table">
 <thead>
@@ -27,6 +47,7 @@
        <th style="text-align: center;">{% trans "Status" %}</th>
        {% comment %}<th style="text-align: center;">{% trans "Details" %}</th>{% endcomment %}
        <th style="text-align: center;">{% trans "Expires" %}</th>
+       <th style="text-align: center;">{% trans "Response" %}</th>
 </tr>
 </thead>
 
        <td style="text-align: center;">{% if route.is_online %}Online{% else %}Offline{% endif %}</td>
        {% comment %}<td style="text-align: center;">{{ route.response }}</td>{% endcomment %}
        <td style="text-align: center;">{{ route.expires }}</td>
+       <td style="text-align: center;">{{ route.response }}</td>
 </tr>
 
 {% endfor %}
 </tbody>
 </table>
 
+<div id="create_dialog" title="Add a new Route">
+       KOKO
+       </div>
+
 {% endblock %}
diff --git a/urls.py b/urls.py
index 7fa8548..8cc86e6 100644 (file)
--- a/urls.py
+++ b/urls.py
@@ -8,6 +8,7 @@ urlpatterns = patterns('',
     # Example:
     # (r'^flowspy/', include('flowspy.foo.urls')),
     url(r'^/?$', 'flowspy.flowspec.views.user_routes', name="user-routes"),
+    url(r'^add/?$', 'flowspy.flowspec.views.add_route', name="add-route"),
     url(r'^user/login/?', 'django.contrib.auth.views.login', {'template_name': 'login.html'}, name="login"),
     url(r'^user/logout/?', 'django.contrib.auth.views.logout', {'next_page': '/'}, name="logout"),
     (r'^setlang/?$', 'django.views.i18n.set_language'),
diff --git a/utils/beanstalkc.py b/utils/beanstalkc.py
new file mode 100644 (file)
index 0000000..bb976df
--- /dev/null
@@ -0,0 +1,328 @@
+#!/usr/bin/env python
+"""beanstalkc - A beanstalkd Client Library for Python"""
+
+__license__ = '''
+Copyright (C) 2008-2010 Andreas Bolka
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+__version__ = '0.2.0'
+
+import logging
+import socket
+import re
+
+
+DEFAULT_HOST = 'localhost'
+DEFAULT_PORT = 11300
+DEFAULT_PRIORITY = 2**31
+DEFAULT_TTR = 120
+DEFAULT_TIMEOUT = 1
+
+
+class BeanstalkcException(Exception): pass
+class UnexpectedResponse(BeanstalkcException): pass
+class CommandFailed(BeanstalkcException): pass
+class DeadlineSoon(BeanstalkcException): pass
+class SocketError(BeanstalkcException): pass
+
+
+class Connection(object):
+    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT,
+                 connection_timeout=DEFAULT_TIMEOUT):
+        self._socket = None
+        self.host = host
+        self.port = port
+        self.connection_timeout = connection_timeout
+        self.connect()
+
+    def connect(self):
+        """Connect to beanstalkd server, unless already connected."""
+        if not self.closed:
+            return
+        try:
+            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self._socket.settimeout(self.connection_timeout)
+            self._socket.connect((self.host, self.port))
+            self._socket.settimeout(None)
+            self._socket_file = self._socket.makefile('rb')
+        except socket.error, e:
+            self._socket = None
+            raise SocketError(e)
+
+    def close(self):
+        """Close connection to server, if it is open."""
+        if self.closed:
+            return
+        try:
+            self._socket.sendall('quit\r\n')
+            self._socket.close()
+        except socket.error:
+            pass
+        finally:
+            self._socket = None
+
+    @property
+    def closed(self):
+        return self._socket is None
+
+    def _interact(self, command, expected_ok, expected_err=[], size_field=None):
+        try:
+            self._socket.sendall(command)
+            status, results = self._read_response()
+            if status in expected_ok:
+                if size_field is not None:
+                    results.append(self._read_body(int(results[size_field])))
+                return results
+            elif status in expected_err:
+                raise CommandFailed(command.split()[0], status, results)
+            else:
+                raise UnexpectedResponse(command.split()[0], status, results)
+        except socket.error, e:
+            self.close()
+            raise SocketError(e)
+
+    def _read_response(self):
+        line = self._socket_file.readline()
+        if not line:
+            raise socket.error('no data read')
+        response = line.split()
+        return response[0], response[1:]
+
+    def _read_body(self, size):
+        body = self._socket_file.read(size)
+        self._socket_file.read(2) # trailing crlf
+        if size > 0 and not body:
+            raise socket.error('no data read')
+        return body
+
+    def _interact_value(self, command, expected_ok, expected_err=[]):
+        return self._interact(command, expected_ok, expected_err)[0]
+
+    def _interact_job(self, command, expected_ok, expected_err, reserved=True):
+        jid, _, body = self._interact(command, expected_ok, expected_err,
+                                      size_field=1)
+        return Job(self, int(jid), body, reserved)
+
+    def _interact_yaml_dict(self, command, expected_ok, expected_err=[]):
+        _, body, = self._interact(command, expected_ok, expected_err,
+                                  size_field=0)
+        return parse_yaml_dict(body)
+
+    def _interact_yaml_list(self, command, expected_ok, expected_err=[]):
+        _, body, = self._interact(command, expected_ok, expected_err,
+                                  size_field=0)
+        return parse_yaml_list(body)
+
+    def _interact_peek(self, command):
+        try:
+            return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
+        except CommandFailed, (_, status, results):
+            return None
+
+    # -- public interface --
+
+    def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
+        """Put a job into the current tube. Returns job id."""
+        assert isinstance(body, str), 'Job body must be a str instance'
+        jid = self._interact_value(
+                'put %d %d %d %d\r\n%s\r\n' %
+                    (priority, delay, ttr, len(body), body),
+                ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
+        return int(jid)
+
+    def reserve(self, timeout=None):
+        """Reserve a job from one of the watched tubes, with optional timeout in
+        seconds. Returns a Job object, or None if the request times out."""
+        if timeout is not None:
+            command = 'reserve-with-timeout %d\r\n' % timeout
+        else:
+            command = 'reserve\r\n'
+        try:
+            return self._interact_job(command,
+                                      ['RESERVED'],
+                                      ['DEADLINE_SOON', 'TIMED_OUT'])
+        except CommandFailed, (_, status, results):
+            if status == 'TIMED_OUT':
+                return None
+            elif status == 'DEADLINE_SOON':
+                raise DeadlineSoon(results)
+
+    def kick(self, bound=1):
+        """Kick at most bound jobs into the ready queue."""
+        return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
+
+    def peek(self, jid):
+        """Peek at a job. Returns a Job, or None."""
+        return self._interact_peek('peek %d\r\n' % jid)
+
+    def peek_ready(self):
+        """Peek at next ready job. Returns a Job, or None."""
+        return self._interact_peek('peek-ready\r\n')
+
+    def peek_delayed(self):
+        """Peek at next delayed job. Returns a Job, or None."""
+        return self._interact_peek('peek-delayed\r\n')
+
+    def peek_buried(self):
+        """Peek at next buried job. Returns a Job, or None."""
+        return self._interact_peek('peek-buried\r\n')
+
+    def tubes(self):
+        """Return a list of all existing tubes."""
+        return self._interact_yaml_list('list-tubes\r\n', ['OK'])
+
+    def using(self):
+        """Return a list of all tubes currently being used."""
+        return self._interact_value('list-tube-used\r\n', ['USING'])
+
+    def use(self, name):
+        """Use a given tube."""
+        return self._interact_value('use %s\r\n' % name, ['USING'])
+
+    def watching(self):
+        """Return a list of all tubes being watched."""
+        return self._interact_yaml_list('list-tubes-watched\r\n', ['OK'])
+
+    def watch(self, name):
+        """Watch a given tube."""
+        return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
+
+    def ignore(self, name):
+        """Stop watching a given tube."""
+        try:
+            return int(self._interact_value('ignore %s\r\n' % name,
+                                            ['WATCHING'],
+                                            ['NOT_IGNORED']))
+        except CommandFailed:
+            return 1
+
+    def stats(self):
+        """Return a dict of beanstalkd statistics."""
+        return self._interact_yaml_dict('stats\r\n', ['OK'])
+
+    def stats_tube(self, name):
+        """Return a dict of stats about a given tube."""
+        return self._interact_yaml_dict('stats-tube %s\r\n' % name,
+                                        ['OK'],
+                                        ['NOT_FOUND'])
+
+    def pause_tube(self, name, delay):
+        """Pause a tube for a given delay time, in seconds."""
+        self._interact('pause-tube %s %d\r\n' %(name, delay),
+                       ['PAUSED'],
+                       ['NOT_FOUND'])
+
+    # -- job interactors --
+
+    def delete(self, jid):
+        """Delete a job, by job id."""
+        self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
+
+    def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
+        """Release a reserved job back into the ready queue."""
+        self._interact('release %d %d %d\r\n' % (jid, priority, delay),
+                       ['RELEASED', 'BURIED'],
+                       ['NOT_FOUND'])
+
+    def bury(self, jid, priority=DEFAULT_PRIORITY):
+        """Bury a job, by job id."""
+        self._interact('bury %d %d\r\n' % (jid, priority),
+                       ['BURIED'],
+                       ['NOT_FOUND'])
+
+    def touch(self, jid):
+        """Touch a job, by job id, requesting more time to work on a reserved
+        job before it expires."""
+        self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
+
+    def stats_job(self, jid):
+        """Return a dict of stats about a job, by job id."""
+        return self._interact_yaml_dict('stats-job %d\r\n' % jid,
+                                        ['OK'],
+                                        ['NOT_FOUND'])
+
+
+class Job(object):
+    def __init__(self, conn, jid, body, reserved=True):
+        self.conn = conn
+        self.jid = jid
+        self.body = body
+        self.reserved = reserved
+
+    def _priority(self):
+        stats = self.stats()
+        if isinstance(stats, dict):
+            return stats['pri']
+        return DEFAULT_PRIORITY
+
+    # -- public interface --
+
+    def delete(self):
+        """Delete this job."""
+        self.conn.delete(self.jid)
+        self.reserved = False
+
+    def release(self, priority=None, delay=0):
+        """Release this job back into the ready queue."""
+        if self.reserved:
+            self.conn.release(self.jid, priority or self._priority(), delay)
+            self.reserved = False
+
+    def bury(self, priority=None):
+        """Bury this job."""
+        if self.reserved:
+            self.conn.bury(self.jid, priority or self._priority())
+            self.reserved = False
+
+    def touch(self):
+        """Touch this reserved job, requesting more time to work on it before it
+        expires."""
+        if self.reserved:
+            self.conn.touch(self.jid)
+
+    def stats(self):
+        """Return a dict of stats about this job."""
+        return self.conn.stats_job(self.jid)
+
+def parse_yaml_dict(yaml):
+    """Parse a YAML dict, in the form returned by beanstalkd."""
+    dict = {}
+    for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M):
+        key, val = m.group(1), m.group(2)
+        # Check the type of the value, and parse it.
+        if key == 'name' or key == 'tube' or key == 'version':
+            dict[key] = val   # String, even if it looks like a number
+        elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None:
+            dict[key] = int(val) # Integer value
+        elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None:
+            dict[key] = float(val) # Float value
+        else:
+            dict[key] = val     # String value
+    return dict
+
+def parse_yaml_list(yaml):
+    """Parse a YAML list, in the form returned by beanstalkd."""
+    return re.findall(r'^- (.*)$', yaml, re.M)
+
+if __name__ == '__main__':
+    import doctest, os, signal
+    try:
+        pid = os.spawnlp(os.P_NOWAIT,
+                         'beanstalkd',
+                         'beanstalkd', '-l', '127.0.0.1', '-p', '14711')
+        doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
+        doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
+    finally:
+        os.kill(pid, signal.SIGTERM)