Revision 942817f2

b/lib/jqueue/__init__.py
67 67
from ganeti import vcluster
68 68

  
69 69

  
70
JOBQUEUE_THREADS = 25
70
JOBQUEUE_THREADS = 1
71 71

  
72 72
# member lock names to be passed to @ssynchronized decorator
73 73
_LOCK = "_lock"
......
1631 1631
      self._enqueue_fn(jobs)
1632 1632

  
1633 1633

  
1634
def _RequireOpenQueue(fn):
1635
  """Decorator for "public" functions.
1636

  
1637
  This function should be used for all 'public' functions. That is,
1638
  functions usually called from other classes. Note that this should
1639
  be applied only to methods (not plain functions), since it expects
1640
  that the decorated function is called with a first argument that has
1641
  a '_queue_filelock' argument.
1642

  
1643
  @warning: Use this decorator only after locking.ssynchronized
1644

  
1645
  Example::
1646
    @locking.ssynchronized(_LOCK)
1647
    @_RequireOpenQueue
1648
    def Example(self):
1649
      pass
1650

  
1651
  """
1652
  def wrapper(self, *args, **kwargs):
1653
    # pylint: disable=W0212
1654
    assert self._queue_filelock is not None, "Queue should be open"
1655
    return fn(self, *args, **kwargs)
1656
  return wrapper
1657

  
1658

  
1659 1634
def _RequireNonDrainedQueue(fn):
1660 1635
  """Decorator checking for a non-drained queue.
1661 1636

  
......
1715 1690
    # Accept jobs by default
1716 1691
    self._accepting_jobs = True
1717 1692

  
1718
    # Initialize the queue, and acquire the filelock.
1719
    # This ensures no other process is working on the job queue.
1720
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1721

  
1722 1693
    # Read serial file
1723 1694
    self._last_serial = jstore.ReadSerial()
1724 1695
    assert self._last_serial is not None, ("Serial file was modified between"
......
1796 1767
    return rpc.JobQueueRunner(self.context, address_list)
1797 1768

  
1798 1769
  @locking.ssynchronized(_LOCK)
1799
  @_RequireOpenQueue
1800 1770
  def AddNode(self, node):
1801 1771
    """Register a new node with the queue.
1802 1772

  
......
1852 1822
    self._nodes[node_name] = node.primary_ip
1853 1823

  
1854 1824
  @locking.ssynchronized(_LOCK)
1855
  @_RequireOpenQueue
1856 1825
  def RemoveNode(self, node_name):
1857 1826
    """Callback called when removing nodes from the cluster.
1858 1827

  
......
2137 2106
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2138 2107

  
2139 2108
  @locking.ssynchronized(_LOCK)
2140
  @_RequireOpenQueue
2141 2109
  def SetDrainFlag(self, drain_flag):
2142 2110
    """Sets the drain flag for the queue.
2143 2111

  
......
2264 2232

  
2265 2233
    raise errors.JobLost("Job %s not found" % job_id)
2266 2234

  
2267
  @_RequireOpenQueue
2268 2235
  def UpdateJobUnlocked(self, job, replicate=True):
2269 2236
    """Update a job's on disk storage.
2270 2237

  
......
2339 2306
      return None
2340 2307

  
2341 2308
  @locking.ssynchronized(_LOCK)
2342
  @_RequireOpenQueue
2343 2309
  def CancelJob(self, job_id):
2344 2310
    """Cancels a job.
2345 2311

  
......
2354 2320
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2355 2321

  
2356 2322
  @locking.ssynchronized(_LOCK)
2357
  @_RequireOpenQueue
2358 2323
  def ChangeJobPriority(self, job_id, priority):
2359 2324
    """Changes a job's priority.
2360 2325

  
......
2411 2376

  
2412 2377
    return (success, msg)
2413 2378

  
2414
  @_RequireOpenQueue
2415 2379
  def _ArchiveJobsUnlocked(self, jobs):
2416 2380
    """Archives jobs.
2417 2381

  
......
2451 2415
    return len(archive_jobs)
2452 2416

  
2453 2417
  @locking.ssynchronized(_LOCK)
2454
  @_RequireOpenQueue
2455 2418
  def ArchiveJob(self, job_id):
2456 2419
    """Archives a job.
2457 2420

  
......
2473 2436
    return self._ArchiveJobsUnlocked([job]) == 1
2474 2437

  
2475 2438
  @locking.ssynchronized(_LOCK)
2476
  @_RequireOpenQueue
2477 2439
  def AutoArchiveJobs(self, age, timeout):
2478 2440
    """Archives all jobs based on age.
2479 2441

  
......
2625 2587
    return self._accepting_jobs
2626 2588

  
2627 2589
  @locking.ssynchronized(_LOCK)
2628
  @_RequireOpenQueue
2629 2590
  def Shutdown(self):
2630 2591
    """Stops the job queue.
2631 2592

  
......
2633 2594

  
2634 2595
    """
2635 2596
    self._wpool.TerminateWorkers()
2636

  
2637
    self._queue_filelock.Close()
2638
    self._queue_filelock = None

Also available in: Unified diff