root / lib / rapi / testutils.py @ d9492490
History | View | Annotate | Download (4.8 kB)
1 |
#
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2012 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Remote API test utilities.
|
23 |
|
24 |
"""
|
25 |
|
26 |
import logging |
27 |
import re |
28 |
import mimetools |
29 |
import base64 |
30 |
import pycurl |
31 |
from cStringIO import StringIO |
32 |
|
33 |
from ganeti import errors |
34 |
from ganeti import opcodes |
35 |
from ganeti import http |
36 |
|
37 |
|
38 |
_URI_RE = re.compile(r"https://(?P<host>.*):(?P<port>\d+)(?P<path>/.*)")
|
39 |
|
40 |
|
41 |
class VerificationError(Exception): |
42 |
"""Dedicated error class for test utilities.
|
43 |
|
44 |
This class is used to hide all of Ganeti's internal exception, so that
|
45 |
external users of these utilities don't have to integrate Ganeti's exception
|
46 |
hierarchy.
|
47 |
|
48 |
"""
|
49 |
|
50 |
|
51 |
def _GetOpById(op_id): |
52 |
"""Tries to get an opcode class based on its C{OP_ID}.
|
53 |
|
54 |
"""
|
55 |
try:
|
56 |
return opcodes.OP_MAPPING[op_id]
|
57 |
except KeyError: |
58 |
raise VerificationError("Unknown opcode ID '%s'" % op_id) |
59 |
|
60 |
|
61 |
def _HideInternalErrors(fn): |
62 |
"""Hides Ganeti-internal exceptions, see L{VerificationError}.
|
63 |
|
64 |
"""
|
65 |
def wrapper(*args, **kwargs): |
66 |
try:
|
67 |
return fn(*args, **kwargs)
|
68 |
except errors.GenericError, err:
|
69 |
raise VerificationError("Unhandled Ganeti error: %s" % err) |
70 |
|
71 |
return wrapper
|
72 |
|
73 |
|
74 |
@_HideInternalErrors
|
75 |
def VerifyOpInput(op_id, data): |
76 |
"""Verifies opcode parameters according to their definition.
|
77 |
|
78 |
@type op_id: string
|
79 |
@param op_id: Opcode ID (C{OP_ID} attribute), e.g. C{OP_CLUSTER_VERIFY}
|
80 |
@type data: dict
|
81 |
@param data: Opcode parameter values
|
82 |
@raise VerificationError: Parameter verification failed
|
83 |
|
84 |
"""
|
85 |
op_cls = _GetOpById(op_id) |
86 |
|
87 |
try:
|
88 |
op = op_cls(**data) # pylint: disable=W0142
|
89 |
except TypeError, err: |
90 |
raise VerificationError("Unable to create opcode instance: %s" % err) |
91 |
|
92 |
try:
|
93 |
op.Validate(False)
|
94 |
except errors.OpPrereqError, err:
|
95 |
raise VerificationError("Parameter validation for opcode '%s' failed: %s" % |
96 |
(op_id, err)) |
97 |
|
98 |
|
99 |
@_HideInternalErrors
|
100 |
def VerifyOpResult(op_id, result): |
101 |
"""Verifies opcode results used in tests (e.g. in a mock).
|
102 |
|
103 |
@type op_id: string
|
104 |
@param op_id: Opcode ID (C{OP_ID} attribute), e.g. C{OP_CLUSTER_VERIFY}
|
105 |
@param result: Mocked opcode result
|
106 |
@raise VerificationError: Return value verification failed
|
107 |
|
108 |
"""
|
109 |
resultcheck_fn = _GetOpById(op_id).OP_RESULT |
110 |
|
111 |
if not resultcheck_fn: |
112 |
logging.warning("Opcode '%s' has no result type definition", op_id)
|
113 |
elif not resultcheck_fn(result): |
114 |
raise VerificationError("Given result does not match result description" |
115 |
" for opcode '%s': %s" % (op_id, resultcheck_fn))
|
116 |
|
117 |
|
118 |
def _GetPathFromUri(uri): |
119 |
"""Gets the path and query from a URI.
|
120 |
|
121 |
"""
|
122 |
match = _URI_RE.match(uri) |
123 |
if match:
|
124 |
return match.groupdict()["path"] |
125 |
else:
|
126 |
return None |
127 |
|
128 |
|
129 |
class FakeCurl: |
130 |
"""Fake cURL object.
|
131 |
|
132 |
"""
|
133 |
def __init__(self, handler): |
134 |
"""Initialize this class
|
135 |
|
136 |
@param handler: Request handler instance
|
137 |
|
138 |
"""
|
139 |
self._handler = handler
|
140 |
self._opts = {}
|
141 |
self._info = {}
|
142 |
|
143 |
def setopt(self, opt, value): |
144 |
self._opts[opt] = value
|
145 |
|
146 |
def getopt(self, opt): |
147 |
return self._opts.get(opt) |
148 |
|
149 |
def unsetopt(self, opt): |
150 |
self._opts.pop(opt, None) |
151 |
|
152 |
def getinfo(self, info): |
153 |
return self._info[info] |
154 |
|
155 |
def perform(self): |
156 |
method = self._opts[pycurl.CUSTOMREQUEST]
|
157 |
url = self._opts[pycurl.URL]
|
158 |
request_body = self._opts[pycurl.POSTFIELDS]
|
159 |
writefn = self._opts[pycurl.WRITEFUNCTION]
|
160 |
|
161 |
if pycurl.HTTPHEADER in self._opts: |
162 |
baseheaders = "\n".join(self._opts[pycurl.HTTPHEADER]) |
163 |
else:
|
164 |
baseheaders = ""
|
165 |
|
166 |
headers = mimetools.Message(StringIO(baseheaders), 0)
|
167 |
|
168 |
if request_body:
|
169 |
headers[http.HTTP_CONTENT_LENGTH] = str(len(request_body)) |
170 |
|
171 |
if self._opts.get(pycurl.HTTPAUTH, 0) & pycurl.HTTPAUTH_BASIC: |
172 |
try:
|
173 |
userpwd = self._opts[pycurl.USERPWD]
|
174 |
except KeyError: |
175 |
raise errors.ProgrammerError("Basic authentication requires username" |
176 |
" and password")
|
177 |
|
178 |
headers[http.HTTP_AUTHORIZATION] = \ |
179 |
"%s %s" % (http.auth.HTTP_BASIC_AUTH, base64.b64encode(userpwd))
|
180 |
|
181 |
path = _GetPathFromUri(url) |
182 |
(code, resp_body) = \ |
183 |
self._handler.FetchResponse(path, method, headers, request_body)
|
184 |
|
185 |
self._info[pycurl.RESPONSE_CODE] = code
|
186 |
if resp_body is not None: |
187 |
writefn(resp_body) |