Revision aea5caef lib/http/client.py
b/lib/http/client.py | ||
---|---|---|
24 | 24 |
|
25 | 25 |
import logging |
26 | 26 |
import pycurl |
27 |
import threading |
|
27 | 28 |
from cStringIO import StringIO |
28 | 29 |
|
29 | 30 |
from ganeti import http |
30 | 31 |
from ganeti import compat |
31 | 32 |
from ganeti import netutils |
33 |
from ganeti import locking |
|
32 | 34 |
|
33 | 35 |
|
34 | 36 |
class HttpClientRequest(object): |
... | ... | |
378 | 380 |
""" |
379 | 381 |
return pycurl.CurlMulti() |
380 | 382 |
|
381 |
def ProcessRequests(self, requests): |
|
383 |
def ProcessRequests(self, requests, lock_monitor_cb=None):
|
|
382 | 384 |
"""Processes any number of HTTP client requests using pooled objects. |
383 | 385 |
|
384 | 386 |
@type requests: list of L{HttpClientRequest} |
385 | 387 |
@param requests: List of all requests |
388 |
@param lock_monitor_cb: Callable for registering with lock monitor |
|
386 | 389 |
|
387 | 390 |
""" |
388 | 391 |
# For client cleanup |
... | ... | |
403 | 406 |
|
404 | 407 |
assert len(curl_to_pclient) == len(requests) |
405 | 408 |
|
409 |
if lock_monitor_cb: |
|
410 |
monitor = _PendingRequestMonitor(threading.currentThread(), |
|
411 |
curl_to_pclient.values) |
|
412 |
lock_monitor_cb(monitor) |
|
413 |
else: |
|
414 |
monitor = _NoOpRequestMonitor |
|
415 |
|
|
406 | 416 |
# Process all requests and act based on the returned values |
407 | 417 |
for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(), |
408 | 418 |
curl_to_pclient.keys()): |
409 | 419 |
pclient = curl_to_pclient[curl] |
410 | 420 |
req = pclient.client.GetCurrentRequest() |
411 |
pclient.client.Done(msg) |
|
421 |
|
|
422 |
monitor.acquire(shared=0) |
|
423 |
try: |
|
424 |
pclient.client.Done(msg) |
|
425 |
finally: |
|
426 |
monitor.release() |
|
412 | 427 |
|
413 | 428 |
assert ((msg is None and req.success and req.error is None) ^ |
414 | 429 |
(msg is not None and not req.success and req.error == msg)) |
... | ... | |
416 | 431 |
assert compat.all(pclient.client.GetCurrentRequest() is None |
417 | 432 |
for pclient in curl_to_pclient.values()) |
418 | 433 |
|
419 |
# Return clients to pool |
|
420 |
self._Return(curl_to_pclient.values()) |
|
434 |
monitor.acquire(shared=0) |
|
435 |
try: |
|
436 |
# Don't try to read information from returned clients |
|
437 |
monitor.Disable() |
|
438 |
|
|
439 |
# Return clients to pool |
|
440 |
self._Return(curl_to_pclient.values()) |
|
441 |
finally: |
|
442 |
monitor.release() |
|
421 | 443 |
|
422 | 444 |
assert compat.all(req.error is not None or |
423 | 445 |
(req.success and |
... | ... | |
426 | 448 |
for req in requests) |
427 | 449 |
|
428 | 450 |
|
451 |
class _NoOpRequestMonitor: # pylint: disable=W0232 |
|
452 |
"""No-op request monitor. |
|
453 |
|
|
454 |
""" |
|
455 |
@staticmethod |
|
456 |
def acquire(*args, **kwargs): |
|
457 |
pass |
|
458 |
|
|
459 |
release = acquire |
|
460 |
Disable = acquire |
|
461 |
|
|
462 |
|
|
463 |
class _PendingRequestMonitor: |
|
464 |
_LOCK = "_lock" |
|
465 |
|
|
466 |
def __init__(self, owner, pending_fn): |
|
467 |
"""Initializes this class. |
|
468 |
|
|
469 |
""" |
|
470 |
self._owner = owner |
|
471 |
self._pending_fn = pending_fn |
|
472 |
|
|
473 |
# The lock monitor runs in another thread, hence locking is necessary |
|
474 |
self._lock = locking.SharedLock("PendingHttpRequests") |
|
475 |
self.acquire = self._lock.acquire |
|
476 |
self.release = self._lock.release |
|
477 |
|
|
478 |
def Disable(self): |
|
479 |
"""Disable monitor. |
|
480 |
|
|
481 |
""" |
|
482 |
self._pending_fn = None |
|
483 |
|
|
484 |
@locking.ssynchronized(_LOCK, shared=1) |
|
485 |
def GetLockInfo(self, requested): # pylint: disable=W0613 |
|
486 |
"""Retrieves information about pending requests. |
|
487 |
|
|
488 |
@type requested: set |
|
489 |
@param requested: Requested information, see C{query.LQ_*} |
|
490 |
|
|
491 |
""" |
|
492 |
# No need to sort here, that's being done by the lock manager and query |
|
493 |
# library. There are no priorities for requests, hence all show up as |
|
494 |
# one item under "pending". |
|
495 |
result = [] |
|
496 |
|
|
497 |
if self._pending_fn: |
|
498 |
owner_name = self._owner.getName() |
|
499 |
|
|
500 |
for pclient in self._pending_fn(): |
|
501 |
req = pclient.client.GetCurrentRequest() |
|
502 |
if req: |
|
503 |
result.append(("rpc/%s%s" % (req.host, req.path), None, None, |
|
504 |
[("thread", [owner_name])])) |
|
505 |
|
|
506 |
return result |
|
507 |
|
|
508 |
|
|
429 | 509 |
def _ProcessCurlRequests(multi, requests): |
430 | 510 |
"""cURL request processor. |
431 | 511 |
|
Also available in: Unified diff