Revision 33231500 lib/rpc.py

b/lib/rpc.py
34 34
import logging
35 35
import zlib
36 36
import base64
37
import pycurl
38
import threading
37 39

  
38 40
from ganeti import utils
39 41
from ganeti import objects
......
47 49
import ganeti.http.client  # pylint: disable-msg=W0611
48 50

  
49 51

  
50
# Module level variable
51
_http_manager = None
52
# Timeout for connecting to nodes (seconds)
53
_RPC_CONNECT_TIMEOUT = 5
54

  
55
_RPC_CLIENT_HEADERS = [
56
  "Content-type: %s" % http.HTTP_APP_JSON,
57
  ]
52 58

  
53 59
# Various time constants for the timeout table
54 60
_TMO_URGENT = 60 # one minute
......
72 78
def Init():
73 79
  """Initializes the module-global HTTP client manager.
74 80

  
75
  Must be called before using any RPC function.
81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
76 83

  
77 84
  """
78
  global _http_manager # pylint: disable-msg=W0603
79

  
80
  assert not _http_manager, "RPC module initialized more than once"
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
81 90

  
82
  http.InitSsl()
91
  logging.info("Using PycURL %s", pycurl.version)
83 92

  
84
  _http_manager = http.client.HttpClientManager()
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
85 94

  
86 95

  
87 96
def Shutdown():
88 97
  """Stops the module-global HTTP client manager.
89 98

  
90
  Must be called before quitting the program.
99
  Must be called before quitting the program and while exactly one thread is
100
  running.
91 101

  
92 102
  """
93
  global _http_manager # pylint: disable-msg=W0603
103
  pycurl.global_cleanup()
104

  
105

  
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
94 108

  
95
  if _http_manager:
96
    _http_manager.Shutdown()
97
    _http_manager = None
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

  
119

  
120
class _RpcThreadLocal(threading.local):
121
  def GetHttpClientPool(self):
122
    """Returns a per-thread HTTP client pool.
123

  
124
    @rtype: L{http.client.HttpClientPool}
125

  
126
    """
127
    try:
128
      pool = self.hcp
129
    except AttributeError:
130
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
131
      self.hcp = pool
132

  
133
    return pool
134

  
135

  
136
_thread_local = _RpcThreadLocal()
98 137

  
99 138

  
100 139
def _RpcTimeout(secs):
......
218 257
    self.procedure = procedure
219 258
    self.body = body
220 259
    self.port = port
221
    self.nc = {}
222

  
223
    self._ssl_params = \
224
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
225
                         ssl_cert_path=constants.NODED_CERT_FILE)
260
    self._request = {}
226 261

  
227 262
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
228 263
    """Add a list of nodes to the target nodes.
......
260 295
    if read_timeout is None:
261 296
      read_timeout = _TIMEOUTS[self.procedure]
262 297

  
263
    self.nc[name] = \
264
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
265
                                    "/%s" % self.procedure,
266
                                    post_data=self.body,
267
                                    ssl_params=self._ssl_params,
268
                                    ssl_verify_peer=True,
298
    self._request[name] = \
299
      http.client.HttpClientRequest(str(address), self.port,
300
                                    http.HTTP_PUT, str("/%s" % self.procedure),
301
                                    headers=_RPC_CLIENT_HEADERS,
302
                                    post_data=str(self.body),
269 303
                                    read_timeout=read_timeout)
270 304

  
271
  def GetResults(self):
305
  def GetResults(self, http_pool=None):
272 306
    """Call nodes and return results.
273 307

  
274 308
    @rtype: list
275 309
    @return: List of RPC results
276 310

  
277 311
    """
278
    assert _http_manager, "RPC module not initialized"
312
    if not http_pool:
313
      http_pool = _thread_local.GetHttpClientPool()
279 314

  
280
    _http_manager.ExecRequests(self.nc.values())
315
    http_pool.ProcessRequests(self._request.values())
281 316

  
282 317
    results = {}
283 318

  
284
    for name, req in self.nc.iteritems():
319
    for name, req in self._request.iteritems():
285 320
      if req.success and req.resp_status_code == http.HTTP_OK:
286 321
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
287 322
                                  node=name, call=self.procedure)

Also available in: Unified diff