Revision a744b676 test/ganeti.utils_unittest.py

b/test/ganeti.utils_unittest.py
1 1
#!/usr/bin/python
2 2
#
3 3

  
4
# Copyright (C) 2006, 2007 Google Inc.
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5 5
#
6 6
# This program is free software; you can redistribute it and/or modify
7 7
# it under the terms of the GNU General Public License as published by
......
21 21

  
22 22
"""Script for unittesting the utils module"""
23 23

  
24
import unittest
24
import distutils.version
25
import errno
26
import fcntl
27
import glob
25 28
import os
26
import time
27
import tempfile
28 29
import os.path
29
import os
30
import stat
30
import re
31
import shutil
31 32
import signal
32 33
import socket
33
import shutil
34
import re
35
import select
34
import stat
36 35
import string
37
import fcntl
38
import OpenSSL
36
import tempfile
37
import time
38
import unittest
39 39
import warnings
40
import distutils.version
41
import glob
42
import errno
40
import OpenSSL
43 41

  
44
import ganeti
45 42
import testutils
46 43
from ganeti import constants
47 44
from ganeti import compat
48 45
from ganeti import utils
49 46
from ganeti import errors
50
from ganeti import serializer
51
from ganeti.utils import IsProcessAlive, RunCmd, \
52
     RemoveFile, MatchNameComponent, FormatUnit, \
53
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
54
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
55
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
56
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
57
     UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile
58

  
59
from ganeti.errors import LockError, UnitParseError, GenericError, \
60
     ProgrammerError, OpPrereqError
47
from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
48
     ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
49
     TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
50
     ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
61 51

  
62 52

  
63 53
class TestIsProcessAlive(unittest.TestCase):
......
65 55

  
66 56
  def testExists(self):
67 57
    mypid = os.getpid()
68
    self.assert_(IsProcessAlive(mypid),
69
                 "can't find myself running")
58
    self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
70 59

  
71 60
  def testNotExisting(self):
72 61
    pid_non_existing = os.fork()
......
75 64
    elif pid_non_existing < 0:
76 65
      raise SystemError("can't fork")
77 66
    os.waitpid(pid_non_existing, 0)
78
    self.assertFalse(IsProcessAlive(pid_non_existing),
67
    self.assertFalse(utils.IsProcessAlive(pid_non_existing),
79 68
                     "nonexisting process detected")
80 69

  
81 70

  
......
121 110
        self.assertEqual(result, value.strip())
122 111

  
123 112
  def test(self):
124
    sp = utils.PathJoin(self.tmpdir, "status")
113
    sp = PathJoin(self.tmpdir, "status")
125 114

  
126 115
    utils.WriteFile(sp, data="\n".join([
127 116
      "Name:   bash",
......
139 128
    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
140 129

  
141 130
  def testNoSigCgt(self):
142
    sp = utils.PathJoin(self.tmpdir, "status")
131
    sp = PathJoin(self.tmpdir, "status")
143 132

  
144 133
    utils.WriteFile(sp, data="\n".join([
145 134
      "Name:   bash",
......
149 138
                      1234, 10, status_path=sp)
150 139

  
151 140
  def testNoSuchFile(self):
152
    sp = utils.PathJoin(self.tmpdir, "notexist")
141
    sp = PathJoin(self.tmpdir, "notexist")
153 142

  
154 143
    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
155 144

  
......
193 182
    read_pid = utils.ReadPidFile(pid_file)
194 183
    self.failUnlessEqual(read_pid, os.getpid())
195 184
    self.failUnless(utils.IsProcessAlive(read_pid))
196
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
185
    self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
197 186
    utils.RemovePidFile('test')
198 187
    self.failIf(os.path.exists(pid_file),
199 188
                "PID file should not exist anymore")
......
227 216
    utils.KillProcess(new_pid, waitpid=True)
228 217
    self.failIf(utils.IsProcessAlive(new_pid))
229 218
    utils.RemovePidFile('child')
230
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
219
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
231 220

  
232 221
  def tearDown(self):
233 222
    for name in os.listdir(self.dir):
......
881 870
    shutil.rmtree(self.tmpdir)
882 871

  
883 872
  def testEmpty(self):
884
    filename = utils.PathJoin(self.tmpdir, "config.data")
873
    filename = PathJoin(self.tmpdir, "config.data")
885 874
    utils.WriteFile(filename, data="")
886 875
    bname = utils.CreateBackup(filename)
887 876
    self.assertFileContent(bname, "")
......
891 880
    utils.CreateBackup(filename)
892 881
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
893 882

  
894
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
883
    fifoname = PathJoin(self.tmpdir, "fifo")
895 884
    os.mkfifo(fifoname)
896 885
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
897 886

  
......
901 890
      for rep in [1, 2, 10, 127]:
902 891
        testdata = data * rep
903 892

  
904
        filename = utils.PathJoin(self.tmpdir, "test.data_")
893
        filename = PathJoin(self.tmpdir, "test.data_")
905 894
        utils.WriteFile(filename, data=testdata)
906 895
        self.assertFileContent(filename, testdata)
907 896

  
......
953 942
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
954 943
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
955 944

  
945

  
956 946
class TestParseUnit(unittest.TestCase):
957 947
  """Test case for the ParseUnit function"""
958 948

  
......
999 989
  def testInvalidInput(self):
1000 990
    for sep in ('-', '_', ',', 'a'):
1001 991
      for suffix, _ in TestParseUnit.SCALES:
1002
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
992
        self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
1003 993

  
1004 994
    for suffix, _ in TestParseUnit.SCALES:
1005
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
995
      self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
1006 996

  
1007 997

  
1008 998
class TestSshKeys(testutils.GanetiTestCase):
......
1023 1013
      handle.close()
1024 1014

  
1025 1015
  def testAddingNewKey(self):
1026
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1016
    utils.AddAuthorizedKey(self.tmpname,
1017
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1027 1018

  
1028 1019
    self.assertFileContent(self.tmpname,
1029 1020
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
......
1032 1023
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1033 1024

  
1034 1025
  def testAddingAlmostButNotCompletelyTheSameKey(self):
1035
    AddAuthorizedKey(self.tmpname,
1026
    utils.AddAuthorizedKey(self.tmpname,
1036 1027
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1037 1028

  
1038 1029
    self.assertFileContent(self.tmpname,
......
1042 1033
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1043 1034

  
1044 1035
  def testAddingExistingKeyWithSomeMoreSpaces(self):
1045
    AddAuthorizedKey(self.tmpname,
1036
    utils.AddAuthorizedKey(self.tmpname,
1046 1037
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1047 1038

  
1048 1039
    self.assertFileContent(self.tmpname,
......
1051 1042
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1052 1043

  
1053 1044
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
1054
    RemoveAuthorizedKey(self.tmpname,
1045
    utils.RemoveAuthorizedKey(self.tmpname,
1055 1046
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1056 1047

  
1057 1048
    self.assertFileContent(self.tmpname,
......
1059 1050
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1060 1051

  
1061 1052
  def testRemovingNonExistingKey(self):
1062
    RemoveAuthorizedKey(self.tmpname,
1053
    utils.RemoveAuthorizedKey(self.tmpname,
1063 1054
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
1064 1055

  
1065 1056
    self.assertFileContent(self.tmpname,
......
1185 1176
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1186 1177

  
1187 1178

  
1188
class _BaseTcpPingTest:
1189
  """Base class for TcpPing tests against listen(2)ing port"""
1190
  family = None
1191
  address = None
1192

  
1193
  def setUp(self):
1194
    self.listener = socket.socket(self.family, socket.SOCK_STREAM)
1195
    self.listener.bind((self.address, 0))
1196
    self.listenerport = self.listener.getsockname()[1]
1197
    self.listener.listen(1)
1198

  
1199
  def tearDown(self):
1200
    self.listener.shutdown(socket.SHUT_RDWR)
1201
    del self.listener
1202
    del self.listenerport
1203

  
1204
  def testTcpPingToLocalHostAccept(self):
1205
    self.assert_(TcpPing(self.address,
1206
                         self.listenerport,
1207
                         timeout=constants.TCP_PING_TIMEOUT,
1208
                         live_port_needed=True,
1209
                         source=self.address,
1210
                         ),
1211
                 "failed to connect to test listener")
1212

  
1213
    self.assert_(TcpPing(self.address, self.listenerport,
1214
                         timeout=constants.TCP_PING_TIMEOUT,
1215
                         live_port_needed=True),
1216
                 "failed to connect to test listener (no source)")
1217

  
1218

  
1219
class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
1220
  """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
1221
  family = socket.AF_INET
1222
  address = constants.IP4_ADDRESS_LOCALHOST
1223

  
1224
  def setUp(self):
1225
    unittest.TestCase.setUp(self)
1226
    _BaseTcpPingTest.setUp(self)
1227

  
1228
  def tearDown(self):
1229
    unittest.TestCase.tearDown(self)
1230
    _BaseTcpPingTest.tearDown(self)
1231

  
1232

  
1233
class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
1234
  """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
1235
  family = socket.AF_INET6
1236
  address = constants.IP6_ADDRESS_LOCALHOST
1237

  
1238
  def setUp(self):
1239
    unittest.TestCase.setUp(self)
1240
    _BaseTcpPingTest.setUp(self)
1241

  
1242
  def tearDown(self):
1243
    unittest.TestCase.tearDown(self)
1244
    _BaseTcpPingTest.tearDown(self)
1245

  
1246

  
1247
class _BaseTcpPingDeafTest:
1248
  """Base class for TcpPing tests against non listen(2)ing port"""
1249
  family = None
1250
  address = None
1251

  
1252
  def setUp(self):
1253
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
1254
    self.deaflistener.bind((self.address, 0))
1255
    self.deaflistenerport = self.deaflistener.getsockname()[1]
1256

  
1257
  def tearDown(self):
1258
    del self.deaflistener
1259
    del self.deaflistenerport
1260

  
1261
  def testTcpPingToLocalHostAcceptDeaf(self):
1262
    self.assertFalse(TcpPing(self.address,
1263
                             self.deaflistenerport,
1264
                             timeout=constants.TCP_PING_TIMEOUT,
1265
                             live_port_needed=True,
1266
                             source=self.address,
1267
                             ), # need successful connect(2)
1268
                     "successfully connected to deaf listener")
1269

  
1270
    self.assertFalse(TcpPing(self.address,
1271
                             self.deaflistenerport,
1272
                             timeout=constants.TCP_PING_TIMEOUT,
1273
                             live_port_needed=True,
1274
                             ), # need successful connect(2)
1275
                     "successfully connected to deaf listener (no source)")
1276

  
1277
  def testTcpPingToLocalHostNoAccept(self):
1278
    self.assert_(TcpPing(self.address,
1279
                         self.deaflistenerport,
1280
                         timeout=constants.TCP_PING_TIMEOUT,
1281
                         live_port_needed=False,
1282
                         source=self.address,
1283
                         ), # ECONNREFUSED is OK
1284
                 "failed to ping alive host on deaf port")
1285

  
1286
    self.assert_(TcpPing(self.address,
1287
                         self.deaflistenerport,
1288
                         timeout=constants.TCP_PING_TIMEOUT,
1289
                         live_port_needed=False,
1290
                         ), # ECONNREFUSED is OK
1291
                 "failed to ping alive host on deaf port (no source)")
1292

  
1293

  
1294
class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
1295
  """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
1296
  family = socket.AF_INET
1297
  address = constants.IP4_ADDRESS_LOCALHOST
1298

  
1299
  def setUp(self):
1300
    self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
1301
    self.deaflistener.bind((self.address, 0))
1302
    self.deaflistenerport = self.deaflistener.getsockname()[1]
1303

  
1304
  def tearDown(self):
1305
    del self.deaflistener
1306
    del self.deaflistenerport
1307

  
1308

  
1309
class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
1310
  """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
1311
  family = socket.AF_INET6
1312
  address = constants.IP6_ADDRESS_LOCALHOST
1313

  
1314
  def setUp(self):
1315
    unittest.TestCase.setUp(self)
1316
    _BaseTcpPingDeafTest.setUp(self)
1317

  
1318
  def tearDown(self):
1319
    unittest.TestCase.tearDown(self)
1320
    _BaseTcpPingDeafTest.tearDown(self)
1321

  
1322

  
1323
class TestOwnIpAddress(unittest.TestCase):
1324
  """Testcase for OwnIpAddress"""
1325

  
1326
  def testOwnLoopback(self):
1327
    """check having the loopback ip"""
1328
    self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
1329
                    "Should own the loopback address")
1330

  
1331
  def testNowOwnAddress(self):
1332
    """check that I don't own an address"""
1333

  
1334
    # Network 192.0.2.0/24 is reserved for test/documentation as per
1335
    # RFC 5735, so we *should* not have an address of this range... if
1336
    # this fails, we should extend the test to multiple addresses
1337
    DST_IP = "192.0.2.1"
1338
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
1339

  
1340

  
1341
def _GetSocketCredentials(path):
1342
  """Connect to a Unix socket and return remote credentials.
1343

  
1344
  """
1345
  sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1346
  try:
1347
    sock.settimeout(10)
1348
    sock.connect(path)
1349
    return utils.GetSocketCredentials(sock)
1350
  finally:
1351
    sock.close()
1352

  
1353

  
1354
class TestGetSocketCredentials(unittest.TestCase):
1355
  def setUp(self):
1356
    self.tmpdir = tempfile.mkdtemp()
1357
    self.sockpath = utils.PathJoin(self.tmpdir, "sock")
1358

  
1359
    self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1360
    self.listener.settimeout(10)
1361
    self.listener.bind(self.sockpath)
1362
    self.listener.listen(1)
1363

  
1364
  def tearDown(self):
1365
    self.listener.shutdown(socket.SHUT_RDWR)
1366
    self.listener.close()
1367
    shutil.rmtree(self.tmpdir)
1368

  
1369
  def test(self):
1370
    (c2pr, c2pw) = os.pipe()
1371

  
1372
    # Start child process
1373
    child = os.fork()
1374
    if child == 0:
1375
      try:
1376
        data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
1377

  
1378
        os.write(c2pw, data)
1379
        os.close(c2pw)
1380

  
1381
        os._exit(0)
1382
      finally:
1383
        os._exit(1)
1384

  
1385
    os.close(c2pw)
1386

  
1387
    # Wait for one connection
1388
    (conn, _) = self.listener.accept()
1389
    conn.recv(1)
1390
    conn.close()
1391

  
1392
    # Wait for result
1393
    result = os.read(c2pr, 4096)
1394
    os.close(c2pr)
1395

  
1396
    # Check child's exit code
1397
    (_, status) = os.waitpid(child, 0)
1398
    self.assertFalse(os.WIFSIGNALED(status))
1399
    self.assertEqual(os.WEXITSTATUS(status), 0)
1400

  
1401
    # Check result
1402
    (pid, uid, gid) = serializer.LoadJson(result)
1403
    self.assertEqual(pid, os.getpid())
1404
    self.assertEqual(uid, os.getuid())
1405
    self.assertEqual(gid, os.getgid())
1406

  
1407

  
1408 1179
class TestListVisibleFiles(unittest.TestCase):
1409 1180
  """Test case for ListVisibleFiles"""
1410 1181

  
......
1712 1483

  
1713 1484
  def _fdt(self, dict, allowed_values=None):
1714 1485
    if allowed_values is None:
1715
      ForceDictType(dict, self.key_types)
1486
      utils.ForceDictType(dict, self.key_types)
1716 1487
    else:
1717
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1488
      utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1718 1489

  
1719 1490
    return dict
1720 1491

  
......
1744 1515

  
1745 1516
  def _pathTestHelper(self, path, result):
1746 1517
    if result:
1747
      self.assert_(IsNormAbsPath(path),
1518
      self.assert_(utils.IsNormAbsPath(path),
1748 1519
          "Path %s should result absolute and normalized" % path)
1749 1520
    else:
1750
      self.assertFalse(IsNormAbsPath(path),
1521
      self.assertFalse(utils.IsNormAbsPath(path),
1751 1522
          "Path %s should not result absolute and normalized" % path)
1752 1523

  
1753 1524
  def testBase(self):
......
1939 1710
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1940 1711

  
1941 1712

  
1942
class TestHostInfo(unittest.TestCase):
1943
  """Testing case for HostInfo"""
1944

  
1945
  def testUppercase(self):
1946
    data = "AbC.example.com"
1947
    self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1948

  
1949
  def testTooLongName(self):
1950
    data = "a.b." + "c" * 255
1951
    self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1952

  
1953
  def testTrailingDot(self):
1954
    data = "a.b.c"
1955
    self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1956

  
1957
  def testInvalidName(self):
1958
    data = [
1959
      "a b",
1960
      "a/b",
1961
      ".a.b",
1962
      "a..b",
1963
      ]
1964
    for value in data:
1965
      self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1966

  
1967
  def testValidName(self):
1968
    data = [
1969
      "a.b",
1970
      "a-b",
1971
      "a_b",
1972
      "a.b.c",
1973
      ]
1974
    for value in data:
1975
      HostInfo.NormalizeName(value)
1976

  
1977

  
1978 1713
class TestValidateServiceName(unittest.TestCase):
1979 1714
  def testValid(self):
1980 1715
    testnames = [
......
1998 1733
      ]
1999 1734

  
2000 1735
    for name in testnames:
2001
      self.assertRaises(OpPrereqError, utils.ValidateServiceName, name)
1736
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
2002 1737

  
2003 1738

  
2004 1739
class TestParseAsn1Generalizedtime(unittest.TestCase):
......
2127 1862
    shutil.rmtree(self.tmpdir)
2128 1863

  
2129 1864
  def testNonExisting(self):
2130
    path = utils.PathJoin(self.tmpdir, "foo")
1865
    path = PathJoin(self.tmpdir, "foo")
2131 1866
    utils.Makedirs(path)
2132 1867
    self.assert_(os.path.isdir(path))
2133 1868

  
2134 1869
  def testExisting(self):
2135
    path = utils.PathJoin(self.tmpdir, "foo")
1870
    path = PathJoin(self.tmpdir, "foo")
2136 1871
    os.mkdir(path)
2137 1872
    utils.Makedirs(path)
2138 1873
    self.assert_(os.path.isdir(path))
2139 1874

  
2140 1875
  def testRecursiveNonExisting(self):
2141
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
1876
    path = PathJoin(self.tmpdir, "foo/bar/baz")
2142 1877
    utils.Makedirs(path)
2143 1878
    self.assert_(os.path.isdir(path))
2144 1879

  
2145 1880
  def testRecursiveExisting(self):
2146
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
1881
    path = PathJoin(self.tmpdir, "B/moo/xyz")
2147 1882
    self.assertFalse(os.path.exists(path))
2148
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
1883
    os.mkdir(PathJoin(self.tmpdir, "B"))
2149 1884
    utils.Makedirs(path)
2150 1885
    self.assert_(os.path.isdir(path))
2151 1886

  
......
2275 2010
    shutil.rmtree(self.tmpdir)
2276 2011

  
2277 2012
  def testNonExistent(self):
2278
    path = utils.PathJoin(self.tmpdir, "nonexist")
2013
    path = PathJoin(self.tmpdir, "nonexist")
2279 2014
    self.assert_(utils.ReadLockedPidFile(path) is None)
2280 2015

  
2281 2016
  def testUnlocked(self):
2282
    path = utils.PathJoin(self.tmpdir, "pid")
2017
    path = PathJoin(self.tmpdir, "pid")
2283 2018
    utils.WriteFile(path, data="123")
2284 2019
    self.assert_(utils.ReadLockedPidFile(path) is None)
2285 2020

  
2286 2021
  def testLocked(self):
2287
    path = utils.PathJoin(self.tmpdir, "pid")
2022
    path = PathJoin(self.tmpdir, "pid")
2288 2023
    utils.WriteFile(path, data="123")
2289 2024

  
2290 2025
    fl = utils.FileLock.Open(path)
......
2298 2033
    self.assert_(utils.ReadLockedPidFile(path) is None)
2299 2034

  
2300 2035
  def testError(self):
2301
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
2302
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
2036
    path = PathJoin(self.tmpdir, "foobar", "pid")
2037
    utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2303 2038
    # open(2) should return ENOTDIR
2304 2039
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2305 2040

  
......
2450 2185

  
2451 2186
  def testEnsureDirs(self):
2452 2187
    utils.EnsureDirs([
2453
        (utils.PathJoin(self.dir, "foo"), 0777),
2454
        (utils.PathJoin(self.dir, "bar"), 0000),
2188
        (PathJoin(self.dir, "foo"), 0777),
2189
        (PathJoin(self.dir, "bar"), 0000),
2455 2190
        ])
2456
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2457
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2191
    self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2192
    self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2458 2193

  
2459 2194
  def tearDown(self):
2460
    os.rmdir(utils.PathJoin(self.dir, "foo"))
2461
    os.rmdir(utils.PathJoin(self.dir, "bar"))
2195
    os.rmdir(PathJoin(self.dir, "foo"))
2196
    os.rmdir(PathJoin(self.dir, "bar"))
2462 2197
    os.rmdir(self.dir)
2463 2198
    os.umask(self.old_umask)
2464 2199

  
......
2506 2241
    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2507 2242

  
2508 2243

  
2509
class TestIsValidIP4(unittest.TestCase):
2510
  def test(self):
2511
    self.assert_(utils.IsValidIP4("127.0.0.1"))
2512
    self.assert_(utils.IsValidIP4("0.0.0.0"))
2513
    self.assert_(utils.IsValidIP4("255.255.255.255"))
2514
    self.assertFalse(utils.IsValidIP4("0"))
2515
    self.assertFalse(utils.IsValidIP4("1"))
2516
    self.assertFalse(utils.IsValidIP4("1.1.1"))
2517
    self.assertFalse(utils.IsValidIP4("255.255.255.256"))
2518
    self.assertFalse(utils.IsValidIP4("::1"))
2519

  
2520

  
2521
class TestIsValidIP6(unittest.TestCase):
2522
  def test(self):
2523
    self.assert_(utils.IsValidIP6("::"))
2524
    self.assert_(utils.IsValidIP6("::1"))
2525
    self.assert_(utils.IsValidIP6("1" + (":1" * 7)))
2526
    self.assert_(utils.IsValidIP6("ffff" + (":ffff" * 7)))
2527
    self.assertFalse(utils.IsValidIP6("0"))
2528
    self.assertFalse(utils.IsValidIP6(":1"))
2529
    self.assertFalse(utils.IsValidIP6("f" + (":f" * 6)))
2530
    self.assertFalse(utils.IsValidIP6("fffg" + (":ffff" * 7)))
2531
    self.assertFalse(utils.IsValidIP6("fffff" + (":ffff" * 7)))
2532
    self.assertFalse(utils.IsValidIP6("1" + (":1" * 8)))
2533
    self.assertFalse(utils.IsValidIP6("127.0.0.1"))
2534

  
2535

  
2536
class TestIsValidIP(unittest.TestCase):
2537
  def test(self):
2538
    self.assert_(utils.IsValidIP("0.0.0.0"))
2539
    self.assert_(utils.IsValidIP("127.0.0.1"))
2540
    self.assert_(utils.IsValidIP("::"))
2541
    self.assert_(utils.IsValidIP("::1"))
2542
    self.assertFalse(utils.IsValidIP("0"))
2543
    self.assertFalse(utils.IsValidIP("1.1.1.256"))
2544
    self.assertFalse(utils.IsValidIP("a:g::1"))
2545

  
2546

  
2547
class TestGetAddressFamily(unittest.TestCase):
2548
  def test(self):
2549
    self.assertEqual(utils.GetAddressFamily("127.0.0.1"), socket.AF_INET)
2550
    self.assertEqual(utils.GetAddressFamily("10.2.0.127"), socket.AF_INET)
2551
    self.assertEqual(utils.GetAddressFamily("::1"), socket.AF_INET6)
2552
    self.assertEqual(utils.GetAddressFamily("fe80::a00:27ff:fe08:5048"),
2553
                     socket.AF_INET6)
2554
    self.assertRaises(errors.GenericError, utils.GetAddressFamily, "0")
2555

  
2556

  
2557 2244
if __name__ == '__main__':
2558 2245
  testutils.GanetiTestProgram()

Also available in: Unified diff