class RouteAdmin(admin.ModelAdmin):
actions = ['deactivate']
-
+
def deactivate(self, request, queryset):
applier = PR.Applier(route_objects=queryset)
commit, response = applier.apply(configuration=applier.delete_routes())
list_display = ('name', 'is_online', 'applier', 'get_match', 'get_then', 'response')
fieldsets = [
- (None, {'fields': ['name',]}),
+ (None, {'fields': ['name','applier']}),
("Match", {'fields': ['source', 'sourceport', 'destination', 'destinationport', 'port']}),
('Advanced Match Statements', {'fields': ['dscp', 'fragmenttype', 'icmpcode', 'icmptype', 'packetlength', 'protocol', 'tcpflag'], 'classes': ['collapse']}),
("Then", {'fields': ['then' ]}),
--- /dev/null
+from django import forms
+from django.utils.safestring import mark_safe
+from django.utils.translation import ugettext as _
+from django.utils.translation import ugettext_lazy
+from django.template.defaultfilters import filesizeformat
+
+from flowspy.flowspec.models import *
+from ipaddr import *
+
+class RouteForm(forms.ModelForm):
+# name = forms.CharField(help_text=ugettext_lazy("A unique route name,"
+# " e.g. uoa_block_p80"), label=ugettext_lazy("Route Name"), required=False)
+# source = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
+# " e.g.10.10.0.1/32"), label=ugettext_lazy("Source Address"), required=False)
+# source_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of source ports to block"), label=ugettext_lazy("Source Ports"), required=False)
+# destination = forms.CharField(help_text=ugettext_lazy("A qualified IP Network address. CIDR notation,"
+# " e.g.10.10.0.1/32"), label=ugettext_lazy("Destination Address"), required=False)
+# destination_ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of destination ports to block"), label=ugettext_lazy("Destination Ports"), required=False)
+# ports = forms.ModelMultipleChoiceField(queryset=MatchPort.objects.all(), help_text=ugettext_lazy("A set of ports to block"), label=ugettext_lazy("Ports"), required=False)
+ class Meta:
+ model = Route
+
+ def clean_source(self):
+ data = self.cleaned_data['source']
+ if data:
+ try:
+ address = IPNetwork(data)
+ return self.cleaned_data["source"]
+ except Exception:
+ raise forms.ValidationError('Invalid network address format')
+
+ def clean_destination(self):
+ data = self.cleaned_data['destination']
+ if data:
+ try:
+ address = IPNetwork(data)
+ return self.cleaned_data["destination"]
+ except Exception:
+ raise forms.ValidationError('Invalid network address format')
+
+ def clean(self):
+ source = self.cleaned_data.get('source', None)
+ sourceports = self.cleaned_data.get('sourceport', None)
+ ports = self.cleaned_data.get('port', None)
+ destination = self.cleaned_data.get('destination', None)
+ destinationports = self.cleaned_data.get('destinationport', None)
+ if (sourceports and ports):
+ raise forms.ValidationError('Cannot create rule for source ports and ports at the same time. Select either ports or source ports')
+ if (destinationports and ports):
+ raise forms.ValidationError('Cannot create rule for destination ports and ports at the same time. Select either ports or destination ports')
+ if sourceports and not source:
+ raise forms.ValidationError('Once source port is matched, source has to be filled as well. Either deselect source port or fill source address')
+ if destinationports and not destination:
+ raise forms.ValidationError('Once destination port is matched, destination has to be filled as well. Either deselect destination port or fill destination address')
+ if not (source or sourceports or ports or destination or destinationports):
+ raise forms.ValidationError('Fill at least a Route Match Condition')
+ return self.cleaned_data
\ No newline at end of file
from ipaddr import *
from datetime import *
import logging
+from flowspec.tasks import *
+from time import sleep
FORMAT = '%(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(format=FORMAT)
class Route(models.Model):
name = models.CharField(max_length=128)
- applier = models.ForeignKey(User)
+ applier = models.ForeignKey(User, blank=True, null=True)
source = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Source Address")
sourceport = models.ManyToManyField(MatchPort, blank=True, null=True, related_name="matchSourcePort", verbose_name="Source Port")
destination = models.CharField(max_length=32, blank=True, null=True, help_text=u"Network address. Use address/CIDR notation", verbose_name="Destination Address")
last_updated = models.DateTimeField(auto_now=True)
is_online = models.BooleanField(default=False)
is_active = models.BooleanField(default=False)
- expires = models.DateField(default=days_offset)
+ expires = models.DateField(default=days_offset, blank=True, null=True,)
response = models.CharField(max_length=512, blank=True, null=True)
comments = models.TextField(null=True, blank=True, verbose_name="Comments")
except Exception:
raise ValidationError('Invalid network address format at Source Field')
- def save(self, *args, **kwargs):
- applier = PR.Applier(route_object=self)
- commit, response = applier.apply()
- if commit:
- self.is_online = True
- self.is_active = True
- self.response = response
- else:
- self.is_online = False
- self.response = response
- super(Route, self).save(*args, **kwargs)
+# def save(self, *args, **kwargs):
+# edit = False
+# if self.pk:
+# #This is an edit
+# edit = True
+# super(Route, self).save(*args, **kwargs)
+# if not edit:
+# response = add.delay(self)
+# logger.info("Got save job id: %s" %response)
+
+ def commit_add(self, *args, **kwargs):
+ response = add.delay(self)
+ logger.info("Got save job id: %s" %response)
+#
+# def delete(self, *args, **kwargs):
+# response = delete.delay(self)
+# logger.info("Got delete job id: %s" %response)
+
def is_synced(self):
--- /dev/null
+from utils import proxy as PR
+from celery.task import task
+
+@task
+def add(route):
+ applier = PR.Applier(route_object=route)
+ commit, response = applier.apply()
+ if commit:
+ is_online = True
+ is_active = True
+ else:
+ is_online = False
+ is_active = True
+ route.is_online = is_online
+ route.is_active = is_active
+ route.response = response
+ route.save()
+#
+#@task
+#def delete(route):
+#
+# applier = PR.Applier(route_object=route)
+# commit, response = applier.apply(configuration=applier.delete_routes())
+# if commit:
+# rows = queryset.update(is_online=False, is_active=False)
+# queryset.update(response="Successfully removed route from network")
+# self.message_user(request, "Successfully removed %s routes from network" % rows)
+# else:
+# self.message_user(request, "Could not remove routes from network")
+# if commit:
+# is_online = False
+# is_active = False
+# response = "Successfully removed route from network"
+# else:
+# is_online = False
+# is_active = True
+# route.is_online = is_online
+# route.is_active = is_active
+# route.response = response
+# route.save()
\ No newline at end of file
+++ /dev/null
-"""
-This file demonstrates two different styles of tests (one doctest and one
-unittest). These will both pass when you run "manage.py test".
-
-Replace these with more appropriate tests for your application.
-"""
-
-from django.test import TestCase
-
-class SimpleTest(TestCase):
- def test_basic_addition(self):
- """
- Tests that 1 + 1 always equals 2.
- """
- self.failUnlessEqual(1 + 1, 2)
-
-__test__ = {"doctest": """
-Another way to test that 1 + 1 is equal to 2.
-
->>> 1 + 1 == 2
-True
-"""}
-
import re
import socket
from django import forms
-from django.core.cache import cache
from django.views.decorators.csrf import csrf_exempt
+from django.core import urlresolvers
from django.contrib.auth.decorators import login_required
from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse
from django.shortcuts import get_object_or_404, render_to_response
from django.core.urlresolvers import reverse
from django.contrib import messages
+from flowspy.flowspec.forms import *
from flowspy.flowspec.models import *
+def days_offset(): return datetime.now() + timedelta(days = settings.EXPIRATION_DAYS_OFFSET)
+
def user_routes(request):
if request.user.is_anonymous():
return HttpResponseRedirect(reverse('login'))
user_routes = Route.objects.filter(applier=request.user)
- print user_routes
return render_to_response('user_routes.html', {'routes': user_routes},
context_instance=RequestContext(request))
+def add_route(request):
+ if request.method == "GET":
+ form = RouteForm()
+ return render_to_response('apply.html', {'form': form},
+ context_instance=RequestContext(request))
+
+ else:
+ form = RouteForm(request.POST)
+ if form.is_valid():
+ route=form.save(commit=False)
+ route.applier = request.user
+ route.expires = days_offset()
+ route.save()
+ form.save_m2m()
+ route.commit_add()
+ return HttpResponseRedirect(urlresolvers.reverse("user-routes"))
+ else:
+ return render_to_response('apply.html', {'form': form},
+ context_instance=RequestContext(request))
{% extends "base.html" %}
{% load i18n %}
-{% block title %}{% trans "Create new Virtual Machine" %}{% endblock %}
-{% block breadcrumbs %}:: {% trans "Create Instance" %}{% endblock %}
+{% block title %}{% trans "Create new Route" %}{% endblock %}
+{% block breadcrumbs %}:: {% trans "Create Route" %}{% endblock %}
{% block content %}
<style type="text/css">
th {
</style>
<div align="center">
-<h3>{% trans "Apply for a new instance" %}</h3>
+<h3>{% trans "Apply for a new route" %}</h3>
<form method="POST">
{% csrf_token %}
+{% if form.non_field_errors %}
+<p class="error">{{ form.non_field_errors|join:", "}}</p>
+{% endif %}
<fieldset>
-<legend>{% trans "Instance information" %}</legend>
+ <legend>{% trans "Route Basic Info" %}</legend>
<table>
-<tr><th>{{ form.hostname.label_tag }}</th><td>{{ form.hostname }}<span class="error">{{ form.hostname.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.hostname.help_text }}</td></tr>
-<tr><th>{{ form.memory.label_tag }}</th><td>{{ form.memory }}<span class="error">{{ form.memory.errors|join:", " }}</span></td></tr>
-<tr><th>{{ form.vcpus.label_tag }}</th><td>{{ form.vcpus }}<span class="error">{{ form.vcpus.errors|join:", " }}</span></td></tr>
-<tr><th>{{ form.disk_size.label_tag }}</th><td>{{ form.disk_size }}<span class="error">{{ form.disk_size.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.disk_size.help_text }}</td></tr>
-<tr><th>{{ form.hosts_mail_server.label_tag }}</th><td>{{ form.hosts_mail_server }}<span class="error">{{ form.hosts_mail_server.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.hosts_mail_server.help_text }}</td></tr>
-<tr><th>{{ form.operating_system.label_tag }}</th><td>{{ form.operating_system }}<span class="error">{{ form.operating_system.errors|join:", " }}</span></td></tr>
-{% if form.network %}
-<tr><th>{{ form.network.label_tag }}</th><td>{{ form.network }}<span class="error">{{ form.network.errors|join:", " }}</span></td></tr>
-<tr class="help"><td></td><td>{{ form.network.help_text|linebreaks }}</td></tr>
-{% endif %}
+<tr><th>{{ form.name.label_tag }}</th><td>{{ form.name }}<span class="error">{{ form.name.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.name.help_text }}</td></tr>
</table>
</fieldset>
<fieldset>
-<legend>{% trans "Use/Comments" %}</legend>
-{% blocktrans %}
-<p>Give a short description of the intended use of this virtual machine, that justifies the parameter selection above. Feel free to include any additional comments.</p>
-{% endblocktrans %}
-<p>{{ form.comments }}
-{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %}
-</p>
+<legend>{% trans "Route Match Conditions" %}</legend>
+<table>
+<tr><th>{{ form.source.label_tag }}</th><td>{{ form.source }}<span class="error">{{ form.source.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.source.help_text }}</td></tr>
+<tr><th>{{ form.sourceport.label_tag }}</th><td>{{ form.sourceport }}<span class="error">{{ form.sourceport.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.sourceport.help_text }}</td></tr>
+<tr><th>{{ form.destination.label_tag }}</th><td>{{ form.destination }}<span class="error">{{ form.destination.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.destination.help_text }}</td></tr>
+<tr><th>{{ form.destinationport.label_tag }}</th><td>{{ form.destinationport }}<span class="error">{{ form.destinationport.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.destinationport.help_text }}</td></tr>
+<tr><th>{{ form.port.label_tag }}</th><td>{{ form.port }}<span class="error">{{ form.port.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.port.help_text }}</td></tr>
+</table>
</fieldset>
-
<fieldset>
-<legend>{% trans "Administrative contact" %}</legend>
-{% blocktrans %}
-<p>If you are applying on behalf of a NOC under GRNET's constituency, please select the appropriate organization. Otherwise, fill-in the admin contact information below.</p>
-{% endblocktrans %}
-
-{% if form.non_field_errors %}
-<p class="error">{{ form.non_field_errors|join:", "}}</p>
-{% endif %}
-
+<legend>{% trans "Route Actions" %}</legend>
<table>
-<tr><th>{{ form.organization.label_tag }}</th><td>{{ form.organization }}<span class="error">{{ form.organization.errors|join:", " }}</span></td></tr>
-
-
-<tr><td colspan="3"><div align="center">{% trans "OR" %}</div></td></tr>
-
-
-<tr><th colspan="3"><div align="center">{% trans "Administrative contact" %}</div></th></tr>
-<tr><th>{% trans "Name" %}</th><td>{{ form.admin_contact_name }}<span class="error">{{ form.admin_contact_name.errors|join:", " }}</span></td></tr>
-<tr><th>E-mail</th><td>{{ form.admin_contact_email }}<span class="error">{{ form.admin_contact_email.errors|join:", " }}</span></td></tr>
-<tr><th>{% trans "Phone" %}</th><td>{{ form.admin_contact_phone }}<span class="error">{{ form.admin_contact_phone.errors|join:", " }}</span></td></tr>
+<tr><th>{{ form.then.label_tag }}</th><td>{{ form.then }}<span class="error">{{ form.then.errors|join:", " }}</span></td></tr>
+<tr class="help"><td></td><td>{{ form.then.help_text }}</td></tr>
</table>
</fieldset>
-
<fieldset>
-<legend>{% trans "Miscellaneous" %}</legend>
+<legend>{% trans "Use/Comments" %}</legend>
{% blocktrans %}
-<p>We kindly remind you of the following:</p>
-<ul align="left">
-<li>You are solely responsible for the data on your VM. You have to take care of back-ups etc.</li>
-<li>We reserve the right to temporarily suspend the operation of your VM in case it causes malfunctions to our infrastructure</li>
-</ul>
+<p>Give a short description of the intended use of this route, that justifies the parameter selection above. Feel free to include any additional comments.</p>
{% endblocktrans %}
-<p>{{ form.accept_tos }} {% trans "I have read the above and accept them, along with the" %} <a href="/about/terms-of-service/" target="_blank">{% trans "Terms of Service" %}</a></p>
-{% if form.accept_tos.errors %}
-<p class="error">
-{% trans "You must accept the terms of service before continuing." %}
+<p>{{ form.comments }}
+{% if form.errors %}<br /><span class="error">{{ form.comments.errors|join:", " }}</span>{% endif %}
</p>
-{% endif %}
</fieldset>
<p><input type="submit" value="{% trans "Apply" %}" /></p>
</form>
</div>
+
{% endblock %}
<html>
<head>
<title>GRNET's FoD :: {% block title %}{% endblock %} </title>
+<META HTTP-EQUIV="Pragma" CONTENT="no-cache">
+<META HTTP-EQUIV="Expires" CONTENT="-1">
<script src="/static/js/jquery.min.js" type="text/javascript"></script>
<link rel="stylesheet" type="text/css" href="/static/css/base.css">
<link rel="stylesheet" type="text/css" href="/static/css/smoothness/jquery-ui-1.8.13.custom.css">
<script type="text/javascript" src="/static/js/jquery.dataTables.js"></script>
<script type="text/javascript">
$(document).ready( function(){
+ $('#create_dialog').dialog({
+ height: 400,
+ width: 500,
+ modal: true,
+ autoOpen: false,
+ });
$('#routes_table').dataTable( {
"bJQueryUI": true,
"oLanguage": {
},
"iDisplayLength": 25,
} );
+ $( ".button_place #routebutton" ).button({
+ icons: {
+ primary: "ui-icon-circle-plus"
+ },
+ });
});
+
+
+
+
</script>
{% endblock %}
{% block title %}{% trans "My routes" %}{% endblock %}
{% block content %}
-<h3>{% trans "My routes" %}</h3>
+<div style="float:left">
+ <h3 style="margin-top: 0px;">{% trans "My routes" %}</h3>
+</div>
+<div class='button_place' style="float:right">
+ <a href="{% url add-route %}" id="routebutton">Add Route</a>
+</div>
<table class="display" width="100%" id="routes_table">
<thead>
<th style="text-align: center;">{% trans "Status" %}</th>
{% comment %}<th style="text-align: center;">{% trans "Details" %}</th>{% endcomment %}
<th style="text-align: center;">{% trans "Expires" %}</th>
+ <th style="text-align: center;">{% trans "Response" %}</th>
</tr>
</thead>
<td style="text-align: center;">{% if route.is_online %}Online{% else %}Offline{% endif %}</td>
{% comment %}<td style="text-align: center;">{{ route.response }}</td>{% endcomment %}
<td style="text-align: center;">{{ route.expires }}</td>
+ <td style="text-align: center;">{{ route.response }}</td>
</tr>
{% endfor %}
</tbody>
</table>
+<div id="create_dialog" title="Add a new Route">
+ KOKO
+ </div>
+
{% endblock %}
# Example:
# (r'^flowspy/', include('flowspy.foo.urls')),
url(r'^/?$', 'flowspy.flowspec.views.user_routes', name="user-routes"),
+ url(r'^add/?$', 'flowspy.flowspec.views.add_route', name="add-route"),
url(r'^user/login/?', 'django.contrib.auth.views.login', {'template_name': 'login.html'}, name="login"),
url(r'^user/logout/?', 'django.contrib.auth.views.logout', {'next_page': '/'}, name="logout"),
(r'^setlang/?$', 'django.views.i18n.set_language'),
--- /dev/null
+#!/usr/bin/env python
+"""beanstalkc - A beanstalkd Client Library for Python"""
+
+__license__ = '''
+Copyright (C) 2008-2010 Andreas Bolka
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+__version__ = '0.2.0'
+
+import logging
+import socket
+import re
+
+
+DEFAULT_HOST = 'localhost'
+DEFAULT_PORT = 11300
+DEFAULT_PRIORITY = 2**31
+DEFAULT_TTR = 120
+DEFAULT_TIMEOUT = 1
+
+
+class BeanstalkcException(Exception): pass
+class UnexpectedResponse(BeanstalkcException): pass
+class CommandFailed(BeanstalkcException): pass
+class DeadlineSoon(BeanstalkcException): pass
+class SocketError(BeanstalkcException): pass
+
+
+class Connection(object):
+ def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT,
+ connection_timeout=DEFAULT_TIMEOUT):
+ self._socket = None
+ self.host = host
+ self.port = port
+ self.connection_timeout = connection_timeout
+ self.connect()
+
+ def connect(self):
+ """Connect to beanstalkd server, unless already connected."""
+ if not self.closed:
+ return
+ try:
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.settimeout(self.connection_timeout)
+ self._socket.connect((self.host, self.port))
+ self._socket.settimeout(None)
+ self._socket_file = self._socket.makefile('rb')
+ except socket.error, e:
+ self._socket = None
+ raise SocketError(e)
+
+ def close(self):
+ """Close connection to server, if it is open."""
+ if self.closed:
+ return
+ try:
+ self._socket.sendall('quit\r\n')
+ self._socket.close()
+ except socket.error:
+ pass
+ finally:
+ self._socket = None
+
+ @property
+ def closed(self):
+ return self._socket is None
+
+ def _interact(self, command, expected_ok, expected_err=[], size_field=None):
+ try:
+ self._socket.sendall(command)
+ status, results = self._read_response()
+ if status in expected_ok:
+ if size_field is not None:
+ results.append(self._read_body(int(results[size_field])))
+ return results
+ elif status in expected_err:
+ raise CommandFailed(command.split()[0], status, results)
+ else:
+ raise UnexpectedResponse(command.split()[0], status, results)
+ except socket.error, e:
+ self.close()
+ raise SocketError(e)
+
+ def _read_response(self):
+ line = self._socket_file.readline()
+ if not line:
+ raise socket.error('no data read')
+ response = line.split()
+ return response[0], response[1:]
+
+ def _read_body(self, size):
+ body = self._socket_file.read(size)
+ self._socket_file.read(2) # trailing crlf
+ if size > 0 and not body:
+ raise socket.error('no data read')
+ return body
+
+ def _interact_value(self, command, expected_ok, expected_err=[]):
+ return self._interact(command, expected_ok, expected_err)[0]
+
+ def _interact_job(self, command, expected_ok, expected_err, reserved=True):
+ jid, _, body = self._interact(command, expected_ok, expected_err,
+ size_field=1)
+ return Job(self, int(jid), body, reserved)
+
+ def _interact_yaml_dict(self, command, expected_ok, expected_err=[]):
+ _, body, = self._interact(command, expected_ok, expected_err,
+ size_field=0)
+ return parse_yaml_dict(body)
+
+ def _interact_yaml_list(self, command, expected_ok, expected_err=[]):
+ _, body, = self._interact(command, expected_ok, expected_err,
+ size_field=0)
+ return parse_yaml_list(body)
+
+ def _interact_peek(self, command):
+ try:
+ return self._interact_job(command, ['FOUND'], ['NOT_FOUND'], False)
+ except CommandFailed, (_, status, results):
+ return None
+
+ # -- public interface --
+
+ def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR):
+ """Put a job into the current tube. Returns job id."""
+ assert isinstance(body, str), 'Job body must be a str instance'
+ jid = self._interact_value(
+ 'put %d %d %d %d\r\n%s\r\n' %
+ (priority, delay, ttr, len(body), body),
+ ['INSERTED', 'BURIED'], ['JOB_TOO_BIG'])
+ return int(jid)
+
+ def reserve(self, timeout=None):
+ """Reserve a job from one of the watched tubes, with optional timeout in
+ seconds. Returns a Job object, or None if the request times out."""
+ if timeout is not None:
+ command = 'reserve-with-timeout %d\r\n' % timeout
+ else:
+ command = 'reserve\r\n'
+ try:
+ return self._interact_job(command,
+ ['RESERVED'],
+ ['DEADLINE_SOON', 'TIMED_OUT'])
+ except CommandFailed, (_, status, results):
+ if status == 'TIMED_OUT':
+ return None
+ elif status == 'DEADLINE_SOON':
+ raise DeadlineSoon(results)
+
+ def kick(self, bound=1):
+ """Kick at most bound jobs into the ready queue."""
+ return int(self._interact_value('kick %d\r\n' % bound, ['KICKED']))
+
+ def peek(self, jid):
+ """Peek at a job. Returns a Job, or None."""
+ return self._interact_peek('peek %d\r\n' % jid)
+
+ def peek_ready(self):
+ """Peek at next ready job. Returns a Job, or None."""
+ return self._interact_peek('peek-ready\r\n')
+
+ def peek_delayed(self):
+ """Peek at next delayed job. Returns a Job, or None."""
+ return self._interact_peek('peek-delayed\r\n')
+
+ def peek_buried(self):
+ """Peek at next buried job. Returns a Job, or None."""
+ return self._interact_peek('peek-buried\r\n')
+
+ def tubes(self):
+ """Return a list of all existing tubes."""
+ return self._interact_yaml_list('list-tubes\r\n', ['OK'])
+
+ def using(self):
+ """Return a list of all tubes currently being used."""
+ return self._interact_value('list-tube-used\r\n', ['USING'])
+
+ def use(self, name):
+ """Use a given tube."""
+ return self._interact_value('use %s\r\n' % name, ['USING'])
+
+ def watching(self):
+ """Return a list of all tubes being watched."""
+ return self._interact_yaml_list('list-tubes-watched\r\n', ['OK'])
+
+ def watch(self, name):
+ """Watch a given tube."""
+ return int(self._interact_value('watch %s\r\n' % name, ['WATCHING']))
+
+ def ignore(self, name):
+ """Stop watching a given tube."""
+ try:
+ return int(self._interact_value('ignore %s\r\n' % name,
+ ['WATCHING'],
+ ['NOT_IGNORED']))
+ except CommandFailed:
+ return 1
+
+ def stats(self):
+ """Return a dict of beanstalkd statistics."""
+ return self._interact_yaml_dict('stats\r\n', ['OK'])
+
+ def stats_tube(self, name):
+ """Return a dict of stats about a given tube."""
+ return self._interact_yaml_dict('stats-tube %s\r\n' % name,
+ ['OK'],
+ ['NOT_FOUND'])
+
+ def pause_tube(self, name, delay):
+ """Pause a tube for a given delay time, in seconds."""
+ self._interact('pause-tube %s %d\r\n' %(name, delay),
+ ['PAUSED'],
+ ['NOT_FOUND'])
+
+ # -- job interactors --
+
+ def delete(self, jid):
+ """Delete a job, by job id."""
+ self._interact('delete %d\r\n' % jid, ['DELETED'], ['NOT_FOUND'])
+
+ def release(self, jid, priority=DEFAULT_PRIORITY, delay=0):
+ """Release a reserved job back into the ready queue."""
+ self._interact('release %d %d %d\r\n' % (jid, priority, delay),
+ ['RELEASED', 'BURIED'],
+ ['NOT_FOUND'])
+
+ def bury(self, jid, priority=DEFAULT_PRIORITY):
+ """Bury a job, by job id."""
+ self._interact('bury %d %d\r\n' % (jid, priority),
+ ['BURIED'],
+ ['NOT_FOUND'])
+
+ def touch(self, jid):
+ """Touch a job, by job id, requesting more time to work on a reserved
+ job before it expires."""
+ self._interact('touch %d\r\n' % jid, ['TOUCHED'], ['NOT_FOUND'])
+
+ def stats_job(self, jid):
+ """Return a dict of stats about a job, by job id."""
+ return self._interact_yaml_dict('stats-job %d\r\n' % jid,
+ ['OK'],
+ ['NOT_FOUND'])
+
+
+class Job(object):
+ def __init__(self, conn, jid, body, reserved=True):
+ self.conn = conn
+ self.jid = jid
+ self.body = body
+ self.reserved = reserved
+
+ def _priority(self):
+ stats = self.stats()
+ if isinstance(stats, dict):
+ return stats['pri']
+ return DEFAULT_PRIORITY
+
+ # -- public interface --
+
+ def delete(self):
+ """Delete this job."""
+ self.conn.delete(self.jid)
+ self.reserved = False
+
+ def release(self, priority=None, delay=0):
+ """Release this job back into the ready queue."""
+ if self.reserved:
+ self.conn.release(self.jid, priority or self._priority(), delay)
+ self.reserved = False
+
+ def bury(self, priority=None):
+ """Bury this job."""
+ if self.reserved:
+ self.conn.bury(self.jid, priority or self._priority())
+ self.reserved = False
+
+ def touch(self):
+ """Touch this reserved job, requesting more time to work on it before it
+ expires."""
+ if self.reserved:
+ self.conn.touch(self.jid)
+
+ def stats(self):
+ """Return a dict of stats about this job."""
+ return self.conn.stats_job(self.jid)
+
+def parse_yaml_dict(yaml):
+ """Parse a YAML dict, in the form returned by beanstalkd."""
+ dict = {}
+ for m in re.finditer(r'^\s*([^:\s]+)\s*:\s*([^\s]*)$', yaml, re.M):
+ key, val = m.group(1), m.group(2)
+ # Check the type of the value, and parse it.
+ if key == 'name' or key == 'tube' or key == 'version':
+ dict[key] = val # String, even if it looks like a number
+ elif re.match(r'^(0|-?[1-9][0-9]*)$', val) is not None:
+ dict[key] = int(val) # Integer value
+ elif re.match(r'^(-?\d+(\.\d+)?(e[-+]?[1-9][0-9]*)?)$', val) is not None:
+ dict[key] = float(val) # Float value
+ else:
+ dict[key] = val # String value
+ return dict
+
+def parse_yaml_list(yaml):
+ """Parse a YAML list, in the form returned by beanstalkd."""
+ return re.findall(r'^- (.*)$', yaml, re.M)
+
+if __name__ == '__main__':
+ import doctest, os, signal
+ try:
+ pid = os.spawnlp(os.P_NOWAIT,
+ 'beanstalkd',
+ 'beanstalkd', '-l', '127.0.0.1', '-p', '14711')
+ doctest.testfile('TUTORIAL.md', optionflags=doctest.ELLIPSIS)
+ doctest.testfile('test/network.doctest', optionflags=doctest.ELLIPSIS)
+ finally:
+ os.kill(pid, signal.SIGTERM)