24 |
24 |
|
25 |
25 |
import logging
|
26 |
26 |
import pycurl
|
|
27 |
import threading
|
27 |
28 |
from cStringIO import StringIO
|
28 |
29 |
|
29 |
30 |
from ganeti import http
|
30 |
31 |
from ganeti import compat
|
31 |
32 |
from ganeti import netutils
|
|
33 |
from ganeti import locking
|
32 |
34 |
|
33 |
35 |
|
34 |
36 |
class HttpClientRequest(object):
|
... | ... | |
378 |
380 |
"""
|
379 |
381 |
return pycurl.CurlMulti()
|
380 |
382 |
|
381 |
|
def ProcessRequests(self, requests):
|
|
383 |
def ProcessRequests(self, requests, lock_monitor_cb=None):
|
382 |
384 |
"""Processes any number of HTTP client requests using pooled objects.
|
383 |
385 |
|
384 |
386 |
@type requests: list of L{HttpClientRequest}
|
385 |
387 |
@param requests: List of all requests
|
|
388 |
@param lock_monitor_cb: Callable for registering with lock monitor
|
386 |
389 |
|
387 |
390 |
"""
|
388 |
391 |
# For client cleanup
|
... | ... | |
403 |
406 |
|
404 |
407 |
assert len(curl_to_pclient) == len(requests)
|
405 |
408 |
|
|
409 |
if lock_monitor_cb:
|
|
410 |
monitor = _PendingRequestMonitor(threading.currentThread(),
|
|
411 |
curl_to_pclient.values)
|
|
412 |
lock_monitor_cb(monitor)
|
|
413 |
else:
|
|
414 |
monitor = _NoOpRequestMonitor
|
|
415 |
|
406 |
416 |
# Process all requests and act based on the returned values
|
407 |
417 |
for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
|
408 |
418 |
curl_to_pclient.keys()):
|
409 |
419 |
pclient = curl_to_pclient[curl]
|
410 |
420 |
req = pclient.client.GetCurrentRequest()
|
411 |
|
pclient.client.Done(msg)
|
|
421 |
|
|
422 |
monitor.acquire(shared=0)
|
|
423 |
try:
|
|
424 |
pclient.client.Done(msg)
|
|
425 |
finally:
|
|
426 |
monitor.release()
|
412 |
427 |
|
413 |
428 |
assert ((msg is None and req.success and req.error is None) ^
|
414 |
429 |
(msg is not None and not req.success and req.error == msg))
|
... | ... | |
416 |
431 |
assert compat.all(pclient.client.GetCurrentRequest() is None
|
417 |
432 |
for pclient in curl_to_pclient.values())
|
418 |
433 |
|
419 |
|
# Return clients to pool
|
420 |
|
self._Return(curl_to_pclient.values())
|
|
434 |
monitor.acquire(shared=0)
|
|
435 |
try:
|
|
436 |
# Don't try to read information from returned clients
|
|
437 |
monitor.Disable()
|
|
438 |
|
|
439 |
# Return clients to pool
|
|
440 |
self._Return(curl_to_pclient.values())
|
|
441 |
finally:
|
|
442 |
monitor.release()
|
421 |
443 |
|
422 |
444 |
assert compat.all(req.error is not None or
|
423 |
445 |
(req.success and
|
... | ... | |
426 |
448 |
for req in requests)
|
427 |
449 |
|
428 |
450 |
|
|
451 |
class _NoOpRequestMonitor: # pylint: disable=W0232
|
|
452 |
"""No-op request monitor.
|
|
453 |
|
|
454 |
"""
|
|
455 |
@staticmethod
|
|
456 |
def acquire(*args, **kwargs):
|
|
457 |
pass
|
|
458 |
|
|
459 |
release = acquire
|
|
460 |
Disable = acquire
|
|
461 |
|
|
462 |
|
|
463 |
class _PendingRequestMonitor:
|
|
464 |
_LOCK = "_lock"
|
|
465 |
|
|
466 |
def __init__(self, owner, pending_fn):
|
|
467 |
"""Initializes this class.
|
|
468 |
|
|
469 |
"""
|
|
470 |
self._owner = owner
|
|
471 |
self._pending_fn = pending_fn
|
|
472 |
|
|
473 |
# The lock monitor runs in another thread, hence locking is necessary
|
|
474 |
self._lock = locking.SharedLock("PendingHttpRequests")
|
|
475 |
self.acquire = self._lock.acquire
|
|
476 |
self.release = self._lock.release
|
|
477 |
|
|
478 |
def Disable(self):
|
|
479 |
"""Disable monitor.
|
|
480 |
|
|
481 |
"""
|
|
482 |
self._pending_fn = None
|
|
483 |
|
|
484 |
@locking.ssynchronized(_LOCK, shared=1)
|
|
485 |
def GetLockInfo(self, requested): # pylint: disable=W0613
|
|
486 |
"""Retrieves information about pending requests.
|
|
487 |
|
|
488 |
@type requested: set
|
|
489 |
@param requested: Requested information, see C{query.LQ_*}
|
|
490 |
|
|
491 |
"""
|
|
492 |
# No need to sort here, that's being done by the lock manager and query
|
|
493 |
# library. There are no priorities for requests, hence all show up as
|
|
494 |
# one item under "pending".
|
|
495 |
result = []
|
|
496 |
|
|
497 |
if self._pending_fn:
|
|
498 |
owner_name = self._owner.getName()
|
|
499 |
|
|
500 |
for pclient in self._pending_fn():
|
|
501 |
req = pclient.client.GetCurrentRequest()
|
|
502 |
if req:
|
|
503 |
result.append(("rpc/%s%s" % (req.host, req.path), None, None,
|
|
504 |
[("thread", [owner_name])]))
|
|
505 |
|
|
506 |
return result
|
|
507 |
|
|
508 |
|
429 |
509 |
def _ProcessCurlRequests(multi, requests):
|
430 |
510 |
"""cURL request processor.
|
431 |
511 |
|