Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fcb21ad7

History | View | Annotate | Download (71.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37
import threading
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60

    
61

    
62
JOBQUEUE_THREADS = 25
63
JOBS_PER_ARCHIVE_DIRECTORY = 10000
64

    
65
# member lock names to be passed to @ssynchronized decorator
66
_LOCK = "_lock"
67
_QUEUE = "_queue"
68

    
69

    
70
class CancelJob(Exception):
71
  """Special exception to cancel a job.
72

73
  """
74

    
75

    
76
def TimeStampNow():
77
  """Returns the current timestamp.
78

79
  @rtype: tuple
80
  @return: the current time in the (seconds, microseconds) format
81

82
  """
83
  return utils.SplitTime(time.time())
84

    
85

    
86
class _QueuedOpCode(object):
87
  """Encapsulates an opcode object.
88

89
  @ivar log: holds the execution log and consists of tuples
90
  of the form C{(log_serial, timestamp, level, message)}
91
  @ivar input: the OpCode we encapsulate
92
  @ivar status: the current status
93
  @ivar result: the result of the LU execution
94
  @ivar start_timestamp: timestamp for the start of the execution
95
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96
  @ivar stop_timestamp: timestamp for the end of the execution
97

98
  """
99
  __slots__ = ["input", "status", "result", "log", "priority",
100
               "start_timestamp", "exec_timestamp", "end_timestamp",
101
               "__weakref__"]
102

    
103
  def __init__(self, op):
104
    """Constructor for the _QuededOpCode.
105

106
    @type op: L{opcodes.OpCode}
107
    @param op: the opcode we encapsulate
108

109
    """
110
    self.input = op
111
    self.status = constants.OP_STATUS_QUEUED
112
    self.result = None
113
    self.log = []
114
    self.start_timestamp = None
115
    self.exec_timestamp = None
116
    self.end_timestamp = None
117

    
118
    # Get initial priority (it might change during the lifetime of this opcode)
119
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120

    
121
  @classmethod
122
  def Restore(cls, state):
123
    """Restore the _QueuedOpCode from the serialized form.
124

125
    @type state: dict
126
    @param state: the serialized state
127
    @rtype: _QueuedOpCode
128
    @return: a new _QueuedOpCode instance
129

130
    """
131
    obj = _QueuedOpCode.__new__(cls)
132
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133
    obj.status = state["status"]
134
    obj.result = state["result"]
135
    obj.log = state["log"]
136
    obj.start_timestamp = state.get("start_timestamp", None)
137
    obj.exec_timestamp = state.get("exec_timestamp", None)
138
    obj.end_timestamp = state.get("end_timestamp", None)
139
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140
    return obj
141

    
142
  def Serialize(self):
143
    """Serializes this _QueuedOpCode.
144

145
    @rtype: dict
146
    @return: the dictionary holding the serialized state
147

148
    """
149
    return {
150
      "input": self.input.__getstate__(),
151
      "status": self.status,
152
      "result": self.result,
153
      "log": self.log,
154
      "start_timestamp": self.start_timestamp,
155
      "exec_timestamp": self.exec_timestamp,
156
      "end_timestamp": self.end_timestamp,
157
      "priority": self.priority,
158
      }
159

    
160

    
161
class _QueuedJob(object):
162
  """In-memory job representation.
163

164
  This is what we use to track the user-submitted jobs. Locking must
165
  be taken care of by users of this class.
166

167
  @type queue: L{JobQueue}
168
  @ivar queue: the parent queue
169
  @ivar id: the job ID
170
  @type ops: list
171
  @ivar ops: the list of _QueuedOpCode that constitute the job
172
  @type log_serial: int
173
  @ivar log_serial: holds the index for the next log entry
174
  @ivar received_timestamp: the timestamp for when the job was received
175
  @ivar start_timestmap: the timestamp for start of execution
176
  @ivar end_timestamp: the timestamp for end of execution
177
  @ivar writable: Whether the job is allowed to be modified
178

179
  """
180
  # pylint: disable-msg=W0212
181
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182
               "received_timestamp", "start_timestamp", "end_timestamp",
183
               "__weakref__", "processor_lock", "writable"]
184

    
185
  def __init__(self, queue, job_id, ops, writable):
186
    """Constructor for the _QueuedJob.
187

188
    @type queue: L{JobQueue}
189
    @param queue: our parent queue
190
    @type job_id: job_id
191
    @param job_id: our job id
192
    @type ops: list
193
    @param ops: the list of opcodes we hold, which will be encapsulated
194
        in _QueuedOpCodes
195
    @type writable: bool
196
    @param writable: Whether job can be modified
197

198
    """
199
    if not ops:
200
      raise errors.GenericError("A job needs at least one opcode")
201

    
202
    self.queue = queue
203
    self.id = job_id
204
    self.ops = [_QueuedOpCode(op) for op in ops]
205
    self.log_serial = 0
206
    self.received_timestamp = TimeStampNow()
207
    self.start_timestamp = None
208
    self.end_timestamp = None
209

    
210
    self._InitInMemory(self, writable)
211

    
212
  @staticmethod
213
  def _InitInMemory(obj, writable):
214
    """Initializes in-memory variables.
215

216
    """
217
    obj.writable = writable
218
    obj.ops_iter = None
219
    obj.cur_opctx = None
220

    
221
    # Read-only jobs are not processed and therefore don't need a lock
222
    if writable:
223
      obj.processor_lock = threading.Lock()
224
    else:
225
      obj.processor_lock = None
226

    
227
  def __repr__(self):
228
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
229
              "id=%s" % self.id,
230
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
231

    
232
    return "<%s at %#x>" % (" ".join(status), id(self))
233

    
234
  @classmethod
235
  def Restore(cls, queue, state, writable):
236
    """Restore a _QueuedJob from serialized state:
237

238
    @type queue: L{JobQueue}
239
    @param queue: to which queue the restored job belongs
240
    @type state: dict
241
    @param state: the serialized state
242
    @type writable: bool
243
    @param writable: Whether job can be modified
244
    @rtype: _JobQueue
245
    @return: the restored _JobQueue instance
246

247
    """
248
    obj = _QueuedJob.__new__(cls)
249
    obj.queue = queue
250
    obj.id = state["id"]
251
    obj.received_timestamp = state.get("received_timestamp", None)
252
    obj.start_timestamp = state.get("start_timestamp", None)
253
    obj.end_timestamp = state.get("end_timestamp", None)
254

    
255
    obj.ops = []
256
    obj.log_serial = 0
257
    for op_state in state["ops"]:
258
      op = _QueuedOpCode.Restore(op_state)
259
      for log_entry in op.log:
260
        obj.log_serial = max(obj.log_serial, log_entry[0])
261
      obj.ops.append(op)
262

    
263
    cls._InitInMemory(obj, writable)
264

    
265
    return obj
266

    
267
  def Serialize(self):
268
    """Serialize the _JobQueue instance.
269

270
    @rtype: dict
271
    @return: the serialized state
272

273
    """
274
    return {
275
      "id": self.id,
276
      "ops": [op.Serialize() for op in self.ops],
277
      "start_timestamp": self.start_timestamp,
278
      "end_timestamp": self.end_timestamp,
279
      "received_timestamp": self.received_timestamp,
280
      }
281

    
282
  def CalcStatus(self):
283
    """Compute the status of this job.
284

285
    This function iterates over all the _QueuedOpCodes in the job and
286
    based on their status, computes the job status.
287

288
    The algorithm is:
289
      - if we find a cancelled, or finished with error, the job
290
        status will be the same
291
      - otherwise, the last opcode with the status one of:
292
          - waitlock
293
          - canceling
294
          - running
295

296
        will determine the job status
297

298
      - otherwise, it means either all opcodes are queued, or success,
299
        and the job status will be the same
300

301
    @return: the job status
302

303
    """
304
    status = constants.JOB_STATUS_QUEUED
305

    
306
    all_success = True
307
    for op in self.ops:
308
      if op.status == constants.OP_STATUS_SUCCESS:
309
        continue
310

    
311
      all_success = False
312

    
313
      if op.status == constants.OP_STATUS_QUEUED:
314
        pass
315
      elif op.status == constants.OP_STATUS_WAITING:
316
        status = constants.JOB_STATUS_WAITING
317
      elif op.status == constants.OP_STATUS_RUNNING:
318
        status = constants.JOB_STATUS_RUNNING
319
      elif op.status == constants.OP_STATUS_CANCELING:
320
        status = constants.JOB_STATUS_CANCELING
321
        break
322
      elif op.status == constants.OP_STATUS_ERROR:
323
        status = constants.JOB_STATUS_ERROR
324
        # The whole job fails if one opcode failed
325
        break
326
      elif op.status == constants.OP_STATUS_CANCELED:
327
        status = constants.OP_STATUS_CANCELED
328
        break
329

    
330
    if all_success:
331
      status = constants.JOB_STATUS_SUCCESS
332

    
333
    return status
334

    
335
  def CalcPriority(self):
336
    """Gets the current priority for this job.
337

338
    Only unfinished opcodes are considered. When all are done, the default
339
    priority is used.
340

341
    @rtype: int
342

343
    """
344
    priorities = [op.priority for op in self.ops
345
                  if op.status not in constants.OPS_FINALIZED]
346

    
347
    if not priorities:
348
      # All opcodes are done, assume default priority
349
      return constants.OP_PRIO_DEFAULT
350

    
351
    return min(priorities)
352

    
353
  def GetLogEntries(self, newer_than):
354
    """Selectively returns the log entries.
355

356
    @type newer_than: None or int
357
    @param newer_than: if this is None, return all log entries,
358
        otherwise return only the log entries with serial higher
359
        than this value
360
    @rtype: list
361
    @return: the list of the log entries selected
362

363
    """
364
    if newer_than is None:
365
      serial = -1
366
    else:
367
      serial = newer_than
368

    
369
    entries = []
370
    for op in self.ops:
371
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
372

    
373
    return entries
374

    
375
  def GetInfo(self, fields):
376
    """Returns information about a job.
377

378
    @type fields: list
379
    @param fields: names of fields to return
380
    @rtype: list
381
    @return: list with one element for each field
382
    @raise errors.OpExecError: when an invalid field
383
        has been passed
384

385
    """
386
    row = []
387
    for fname in fields:
388
      if fname == "id":
389
        row.append(self.id)
390
      elif fname == "status":
391
        row.append(self.CalcStatus())
392
      elif fname == "priority":
393
        row.append(self.CalcPriority())
394
      elif fname == "ops":
395
        row.append([op.input.__getstate__() for op in self.ops])
396
      elif fname == "opresult":
397
        row.append([op.result for op in self.ops])
398
      elif fname == "opstatus":
399
        row.append([op.status for op in self.ops])
400
      elif fname == "oplog":
401
        row.append([op.log for op in self.ops])
402
      elif fname == "opstart":
403
        row.append([op.start_timestamp for op in self.ops])
404
      elif fname == "opexec":
405
        row.append([op.exec_timestamp for op in self.ops])
406
      elif fname == "opend":
407
        row.append([op.end_timestamp for op in self.ops])
408
      elif fname == "oppriority":
409
        row.append([op.priority for op in self.ops])
410
      elif fname == "received_ts":
411
        row.append(self.received_timestamp)
412
      elif fname == "start_ts":
413
        row.append(self.start_timestamp)
414
      elif fname == "end_ts":
415
        row.append(self.end_timestamp)
416
      elif fname == "summary":
417
        row.append([op.input.Summary() for op in self.ops])
418
      else:
419
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
420
    return row
421

    
422
  def MarkUnfinishedOps(self, status, result):
423
    """Mark unfinished opcodes with a given status and result.
424

425
    This is an utility function for marking all running or waiting to
426
    be run opcodes with a given status. Opcodes which are already
427
    finalised are not changed.
428

429
    @param status: a given opcode status
430
    @param result: the opcode result
431

432
    """
433
    not_marked = True
434
    for op in self.ops:
435
      if op.status in constants.OPS_FINALIZED:
436
        assert not_marked, "Finalized opcodes found after non-finalized ones"
437
        continue
438
      op.status = status
439
      op.result = result
440
      not_marked = False
441

    
442
  def Finalize(self):
443
    """Marks the job as finalized.
444

445
    """
446
    self.end_timestamp = TimeStampNow()
447

    
448
  def Cancel(self):
449
    """Marks job as canceled/-ing if possible.
450

451
    @rtype: tuple; (bool, string)
452
    @return: Boolean describing whether job was successfully canceled or marked
453
      as canceling and a text message
454

455
    """
456
    status = self.CalcStatus()
457

    
458
    if status == constants.JOB_STATUS_QUEUED:
459
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
460
                             "Job canceled by request")
461
      self.Finalize()
462
      return (True, "Job %s canceled" % self.id)
463

    
464
    elif status == constants.JOB_STATUS_WAITING:
465
      # The worker will notice the new status and cancel the job
466
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467
      return (True, "Job %s will be canceled" % self.id)
468

    
469
    else:
470
      logging.debug("Job %s is no longer waiting in the queue", self.id)
471
      return (False, "Job %s is no longer waiting in the queue" % self.id)
472

    
473

    
474
class _OpExecCallbacks(mcpu.OpExecCbBase):
475
  def __init__(self, queue, job, op):
476
    """Initializes this class.
477

478
    @type queue: L{JobQueue}
479
    @param queue: Job queue
480
    @type job: L{_QueuedJob}
481
    @param job: Job object
482
    @type op: L{_QueuedOpCode}
483
    @param op: OpCode
484

485
    """
486
    assert queue, "Queue is missing"
487
    assert job, "Job is missing"
488
    assert op, "Opcode is missing"
489

    
490
    self._queue = queue
491
    self._job = job
492
    self._op = op
493

    
494
  def _CheckCancel(self):
495
    """Raises an exception to cancel the job if asked to.
496

497
    """
498
    # Cancel here if we were asked to
499
    if self._op.status == constants.OP_STATUS_CANCELING:
500
      logging.debug("Canceling opcode")
501
      raise CancelJob()
502

    
503
  @locking.ssynchronized(_QUEUE, shared=1)
504
  def NotifyStart(self):
505
    """Mark the opcode as running, not lock-waiting.
506

507
    This is called from the mcpu code as a notifier function, when the LU is
508
    finally about to start the Exec() method. Of course, to have end-user
509
    visible results, the opcode must be initially (before calling into
510
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
511

512
    """
513
    assert self._op in self._job.ops
514
    assert self._op.status in (constants.OP_STATUS_WAITING,
515
                               constants.OP_STATUS_CANCELING)
516

    
517
    # Cancel here if we were asked to
518
    self._CheckCancel()
519

    
520
    logging.debug("Opcode is now running")
521

    
522
    self._op.status = constants.OP_STATUS_RUNNING
523
    self._op.exec_timestamp = TimeStampNow()
524

    
525
    # And finally replicate the job status
526
    self._queue.UpdateJobUnlocked(self._job)
527

    
528
  @locking.ssynchronized(_QUEUE, shared=1)
529
  def _AppendFeedback(self, timestamp, log_type, log_msg):
530
    """Internal feedback append function, with locks
531

532
    """
533
    self._job.log_serial += 1
534
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
535
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
536

    
537
  def Feedback(self, *args):
538
    """Append a log entry.
539

540
    """
541
    assert len(args) < 3
542

    
543
    if len(args) == 1:
544
      log_type = constants.ELOG_MESSAGE
545
      log_msg = args[0]
546
    else:
547
      (log_type, log_msg) = args
548

    
549
    # The time is split to make serialization easier and not lose
550
    # precision.
551
    timestamp = utils.SplitTime(time.time())
552
    self._AppendFeedback(timestamp, log_type, log_msg)
553

    
554
  def CheckCancel(self):
555
    """Check whether job has been cancelled.
556

557
    """
558
    assert self._op.status in (constants.OP_STATUS_WAITING,
559
                               constants.OP_STATUS_CANCELING)
560

    
561
    # Cancel here if we were asked to
562
    self._CheckCancel()
563

    
564
  def SubmitManyJobs(self, jobs):
565
    """Submits jobs for processing.
566

567
    See L{JobQueue.SubmitManyJobs}.
568

569
    """
570
    # Locking is done in job queue
571
    return self._queue.SubmitManyJobs(jobs)
572

    
573

    
574
class _JobChangesChecker(object):
575
  def __init__(self, fields, prev_job_info, prev_log_serial):
576
    """Initializes this class.
577

578
    @type fields: list of strings
579
    @param fields: Fields requested by LUXI client
580
    @type prev_job_info: string
581
    @param prev_job_info: previous job info, as passed by the LUXI client
582
    @type prev_log_serial: string
583
    @param prev_log_serial: previous job serial, as passed by the LUXI client
584

585
    """
586
    self._fields = fields
587
    self._prev_job_info = prev_job_info
588
    self._prev_log_serial = prev_log_serial
589

    
590
  def __call__(self, job):
591
    """Checks whether job has changed.
592

593
    @type job: L{_QueuedJob}
594
    @param job: Job object
595

596
    """
597
    assert not job.writable, "Expected read-only job"
598

    
599
    status = job.CalcStatus()
600
    job_info = job.GetInfo(self._fields)
601
    log_entries = job.GetLogEntries(self._prev_log_serial)
602

    
603
    # Serializing and deserializing data can cause type changes (e.g. from
604
    # tuple to list) or precision loss. We're doing it here so that we get
605
    # the same modifications as the data received from the client. Without
606
    # this, the comparison afterwards might fail without the data being
607
    # significantly different.
608
    # TODO: we just deserialized from disk, investigate how to make sure that
609
    # the job info and log entries are compatible to avoid this further step.
610
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
611
    # efficient, though floats will be tricky
612
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
613
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
614

    
615
    # Don't even try to wait if the job is no longer running, there will be
616
    # no changes.
617
    if (status not in (constants.JOB_STATUS_QUEUED,
618
                       constants.JOB_STATUS_RUNNING,
619
                       constants.JOB_STATUS_WAITING) or
620
        job_info != self._prev_job_info or
621
        (log_entries and self._prev_log_serial != log_entries[0][0])):
622
      logging.debug("Job %s changed", job.id)
623
      return (job_info, log_entries)
624

    
625
    return None
626

    
627

    
628
class _JobFileChangesWaiter(object):
629
  def __init__(self, filename):
630
    """Initializes this class.
631

632
    @type filename: string
633
    @param filename: Path to job file
634
    @raises errors.InotifyError: if the notifier cannot be setup
635

636
    """
637
    self._wm = pyinotify.WatchManager()
638
    self._inotify_handler = \
639
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
640
    self._notifier = \
641
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
642
    try:
643
      self._inotify_handler.enable()
644
    except Exception:
645
      # pyinotify doesn't close file descriptors automatically
646
      self._notifier.stop()
647
      raise
648

    
649
  def _OnInotify(self, notifier_enabled):
650
    """Callback for inotify.
651

652
    """
653
    if not notifier_enabled:
654
      self._inotify_handler.enable()
655

    
656
  def Wait(self, timeout):
657
    """Waits for the job file to change.
658

659
    @type timeout: float
660
    @param timeout: Timeout in seconds
661
    @return: Whether there have been events
662

663
    """
664
    assert timeout >= 0
665
    have_events = self._notifier.check_events(timeout * 1000)
666
    if have_events:
667
      self._notifier.read_events()
668
    self._notifier.process_events()
669
    return have_events
670

    
671
  def Close(self):
672
    """Closes underlying notifier and its file descriptor.
673

674
    """
675
    self._notifier.stop()
676

    
677

    
678
class _JobChangesWaiter(object):
679
  def __init__(self, filename):
680
    """Initializes this class.
681

682
    @type filename: string
683
    @param filename: Path to job file
684

685
    """
686
    self._filewaiter = None
687
    self._filename = filename
688

    
689
  def Wait(self, timeout):
690
    """Waits for a job to change.
691

692
    @type timeout: float
693
    @param timeout: Timeout in seconds
694
    @return: Whether there have been events
695

696
    """
697
    if self._filewaiter:
698
      return self._filewaiter.Wait(timeout)
699

    
700
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
701
    # If this point is reached, return immediately and let caller check the job
702
    # file again in case there were changes since the last check. This avoids a
703
    # race condition.
704
    self._filewaiter = _JobFileChangesWaiter(self._filename)
705

    
706
    return True
707

    
708
  def Close(self):
709
    """Closes underlying waiter.
710

711
    """
712
    if self._filewaiter:
713
      self._filewaiter.Close()
714

    
715

    
716
class _WaitForJobChangesHelper(object):
717
  """Helper class using inotify to wait for changes in a job file.
718

719
  This class takes a previous job status and serial, and alerts the client when
720
  the current job status has changed.
721

722
  """
723
  @staticmethod
724
  def _CheckForChanges(job_load_fn, check_fn):
725
    job = job_load_fn()
726
    if not job:
727
      raise errors.JobLost()
728

    
729
    result = check_fn(job)
730
    if result is None:
731
      raise utils.RetryAgain()
732

    
733
    return result
734

    
735
  def __call__(self, filename, job_load_fn,
736
               fields, prev_job_info, prev_log_serial, timeout):
737
    """Waits for changes on a job.
738

739
    @type filename: string
740
    @param filename: File on which to wait for changes
741
    @type job_load_fn: callable
742
    @param job_load_fn: Function to load job
743
    @type fields: list of strings
744
    @param fields: Which fields to check for changes
745
    @type prev_job_info: list or None
746
    @param prev_job_info: Last job information returned
747
    @type prev_log_serial: int
748
    @param prev_log_serial: Last job message serial number
749
    @type timeout: float
750
    @param timeout: maximum time to wait in seconds
751

752
    """
753
    try:
754
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
755
      waiter = _JobChangesWaiter(filename)
756
      try:
757
        return utils.Retry(compat.partial(self._CheckForChanges,
758
                                          job_load_fn, check_fn),
759
                           utils.RETRY_REMAINING_TIME, timeout,
760
                           wait_fn=waiter.Wait)
761
      finally:
762
        waiter.Close()
763
    except (errors.InotifyError, errors.JobLost):
764
      return None
765
    except utils.RetryTimeout:
766
      return constants.JOB_NOTCHANGED
767

    
768

    
769
def _EncodeOpError(err):
770
  """Encodes an error which occurred while processing an opcode.
771

772
  """
773
  if isinstance(err, errors.GenericError):
774
    to_encode = err
775
  else:
776
    to_encode = errors.OpExecError(str(err))
777

    
778
  return errors.EncodeException(to_encode)
779

    
780

    
781
class _TimeoutStrategyWrapper:
782
  def __init__(self, fn):
783
    """Initializes this class.
784

785
    """
786
    self._fn = fn
787
    self._next = None
788

    
789
  def _Advance(self):
790
    """Gets the next timeout if necessary.
791

792
    """
793
    if self._next is None:
794
      self._next = self._fn()
795

    
796
  def Peek(self):
797
    """Returns the next timeout.
798

799
    """
800
    self._Advance()
801
    return self._next
802

    
803
  def Next(self):
804
    """Returns the current timeout and advances the internal state.
805

806
    """
807
    self._Advance()
808
    result = self._next
809
    self._next = None
810
    return result
811

    
812

    
813
class _OpExecContext:
814
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
815
    """Initializes this class.
816

817
    """
818
    self.op = op
819
    self.index = index
820
    self.log_prefix = log_prefix
821
    self.summary = op.input.Summary()
822

    
823
    # Create local copy to modify
824
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
825
      self.jobdeps = op.input.depends[:]
826
    else:
827
      self.jobdeps = None
828

    
829
    self._timeout_strategy_factory = timeout_strategy_factory
830
    self._ResetTimeoutStrategy()
831

    
832
  def _ResetTimeoutStrategy(self):
833
    """Creates a new timeout strategy.
834

835
    """
836
    self._timeout_strategy = \
837
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
838

    
839
  def CheckPriorityIncrease(self):
840
    """Checks whether priority can and should be increased.
841

842
    Called when locks couldn't be acquired.
843

844
    """
845
    op = self.op
846

    
847
    # Exhausted all retries and next round should not use blocking acquire
848
    # for locks?
849
    if (self._timeout_strategy.Peek() is None and
850
        op.priority > constants.OP_PRIO_HIGHEST):
851
      logging.debug("Increasing priority")
852
      op.priority -= 1
853
      self._ResetTimeoutStrategy()
854
      return True
855

    
856
    return False
857

    
858
  def GetNextLockTimeout(self):
859
    """Returns the next lock acquire timeout.
860

861
    """
862
    return self._timeout_strategy.Next()
863

    
864

    
865
class _JobProcessor(object):
866
  (DEFER,
867
   WAITDEP,
868
   FINISHED) = range(1, 4)
869

    
870
  def __init__(self, queue, opexec_fn, job,
871
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
872
    """Initializes this class.
873

874
    """
875
    self.queue = queue
876
    self.opexec_fn = opexec_fn
877
    self.job = job
878
    self._timeout_strategy_factory = _timeout_strategy_factory
879

    
880
  @staticmethod
881
  def _FindNextOpcode(job, timeout_strategy_factory):
882
    """Locates the next opcode to run.
883

884
    @type job: L{_QueuedJob}
885
    @param job: Job object
886
    @param timeout_strategy_factory: Callable to create new timeout strategy
887

888
    """
889
    # Create some sort of a cache to speed up locating next opcode for future
890
    # lookups
891
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
892
    # pending and one for processed ops.
893
    if job.ops_iter is None:
894
      job.ops_iter = enumerate(job.ops)
895

    
896
    # Find next opcode to run
897
    while True:
898
      try:
899
        (idx, op) = job.ops_iter.next()
900
      except StopIteration:
901
        raise errors.ProgrammerError("Called for a finished job")
902

    
903
      if op.status == constants.OP_STATUS_RUNNING:
904
        # Found an opcode already marked as running
905
        raise errors.ProgrammerError("Called for job marked as running")
906

    
907
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
908
                             timeout_strategy_factory)
909

    
910
      if op.status not in constants.OPS_FINALIZED:
911
        return opctx
912

    
913
      # This is a job that was partially completed before master daemon
914
      # shutdown, so it can be expected that some opcodes are already
915
      # completed successfully (if any did error out, then the whole job
916
      # should have been aborted and not resubmitted for processing).
917
      logging.info("%s: opcode %s already processed, skipping",
918
                   opctx.log_prefix, opctx.summary)
919

    
920
  @staticmethod
921
  def _MarkWaitlock(job, op):
922
    """Marks an opcode as waiting for locks.
923

924
    The job's start timestamp is also set if necessary.
925

926
    @type job: L{_QueuedJob}
927
    @param job: Job object
928
    @type op: L{_QueuedOpCode}
929
    @param op: Opcode object
930

931
    """
932
    assert op in job.ops
933
    assert op.status in (constants.OP_STATUS_QUEUED,
934
                         constants.OP_STATUS_WAITING)
935

    
936
    update = False
937

    
938
    op.result = None
939

    
940
    if op.status == constants.OP_STATUS_QUEUED:
941
      op.status = constants.OP_STATUS_WAITING
942
      update = True
943

    
944
    if op.start_timestamp is None:
945
      op.start_timestamp = TimeStampNow()
946
      update = True
947

    
948
    if job.start_timestamp is None:
949
      job.start_timestamp = op.start_timestamp
950
      update = True
951

    
952
    assert op.status == constants.OP_STATUS_WAITING
953

    
954
    return update
955

    
956
  @staticmethod
957
  def _CheckDependencies(queue, job, opctx):
958
    """Checks if an opcode has dependencies and if so, processes them.
959

960
    @type queue: L{JobQueue}
961
    @param queue: Queue object
962
    @type job: L{_QueuedJob}
963
    @param job: Job object
964
    @type opctx: L{_OpExecContext}
965
    @param opctx: Opcode execution context
966
    @rtype: bool
967
    @return: Whether opcode will be re-scheduled by dependency tracker
968

969
    """
970
    op = opctx.op
971

    
972
    result = False
973

    
974
    while opctx.jobdeps:
975
      (dep_job_id, dep_status) = opctx.jobdeps[0]
976

    
977
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
978
                                                          dep_status)
979
      assert ht.TNonEmptyString(depmsg), "No dependency message"
980

    
981
      logging.info("%s: %s", opctx.log_prefix, depmsg)
982

    
983
      if depresult == _JobDependencyManager.CONTINUE:
984
        # Remove dependency and continue
985
        opctx.jobdeps.pop(0)
986

    
987
      elif depresult == _JobDependencyManager.WAIT:
988
        # Need to wait for notification, dependency tracker will re-add job
989
        # to workerpool
990
        result = True
991
        break
992

    
993
      elif depresult == _JobDependencyManager.CANCEL:
994
        # Job was cancelled, cancel this job as well
995
        job.Cancel()
996
        assert op.status == constants.OP_STATUS_CANCELING
997
        break
998

    
999
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1000
                         _JobDependencyManager.ERROR):
1001
        # Job failed or there was an error, this job must fail
1002
        op.status = constants.OP_STATUS_ERROR
1003
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1004
        break
1005

    
1006
      else:
1007
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1008
                                     depresult)
1009

    
1010
    return result
1011

    
1012
  def _ExecOpCodeUnlocked(self, opctx):
1013
    """Processes one opcode and returns the result.
1014

1015
    """
1016
    op = opctx.op
1017

    
1018
    assert op.status == constants.OP_STATUS_WAITING
1019

    
1020
    timeout = opctx.GetNextLockTimeout()
1021

    
1022
    try:
1023
      # Make sure not to hold queue lock while calling ExecOpCode
1024
      result = self.opexec_fn(op.input,
1025
                              _OpExecCallbacks(self.queue, self.job, op),
1026
                              timeout=timeout, priority=op.priority)
1027
    except mcpu.LockAcquireTimeout:
1028
      assert timeout is not None, "Received timeout for blocking acquire"
1029
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1030

    
1031
      assert op.status in (constants.OP_STATUS_WAITING,
1032
                           constants.OP_STATUS_CANCELING)
1033

    
1034
      # Was job cancelled while we were waiting for the lock?
1035
      if op.status == constants.OP_STATUS_CANCELING:
1036
        return (constants.OP_STATUS_CANCELING, None)
1037

    
1038
      # Stay in waitlock while trying to re-acquire lock
1039
      return (constants.OP_STATUS_WAITING, None)
1040
    except CancelJob:
1041
      logging.exception("%s: Canceling job", opctx.log_prefix)
1042
      assert op.status == constants.OP_STATUS_CANCELING
1043
      return (constants.OP_STATUS_CANCELING, None)
1044
    except Exception, err: # pylint: disable-msg=W0703
1045
      logging.exception("%s: Caught exception in %s",
1046
                        opctx.log_prefix, opctx.summary)
1047
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1048
    else:
1049
      logging.debug("%s: %s successful",
1050
                    opctx.log_prefix, opctx.summary)
1051
      return (constants.OP_STATUS_SUCCESS, result)
1052

    
1053
  def __call__(self, _nextop_fn=None):
1054
    """Continues execution of a job.
1055

1056
    @param _nextop_fn: Callback function for tests
1057
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1058
      be deferred and C{WAITDEP} if the dependency manager
1059
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1060

1061
    """
1062
    queue = self.queue
1063
    job = self.job
1064

    
1065
    logging.debug("Processing job %s", job.id)
1066

    
1067
    queue.acquire(shared=1)
1068
    try:
1069
      opcount = len(job.ops)
1070

    
1071
      assert job.writable, "Expected writable job"
1072

    
1073
      # Don't do anything for finalized jobs
1074
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1075
        return self.FINISHED
1076

    
1077
      # Is a previous opcode still pending?
1078
      if job.cur_opctx:
1079
        opctx = job.cur_opctx
1080
        job.cur_opctx = None
1081
      else:
1082
        if __debug__ and _nextop_fn:
1083
          _nextop_fn()
1084
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1085

    
1086
      op = opctx.op
1087

    
1088
      # Consistency check
1089
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1090
                                     constants.OP_STATUS_CANCELING)
1091
                        for i in job.ops[opctx.index + 1:])
1092

    
1093
      assert op.status in (constants.OP_STATUS_QUEUED,
1094
                           constants.OP_STATUS_WAITING,
1095
                           constants.OP_STATUS_CANCELING)
1096

    
1097
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1098
              op.priority >= constants.OP_PRIO_HIGHEST)
1099

    
1100
      waitjob = None
1101

    
1102
      if op.status != constants.OP_STATUS_CANCELING:
1103
        assert op.status in (constants.OP_STATUS_QUEUED,
1104
                             constants.OP_STATUS_WAITING)
1105

    
1106
        # Prepare to start opcode
1107
        if self._MarkWaitlock(job, op):
1108
          # Write to disk
1109
          queue.UpdateJobUnlocked(job)
1110

    
1111
        assert op.status == constants.OP_STATUS_WAITING
1112
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1113
        assert job.start_timestamp and op.start_timestamp
1114
        assert waitjob is None
1115

    
1116
        # Check if waiting for a job is necessary
1117
        waitjob = self._CheckDependencies(queue, job, opctx)
1118

    
1119
        assert op.status in (constants.OP_STATUS_WAITING,
1120
                             constants.OP_STATUS_CANCELING,
1121
                             constants.OP_STATUS_ERROR)
1122

    
1123
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1124
                                         constants.OP_STATUS_ERROR)):
1125
          logging.info("%s: opcode %s waiting for locks",
1126
                       opctx.log_prefix, opctx.summary)
1127

    
1128
          assert not opctx.jobdeps, "Not all dependencies were removed"
1129

    
1130
          queue.release()
1131
          try:
1132
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1133
          finally:
1134
            queue.acquire(shared=1)
1135

    
1136
          op.status = op_status
1137
          op.result = op_result
1138

    
1139
          assert not waitjob
1140

    
1141
        if op.status == constants.OP_STATUS_WAITING:
1142
          # Couldn't get locks in time
1143
          assert not op.end_timestamp
1144
        else:
1145
          # Finalize opcode
1146
          op.end_timestamp = TimeStampNow()
1147

    
1148
          if op.status == constants.OP_STATUS_CANCELING:
1149
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1150
                                  for i in job.ops[opctx.index:])
1151
          else:
1152
            assert op.status in constants.OPS_FINALIZED
1153

    
1154
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1155
        finalize = False
1156

    
1157
        if not waitjob and opctx.CheckPriorityIncrease():
1158
          # Priority was changed, need to update on-disk file
1159
          queue.UpdateJobUnlocked(job)
1160

    
1161
        # Keep around for another round
1162
        job.cur_opctx = opctx
1163

    
1164
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1165
                op.priority >= constants.OP_PRIO_HIGHEST)
1166

    
1167
        # In no case must the status be finalized here
1168
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1169

    
1170
      else:
1171
        # Ensure all opcodes so far have been successful
1172
        assert (opctx.index == 0 or
1173
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1174
                           for i in job.ops[:opctx.index]))
1175

    
1176
        # Reset context
1177
        job.cur_opctx = None
1178

    
1179
        if op.status == constants.OP_STATUS_SUCCESS:
1180
          finalize = False
1181

    
1182
        elif op.status == constants.OP_STATUS_ERROR:
1183
          # Ensure failed opcode has an exception as its result
1184
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1185

    
1186
          to_encode = errors.OpExecError("Preceding opcode failed")
1187
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1188
                                _EncodeOpError(to_encode))
1189
          finalize = True
1190

    
1191
          # Consistency check
1192
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1193
                            errors.GetEncodedError(i.result)
1194
                            for i in job.ops[opctx.index:])
1195

    
1196
        elif op.status == constants.OP_STATUS_CANCELING:
1197
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1198
                                "Job canceled by request")
1199
          finalize = True
1200

    
1201
        else:
1202
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1203

    
1204
        if opctx.index == (opcount - 1):
1205
          # Finalize on last opcode
1206
          finalize = True
1207

    
1208
        if finalize:
1209
          # All opcodes have been run, finalize job
1210
          job.Finalize()
1211

    
1212
        # Write to disk. If the job status is final, this is the final write
1213
        # allowed. Once the file has been written, it can be archived anytime.
1214
        queue.UpdateJobUnlocked(job)
1215

    
1216
        assert not waitjob
1217

    
1218
        if finalize:
1219
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1220
          return self.FINISHED
1221

    
1222
      assert not waitjob or queue.depmgr.JobWaiting(job)
1223

    
1224
      if waitjob:
1225
        return self.WAITDEP
1226
      else:
1227
        return self.DEFER
1228
    finally:
1229
      assert job.writable, "Job became read-only while being processed"
1230
      queue.release()
1231

    
1232

    
1233
class _JobQueueWorker(workerpool.BaseWorker):
1234
  """The actual job workers.
1235

1236
  """
1237
  def RunTask(self, job): # pylint: disable-msg=W0221
1238
    """Job executor.
1239

1240
    @type job: L{_QueuedJob}
1241
    @param job: the job to be processed
1242

1243
    """
1244
    assert job.writable, "Expected writable job"
1245

    
1246
    # Ensure only one worker is active on a single job. If a job registers for
1247
    # a dependency job, and the other job notifies before the first worker is
1248
    # done, the job can end up in the tasklist more than once.
1249
    job.processor_lock.acquire()
1250
    try:
1251
      return self._RunTaskInner(job)
1252
    finally:
1253
      job.processor_lock.release()
1254

    
1255
  def _RunTaskInner(self, job):
1256
    """Executes a job.
1257

1258
    Must be called with per-job lock acquired.
1259

1260
    """
1261
    queue = job.queue
1262
    assert queue == self.pool.queue
1263

    
1264
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1265
    setname_fn(None)
1266

    
1267
    proc = mcpu.Processor(queue.context, job.id)
1268

    
1269
    # Create wrapper for setting thread name
1270
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1271
                                    proc.ExecOpCode)
1272

    
1273
    result = _JobProcessor(queue, wrap_execop_fn, job)()
1274

    
1275
    if result == _JobProcessor.FINISHED:
1276
      # Notify waiting jobs
1277
      queue.depmgr.NotifyWaiters(job.id)
1278

    
1279
    elif result == _JobProcessor.DEFER:
1280
      # Schedule again
1281
      raise workerpool.DeferTask(priority=job.CalcPriority())
1282

    
1283
    elif result == _JobProcessor.WAITDEP:
1284
      # No-op, dependency manager will re-schedule
1285
      pass
1286

    
1287
    else:
1288
      raise errors.ProgrammerError("Job processor returned unknown status %s" %
1289
                                   (result, ))
1290

    
1291
  @staticmethod
1292
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1293
    """Updates the worker thread name to include a short summary of the opcode.
1294

1295
    @param setname_fn: Callable setting worker thread name
1296
    @param execop_fn: Callable for executing opcode (usually
1297
                      L{mcpu.Processor.ExecOpCode})
1298

1299
    """
1300
    setname_fn(op)
1301
    try:
1302
      return execop_fn(op, *args, **kwargs)
1303
    finally:
1304
      setname_fn(None)
1305

    
1306
  @staticmethod
1307
  def _GetWorkerName(job, op):
1308
    """Sets the worker thread name.
1309

1310
    @type job: L{_QueuedJob}
1311
    @type op: L{opcodes.OpCode}
1312

1313
    """
1314
    parts = ["Job%s" % job.id]
1315

    
1316
    if op:
1317
      parts.append(op.TinySummary())
1318

    
1319
    return "/".join(parts)
1320

    
1321

    
1322
class _JobQueueWorkerPool(workerpool.WorkerPool):
1323
  """Simple class implementing a job-processing workerpool.
1324

1325
  """
1326
  def __init__(self, queue):
1327
    super(_JobQueueWorkerPool, self).__init__("Jq",
1328
                                              JOBQUEUE_THREADS,
1329
                                              _JobQueueWorker)
1330
    self.queue = queue
1331

    
1332

    
1333
class _JobDependencyManager:
1334
  """Keeps track of job dependencies.
1335

1336
  """
1337
  (WAIT,
1338
   ERROR,
1339
   CANCEL,
1340
   CONTINUE,
1341
   WRONGSTATUS) = range(1, 6)
1342

    
1343
  def __init__(self, getstatus_fn, enqueue_fn):
1344
    """Initializes this class.
1345

1346
    """
1347
    self._getstatus_fn = getstatus_fn
1348
    self._enqueue_fn = enqueue_fn
1349

    
1350
    self._waiters = {}
1351
    self._lock = locking.SharedLock("JobDepMgr")
1352

    
1353
  @locking.ssynchronized(_LOCK, shared=1)
1354
  def GetLockInfo(self, requested): # pylint: disable-msg=W0613
1355
    """Retrieves information about waiting jobs.
1356

1357
    @type requested: set
1358
    @param requested: Requested information, see C{query.LQ_*}
1359

1360
    """
1361
    # No need to sort here, that's being done by the lock manager and query
1362
    # library. There are no priorities for notifying jobs, hence all show up as
1363
    # one item under "pending".
1364
    return [("job/%s" % job_id, None, None,
1365
             [("job", [job.id for job in waiters])])
1366
            for job_id, waiters in self._waiters.items()
1367
            if waiters]
1368

    
1369
  @locking.ssynchronized(_LOCK, shared=1)
1370
  def JobWaiting(self, job):
1371
    """Checks if a job is waiting.
1372

1373
    """
1374
    return compat.any(job in jobs
1375
                      for jobs in self._waiters.values())
1376

    
1377
  @locking.ssynchronized(_LOCK)
1378
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1379
    """Checks if a dependency job has the requested status.
1380

1381
    If the other job is not yet in a finalized status, the calling job will be
1382
    notified (re-added to the workerpool) at a later point.
1383

1384
    @type job: L{_QueuedJob}
1385
    @param job: Job object
1386
    @type dep_job_id: string
1387
    @param dep_job_id: ID of dependency job
1388
    @type dep_status: list
1389
    @param dep_status: Required status
1390

1391
    """
1392
    assert ht.TString(job.id)
1393
    assert ht.TString(dep_job_id)
1394
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1395

    
1396
    if job.id == dep_job_id:
1397
      return (self.ERROR, "Job can't depend on itself")
1398

    
1399
    # Get status of dependency job
1400
    try:
1401
      status = self._getstatus_fn(dep_job_id)
1402
    except errors.JobLost, err:
1403
      return (self.ERROR, "Dependency error: %s" % err)
1404

    
1405
    assert status in constants.JOB_STATUS_ALL
1406

    
1407
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1408

    
1409
    if status not in constants.JOBS_FINALIZED:
1410
      # Register for notification and wait for job to finish
1411
      job_id_waiters.add(job)
1412
      return (self.WAIT,
1413
              "Need to wait for job %s, wanted status '%s'" %
1414
              (dep_job_id, dep_status))
1415

    
1416
    # Remove from waiters list
1417
    if job in job_id_waiters:
1418
      job_id_waiters.remove(job)
1419

    
1420
    if (status == constants.JOB_STATUS_CANCELED and
1421
        constants.JOB_STATUS_CANCELED not in dep_status):
1422
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1423

    
1424
    elif not dep_status or status in dep_status:
1425
      return (self.CONTINUE,
1426
              "Dependency job %s finished with status '%s'" %
1427
              (dep_job_id, status))
1428

    
1429
    else:
1430
      return (self.WRONGSTATUS,
1431
              "Dependency job %s finished with status '%s',"
1432
              " not one of '%s' as required" %
1433
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1434

    
1435
  @locking.ssynchronized(_LOCK)
1436
  def NotifyWaiters(self, job_id):
1437
    """Notifies all jobs waiting for a certain job ID.
1438

1439
    @type job_id: string
1440
    @param job_id: Job ID
1441

1442
    """
1443
    assert ht.TString(job_id)
1444

    
1445
    jobs = self._waiters.pop(job_id, None)
1446
    if jobs:
1447
      # Re-add jobs to workerpool
1448
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1449
                    len(jobs), job_id)
1450
      self._enqueue_fn(jobs)
1451

    
1452
    # Remove all jobs without actual waiters
1453
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1454
                   if not waiters]:
1455
      del self._waiters[job_id]
1456

    
1457

    
1458
def _RequireOpenQueue(fn):
1459
  """Decorator for "public" functions.
1460

1461
  This function should be used for all 'public' functions. That is,
1462
  functions usually called from other classes. Note that this should
1463
  be applied only to methods (not plain functions), since it expects
1464
  that the decorated function is called with a first argument that has
1465
  a '_queue_filelock' argument.
1466

1467
  @warning: Use this decorator only after locking.ssynchronized
1468

1469
  Example::
1470
    @locking.ssynchronized(_LOCK)
1471
    @_RequireOpenQueue
1472
    def Example(self):
1473
      pass
1474

1475
  """
1476
  def wrapper(self, *args, **kwargs):
1477
    # pylint: disable-msg=W0212
1478
    assert self._queue_filelock is not None, "Queue should be open"
1479
    return fn(self, *args, **kwargs)
1480
  return wrapper
1481

    
1482

    
1483
class JobQueue(object):
1484
  """Queue used to manage the jobs.
1485

1486
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1487

1488
  """
1489
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1490

    
1491
  def __init__(self, context):
1492
    """Constructor for JobQueue.
1493

1494
    The constructor will initialize the job queue object and then
1495
    start loading the current jobs from disk, either for starting them
1496
    (if they were queue) or for aborting them (if they were already
1497
    running).
1498

1499
    @type context: GanetiContext
1500
    @param context: the context object for access to the configuration
1501
        data and other ganeti objects
1502

1503
    """
1504
    self.context = context
1505
    self._memcache = weakref.WeakValueDictionary()
1506
    self._my_hostname = netutils.Hostname.GetSysName()
1507

    
1508
    # The Big JobQueue lock. If a code block or method acquires it in shared
1509
    # mode safe it must guarantee concurrency with all the code acquiring it in
1510
    # shared mode, including itself. In order not to acquire it at all
1511
    # concurrency must be guaranteed with all code acquiring it in shared mode
1512
    # and all code acquiring it exclusively.
1513
    self._lock = locking.SharedLock("JobQueue")
1514

    
1515
    self.acquire = self._lock.acquire
1516
    self.release = self._lock.release
1517

    
1518
    # Initialize the queue, and acquire the filelock.
1519
    # This ensures no other process is working on the job queue.
1520
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1521

    
1522
    # Read serial file
1523
    self._last_serial = jstore.ReadSerial()
1524
    assert self._last_serial is not None, ("Serial file was modified between"
1525
                                           " check in jstore and here")
1526

    
1527
    # Get initial list of nodes
1528
    self._nodes = dict((n.name, n.primary_ip)
1529
                       for n in self.context.cfg.GetAllNodesInfo().values()
1530
                       if n.master_candidate)
1531

    
1532
    # Remove master node
1533
    self._nodes.pop(self._my_hostname, None)
1534

    
1535
    # TODO: Check consistency across nodes
1536

    
1537
    self._queue_size = 0
1538
    self._UpdateQueueSizeUnlocked()
1539
    self._drained = jstore.CheckDrainFlag()
1540

    
1541
    # Job dependencies
1542
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1543
                                        self._EnqueueJobs)
1544
    self.context.glm.AddToLockMonitor(self.depmgr)
1545

    
1546
    # Setup worker pool
1547
    self._wpool = _JobQueueWorkerPool(self)
1548
    try:
1549
      self._InspectQueue()
1550
    except:
1551
      self._wpool.TerminateWorkers()
1552
      raise
1553

    
1554
  @locking.ssynchronized(_LOCK)
1555
  @_RequireOpenQueue
1556
  def _InspectQueue(self):
1557
    """Loads the whole job queue and resumes unfinished jobs.
1558

1559
    This function needs the lock here because WorkerPool.AddTask() may start a
1560
    job while we're still doing our work.
1561

1562
    """
1563
    logging.info("Inspecting job queue")
1564

    
1565
    restartjobs = []
1566

    
1567
    all_job_ids = self._GetJobIDsUnlocked()
1568
    jobs_count = len(all_job_ids)
1569
    lastinfo = time.time()
1570
    for idx, job_id in enumerate(all_job_ids):
1571
      # Give an update every 1000 jobs or 10 seconds
1572
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1573
          idx == (jobs_count - 1)):
1574
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1575
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1576
        lastinfo = time.time()
1577

    
1578
      job = self._LoadJobUnlocked(job_id)
1579

    
1580
      # a failure in loading the job can cause 'None' to be returned
1581
      if job is None:
1582
        continue
1583

    
1584
      status = job.CalcStatus()
1585

    
1586
      if status == constants.JOB_STATUS_QUEUED:
1587
        restartjobs.append(job)
1588

    
1589
      elif status in (constants.JOB_STATUS_RUNNING,
1590
                      constants.JOB_STATUS_WAITING,
1591
                      constants.JOB_STATUS_CANCELING):
1592
        logging.warning("Unfinished job %s found: %s", job.id, job)
1593

    
1594
        if status == constants.JOB_STATUS_WAITING:
1595
          # Restart job
1596
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1597
          restartjobs.append(job)
1598
        else:
1599
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1600
                                "Unclean master daemon shutdown")
1601
          job.Finalize()
1602

    
1603
        self.UpdateJobUnlocked(job)
1604

    
1605
    if restartjobs:
1606
      logging.info("Restarting %s jobs", len(restartjobs))
1607
      self._EnqueueJobsUnlocked(restartjobs)
1608

    
1609
    logging.info("Job queue inspection finished")
1610

    
1611
  @locking.ssynchronized(_LOCK)
1612
  @_RequireOpenQueue
1613
  def AddNode(self, node):
1614
    """Register a new node with the queue.
1615

1616
    @type node: L{objects.Node}
1617
    @param node: the node object to be added
1618

1619
    """
1620
    node_name = node.name
1621
    assert node_name != self._my_hostname
1622

    
1623
    # Clean queue directory on added node
1624
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1625
    msg = result.fail_msg
1626
    if msg:
1627
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1628
                      node_name, msg)
1629

    
1630
    if not node.master_candidate:
1631
      # remove if existing, ignoring errors
1632
      self._nodes.pop(node_name, None)
1633
      # and skip the replication of the job ids
1634
      return
1635

    
1636
    # Upload the whole queue excluding archived jobs
1637
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1638

    
1639
    # Upload current serial file
1640
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1641

    
1642
    for file_name in files:
1643
      # Read file content
1644
      content = utils.ReadFile(file_name)
1645

    
1646
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1647
                                                  [node.primary_ip],
1648
                                                  file_name, content)
1649
      msg = result[node_name].fail_msg
1650
      if msg:
1651
        logging.error("Failed to upload file %s to node %s: %s",
1652
                      file_name, node_name, msg)
1653

    
1654
    self._nodes[node_name] = node.primary_ip
1655

    
1656
  @locking.ssynchronized(_LOCK)
1657
  @_RequireOpenQueue
1658
  def RemoveNode(self, node_name):
1659
    """Callback called when removing nodes from the cluster.
1660

1661
    @type node_name: str
1662
    @param node_name: the name of the node to remove
1663

1664
    """
1665
    self._nodes.pop(node_name, None)
1666

    
1667
  @staticmethod
1668
  def _CheckRpcResult(result, nodes, failmsg):
1669
    """Verifies the status of an RPC call.
1670

1671
    Since we aim to keep consistency should this node (the current
1672
    master) fail, we will log errors if our rpc fail, and especially
1673
    log the case when more than half of the nodes fails.
1674

1675
    @param result: the data as returned from the rpc call
1676
    @type nodes: list
1677
    @param nodes: the list of nodes we made the call to
1678
    @type failmsg: str
1679
    @param failmsg: the identifier to be used for logging
1680

1681
    """
1682
    failed = []
1683
    success = []
1684

    
1685
    for node in nodes:
1686
      msg = result[node].fail_msg
1687
      if msg:
1688
        failed.append(node)
1689
        logging.error("RPC call %s (%s) failed on node %s: %s",
1690
                      result[node].call, failmsg, node, msg)
1691
      else:
1692
        success.append(node)
1693

    
1694
    # +1 for the master node
1695
    if (len(success) + 1) < len(failed):
1696
      # TODO: Handle failing nodes
1697
      logging.error("More than half of the nodes failed")
1698

    
1699
  def _GetNodeIp(self):
1700
    """Helper for returning the node name/ip list.
1701

1702
    @rtype: (list, list)
1703
    @return: a tuple of two lists, the first one with the node
1704
        names and the second one with the node addresses
1705

1706
    """
1707
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1708
    name_list = self._nodes.keys()
1709
    addr_list = [self._nodes[name] for name in name_list]
1710
    return name_list, addr_list
1711

    
1712
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1713
    """Writes a file locally and then replicates it to all nodes.
1714

1715
    This function will replace the contents of a file on the local
1716
    node and then replicate it to all the other nodes we have.
1717

1718
    @type file_name: str
1719
    @param file_name: the path of the file to be replicated
1720
    @type data: str
1721
    @param data: the new contents of the file
1722
    @type replicate: boolean
1723
    @param replicate: whether to spread the changes to the remote nodes
1724

1725
    """
1726
    getents = runtime.GetEnts()
1727
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1728
                    gid=getents.masterd_gid)
1729

    
1730
    if replicate:
1731
      names, addrs = self._GetNodeIp()
1732
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1733
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1734

    
1735
  def _RenameFilesUnlocked(self, rename):
1736
    """Renames a file locally and then replicate the change.
1737

1738
    This function will rename a file in the local queue directory
1739
    and then replicate this rename to all the other nodes we have.
1740

1741
    @type rename: list of (old, new)
1742
    @param rename: List containing tuples mapping old to new names
1743

1744
    """
1745
    # Rename them locally
1746
    for old, new in rename:
1747
      utils.RenameFile(old, new, mkdir=True)
1748

    
1749
    # ... and on all nodes
1750
    names, addrs = self._GetNodeIp()
1751
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1752
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1753

    
1754
  @staticmethod
1755
  def _FormatJobID(job_id):
1756
    """Convert a job ID to string format.
1757

1758
    Currently this just does C{str(job_id)} after performing some
1759
    checks, but if we want to change the job id format this will
1760
    abstract this change.
1761

1762
    @type job_id: int or long
1763
    @param job_id: the numeric job id
1764
    @rtype: str
1765
    @return: the formatted job id
1766

1767
    """
1768
    if not isinstance(job_id, (int, long)):
1769
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1770
    if job_id < 0:
1771
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1772

    
1773
    return str(job_id)
1774

    
1775
  @classmethod
1776
  def _GetArchiveDirectory(cls, job_id):
1777
    """Returns the archive directory for a job.
1778

1779
    @type job_id: str
1780
    @param job_id: Job identifier
1781
    @rtype: str
1782
    @return: Directory name
1783

1784
    """
1785
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1786

    
1787
  def _NewSerialsUnlocked(self, count):
1788
    """Generates a new job identifier.
1789

1790
    Job identifiers are unique during the lifetime of a cluster.
1791

1792
    @type count: integer
1793
    @param count: how many serials to return
1794
    @rtype: str
1795
    @return: a string representing the job identifier.
1796

1797
    """
1798
    assert count > 0
1799
    # New number
1800
    serial = self._last_serial + count
1801

    
1802
    # Write to file
1803
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1804
                             "%s\n" % serial, True)
1805

    
1806
    result = [self._FormatJobID(v)
1807
              for v in range(self._last_serial + 1, serial + 1)]
1808

    
1809
    # Keep it only if we were able to write the file
1810
    self._last_serial = serial
1811

    
1812
    assert len(result) == count
1813

    
1814
    return result
1815

    
1816
  @staticmethod
1817
  def _GetJobPath(job_id):
1818
    """Returns the job file for a given job id.
1819

1820
    @type job_id: str
1821
    @param job_id: the job identifier
1822
    @rtype: str
1823
    @return: the path to the job file
1824

1825
    """
1826
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1827

    
1828
  @classmethod
1829
  def _GetArchivedJobPath(cls, job_id):
1830
    """Returns the archived job file for a give job id.
1831

1832
    @type job_id: str
1833
    @param job_id: the job identifier
1834
    @rtype: str
1835
    @return: the path to the archived job file
1836

1837
    """
1838
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1839
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1840

    
1841
  def _GetJobIDsUnlocked(self, sort=True):
1842
    """Return all known job IDs.
1843

1844
    The method only looks at disk because it's a requirement that all
1845
    jobs are present on disk (so in the _memcache we don't have any
1846
    extra IDs).
1847

1848
    @type sort: boolean
1849
    @param sort: perform sorting on the returned job ids
1850
    @rtype: list
1851
    @return: the list of job IDs
1852

1853
    """
1854
    jlist = []
1855
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1856
      m = self._RE_JOB_FILE.match(filename)
1857
      if m:
1858
        jlist.append(m.group(1))
1859
    if sort:
1860
      jlist = utils.NiceSort(jlist)
1861
    return jlist
1862

    
1863
  def _LoadJobUnlocked(self, job_id):
1864
    """Loads a job from the disk or memory.
1865

1866
    Given a job id, this will return the cached job object if
1867
    existing, or try to load the job from the disk. If loading from
1868
    disk, it will also add the job to the cache.
1869

1870
    @param job_id: the job id
1871
    @rtype: L{_QueuedJob} or None
1872
    @return: either None or the job object
1873

1874
    """
1875
    job = self._memcache.get(job_id, None)
1876
    if job:
1877
      logging.debug("Found job %s in memcache", job_id)
1878
      assert job.writable, "Found read-only job in memcache"
1879
      return job
1880

    
1881
    try:
1882
      job = self._LoadJobFromDisk(job_id, False)
1883
      if job is None:
1884
        return job
1885
    except errors.JobFileCorrupted:
1886
      old_path = self._GetJobPath(job_id)
1887
      new_path = self._GetArchivedJobPath(job_id)
1888
      if old_path == new_path:
1889
        # job already archived (future case)
1890
        logging.exception("Can't parse job %s", job_id)
1891
      else:
1892
        # non-archived case
1893
        logging.exception("Can't parse job %s, will archive.", job_id)
1894
        self._RenameFilesUnlocked([(old_path, new_path)])
1895
      return None
1896

    
1897
    assert job.writable, "Job just loaded is not writable"
1898

    
1899
    self._memcache[job_id] = job
1900
    logging.debug("Added job %s to the cache", job_id)
1901
    return job
1902

    
1903
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1904
    """Load the given job file from disk.
1905

1906
    Given a job file, read, load and restore it in a _QueuedJob format.
1907

1908
    @type job_id: string
1909
    @param job_id: job identifier
1910
    @type try_archived: bool
1911
    @param try_archived: Whether to try loading an archived job
1912
    @rtype: L{_QueuedJob} or None
1913
    @return: either None or the job object
1914

1915
    """
1916
    path_functions = [(self._GetJobPath, True)]
1917

    
1918
    if try_archived:
1919
      path_functions.append((self._GetArchivedJobPath, False))
1920

    
1921
    raw_data = None
1922
    writable_default = None
1923

    
1924
    for (fn, writable_default) in path_functions:
1925
      filepath = fn(job_id)
1926
      logging.debug("Loading job from %s", filepath)
1927
      try:
1928
        raw_data = utils.ReadFile(filepath)
1929
      except EnvironmentError, err:
1930
        if err.errno != errno.ENOENT:
1931
          raise
1932
      else:
1933
        break
1934

    
1935
    if not raw_data:
1936
      return None
1937

    
1938
    if writable is None:
1939
      writable = writable_default
1940

    
1941
    try:
1942
      data = serializer.LoadJson(raw_data)
1943
      job = _QueuedJob.Restore(self, data, writable)
1944
    except Exception, err: # pylint: disable-msg=W0703
1945
      raise errors.JobFileCorrupted(err)
1946

    
1947
    return job
1948

    
1949
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1950
    """Load the given job file from disk.
1951

1952
    Given a job file, read, load and restore it in a _QueuedJob format.
1953
    In case of error reading the job, it gets returned as None, and the
1954
    exception is logged.
1955

1956
    @type job_id: string
1957
    @param job_id: job identifier
1958
    @type try_archived: bool
1959
    @param try_archived: Whether to try loading an archived job
1960
    @rtype: L{_QueuedJob} or None
1961
    @return: either None or the job object
1962

1963
    """
1964
    try:
1965
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1966
    except (errors.JobFileCorrupted, EnvironmentError):
1967
      logging.exception("Can't load/parse job %s", job_id)
1968
      return None
1969

    
1970
  def _UpdateQueueSizeUnlocked(self):
1971
    """Update the queue size.
1972

1973
    """
1974
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1975

    
1976
  @locking.ssynchronized(_LOCK)
1977
  @_RequireOpenQueue
1978
  def SetDrainFlag(self, drain_flag):
1979
    """Sets the drain flag for the queue.
1980

1981
    @type drain_flag: boolean
1982
    @param drain_flag: Whether to set or unset the drain flag
1983

1984
    """
1985
    jstore.SetDrainFlag(drain_flag)
1986

    
1987
    self._drained = drain_flag
1988

    
1989
    return True
1990

    
1991
  @_RequireOpenQueue
1992
  def _SubmitJobUnlocked(self, job_id, ops):
1993
    """Create and store a new job.
1994

1995
    This enters the job into our job queue and also puts it on the new
1996
    queue, in order for it to be picked up by the queue processors.
1997

1998
    @type job_id: job ID
1999
    @param job_id: the job ID for the new job
2000
    @type ops: list
2001
    @param ops: The list of OpCodes that will become the new job.
2002
    @rtype: L{_QueuedJob}
2003
    @return: the job object to be queued
2004
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
2005
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2006
    @raise errors.GenericError: If an opcode is not valid
2007

2008
    """
2009
    # Ok when sharing the big job queue lock, as the drain file is created when
2010
    # the lock is exclusive.
2011
    if self._drained:
2012
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
2013

    
2014
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2015
      raise errors.JobQueueFull()
2016

    
2017
    job = _QueuedJob(self, job_id, ops, True)
2018

    
2019
    # Check priority
2020
    for idx, op in enumerate(job.ops):
2021
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2022
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2023
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2024
                                  " are %s" % (idx, op.priority, allowed))
2025

    
2026
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2027
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2028
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2029
                                  " match %s: %s" %
2030
                                  (idx, opcodes.TNoRelativeJobDependencies,
2031
                                   dependencies))
2032

    
2033
    # Write to disk
2034
    self.UpdateJobUnlocked(job)
2035

    
2036
    self._queue_size += 1
2037

    
2038
    logging.debug("Adding new job %s to the cache", job_id)
2039
    self._memcache[job_id] = job
2040

    
2041
    return job
2042

    
2043
  @locking.ssynchronized(_LOCK)
2044
  @_RequireOpenQueue
2045
  def SubmitJob(self, ops):
2046
    """Create and store a new job.
2047

2048
    @see: L{_SubmitJobUnlocked}
2049

2050
    """
2051
    (job_id, ) = self._NewSerialsUnlocked(1)
2052
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2053
    return job_id
2054

    
2055
  @locking.ssynchronized(_LOCK)
2056
  @_RequireOpenQueue
2057
  def SubmitManyJobs(self, jobs):
2058
    """Create and store multiple jobs.
2059

2060
    @see: L{_SubmitJobUnlocked}
2061

2062
    """
2063
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2064

    
2065
    (results, added_jobs) = \
2066
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2067

    
2068
    self._EnqueueJobsUnlocked(added_jobs)
2069

    
2070
    return results
2071

    
2072
  @staticmethod
2073
  def _FormatSubmitError(msg, ops):
2074
    """Formats errors which occurred while submitting a job.
2075

2076
    """
2077
    return ("%s; opcodes %s" %
2078
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2079

    
2080
  @staticmethod
2081
  def _ResolveJobDependencies(resolve_fn, deps):
2082
    """Resolves relative job IDs in dependencies.
2083

2084
    @type resolve_fn: callable
2085
    @param resolve_fn: Function to resolve a relative job ID
2086
    @type deps: list
2087
    @param deps: Dependencies
2088
    @rtype: list
2089
    @return: Resolved dependencies
2090

2091
    """
2092
    result = []
2093

    
2094
    for (dep_job_id, dep_status) in deps:
2095
      if ht.TRelativeJobId(dep_job_id):
2096
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2097
        try:
2098
          job_id = resolve_fn(dep_job_id)
2099
        except IndexError:
2100
          # Abort
2101
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2102
      else:
2103
        job_id = dep_job_id
2104

    
2105
      result.append((job_id, dep_status))
2106

    
2107
    return (True, result)
2108

    
2109
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2110
    """Create and store multiple jobs.
2111

2112
    @see: L{_SubmitJobUnlocked}
2113

2114
    """
2115
    results = []
2116
    added_jobs = []
2117

    
2118
    def resolve_fn(job_idx, reljobid):
2119
      assert reljobid < 0
2120
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2121

    
2122
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2123
      for op in ops:
2124
        if getattr(op, opcodes.DEPEND_ATTR, None):
2125
          (status, data) = \
2126
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2127
                                         op.depends)
2128
          if not status:
2129
            # Abort resolving dependencies
2130
            assert ht.TNonEmptyString(data), "No error message"
2131
            break
2132
          # Use resolved dependencies
2133
          op.depends = data
2134
      else:
2135
        try:
2136
          job = self._SubmitJobUnlocked(job_id, ops)
2137
        except errors.GenericError, err:
2138
          status = False
2139
          data = self._FormatSubmitError(str(err), ops)
2140
        else:
2141
          status = True
2142
          data = job_id
2143
          added_jobs.append(job)
2144

    
2145
      results.append((status, data))
2146

    
2147
    return (results, added_jobs)
2148

    
2149
  @locking.ssynchronized(_LOCK)
2150
  def _EnqueueJobs(self, jobs):
2151
    """Helper function to add jobs to worker pool's queue.
2152

2153
    @type jobs: list
2154
    @param jobs: List of all jobs
2155

2156
    """
2157
    return self._EnqueueJobsUnlocked(jobs)
2158

    
2159
  def _EnqueueJobsUnlocked(self, jobs):
2160
    """Helper function to add jobs to worker pool's queue.
2161

2162
    @type jobs: list
2163
    @param jobs: List of all jobs
2164

2165
    """
2166
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2167
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2168
                             priority=[job.CalcPriority() for job in jobs])
2169

    
2170
  def _GetJobStatusForDependencies(self, job_id):
2171
    """Gets the status of a job for dependencies.
2172

2173
    @type job_id: string
2174
    @param job_id: Job ID
2175
    @raise errors.JobLost: If job can't be found
2176

2177
    """
2178
    if not isinstance(job_id, basestring):
2179
      job_id = self._FormatJobID(job_id)
2180

    
2181
    # Not using in-memory cache as doing so would require an exclusive lock
2182

    
2183
    # Try to load from disk
2184
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2185

    
2186
    assert not job.writable, "Got writable job"
2187

    
2188
    if job:
2189
      return job.CalcStatus()
2190

    
2191
    raise errors.JobLost("Job %s not found" % job_id)
2192

    
2193
  @_RequireOpenQueue
2194
  def UpdateJobUnlocked(self, job, replicate=True):
2195
    """Update a job's on disk storage.
2196

2197
    After a job has been modified, this function needs to be called in
2198
    order to write the changes to disk and replicate them to the other
2199
    nodes.
2200

2201
    @type job: L{_QueuedJob}
2202
    @param job: the changed job
2203
    @type replicate: boolean
2204
    @param replicate: whether to replicate the change to remote nodes
2205

2206
    """
2207
    if __debug__:
2208
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2209
      assert (finalized ^ (job.end_timestamp is None))
2210
      assert job.writable, "Can't update read-only job"
2211

    
2212
    filename = self._GetJobPath(job.id)
2213
    data = serializer.DumpJson(job.Serialize(), indent=False)
2214
    logging.debug("Writing job %s to %s", job.id, filename)
2215
    self._UpdateJobQueueFile(filename, data, replicate)
2216

    
2217
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2218
                        timeout):
2219
    """Waits for changes in a job.
2220

2221
    @type job_id: string
2222
    @param job_id: Job identifier
2223
    @type fields: list of strings
2224
    @param fields: Which fields to check for changes
2225
    @type prev_job_info: list or None
2226
    @param prev_job_info: Last job information returned
2227
    @type prev_log_serial: int
2228
    @param prev_log_serial: Last job message serial number
2229
    @type timeout: float
2230
    @param timeout: maximum time to wait in seconds
2231
    @rtype: tuple (job info, log entries)
2232
    @return: a tuple of the job information as required via
2233
        the fields parameter, and the log entries as a list
2234

2235
        if the job has not changed and the timeout has expired,
2236
        we instead return a special value,
2237
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2238
        as such by the clients
2239

2240
    """
2241
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2242
                             writable=False)
2243

    
2244
    helper = _WaitForJobChangesHelper()
2245

    
2246
    return helper(self._GetJobPath(job_id), load_fn,
2247
                  fields, prev_job_info, prev_log_serial, timeout)
2248

    
2249
  @locking.ssynchronized(_LOCK)
2250
  @_RequireOpenQueue
2251
  def CancelJob(self, job_id):
2252
    """Cancels a job.
2253

2254
    This will only succeed if the job has not started yet.
2255

2256
    @type job_id: string
2257
    @param job_id: job ID of job to be cancelled.
2258

2259
    """
2260
    logging.info("Cancelling job %s", job_id)
2261

    
2262
    job = self._LoadJobUnlocked(job_id)
2263
    if not job:
2264
      logging.debug("Job %s not found", job_id)
2265
      return (False, "Job %s not found" % job_id)
2266

    
2267
    assert job.writable, "Can't cancel read-only job"
2268

    
2269
    (success, msg) = job.Cancel()
2270

    
2271
    if success:
2272
      # If the job was finalized (e.g. cancelled), this is the final write
2273
      # allowed. The job can be archived anytime.
2274
      self.UpdateJobUnlocked(job)
2275

    
2276
    return (success, msg)
2277

    
2278
  @_RequireOpenQueue
2279
  def _ArchiveJobsUnlocked(self, jobs):
2280
    """Archives jobs.
2281

2282
    @type jobs: list of L{_QueuedJob}
2283
    @param jobs: Job objects
2284
    @rtype: int
2285
    @return: Number of archived jobs
2286

2287
    """
2288
    archive_jobs = []
2289
    rename_files = []
2290
    for job in jobs:
2291
      assert job.writable, "Can't archive read-only job"
2292

    
2293
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2294
        logging.debug("Job %s is not yet done", job.id)
2295
        continue
2296

    
2297
      archive_jobs.append(job)
2298

    
2299
      old = self._GetJobPath(job.id)
2300
      new = self._GetArchivedJobPath(job.id)
2301
      rename_files.append((old, new))
2302

    
2303
    # TODO: What if 1..n files fail to rename?
2304
    self._RenameFilesUnlocked(rename_files)
2305

    
2306
    logging.debug("Successfully archived job(s) %s",
2307
                  utils.CommaJoin(job.id for job in archive_jobs))
2308

    
2309
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2310
    # the files, we update the cached queue size from the filesystem. When we
2311
    # get around to fix the TODO: above, we can use the number of actually
2312
    # archived jobs to fix this.
2313
    self._UpdateQueueSizeUnlocked()
2314
    return len(archive_jobs)
2315

    
2316
  @locking.ssynchronized(_LOCK)
2317
  @_RequireOpenQueue
2318
  def ArchiveJob(self, job_id):
2319
    """Archives a job.
2320

2321
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2322

2323
    @type job_id: string
2324
    @param job_id: Job ID of job to be archived.
2325
    @rtype: bool
2326
    @return: Whether job was archived
2327

2328
    """
2329
    logging.info("Archiving job %s", job_id)
2330

    
2331
    job = self._LoadJobUnlocked(job_id)
2332
    if not job:
2333
      logging.debug("Job %s not found", job_id)
2334
      return False
2335

    
2336
    return self._ArchiveJobsUnlocked([job]) == 1
2337

    
2338
  @locking.ssynchronized(_LOCK)
2339
  @_RequireOpenQueue
2340
  def AutoArchiveJobs(self, age, timeout):
2341
    """Archives all jobs based on age.
2342

2343
    The method will archive all jobs which are older than the age
2344
    parameter. For jobs that don't have an end timestamp, the start
2345
    timestamp will be considered. The special '-1' age will cause
2346
    archival of all jobs (that are not running or queued).
2347

2348
    @type age: int
2349
    @param age: the minimum age in seconds
2350

2351
    """
2352
    logging.info("Archiving jobs with age more than %s seconds", age)
2353

    
2354
    now = time.time()
2355
    end_time = now + timeout
2356
    archived_count = 0
2357
    last_touched = 0
2358

    
2359
    all_job_ids = self._GetJobIDsUnlocked()
2360
    pending = []
2361
    for idx, job_id in enumerate(all_job_ids):
2362
      last_touched = idx + 1
2363

    
2364
      # Not optimal because jobs could be pending
2365
      # TODO: Measure average duration for job archival and take number of
2366
      # pending jobs into account.
2367
      if time.time() > end_time:
2368
        break
2369

    
2370
      # Returns None if the job failed to load
2371
      job = self._LoadJobUnlocked(job_id)
2372
      if job:
2373
        if job.end_timestamp is None:
2374
          if job.start_timestamp is None:
2375
            job_age = job.received_timestamp
2376
          else:
2377
            job_age = job.start_timestamp
2378
        else:
2379
          job_age = job.end_timestamp
2380

    
2381
        if age == -1 or now - job_age[0] > age:
2382
          pending.append(job)
2383

    
2384
          # Archive 10 jobs at a time
2385
          if len(pending) >= 10:
2386
            archived_count += self._ArchiveJobsUnlocked(pending)
2387
            pending = []
2388

    
2389
    if pending:
2390
      archived_count += self._ArchiveJobsUnlocked(pending)
2391

    
2392
    return (archived_count, len(all_job_ids) - last_touched)
2393

    
2394
  def QueryJobs(self, job_ids, fields):
2395
    """Returns a list of jobs in queue.
2396

2397
    @type job_ids: list
2398
    @param job_ids: sequence of job identifiers or None for all
2399
    @type fields: list
2400
    @param fields: names of fields to return
2401
    @rtype: list
2402
    @return: list one element per job, each element being list with
2403
        the requested fields
2404

2405
    """
2406
    jobs = []
2407
    list_all = False
2408
    if not job_ids:
2409
      # Since files are added to/removed from the queue atomically, there's no
2410
      # risk of getting the job ids in an inconsistent state.
2411
      job_ids = self._GetJobIDsUnlocked()
2412
      list_all = True
2413

    
2414
    for job_id in job_ids:
2415
      job = self.SafeLoadJobFromDisk(job_id, True)
2416
      if job is not None:
2417
        jobs.append(job.GetInfo(fields))
2418
      elif not list_all:
2419
        jobs.append(None)
2420

    
2421
    return jobs
2422

    
2423
  @locking.ssynchronized(_LOCK)
2424
  @_RequireOpenQueue
2425
  def Shutdown(self):
2426
    """Stops the job queue.
2427

2428
    This shutdowns all the worker threads an closes the queue.
2429

2430
    """
2431
    self._wpool.TerminateWorkers()
2432

    
2433
    self._queue_filelock.Close()
2434
    self._queue_filelock = None