Revision e9d46ce1

b/accounts/models.py
1
from django.db import models
2
from django.contrib.auth.models import User
3
from flowspy.peers.models import *
4

  
5

  
6
class UserProfile(models.Model):
7
    user = models.OneToOneField(User)
8
    domain = models.ForeignKey(Peer)
9

  
10
    def get_address_space(self):
11
        networks = self.domain.networks.all()
12
        if not networks:
13
            return False
14
        return networks
b/accounts/tests.py
1
"""
2
This file demonstrates two different styles of tests (one doctest and one
3
unittest). These will both pass when you run "manage.py test".
4

  
5
Replace these with more appropriate tests for your application.
6
"""
7

  
8
from django.test import TestCase
9

  
10
class SimpleTest(TestCase):
11
    def test_basic_addition(self):
12
        """
13
        Tests that 1 + 1 always equals 2.
14
        """
15
        self.failUnlessEqual(1 + 1, 2)
16

  
17
__test__ = {"doctest": """
18
Another way to test that 1 + 1 is equal to 2.
19

  
20
>>> 1 + 1 == 2
21
True
22
"""}
23

  
b/accounts/views.py
1
# Create your views here.
b/peers/models.py
1
from django.db import models
2
from utils.whois import *
3

  
4
# Create your models here.
5
class PeerRange(models.Model):
6
    network = models.CharField(max_length=128)
7
    def __unicode__(self):
8
        return self.network
9
    class Meta:
10
        db_table = u'peer_range'
11

  
12
# Create your models here.
13
class Peer(models.Model):
14
    peer_id = models.IntegerField(primary_key=True)
15
    peer_name = models.CharField(max_length=128)
16
    peer_as = models.IntegerField()
17
    peer_tag = models.CharField(max_length=64)
18
    domain_name = models.CharField(max_length=128)
19
    networks = models.ManyToManyField(PeerRange, null=True, blank=True)
20
    def __unicode__(self):
21
        return self.peer_name
22
    class Meta:
23
        db_table = u'peer'
24
        
25
    def fill_networks(self):
26
        network_range = []
27
        peer_as = "AS%s" %self.peer_as
28
        network_range = whois(peer_as)
29
        if network_range:
30
            for network_item in network_range:
31
                range, created = PeerRange.objects.get_or_create(network=network_item.compressed)
32
                if not range.network in self.networks.all():
33
                    self.networks.add(range)
34
            self.save()
35
                    
36
            
37
        
38
        
b/peers/tests.py
1
"""
2
This file demonstrates two different styles of tests (one doctest and one
3
unittest). These will both pass when you run "manage.py test".
4

  
5
Replace these with more appropriate tests for your application.
6
"""
7

  
8
from django.test import TestCase
9

  
10
class SimpleTest(TestCase):
11
    def test_basic_addition(self):
12
        """
13
        Tests that 1 + 1 always equals 2.
14
        """
15
        self.failUnlessEqual(1 + 1, 2)
16

  
17
__test__ = {"doctest": """
18
Another way to test that 1 + 1 is equal to 2.
19

  
20
>>> 1 + 1 == 2
21
True
22
"""}
23

  
b/peers/views.py
1
# Create your views here.
b/utils/whois.py
1
import socket
2
from ipaddr import *
3
import re
4

  
5
RIPEWHOIS = 'whois.ripe.net'
6
GRNETWHOIS = 'whois.grnet.gr'
7

  
8
def query(query, hostname, flags):
9
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10
    s.connect((hostname, 43))
11
    s.send(" -i origin -r -K -T route -T route6 " + query + "\r\n")
12
    response = ''
13
    while True:
14
        d = s.recv(4096)
15
        response += d
16
        if not d:
17
            break
18
    s.close()
19
    query = response.splitlines()
20
    routes4 = []
21
    routes6 = []
22
    final_routes4 = []
23
    final_routes6 = []
24
    for line in query:
25
        m = re.match(r"(^route6?\:\s+)(?P<subnets>\S+)", line)
26
        if m:
27
            if IPNetwork(m.group('subnets')).version == 4:
28
                routes4.append(IPNetwork(m.group('subnets')))
29
            if IPNetwork(m.group('subnets')).version == 6:
30
                routes6.append(IPNetwork(m.group('subnets')))
31
    final_routes = []
32
    if len(routes4):
33
        final_routes4 = collapse_address_list(routes4)
34
    if len(routes6):
35
        final_routes6 = collapse_address_list(routes6)
36
    final_routes = final_routes4 + final_routes6
37
    return final_routes
38

  
39
def whois(queryas):
40
    routes = query(queryas,GRNETWHOIS, None)
41
    if not routes:
42
        routes = query(queryas,RIPEWHOIS, None)
43
    return routes

Also available in: Unified diff