Revision b95479a5 lib/jqueue.py

b/lib/jqueue.py
1 1
#
2 2
#
3 3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 5
#
6 6
# This program is free software; you can redistribute it and/or modify
7 7
# it under the terms of the GNU General Public License as published by
......
34 34
import re
35 35
import time
36 36
import weakref
37
import threading
37 38

  
38 39
try:
39 40
  # pylint: disable-msg=E0611
......
55 56
from ganeti import runtime
56 57
from ganeti import netutils
57 58
from ganeti import compat
59
from ganeti import ht
58 60

  
59 61

  
60 62
JOBQUEUE_THREADS = 25
......
177 179
  # pylint: disable-msg=W0212
178 180
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179 181
               "received_timestamp", "start_timestamp", "end_timestamp",
180
               "__weakref__"]
182
               "__weakref__", "processor_lock"]
181 183

  
182 184
  def __init__(self, queue, job_id, ops):
183 185
    """Constructor for the _QueuedJob.
......
211 213
    """
212 214
    obj.ops_iter = None
213 215
    obj.cur_opctx = None
216
    obj.processor_lock = threading.Lock()
214 217

  
215 218
  def __repr__(self):
216 219
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
......
804 807
    self.log_prefix = log_prefix
805 808
    self.summary = op.input.Summary()
806 809

  
810
    # Create local copy to modify
811
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
812
      self.jobdeps = op.input.depends[:]
813
    else:
814
      self.jobdeps = None
815

  
807 816
    self._timeout_strategy_factory = timeout_strategy_factory
808 817
    self._ResetTimeoutStrategy()
809 818

  
......
927 936

  
928 937
    return update
929 938

  
939
  @staticmethod
940
  def _CheckDependencies(queue, job, opctx):
941
    """Checks if an opcode has dependencies and if so, processes them.
942

  
943
    @type queue: L{JobQueue}
944
    @param queue: Queue object
945
    @type job: L{_QueuedJob}
946
    @param job: Job object
947
    @type opctx: L{_OpExecContext}
948
    @param opctx: Opcode execution context
949
    @rtype: bool
950
    @return: Whether opcode will be re-scheduled by dependency tracker
951

  
952
    """
953
    op = opctx.op
954

  
955
    result = False
956

  
957
    while opctx.jobdeps:
958
      (dep_job_id, dep_status) = opctx.jobdeps[0]
959

  
960
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
961
                                                          dep_status)
962
      assert ht.TNonEmptyString(depmsg), "No dependency message"
963

  
964
      logging.info("%s: %s", opctx.log_prefix, depmsg)
965

  
966
      if depresult == _JobDependencyManager.CONTINUE:
967
        # Remove dependency and continue
968
        opctx.jobdeps.pop(0)
969

  
970
      elif depresult == _JobDependencyManager.WAIT:
971
        # Need to wait for notification, dependency tracker will re-add job
972
        # to workerpool
973
        result = True
974
        break
975

  
976
      elif depresult == _JobDependencyManager.CANCEL:
977
        # Job was cancelled, cancel this job as well
978
        job.Cancel()
979
        assert op.status == constants.OP_STATUS_CANCELING
980
        break
981

  
982
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
983
                         _JobDependencyManager.ERROR):
984
        # Job failed or there was an error, this job must fail
985
        op.status = constants.OP_STATUS_ERROR
986
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
987
        break
988

  
989
      else:
990
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
991
                                     depresult)
992

  
993
    return result
994

  
930 995
  def _ExecOpCodeUnlocked(self, opctx):
931 996
    """Processes one opcode and returns the result.
932 997

  
......
1013 1078
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1014 1079
              op.priority >= constants.OP_PRIO_HIGHEST)
1015 1080

  
1081
      waitjob = None
1082

  
1016 1083
      if op.status != constants.OP_STATUS_CANCELING:
1017 1084
        assert op.status in (constants.OP_STATUS_QUEUED,
1018 1085
                             constants.OP_STATUS_WAITLOCK)
......
1025 1092
        assert op.status == constants.OP_STATUS_WAITLOCK
1026 1093
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1027 1094
        assert job.start_timestamp and op.start_timestamp
1095
        assert waitjob is None
1096

  
1097
        # Check if waiting for a job is necessary
1098
        waitjob = self._CheckDependencies(queue, job, opctx)
1028 1099

  
1029
        logging.info("%s: opcode %s waiting for locks",
1030
                     opctx.log_prefix, opctx.summary)
1100
        assert op.status in (constants.OP_STATUS_WAITLOCK,
1101
                             constants.OP_STATUS_CANCELING,
1102
                             constants.OP_STATUS_ERROR)
1031 1103

  
1032
        queue.release()
1033
        try:
1034
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1035
        finally:
1036
          queue.acquire(shared=1)
1104
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1105
                                         constants.OP_STATUS_ERROR)):
1106
          logging.info("%s: opcode %s waiting for locks",
1107
                       opctx.log_prefix, opctx.summary)
1037 1108

  
1038
        op.status = op_status
1039
        op.result = op_result
1109
          assert not opctx.jobdeps, "Not all dependencies were removed"
1110

  
1111
          queue.release()
1112
          try:
1113
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1114
          finally:
1115
            queue.acquire(shared=1)
1116

  
1117
          op.status = op_status
1118
          op.result = op_result
1119

  
1120
          assert not waitjob
1040 1121

  
1041 1122
        if op.status == constants.OP_STATUS_WAITLOCK:
1042 1123
          # Couldn't get locks in time
......
1051 1132
          else:
1052 1133
            assert op.status in constants.OPS_FINALIZED
1053 1134

  
1054
      if op.status == constants.OP_STATUS_WAITLOCK:
1135
      if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
1055 1136
        finalize = False
1056 1137

  
1057
        if opctx.CheckPriorityIncrease():
1138
        if not waitjob and opctx.CheckPriorityIncrease():
1058 1139
          # Priority was changed, need to update on-disk file
1059 1140
          queue.UpdateJobUnlocked(job)
1060 1141

  
......
1113 1194
        # allowed. Once the file has been written, it can be archived anytime.
1114 1195
        queue.UpdateJobUnlocked(job)
1115 1196

  
1197
        assert not waitjob
1198

  
1116 1199
        if finalize:
1117 1200
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1201
          # TODO: Check locking
1202
          queue.depmgr.NotifyWaiters(job.id)
1118 1203
          return True
1119 1204

  
1120
      return False
1205
      assert not waitjob or queue.depmgr.JobWaiting(job)
1206

  
1207
      return bool(waitjob)
1121 1208
    finally:
1122 1209
      queue.release()
1123 1210

  
......
1129 1216
  def RunTask(self, job): # pylint: disable-msg=W0221
1130 1217
    """Job executor.
1131 1218

  
1132
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1133
    L{_QueuedOpCode} classes.
1134

  
1135 1219
    @type job: L{_QueuedJob}
1136 1220
    @param job: the job to be processed
1137 1221

  
1138 1222
    """
1223
    # Ensure only one worker is active on a single job. If a job registers for
1224
    # a dependency job, and the other job notifies before the first worker is
1225
    # done, the job can end up in the tasklist more than once.
1226
    job.processor_lock.acquire()
1227
    try:
1228
      return self._RunTaskInner(job)
1229
    finally:
1230
      job.processor_lock.release()
1231

  
1232
  def _RunTaskInner(self, job):
1233
    """Executes a job.
1234

  
1235
    Must be called with per-job lock acquired.
1236

  
1237
    """
1139 1238
    queue = job.queue
1140 1239
    assert queue == self.pool.queue
1141 1240

  
......
1194 1293
    self.queue = queue
1195 1294

  
1196 1295

  
1296
class _JobDependencyManager:
1297
  """Keeps track of job dependencies.
1298

  
1299
  """
1300
  (WAIT,
1301
   ERROR,
1302
   CANCEL,
1303
   CONTINUE,
1304
   WRONGSTATUS) = range(1, 6)
1305

  
1306
  # TODO: Export waiter information to lock monitor
1307

  
1308
  def __init__(self, getstatus_fn, enqueue_fn):
1309
    """Initializes this class.
1310

  
1311
    """
1312
    self._getstatus_fn = getstatus_fn
1313
    self._enqueue_fn = enqueue_fn
1314

  
1315
    self._waiters = {}
1316
    self._lock = locking.SharedLock("JobDepMgr")
1317

  
1318
  @locking.ssynchronized(_LOCK, shared=1)
1319
  def JobWaiting(self, job):
1320
    """Checks if a job is waiting.
1321

  
1322
    """
1323
    return compat.any(job in jobs
1324
                      for jobs in self._waiters.values())
1325

  
1326
  @locking.ssynchronized(_LOCK)
1327
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1328
    """Checks if a dependency job has the requested status.
1329

  
1330
    If the other job is not yet in a finalized status, the calling job will be
1331
    notified (re-added to the workerpool) at a later point.
1332

  
1333
    @type job: L{_QueuedJob}
1334
    @param job: Job object
1335
    @type dep_job_id: string
1336
    @param dep_job_id: ID of dependency job
1337
    @type dep_status: list
1338
    @param dep_status: Required status
1339

  
1340
    """
1341
    assert ht.TString(job.id)
1342
    assert ht.TString(dep_job_id)
1343
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1344

  
1345
    if job.id == dep_job_id:
1346
      return (self.ERROR, "Job can't depend on itself")
1347

  
1348
    # Get status of dependency job
1349
    try:
1350
      status = self._getstatus_fn(dep_job_id)
1351
    except errors.JobLost, err:
1352
      return (self.ERROR, "Dependency error: %s" % err)
1353

  
1354
    assert status in constants.JOB_STATUS_ALL
1355

  
1356
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1357

  
1358
    if status not in constants.JOBS_FINALIZED:
1359
      # Register for notification and wait for job to finish
1360
      job_id_waiters.add(job)
1361
      return (self.WAIT,
1362
              "Need to wait for job %s, wanted status '%s'" %
1363
              (dep_job_id, dep_status))
1364

  
1365
    # Remove from waiters list
1366
    if job in job_id_waiters:
1367
      job_id_waiters.remove(job)
1368

  
1369
    if (status == constants.JOB_STATUS_CANCELED and
1370
        constants.JOB_STATUS_CANCELED not in dep_status):
1371
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1372

  
1373
    elif not dep_status or status in dep_status:
1374
      return (self.CONTINUE,
1375
              "Dependency job %s finished with status '%s'" %
1376
              (dep_job_id, status))
1377

  
1378
    else:
1379
      return (self.WRONGSTATUS,
1380
              "Dependency job %s finished with status '%s',"
1381
              " not one of '%s' as required" %
1382
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1383

  
1384
  @locking.ssynchronized(_LOCK)
1385
  def NotifyWaiters(self, job_id):
1386
    """Notifies all jobs waiting for a certain job ID.
1387

  
1388
    @type job_id: string
1389
    @param job_id: Job ID
1390

  
1391
    """
1392
    assert ht.TString(job_id)
1393

  
1394
    jobs = self._waiters.pop(job_id, None)
1395
    if jobs:
1396
      # Re-add jobs to workerpool
1397
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1398
                    len(jobs), job_id)
1399
      self._enqueue_fn(jobs)
1400

  
1401
    # Remove all jobs without actual waiters
1402
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1403
                   if not waiters]:
1404
      del self._waiters[job_id]
1405

  
1406

  
1197 1407
def _RequireOpenQueue(fn):
1198 1408
  """Decorator for "public" functions.
1199 1409

  
......
1277 1487
    self._UpdateQueueSizeUnlocked()
1278 1488
    self._drained = jstore.CheckDrainFlag()
1279 1489

  
1490
    # Job dependencies
1491
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1492
                                        self._EnqueueJobs)
1493

  
1280 1494
    # Setup worker pool
1281 1495
    self._wpool = _JobQueueWorkerPool(self)
1282 1496
    try:
......
1808 2022
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1809 2023
                             priority=[job.CalcPriority() for job in jobs])
1810 2024

  
2025
  def _GetJobStatusForDependencies(self, job_id):
2026
    """Gets the status of a job for dependencies.
2027

  
2028
    @type job_id: string
2029
    @param job_id: Job ID
2030
    @raise errors.JobLost: If job can't be found
2031

  
2032
    """
2033
    if not isinstance(job_id, basestring):
2034
      job_id = self._FormatJobID(job_id)
2035

  
2036
    # Not using in-memory cache as doing so would require an exclusive lock
2037

  
2038
    # Try to load from disk
2039
    job = self.SafeLoadJobFromDisk(job_id, True)
2040

  
2041
    if job:
2042
      return job.CalcStatus()
2043

  
2044
    raise errors.JobLost("Job %s not found" % job_id)
2045

  
1811 2046
  @_RequireOpenQueue
1812 2047
  def UpdateJobUnlocked(self, job, replicate=True):
1813 2048
    """Update a job's on disk storage.

Also available in: Unified diff