Revision 85f03e0d

b/lib/jqueue.py
50 50
  of the form (timestamp, level, message).
51 51

  
52 52
  """
53
  def __init__(self, op):
54
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
53
  def __new__(cls, *args, **kwargs):
54
    obj = object.__new__(cls, *args, **kwargs)
55
    # Create a special lock for logging
56
    obj._log_lock = threading.Lock()
57
    return obj
55 58

  
56
  def __Setup(self, input_, status, result, log):
57
    self._lock = threading.Lock()
58
    self.input = input_
59
    self.status = status
60
    self.result = result
61
    self.log = log
59
  def __init__(self, op):
60
    self.input = op
61
    self.status = constants.OP_STATUS_QUEUED
62
    self.result = None
63
    self.log = []
62 64

  
63 65
  @classmethod
64 66
  def Restore(cls, state):
65
    obj = object.__new__(cls)
66
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67
                state["status"], state["result"], state["log"])
67
    obj = _QueuedOpCode.__new__(cls)
68
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69
    obj.status = state["status"]
70
    obj.result = state["result"]
71
    obj.log = state["log"]
68 72
    return obj
69 73

  
70
  @utils.LockedMethod
71 74
  def Serialize(self):
72
    return {
73
      "input": self.input.__getstate__(),
74
      "status": self.status,
75
      "result": self.result,
76
      "log": self.log,
77
      }
78

  
79
  @utils.LockedMethod
80
  def GetInput(self):
81
    """Returns the original opcode.
82

  
83
    """
84
    return self.input
85

  
86
  @utils.LockedMethod
87
  def SetStatus(self, status, result):
88
    """Update the opcode status and result.
89

  
90
    """
91
    self.status = status
92
    self.result = result
93

  
94
  @utils.LockedMethod
95
  def GetStatus(self):
96
    """Get the opcode status.
97

  
98
    """
99
    return self.status
100

  
101
  @utils.LockedMethod
102
  def GetResult(self):
103
    """Get the opcode result.
104

  
105
    """
106
    return self.result
75
    self._log_lock.acquire()
76
    try:
77
      return {
78
        "input": self.input.__getstate__(),
79
        "status": self.status,
80
        "result": self.result,
81
        "log": self.log,
82
        }
83
    finally:
84
      self._log_lock.release()
107 85

  
108
  @utils.LockedMethod
109 86
  def Log(self, *args):
110 87
    """Append a log entry.
111 88

  
112 89
    """
113
    assert len(args) < 2
90
    assert len(args) < 3
114 91

  
115 92
    if len(args) == 1:
116 93
      log_type = constants.ELOG_MESSAGE
117 94
      log_msg = args[0]
118 95
    else:
119 96
      log_type, log_msg = args
120
    self.log.append((time.time(), log_type, log_msg))
121 97

  
122
  @utils.LockedMethod
98
    self._log_lock.acquire()
99
    try:
100
      self.log.append((time.time(), log_type, log_msg))
101
    finally:
102
      self._log_lock.release()
103

  
123 104
  def RetrieveLog(self, start_at=0):
124 105
    """Retrieve (a part of) the execution log.
125 106

  
126 107
    """
127
    return self.log[start_at:]
108
    self._log_lock.acquire()
109
    try:
110
      return self.log[start_at:]
111
    finally:
112
      self._log_lock.release()
128 113

  
129 114

  
130 115
class _QueuedJob(object):
......
133 118
  This is what we use to track the user-submitted jobs.
134 119

  
135 120
  """
136
  def __init__(self, storage, job_id, ops):
121
  def __init__(self, queue, job_id, ops):
137 122
    if not ops:
138 123
      # TODO
139 124
      raise Exception("No opcodes")
140 125

  
141
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142

  
143
  def __Setup(self, storage, job_id, ops, run_op_index):
144
    self._lock = threading.Lock()
145
    self.storage = storage
126
    self.queue = queue
146 127
    self.id = job_id
147
    self._ops = ops
148
    self.run_op_index = run_op_index
128
    self.ops = [_QueuedOpCode(op) for op in ops]
129
    self.run_op_index = -1
149 130

  
150 131
  @classmethod
151
  def Restore(cls, storage, state):
152
    obj = object.__new__(cls)
153
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
132
  def Restore(cls, queue, state):
133
    obj = _QueuedJob.__new__(cls)
134
    obj.queue = queue
135
    obj.id = state["id"]
136
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137
    obj.run_op_index = state["run_op_index"]
155 138
    return obj
156 139

  
157 140
  def Serialize(self):
158 141
    return {
159 142
      "id": self.id,
160
      "ops": [op.Serialize() for op in self._ops],
143
      "ops": [op.Serialize() for op in self.ops],
161 144
      "run_op_index": self.run_op_index,
162 145
      }
163 146

  
164
  def _SetStatus(self, status, msg):
165
    try:
166
      for op in self._ops:
167
        op.SetStatus(status, msg)
168
    finally:
169
      self.storage.UpdateJob(self)
170

  
171
  def SetUnclean(self, msg):
172
    return self._SetStatus(constants.OP_STATUS_ERROR, msg)
173

  
174
  def SetCanceled(self, msg):
175
    return self._SetStatus(constants.JOB_STATUS_CANCELED, msg)
176

  
177
  def GetStatus(self):
147
  def CalcStatus(self):
178 148
    status = constants.JOB_STATUS_QUEUED
179 149

  
180 150
    all_success = True
181
    for op in self._ops:
182
      op_status = op.GetStatus()
183
      if op_status == constants.OP_STATUS_SUCCESS:
151
    for op in self.ops:
152
      if op.status == constants.OP_STATUS_SUCCESS:
184 153
        continue
185 154

  
186 155
      all_success = False
187 156

  
188
      if op_status == constants.OP_STATUS_QUEUED:
157
      if op.status == constants.OP_STATUS_QUEUED:
189 158
        pass
190
      elif op_status == constants.OP_STATUS_RUNNING:
159
      elif op.status == constants.OP_STATUS_RUNNING:
191 160
        status = constants.JOB_STATUS_RUNNING
192
      elif op_status == constants.OP_STATUS_ERROR:
161
      elif op.status == constants.OP_STATUS_ERROR:
193 162
        status = constants.JOB_STATUS_ERROR
194 163
        # The whole job fails if one opcode failed
195 164
        break
196
      elif op_status == constants.OP_STATUS_CANCELED:
165
      elif op.status == constants.OP_STATUS_CANCELED:
197 166
        status = constants.OP_STATUS_CANCELED
198 167
        break
199 168

  
......
202 171

  
203 172
    return status
204 173

  
205
  @utils.LockedMethod
206
  def GetRunOpIndex(self):
207
    return self.run_op_index
208 174

  
209
  def Run(self, proc):
175
class _JobQueueWorker(workerpool.BaseWorker):
176
  def RunTask(self, job):
210 177
    """Job executor.
211 178

  
212
    This functions processes a this job in the context of given processor
213
    instance.
214

  
215
    Args:
216
    - proc: Ganeti Processor to run the job with
179
    This functions processes a job.
217 180

  
218 181
    """
219
    try:
220
      count = len(self._ops)
221
      for idx, op in enumerate(self._ops):
222
        try:
223
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
224

  
225
          self._lock.acquire()
226
          try:
227
            self.run_op_index = idx
228
          finally:
229
            self._lock.release()
230

  
231
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
232
          self.storage.UpdateJob(self)
233

  
234
          result = proc.ExecOpCode(op.input, op.Log)
235

  
236
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
237
          self.storage.UpdateJob(self)
238
          logging.debug("Op %s/%s: Successfully finished %s",
239
                        idx + 1, count, op)
240
        except Exception, err:
241
          try:
242
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
243
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
244
          finally:
245
            self.storage.UpdateJob(self)
246
          raise
247

  
248
    except errors.GenericError, err:
249
      logging.exception("Ganeti exception")
250
    except:
251
      logging.exception("Unhandled exception")
252

  
253

  
254
class _JobQueueWorker(workerpool.BaseWorker):
255
  def RunTask(self, job):
256 182
    logging.debug("Worker %s processing job %s",
257 183
                  self.worker_id, job.id)
258
    # TODO: feedback function
259 184
    proc = mcpu.Processor(self.pool.context)
185
    queue = job.queue
260 186
    try:
261
      job.Run(proc)
187
      try:
188
        count = len(job.ops)
189
        for idx, op in enumerate(job.ops):
190
          try:
191
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
192

  
193
            queue.acquire()
194
            try:
195
              job.run_op_index = idx
196
              op.status = constants.OP_STATUS_RUNNING
197
              op.result = None
198
              queue.UpdateJobUnlocked(job)
199

  
200
              input = op.input
201
            finally:
202
              queue.release()
203

  
204
            result = proc.ExecOpCode(input, op.Log)
205

  
206
            queue.acquire()
207
            try:
208
              op.status = constants.OP_STATUS_SUCCESS
209
              op.result = result
210
              queue.UpdateJobUnlocked(job)
211
            finally:
212
              queue.release()
213

  
214
            logging.debug("Op %s/%s: Successfully finished %s",
215
                          idx + 1, count, op)
216
          except Exception, err:
217
            queue.acquire()
218
            try:
219
              try:
220
                op.status = constants.OP_STATUS_ERROR
221
                op.result = str(err)
222
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
223
              finally:
224
                queue.UpdateJobUnlocked(job)
225
            finally:
226
              queue.release()
227
            raise
228

  
229
      except errors.GenericError, err:
230
        logging.exception("Ganeti exception")
231
      except:
232
        logging.exception("Unhandled exception")
262 233
    finally:
234
      queue.acquire()
235
      try:
236
        job_id = job.id
237
        status = job.CalcStatus()
238
      finally:
239
        queue.release()
263 240
      logging.debug("Worker %s finished job %s, status = %s",
264
                    self.worker_id, job.id, job.GetStatus())
241
                    self.worker_id, job_id, status)
265 242

  
266 243

  
267 244
class _JobQueueWorkerPool(workerpool.WorkerPool):
......
271 248
    self.context = context
272 249

  
273 250

  
274
class JobStorageBase(object):
275
  def __init__(self, id_prefix):
276
    self.id_prefix = id_prefix
277

  
278
    if id_prefix:
279
      prefix_pattern = re.escape("%s-" % id_prefix)
280
    else:
281
      prefix_pattern = ""
282

  
283
    # Apart from the prefix, all job IDs are numeric
284
    self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
285

  
286
  def OwnsJobId(self, job_id):
287
    return self._re_job_id.match(job_id)
288

  
289
  def FormatJobID(self, job_id):
290
    if not isinstance(job_id, (int, long)):
291
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
292
    if job_id < 0:
293
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
294

  
295
    if self.id_prefix:
296
      prefix = "%s-" % self.id_prefix
297
    else:
298
      prefix = ""
299

  
300
    return "%s%010d" % (prefix, job_id)
301

  
302
  def _ShouldJobBeArchivedUnlocked(self, job):
303
    if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
304
                               constants.JOB_STATUS_SUCCESS,
305
                               constants.JOB_STATUS_ERROR):
306
      logging.debug("Job %s is not yet done", job.id)
307
      return False
308
    return True
309

  
310

  
311
class DiskJobStorage(JobStorageBase):
251
class JobQueue(object):
312 252
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
313 253

  
314
  def __init__(self, id_prefix):
315
    JobStorageBase.__init__(self, id_prefix)
316

  
317
    self._lock = threading.Lock()
254
  def __init__(self, context):
318 255
    self._memcache = {}
319 256
    self._my_hostname = utils.HostInfo().name
320 257

  
258
    # Locking
259
    self._lock = threading.Lock()
260
    self.acquire = self._lock.acquire
261
    self.release = self._lock.release
262

  
321 263
    # Make sure our directories exists
322 264
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
323 265
      try:
......
363 305
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
364 306
                                      " file")
365 307

  
308
    # Setup worker pool
309
    self._wpool = _JobQueueWorkerPool(context)
310

  
311
    # We need to lock here because WorkerPool.AddTask() may start a job while
312
    # we're still doing our work.
313
    self.acquire()
314
    try:
315
      for job in self._GetJobsUnlocked(None):
316
        status = job.CalcStatus()
317

  
318
        if status in (constants.JOB_STATUS_QUEUED, ):
319
          self._wpool.AddTask(job)
320

  
321
        elif status in (constants.JOB_STATUS_RUNNING, ):
322
          logging.warning("Unfinished job %s found: %s", job.id, job)
323
          try:
324
            for op in job.ops:
325
              op.status = constants.OP_STATUS_ERROR
326
              op.result = "Unclean master daemon shutdown"
327
          finally:
328
            self.UpdateJobUnlocked(job)
329
    finally:
330
      self.release()
331

  
366 332
  @staticmethod
367 333
  def _ReadSerial():
368 334
    """Try to read the job serial file.
......
384 350

  
385 351
    return serial
386 352

  
387
  def Close(self):
388
    assert self.lock_fd, "Queue should be open"
389

  
390
    self.lock_fd.close()
391
    self.lock_fd = None
392

  
393 353
  def _InitQueueUnlocked(self):
394 354
    assert self.lock_fd, "Queue should be open"
395 355

  
......
399 359
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
400 360
                      data="%s\n" % 0)
401 361

  
362
  def _FormatJobID(self, job_id):
363
    if not isinstance(job_id, (int, long)):
364
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
365
    if job_id < 0:
366
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
367

  
368
    return str(job_id)
369

  
402 370
  def _NewSerialUnlocked(self, nodes):
403 371
    """Generates a new job identifier.
404 372

  
......
430 398
      if not result[node]:
431 399
        logging.error("copy of job queue file to node %s failed", node)
432 400

  
433
    return self.FormatJobID(serial)
401
    return self._FormatJobID(serial)
434 402

  
435
  def _GetJobPath(self, job_id):
403
  @staticmethod
404
  def _GetJobPath(job_id):
436 405
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
437 406

  
438
  def _GetArchivedJobPath(self, job_id):
407
  @staticmethod
408
  def _GetArchivedJobPath(job_id):
439 409
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
440 410

  
441
  def _ExtractJobID(self, name):
442
    m = self._RE_JOB_FILE.match(name)
411
  @classmethod
412
  def _ExtractJobID(cls, name):
413
    m = cls._RE_JOB_FILE.match(name)
443 414
    if m:
444 415
      return m.group(1)
445 416
    else:
......
498 469
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
499 470

  
500 471
  @utils.LockedMethod
501
  def GetJobs(self, job_ids):
502
    return self._GetJobsUnlocked(job_ids)
472
  def SubmitJob(self, ops, nodes):
473
    """Create and store a new job.
503 474

  
504
  @utils.LockedMethod
505
  def AddJob(self, ops, nodes):
506
    """Create and store on disk a new job.
475
    This enters the job into our job queue and also puts it on the new
476
    queue, in order for it to be picked up by the queue processors.
507 477

  
508 478
    @type ops: list
509 479
    @param ops: The list of OpCodes that will become the new job.
......
519 489
    job = _QueuedJob(self, job_id, ops)
520 490

  
521 491
    # Write to disk
522
    self._UpdateJobUnlocked(job)
492
    self.UpdateJobUnlocked(job)
523 493

  
524 494
    logging.debug("Added new job %s to the cache", job_id)
525 495
    self._memcache[job_id] = job
526 496

  
527
    return job
497
    # Add to worker pool
498
    self._wpool.AddTask(job)
499

  
500
    return job.id
528 501

  
529
  def _UpdateJobUnlocked(self, job):
502
  def UpdateJobUnlocked(self, job):
530 503
    assert self.lock_fd, "Queue should be open"
531 504

  
532 505
    filename = self._GetJobPath(job.id)
......
543 516

  
544 517
    """
545 518
    assert isinstance(exclude, list)
519

  
546 520
    for job in self._memcache.values():
547 521
      if job.id in exclude:
548 522
        continue
549
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
550
                                 constants.JOB_STATUS_RUNNING):
523
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
524
                                  constants.JOB_STATUS_RUNNING):
551 525
        logging.debug("Cleaning job %s from the cache", job.id)
552 526
        try:
553 527
          del self._memcache[job.id]
......
555 529
          pass
556 530

  
557 531
  @utils.LockedMethod
558
  def UpdateJob(self, job):
559
    return self._UpdateJobUnlocked(job)
560

  
561
  # TODO: Figure out locking
562
  #@utils.LockedMethod
563 532
  def CancelJob(self, job_id):
564 533
    """Cancels a job.
565 534

  
......
569 538
    """
570 539
    logging.debug("Cancelling job %s", job_id)
571 540

  
572
    self._lock.acquire()
573
    try:
574
      job = self._LoadJobUnlocked(job_id)
575
    finally:
576
      self._lock.release()
541
    job = self._LoadJobUnlocked(job_id)
577 542
    if not job:
578 543
      logging.debug("Job %s not found", job_id)
579 544
      return
580 545

  
581
    if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,):
546
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
582 547
      logging.debug("Job %s is no longer in the queue", job.id)
583 548
      return
584 549

  
585
    job.SetCanceled("Job cancelled by request")
550
    try:
551
      for op in job.ops:
552
        op.status = constants.OP_STATUS_ERROR
553
        op.result = "Job cancelled by request"
554
    finally:
555
      self.UpdateJobUnlocked(job)
586 556

  
587 557
  @utils.LockedMethod
588 558
  def ArchiveJob(self, job_id):
......
599 569
      logging.debug("Job %s not found", job_id)
600 570
      return
601 571

  
602
    if not self._ShouldJobBeArchivedUnlocked(job):
572
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
573
                                constants.JOB_STATUS_SUCCESS,
574
                                constants.JOB_STATUS_ERROR):
575
      logging.debug("Job %s is not yet done", job.id)
603 576
      return
604 577

  
605 578
    try:
......
614 587
      # and to be on the safe side.
615 588
      self._CleanCacheUnlocked([])
616 589

  
617

  
618
class JobQueue:
619
  """The job queue.
620

  
621
  """
622
  def __init__(self, context):
623
    self._lock = threading.Lock()
624
    self._jobs = DiskJobStorage("")
625
    self._wpool = _JobQueueWorkerPool(context)
626

  
627
    for job in self._jobs.GetJobs(None):
628
      status = job.GetStatus()
629
      if status in (constants.JOB_STATUS_QUEUED, ):
630
        self._wpool.AddTask(job)
631

  
632
      elif status in (constants.JOB_STATUS_RUNNING, ):
633
        logging.warning("Unfinished job %s found: %s", job.id, job)
634
        job.SetUnclean("Unclean master daemon shutdown")
635

  
636
  @utils.LockedMethod
637
  def SubmitJob(self, ops, nodes):
638
    """Add a new job to the queue.
639

  
640
    This enters the job into our job queue and also puts it on the new
641
    queue, in order for it to be picked up by the queue processors.
642

  
643
    @type ops: list
644
    @param ops: the sequence of opcodes that will become the new job
645
    @type nodes: list
646
    @param nodes: the list of nodes to which the queue should be
647
                  distributed
648

  
649
    """
650
    job = self._jobs.AddJob(ops, nodes)
651

  
652
    # Add to worker pool
653
    self._wpool.AddTask(job)
654

  
655
    return job.id
656

  
657
  def ArchiveJob(self, job_id):
658
    self._jobs.ArchiveJob(job_id)
659

  
660
  @utils.LockedMethod
661
  def CancelJob(self, job_id):
662
    self._jobs.CancelJob(job_id)
663

  
664
  def _GetJobInfo(self, job, fields):
590
  def _GetJobInfoUnlocked(self, job, fields):
665 591
    row = []
666 592
    for fname in fields:
667 593
      if fname == "id":
668 594
        row.append(job.id)
669 595
      elif fname == "status":
670
        row.append(job.GetStatus())
596
        row.append(job.CalcStatus())
671 597
      elif fname == "ops":
672
        row.append([op.GetInput().__getstate__() for op in job._ops])
598
        row.append([op.input.__getstate__() for op in job.ops])
673 599
      elif fname == "opresult":
674
        row.append([op.GetResult() for op in job._ops])
600
        row.append([op.result for op in job.ops])
675 601
      elif fname == "opstatus":
676
        row.append([op.GetStatus() for op in job._ops])
602
        row.append([op.status for op in job.ops])
677 603
      elif fname == "ticker":
678
        ji = job.GetRunOpIndex()
604
        ji = job.run_op_index
679 605
        if ji < 0:
680 606
          lmsg = None
681 607
        else:
682
          lmsg = job._ops[ji].RetrieveLog(-1)
608
          lmsg = job.ops[ji].RetrieveLog(-1)
683 609
          # message might be empty here
684 610
          if lmsg:
685 611
            lmsg = lmsg[0]
......
690 616
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
691 617
    return row
692 618

  
619
  @utils.LockedMethod
693 620
  def QueryJobs(self, job_ids, fields):
694 621
    """Returns a list of jobs in queue.
695 622

  
......
698 625
    - fields: Names of fields to return
699 626

  
700 627
    """
701
    self._lock.acquire()
702
    try:
703
      jobs = []
628
    jobs = []
704 629

  
705
      for job in self._jobs.GetJobs(job_ids):
706
        if job is None:
707
          jobs.append(None)
708
        else:
709
          jobs.append(self._GetJobInfo(job, fields))
630
    for job in self._GetJobsUnlocked(job_ids):
631
      if job is None:
632
        jobs.append(None)
633
      else:
634
        jobs.append(self._GetJobInfoUnlocked(job, fields))
710 635

  
711
      return jobs
712
    finally:
713
      self._lock.release()
636
    return jobs
714 637

  
715 638
  @utils.LockedMethod
716 639
  def Shutdown(self):
717 640
    """Stops the job queue.
718 641

  
719 642
    """
643
    assert self.lock_fd, "Queue should be open"
644

  
720 645
    self._wpool.TerminateWorkers()
721
    self._jobs.Close()
646

  
647
    self.lock_fd.close()
648
    self.lock_fd = None

Also available in: Unified diff