Revision aea5caef

b/lib/http/client.py
24 24

  
25 25
import logging
26 26
import pycurl
27
import threading
27 28
from cStringIO import StringIO
28 29

  
29 30
from ganeti import http
30 31
from ganeti import compat
31 32
from ganeti import netutils
33
from ganeti import locking
32 34

  
33 35

  
34 36
class HttpClientRequest(object):
......
378 380
    """
379 381
    return pycurl.CurlMulti()
380 382

  
381
  def ProcessRequests(self, requests):
383
  def ProcessRequests(self, requests, lock_monitor_cb=None):
382 384
    """Processes any number of HTTP client requests using pooled objects.
383 385

  
384 386
    @type requests: list of L{HttpClientRequest}
385 387
    @param requests: List of all requests
388
    @param lock_monitor_cb: Callable for registering with lock monitor
386 389

  
387 390
    """
388 391
    # For client cleanup
......
403 406

  
404 407
    assert len(curl_to_pclient) == len(requests)
405 408

  
409
    if lock_monitor_cb:
410
      monitor = _PendingRequestMonitor(threading.currentThread(),
411
                                       curl_to_pclient.values)
412
      lock_monitor_cb(monitor)
413
    else:
414
      monitor = _NoOpRequestMonitor
415

  
406 416
    # Process all requests and act based on the returned values
407 417
    for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
408 418
                                            curl_to_pclient.keys()):
409 419
      pclient = curl_to_pclient[curl]
410 420
      req = pclient.client.GetCurrentRequest()
411
      pclient.client.Done(msg)
421

  
422
      monitor.acquire(shared=0)
423
      try:
424
        pclient.client.Done(msg)
425
      finally:
426
        monitor.release()
412 427

  
413 428
      assert ((msg is None and req.success and req.error is None) ^
414 429
              (msg is not None and not req.success and req.error == msg))
......
416 431
    assert compat.all(pclient.client.GetCurrentRequest() is None
417 432
                      for pclient in curl_to_pclient.values())
418 433

  
419
    # Return clients to pool
420
    self._Return(curl_to_pclient.values())
434
    monitor.acquire(shared=0)
435
    try:
436
      # Don't try to read information from returned clients
437
      monitor.Disable()
438

  
439
      # Return clients to pool
440
      self._Return(curl_to_pclient.values())
441
    finally:
442
      monitor.release()
421 443

  
422 444
    assert compat.all(req.error is not None or
423 445
                      (req.success and
......
426 448
                      for req in requests)
427 449

  
428 450

  
451
class _NoOpRequestMonitor: # pylint: disable=W0232
452
  """No-op request monitor.
453

  
454
  """
455
  @staticmethod
456
  def acquire(*args, **kwargs):
457
    pass
458

  
459
  release = acquire
460
  Disable = acquire
461

  
462

  
463
class _PendingRequestMonitor:
464
  _LOCK = "_lock"
465

  
466
  def __init__(self, owner, pending_fn):
467
    """Initializes this class.
468

  
469
    """
470
    self._owner = owner
471
    self._pending_fn = pending_fn
472

  
473
    # The lock monitor runs in another thread, hence locking is necessary
474
    self._lock = locking.SharedLock("PendingHttpRequests")
475
    self.acquire = self._lock.acquire
476
    self.release = self._lock.release
477

  
478
  def Disable(self):
479
    """Disable monitor.
480

  
481
    """
482
    self._pending_fn = None
483

  
484
  @locking.ssynchronized(_LOCK, shared=1)
485
  def GetLockInfo(self, requested): # pylint: disable=W0613
486
    """Retrieves information about pending requests.
487

  
488
    @type requested: set
489
    @param requested: Requested information, see C{query.LQ_*}
490

  
491
    """
492
    # No need to sort here, that's being done by the lock manager and query
493
    # library. There are no priorities for requests, hence all show up as
494
    # one item under "pending".
495
    result = []
496

  
497
    if self._pending_fn:
498
      owner_name = self._owner.getName()
499

  
500
      for pclient in self._pending_fn():
501
        req = pclient.client.GetCurrentRequest()
502
        if req:
503
          result.append(("rpc/%s%s" % (req.host, req.path), None, None,
504
                         [("thread", [owner_name])]))
505

  
506
    return result
507

  
508

  
429 509
def _ProcessCurlRequests(multi, requests):
430 510
  """cURL request processor.
431 511

  
b/lib/rpc.py
368 368

  
369 369

  
370 370
class _RpcProcessor:
371
  def __init__(self, resolver, port):
371
  def __init__(self, resolver, port, lock_monitor_cb=None):
372 372
    """Initializes this class.
373 373

  
374 374
    @param resolver: callable accepting a list of hostnames, returning a list
......
376 376
      the special value L{_OFFLINE} to mark offline machines)
377 377
    @type port: int
378 378
    @param port: TCP port
379
    @param lock_monitor_cb: Callable for registering with lock monitor
379 380

  
380 381
    """
381 382
    self._resolver = resolver
382 383
    self._port = port
384
    self._lock_monitor_cb = lock_monitor_cb
383 385

  
384 386
  @staticmethod
385 387
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
......
452 454
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
453 455
                            str(body), read_timeout)
454 456

  
455
    http_pool.ProcessRequests(requests.values())
457
    http_pool.ProcessRequests(requests.values(),
458
                              lock_monitor_cb=self._lock_monitor_cb)
456 459

  
457 460
    assert not frozenset(results).intersection(requests)
458 461

  
......
489 492
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
490 493
                                              self._cfg.GetNodeInfo,
491 494
                                              self._cfg.GetAllNodesInfo),
492
                               netutils.GetDaemonPort(constants.NODED))
495
                               netutils.GetDaemonPort(constants.NODED),
496
                               lock_monitor_cb=context.glm.AddToLockMonitor)
493 497

  
494 498
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
495 499
    """Convert the given instance to a dict.
b/test/ganeti.rpc_unittest.py
51 51
    self._response_fn = response_fn
52 52
    self.reqcount = 0
53 53

  
54
  def ProcessRequests(self, reqs):
54
  def ProcessRequests(self, reqs, lock_monitor_cb=None):
55 55
    for req in reqs:
56 56
      self.reqcount += 1
57 57
      self._response_fn(req)

Also available in: Unified diff