Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.serializer_unittest.py @ 560ef132

History | View | Annotate | Download (7.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the serializer module"""
23

    
24

    
25
import doctest
26
import unittest
27

    
28
from ganeti import errors
29
from ganeti import ht
30
from ganeti import objects
31
from ganeti import serializer
32

    
33
import testutils
34

    
35

    
36
class TestSerializer(testutils.GanetiTestCase):
37
  """Serializer tests"""
38

    
39
  _TESTDATA = [
40
    "test",
41
    255,
42
    [1, 2, 3],
43
    (1, 2, 3),
44
    {"1": 2,
45
     "foo": "bar"},
46
    ["abc", 1, 2, 3, 999,
47
      {
48
        "a1": ("Hello", "World"),
49
        "a2": "This is only a test",
50
        "a3": None,
51
        "osparams:": serializer.PrivateDict({
52
                      "foo": 5,
53
                     })
54
      }
55
    ]
56
  ]
57

    
58
  def _TestSerializer(self, dump_fn, load_fn):
59
    _dump_fn = lambda data: dump_fn(
60
      data,
61
      private_encoder=serializer.EncodeWithPrivateFields
62
    )
63
    for data in self._TESTDATA:
64
      self.failUnless(_dump_fn(data).endswith("\n"))
65
      self.assertEqualValues(load_fn(_dump_fn(data)), data)
66

    
67
  def testGeneric(self):
68
    self._TestSerializer(serializer.Dump, serializer.Load)
69

    
70
  def testSignedGeneric(self):
71
    self._TestSigned(serializer.DumpSigned, serializer.LoadSigned)
72

    
73
  def testJson(self):
74
    self._TestSerializer(serializer.DumpJson, serializer.LoadJson)
75

    
76
  def testSignedJson(self):
77
    self._TestSigned(serializer.DumpSignedJson, serializer.LoadSignedJson)
78

    
79
  def _TestSigned(self, dump_fn, load_fn):
80
    _dump_fn = lambda *args, **kwargs: dump_fn(
81
      *args,
82
      private_encoder=serializer.EncodeWithPrivateFields,
83
      **kwargs
84
    )
85
    for data in self._TESTDATA:
86
      self.assertEqualValues(load_fn(_dump_fn(data, "mykey"), "mykey"),
87
                             (data, ""))
88
      self.assertEqualValues(load_fn(_dump_fn(data, "myprivatekey",
89
                                              salt="mysalt"),
90
                                     "myprivatekey"),
91
                             (data, "mysalt"))
92

    
93
      keydict = {
94
        "mykey_id": "myprivatekey",
95
        }
96
      self.assertEqualValues(load_fn(_dump_fn(data, "myprivatekey",
97
                                              salt="mysalt",
98
                                              key_selector="mykey_id"),
99
                                     keydict.get),
100
                             (data, "mysalt"))
101
      self.assertRaises(errors.SignatureError, load_fn,
102
                        _dump_fn(data, "myprivatekey",
103
                                 salt="mysalt",
104
                                 key_selector="mykey_id"),
105
                        {}.get)
106

    
107
    self.assertRaises(errors.SignatureError, load_fn,
108
                      _dump_fn("test", "myprivatekey"),
109
                      "myotherkey")
110

    
111
    self.assertRaises(errors.SignatureError, load_fn,
112
                      serializer.DumpJson("This is a test"), "mykey")
113
    self.assertRaises(errors.SignatureError, load_fn,
114
                      serializer.DumpJson({}), "mykey")
115

    
116
    # Message missing salt and HMAC
117
    tdata = { "msg": "Foo", }
118
    self.assertRaises(errors.SignatureError, load_fn,
119
                      serializer.DumpJson(tdata), "mykey")
120

    
121

    
122
class TestLoadAndVerifyJson(unittest.TestCase):
123
  def testNoJson(self):
124
    self.assertRaises(errors.ParseError, serializer.LoadAndVerifyJson,
125
                      "", NotImplemented)
126
    self.assertRaises(errors.ParseError, serializer.LoadAndVerifyJson,
127
                      "}", NotImplemented)
128

    
129
  def testVerificationFails(self):
130
    self.assertRaises(errors.ParseError, serializer.LoadAndVerifyJson,
131
                      "{}", lambda _: False)
132

    
133
    verify_fn = ht.TListOf(ht.TNonEmptyString)
134
    try:
135
      serializer.LoadAndVerifyJson("{}", verify_fn)
136
    except errors.ParseError, err:
137
      self.assertTrue(str(err).endswith(str(verify_fn)))
138
    else:
139
      self.fail("Exception not raised")
140

    
141
  def testSuccess(self):
142
    self.assertEqual(serializer.LoadAndVerifyJson("{}", ht.TAny), {})
143
    self.assertEqual(serializer.LoadAndVerifyJson("\"Foo\"", ht.TAny), "Foo")
144

    
145

    
146
class TestPrivate(unittest.TestCase):
147

    
148
  def testEquality(self):
149
    pDict = serializer.PrivateDict()
150
    pDict["bar"] = "egg"
151
    nDict = {"bar": "egg"}
152
    self.assertEqual(pDict, nDict, "PrivateDict-dict equality failure")
153

    
154
  def testPrivateDictUnprivate(self):
155
    pDict = serializer.PrivateDict()
156
    pDict["bar"] = "egg"
157
    uDict = pDict.Unprivate()
158
    nDict = {"bar": "egg"}
159
    self.assertEquals(type(uDict), dict,
160
                      "PrivateDict.Unprivate() did not return a dict")
161
    self.assertEqual(pDict, uDict, "PrivateDict.Unprivate() equality failure")
162
    self.assertEqual(nDict, uDict, "PrivateDict.Unprivate() failed to return")
163

    
164
  def testAttributeTransparency(self):
165
    class Dummy(object):
166
      pass
167

    
168
    dummy = Dummy()
169
    dummy.bar = "egg"
170
    pDummy = serializer.Private(dummy)
171
    self.assertEqual(pDummy.bar, "egg", "Failed to access attribute of Private")
172

    
173
  def testCallTransparency(self):
174
    foo = serializer.Private("egg")
175
    self.assertEqual(foo.upper(), "EGG", "Failed to call Private instance")
176

    
177
  def testFillDict(self):
178
    pDict = serializer.PrivateDict()
179
    pDict["bar"] = "egg"
180
    self.assertEqual(pDict, objects.FillDict({}, pDict))
181

    
182
  def testLeak(self):
183
    pDict = serializer.PrivateDict()
184
    pDict["bar"] = "egg"
185
    self.assertNotIn("egg", str(pDict), "Value leaked in str(PrivateDict)")
186
    self.assertNotIn("egg", repr(pDict), "Value leaked in repr(PrivateDict)")
187
    self.assertNotIn("egg", "{0}".format(pDict),
188
                     "Value leaked in PrivateDict.__format__")
189
    self.assertNotIn("egg", serializer.Dump(pDict),
190
                     "Value leaked in serializer.Dump(PrivateDict)")
191

    
192
  def testProperAccess(self):
193
    pDict = serializer.PrivateDict()
194
    pDict["bar"] = "egg"
195

    
196
    self.assertIs("egg", pDict["bar"].Get(),
197
                  "Value not returned by Private.Get()")
198
    self.assertIs("egg", pDict.GetPrivate("bar"),
199
                  "Value not returned by Private.GetPrivate()")
200
    self.assertIs("egg", pDict.Unprivate()["bar"],
201
                  "Value not returned by PrivateDict.Unprivate()")
202
    self.assertIn(
203
      "egg",
204
      serializer.Dump(pDict,
205
                      private_encoder=serializer.EncodeWithPrivateFields)
206
    )
207

    
208
  def testDictGet(self):
209
    self.assertIs("tar", serializer.PrivateDict().GetPrivate("bar", "tar"),
210
                  "Private.GetPrivate() did not handle the default case")
211

    
212
  def testZeronessPrivate(self):
213
    self.assertTrue(serializer.Private("foo"),
214
                    "Private of non-empty string is false")
215
    self.assertFalse(serializer.Private(""), "Private empty string is true")
216

    
217

    
218
class TestCheckDoctests(unittest.TestCase):
219

    
220
  def testCheckSerializer(self):
221
    results = doctest.testmod(serializer)
222
    self.assertEquals(results.failed, 0, "Doctest failures detected")
223

    
224
if __name__ == "__main__":
225
  testutils.GanetiTestProgram()