Revision 33231500 lib/rpc.py
b/lib/rpc.py | ||
---|---|---|
34 | 34 |
import logging |
35 | 35 |
import zlib |
36 | 36 |
import base64 |
37 |
import pycurl |
|
38 |
import threading |
|
37 | 39 |
|
38 | 40 |
from ganeti import utils |
39 | 41 |
from ganeti import objects |
... | ... | |
47 | 49 |
import ganeti.http.client # pylint: disable-msg=W0611 |
48 | 50 |
|
49 | 51 |
|
50 |
# Module level variable |
|
51 |
_http_manager = None |
|
52 |
# Timeout for connecting to nodes (seconds) |
|
53 |
_RPC_CONNECT_TIMEOUT = 5 |
|
54 |
|
|
55 |
_RPC_CLIENT_HEADERS = [ |
|
56 |
"Content-type: %s" % http.HTTP_APP_JSON, |
|
57 |
] |
|
52 | 58 |
|
53 | 59 |
# Various time constants for the timeout table |
54 | 60 |
_TMO_URGENT = 60 # one minute |
... | ... | |
72 | 78 |
def Init(): |
73 | 79 |
"""Initializes the module-global HTTP client manager. |
74 | 80 |
|
75 |
Must be called before using any RPC function. |
|
81 |
Must be called before using any RPC function and while exactly one thread is |
|
82 |
running. |
|
76 | 83 |
|
77 | 84 |
""" |
78 |
global _http_manager # pylint: disable-msg=W0603 |
|
79 |
|
|
80 |
assert not _http_manager, "RPC module initialized more than once" |
|
85 |
# curl_global_init(3) and curl_global_cleanup(3) must be called with only |
|
86 |
# one thread running. This check is just a safety measure -- it doesn't |
|
87 |
# cover all cases. |
|
88 |
assert threading.activeCount() == 1, \ |
|
89 |
"Found more than one active thread when initializing pycURL" |
|
81 | 90 |
|
82 |
http.InitSsl()
|
|
91 |
logging.info("Using PycURL %s", pycurl.version)
|
|
83 | 92 |
|
84 |
_http_manager = http.client.HttpClientManager()
|
|
93 |
pycurl.global_init(pycurl.GLOBAL_ALL)
|
|
85 | 94 |
|
86 | 95 |
|
87 | 96 |
def Shutdown(): |
88 | 97 |
"""Stops the module-global HTTP client manager. |
89 | 98 |
|
90 |
Must be called before quitting the program. |
|
99 |
Must be called before quitting the program and while exactly one thread is |
|
100 |
running. |
|
91 | 101 |
|
92 | 102 |
""" |
93 |
global _http_manager # pylint: disable-msg=W0603 |
|
103 |
pycurl.global_cleanup() |
|
104 |
|
|
105 |
|
|
106 |
def _ConfigRpcCurl(curl): |
|
107 |
noded_cert = str(constants.NODED_CERT_FILE) |
|
94 | 108 |
|
95 |
if _http_manager: |
|
96 |
_http_manager.Shutdown() |
|
97 |
_http_manager = None |
|
109 |
curl.setopt(pycurl.FOLLOWLOCATION, False) |
|
110 |
curl.setopt(pycurl.CAINFO, noded_cert) |
|
111 |
curl.setopt(pycurl.SSL_VERIFYHOST, 0) |
|
112 |
curl.setopt(pycurl.SSL_VERIFYPEER, True) |
|
113 |
curl.setopt(pycurl.SSLCERTTYPE, "PEM") |
|
114 |
curl.setopt(pycurl.SSLCERT, noded_cert) |
|
115 |
curl.setopt(pycurl.SSLKEYTYPE, "PEM") |
|
116 |
curl.setopt(pycurl.SSLKEY, noded_cert) |
|
117 |
curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT) |
|
118 |
|
|
119 |
|
|
120 |
class _RpcThreadLocal(threading.local): |
|
121 |
def GetHttpClientPool(self): |
|
122 |
"""Returns a per-thread HTTP client pool. |
|
123 |
|
|
124 |
@rtype: L{http.client.HttpClientPool} |
|
125 |
|
|
126 |
""" |
|
127 |
try: |
|
128 |
pool = self.hcp |
|
129 |
except AttributeError: |
|
130 |
pool = http.client.HttpClientPool(_ConfigRpcCurl) |
|
131 |
self.hcp = pool |
|
132 |
|
|
133 |
return pool |
|
134 |
|
|
135 |
|
|
136 |
_thread_local = _RpcThreadLocal() |
|
98 | 137 |
|
99 | 138 |
|
100 | 139 |
def _RpcTimeout(secs): |
... | ... | |
218 | 257 |
self.procedure = procedure |
219 | 258 |
self.body = body |
220 | 259 |
self.port = port |
221 |
self.nc = {} |
|
222 |
|
|
223 |
self._ssl_params = \ |
|
224 |
http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE, |
|
225 |
ssl_cert_path=constants.NODED_CERT_FILE) |
|
260 |
self._request = {} |
|
226 | 261 |
|
227 | 262 |
def ConnectList(self, node_list, address_list=None, read_timeout=None): |
228 | 263 |
"""Add a list of nodes to the target nodes. |
... | ... | |
260 | 295 |
if read_timeout is None: |
261 | 296 |
read_timeout = _TIMEOUTS[self.procedure] |
262 | 297 |
|
263 |
self.nc[name] = \ |
|
264 |
http.client.HttpClientRequest(address, self.port, http.HTTP_PUT, |
|
265 |
"/%s" % self.procedure, |
|
266 |
post_data=self.body, |
|
267 |
ssl_params=self._ssl_params, |
|
268 |
ssl_verify_peer=True, |
|
298 |
self._request[name] = \ |
|
299 |
http.client.HttpClientRequest(str(address), self.port, |
|
300 |
http.HTTP_PUT, str("/%s" % self.procedure), |
|
301 |
headers=_RPC_CLIENT_HEADERS, |
|
302 |
post_data=str(self.body), |
|
269 | 303 |
read_timeout=read_timeout) |
270 | 304 |
|
271 |
def GetResults(self): |
|
305 |
def GetResults(self, http_pool=None):
|
|
272 | 306 |
"""Call nodes and return results. |
273 | 307 |
|
274 | 308 |
@rtype: list |
275 | 309 |
@return: List of RPC results |
276 | 310 |
|
277 | 311 |
""" |
278 |
assert _http_manager, "RPC module not initialized" |
|
312 |
if not http_pool: |
|
313 |
http_pool = _thread_local.GetHttpClientPool() |
|
279 | 314 |
|
280 |
_http_manager.ExecRequests(self.nc.values())
|
|
315 |
http_pool.ProcessRequests(self._request.values())
|
|
281 | 316 |
|
282 | 317 |
results = {} |
283 | 318 |
|
284 |
for name, req in self.nc.iteritems():
|
|
319 |
for name, req in self._request.iteritems():
|
|
285 | 320 |
if req.success and req.resp_status_code == http.HTTP_OK: |
286 | 321 |
results[name] = RpcResult(data=serializer.LoadJson(req.resp_body), |
287 | 322 |
node=name, call=self.procedure) |
Also available in: Unified diff