Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 63b4bb1e

History | View | Annotate | Download (73.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38

    
39
try:
40
  # pylint: disable=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59
from ganeti import ht
60

    
61

    
62
JOBQUEUE_THREADS = 25
63
JOBS_PER_ARCHIVE_DIRECTORY = 10000
64

    
65
# member lock names to be passed to @ssynchronized decorator
66
_LOCK = "_lock"
67
_QUEUE = "_queue"
68

    
69

    
70
class CancelJob(Exception):
71
  """Special exception to cancel a job.
72

73
  """
74

    
75

    
76
def TimeStampNow():
77
  """Returns the current timestamp.
78

79
  @rtype: tuple
80
  @return: the current time in the (seconds, microseconds) format
81

82
  """
83
  return utils.SplitTime(time.time())
84

    
85

    
86
class _QueuedOpCode(object):
87
  """Encapsulates an opcode object.
88

89
  @ivar log: holds the execution log and consists of tuples
90
  of the form C{(log_serial, timestamp, level, message)}
91
  @ivar input: the OpCode we encapsulate
92
  @ivar status: the current status
93
  @ivar result: the result of the LU execution
94
  @ivar start_timestamp: timestamp for the start of the execution
95
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96
  @ivar stop_timestamp: timestamp for the end of the execution
97

98
  """
99
  __slots__ = ["input", "status", "result", "log", "priority",
100
               "start_timestamp", "exec_timestamp", "end_timestamp",
101
               "__weakref__"]
102

    
103
  def __init__(self, op):
104
    """Constructor for the _QuededOpCode.
105

106
    @type op: L{opcodes.OpCode}
107
    @param op: the opcode we encapsulate
108

109
    """
110
    self.input = op
111
    self.status = constants.OP_STATUS_QUEUED
112
    self.result = None
113
    self.log = []
114
    self.start_timestamp = None
115
    self.exec_timestamp = None
116
    self.end_timestamp = None
117

    
118
    # Get initial priority (it might change during the lifetime of this opcode)
119
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120

    
121
  @classmethod
122
  def Restore(cls, state):
123
    """Restore the _QueuedOpCode from the serialized form.
124

125
    @type state: dict
126
    @param state: the serialized state
127
    @rtype: _QueuedOpCode
128
    @return: a new _QueuedOpCode instance
129

130
    """
131
    obj = _QueuedOpCode.__new__(cls)
132
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133
    obj.status = state["status"]
134
    obj.result = state["result"]
135
    obj.log = state["log"]
136
    obj.start_timestamp = state.get("start_timestamp", None)
137
    obj.exec_timestamp = state.get("exec_timestamp", None)
138
    obj.end_timestamp = state.get("end_timestamp", None)
139
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140
    return obj
141

    
142
  def Serialize(self):
143
    """Serializes this _QueuedOpCode.
144

145
    @rtype: dict
146
    @return: the dictionary holding the serialized state
147

148
    """
149
    return {
150
      "input": self.input.__getstate__(),
151
      "status": self.status,
152
      "result": self.result,
153
      "log": self.log,
154
      "start_timestamp": self.start_timestamp,
155
      "exec_timestamp": self.exec_timestamp,
156
      "end_timestamp": self.end_timestamp,
157
      "priority": self.priority,
158
      }
159

    
160

    
161
class _QueuedJob(object):
162
  """In-memory job representation.
163

164
  This is what we use to track the user-submitted jobs. Locking must
165
  be taken care of by users of this class.
166

167
  @type queue: L{JobQueue}
168
  @ivar queue: the parent queue
169
  @ivar id: the job ID
170
  @type ops: list
171
  @ivar ops: the list of _QueuedOpCode that constitute the job
172
  @type log_serial: int
173
  @ivar log_serial: holds the index for the next log entry
174
  @ivar received_timestamp: the timestamp for when the job was received
175
  @ivar start_timestmap: the timestamp for start of execution
176
  @ivar end_timestamp: the timestamp for end of execution
177
  @ivar writable: Whether the job is allowed to be modified
178

179
  """
180
  # pylint: disable=W0212
181
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182
               "received_timestamp", "start_timestamp", "end_timestamp",
183
               "__weakref__", "processor_lock", "writable"]
184

    
185
  def __init__(self, queue, job_id, ops, writable):
186
    """Constructor for the _QueuedJob.
187

188
    @type queue: L{JobQueue}
189
    @param queue: our parent queue
190
    @type job_id: job_id
191
    @param job_id: our job id
192
    @type ops: list
193
    @param ops: the list of opcodes we hold, which will be encapsulated
194
        in _QueuedOpCodes
195
    @type writable: bool
196
    @param writable: Whether job can be modified
197

198
    """
199
    if not ops:
200
      raise errors.GenericError("A job needs at least one opcode")
201

    
202
    self.queue = queue
203
    self.id = job_id
204
    self.ops = [_QueuedOpCode(op) for op in ops]
205
    self.log_serial = 0
206
    self.received_timestamp = TimeStampNow()
207
    self.start_timestamp = None
208
    self.end_timestamp = None
209

    
210
    self._InitInMemory(self, writable)
211

    
212
  @staticmethod
213
  def _InitInMemory(obj, writable):
214
    """Initializes in-memory variables.
215

216
    """
217
    obj.writable = writable
218
    obj.ops_iter = None
219
    obj.cur_opctx = None
220

    
221
    # Read-only jobs are not processed and therefore don't need a lock
222
    if writable:
223
      obj.processor_lock = threading.Lock()
224
    else:
225
      obj.processor_lock = None
226

    
227
  def __repr__(self):
228
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
229
              "id=%s" % self.id,
230
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
231

    
232
    return "<%s at %#x>" % (" ".join(status), id(self))
233

    
234
  @classmethod
235
  def Restore(cls, queue, state, writable):
236
    """Restore a _QueuedJob from serialized state:
237

238
    @type queue: L{JobQueue}
239
    @param queue: to which queue the restored job belongs
240
    @type state: dict
241
    @param state: the serialized state
242
    @type writable: bool
243
    @param writable: Whether job can be modified
244
    @rtype: _JobQueue
245
    @return: the restored _JobQueue instance
246

247
    """
248
    obj = _QueuedJob.__new__(cls)
249
    obj.queue = queue
250
    obj.id = state["id"]
251
    obj.received_timestamp = state.get("received_timestamp", None)
252
    obj.start_timestamp = state.get("start_timestamp", None)
253
    obj.end_timestamp = state.get("end_timestamp", None)
254

    
255
    obj.ops = []
256
    obj.log_serial = 0
257
    for op_state in state["ops"]:
258
      op = _QueuedOpCode.Restore(op_state)
259
      for log_entry in op.log:
260
        obj.log_serial = max(obj.log_serial, log_entry[0])
261
      obj.ops.append(op)
262

    
263
    cls._InitInMemory(obj, writable)
264

    
265
    return obj
266

    
267
  def Serialize(self):
268
    """Serialize the _JobQueue instance.
269

270
    @rtype: dict
271
    @return: the serialized state
272

273
    """
274
    return {
275
      "id": self.id,
276
      "ops": [op.Serialize() for op in self.ops],
277
      "start_timestamp": self.start_timestamp,
278
      "end_timestamp": self.end_timestamp,
279
      "received_timestamp": self.received_timestamp,
280
      }
281

    
282
  def CalcStatus(self):
283
    """Compute the status of this job.
284

285
    This function iterates over all the _QueuedOpCodes in the job and
286
    based on their status, computes the job status.
287

288
    The algorithm is:
289
      - if we find a cancelled, or finished with error, the job
290
        status will be the same
291
      - otherwise, the last opcode with the status one of:
292
          - waitlock
293
          - canceling
294
          - running
295

296
        will determine the job status
297

298
      - otherwise, it means either all opcodes are queued, or success,
299
        and the job status will be the same
300

301
    @return: the job status
302

303
    """
304
    status = constants.JOB_STATUS_QUEUED
305

    
306
    all_success = True
307
    for op in self.ops:
308
      if op.status == constants.OP_STATUS_SUCCESS:
309
        continue
310

    
311
      all_success = False
312

    
313
      if op.status == constants.OP_STATUS_QUEUED:
314
        pass
315
      elif op.status == constants.OP_STATUS_WAITING:
316
        status = constants.JOB_STATUS_WAITING
317
      elif op.status == constants.OP_STATUS_RUNNING:
318
        status = constants.JOB_STATUS_RUNNING
319
      elif op.status == constants.OP_STATUS_CANCELING:
320
        status = constants.JOB_STATUS_CANCELING
321
        break
322
      elif op.status == constants.OP_STATUS_ERROR:
323
        status = constants.JOB_STATUS_ERROR
324
        # The whole job fails if one opcode failed
325
        break
326
      elif op.status == constants.OP_STATUS_CANCELED:
327
        status = constants.OP_STATUS_CANCELED
328
        break
329

    
330
    if all_success:
331
      status = constants.JOB_STATUS_SUCCESS
332

    
333
    return status
334

    
335
  def CalcPriority(self):
336
    """Gets the current priority for this job.
337

338
    Only unfinished opcodes are considered. When all are done, the default
339
    priority is used.
340

341
    @rtype: int
342

343
    """
344
    priorities = [op.priority for op in self.ops
345
                  if op.status not in constants.OPS_FINALIZED]
346

    
347
    if not priorities:
348
      # All opcodes are done, assume default priority
349
      return constants.OP_PRIO_DEFAULT
350

    
351
    return min(priorities)
352

    
353
  def GetLogEntries(self, newer_than):
354
    """Selectively returns the log entries.
355

356
    @type newer_than: None or int
357
    @param newer_than: if this is None, return all log entries,
358
        otherwise return only the log entries with serial higher
359
        than this value
360
    @rtype: list
361
    @return: the list of the log entries selected
362

363
    """
364
    if newer_than is None:
365
      serial = -1
366
    else:
367
      serial = newer_than
368

    
369
    entries = []
370
    for op in self.ops:
371
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
372

    
373
    return entries
374

    
375
  def GetInfo(self, fields):
376
    """Returns information about a job.
377

378
    @type fields: list
379
    @param fields: names of fields to return
380
    @rtype: list
381
    @return: list with one element for each field
382
    @raise errors.OpExecError: when an invalid field
383
        has been passed
384

385
    """
386
    row = []
387
    for fname in fields:
388
      if fname == "id":
389
        row.append(self.id)
390
      elif fname == "status":
391
        row.append(self.CalcStatus())
392
      elif fname == "priority":
393
        row.append(self.CalcPriority())
394
      elif fname == "ops":
395
        row.append([op.input.__getstate__() for op in self.ops])
396
      elif fname == "opresult":
397
        row.append([op.result for op in self.ops])
398
      elif fname == "opstatus":
399
        row.append([op.status for op in self.ops])
400
      elif fname == "oplog":
401
        row.append([op.log for op in self.ops])
402
      elif fname == "opstart":
403
        row.append([op.start_timestamp for op in self.ops])
404
      elif fname == "opexec":
405
        row.append([op.exec_timestamp for op in self.ops])
406
      elif fname == "opend":
407
        row.append([op.end_timestamp for op in self.ops])
408
      elif fname == "oppriority":
409
        row.append([op.priority for op in self.ops])
410
      elif fname == "received_ts":
411
        row.append(self.received_timestamp)
412
      elif fname == "start_ts":
413
        row.append(self.start_timestamp)
414
      elif fname == "end_ts":
415
        row.append(self.end_timestamp)
416
      elif fname == "summary":
417
        row.append([op.input.Summary() for op in self.ops])
418
      else:
419
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
420
    return row
421

    
422
  def MarkUnfinishedOps(self, status, result):
423
    """Mark unfinished opcodes with a given status and result.
424

425
    This is an utility function for marking all running or waiting to
426
    be run opcodes with a given status. Opcodes which are already
427
    finalised are not changed.
428

429
    @param status: a given opcode status
430
    @param result: the opcode result
431

432
    """
433
    not_marked = True
434
    for op in self.ops:
435
      if op.status in constants.OPS_FINALIZED:
436
        assert not_marked, "Finalized opcodes found after non-finalized ones"
437
        continue
438
      op.status = status
439
      op.result = result
440
      not_marked = False
441

    
442
  def Finalize(self):
443
    """Marks the job as finalized.
444

445
    """
446
    self.end_timestamp = TimeStampNow()
447

    
448
  def Cancel(self):
449
    """Marks job as canceled/-ing if possible.
450

451
    @rtype: tuple; (bool, string)
452
    @return: Boolean describing whether job was successfully canceled or marked
453
      as canceling and a text message
454

455
    """
456
    status = self.CalcStatus()
457

    
458
    if status == constants.JOB_STATUS_QUEUED:
459
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
460
                             "Job canceled by request")
461
      self.Finalize()
462
      return (True, "Job %s canceled" % self.id)
463

    
464
    elif status == constants.JOB_STATUS_WAITING:
465
      # The worker will notice the new status and cancel the job
466
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467
      return (True, "Job %s will be canceled" % self.id)
468

    
469
    else:
470
      logging.debug("Job %s is no longer waiting in the queue", self.id)
471
      return (False, "Job %s is no longer waiting in the queue" % self.id)
472

    
473

    
474
class _OpExecCallbacks(mcpu.OpExecCbBase):
475
  def __init__(self, queue, job, op):
476
    """Initializes this class.
477

478
    @type queue: L{JobQueue}
479
    @param queue: Job queue
480
    @type job: L{_QueuedJob}
481
    @param job: Job object
482
    @type op: L{_QueuedOpCode}
483
    @param op: OpCode
484

485
    """
486
    assert queue, "Queue is missing"
487
    assert job, "Job is missing"
488
    assert op, "Opcode is missing"
489

    
490
    self._queue = queue
491
    self._job = job
492
    self._op = op
493

    
494
  def _CheckCancel(self):
495
    """Raises an exception to cancel the job if asked to.
496

497
    """
498
    # Cancel here if we were asked to
499
    if self._op.status == constants.OP_STATUS_CANCELING:
500
      logging.debug("Canceling opcode")
501
      raise CancelJob()
502

    
503
  @locking.ssynchronized(_QUEUE, shared=1)
504
  def NotifyStart(self):
505
    """Mark the opcode as running, not lock-waiting.
506

507
    This is called from the mcpu code as a notifier function, when the LU is
508
    finally about to start the Exec() method. Of course, to have end-user
509
    visible results, the opcode must be initially (before calling into
510
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
511

512
    """
513
    assert self._op in self._job.ops
514
    assert self._op.status in (constants.OP_STATUS_WAITING,
515
                               constants.OP_STATUS_CANCELING)
516

    
517
    # Cancel here if we were asked to
518
    self._CheckCancel()
519

    
520
    logging.debug("Opcode is now running")
521

    
522
    self._op.status = constants.OP_STATUS_RUNNING
523
    self._op.exec_timestamp = TimeStampNow()
524

    
525
    # And finally replicate the job status
526
    self._queue.UpdateJobUnlocked(self._job)
527

    
528
  @locking.ssynchronized(_QUEUE, shared=1)
529
  def _AppendFeedback(self, timestamp, log_type, log_msg):
530
    """Internal feedback append function, with locks
531

532
    """
533
    self._job.log_serial += 1
534
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
535
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
536

    
537
  def Feedback(self, *args):
538
    """Append a log entry.
539

540
    """
541
    assert len(args) < 3
542

    
543
    if len(args) == 1:
544
      log_type = constants.ELOG_MESSAGE
545
      log_msg = args[0]
546
    else:
547
      (log_type, log_msg) = args
548

    
549
    # The time is split to make serialization easier and not lose
550
    # precision.
551
    timestamp = utils.SplitTime(time.time())
552
    self._AppendFeedback(timestamp, log_type, log_msg)
553

    
554
  def CheckCancel(self):
555
    """Check whether job has been cancelled.
556

557
    """
558
    assert self._op.status in (constants.OP_STATUS_WAITING,
559
                               constants.OP_STATUS_CANCELING)
560

    
561
    # Cancel here if we were asked to
562
    self._CheckCancel()
563

    
564
  def SubmitManyJobs(self, jobs):
565
    """Submits jobs for processing.
566

567
    See L{JobQueue.SubmitManyJobs}.
568

569
    """
570
    # Locking is done in job queue
571
    return self._queue.SubmitManyJobs(jobs)
572

    
573

    
574
class _JobChangesChecker(object):
575
  def __init__(self, fields, prev_job_info, prev_log_serial):
576
    """Initializes this class.
577

578
    @type fields: list of strings
579
    @param fields: Fields requested by LUXI client
580
    @type prev_job_info: string
581
    @param prev_job_info: previous job info, as passed by the LUXI client
582
    @type prev_log_serial: string
583
    @param prev_log_serial: previous job serial, as passed by the LUXI client
584

585
    """
586
    self._fields = fields
587
    self._prev_job_info = prev_job_info
588
    self._prev_log_serial = prev_log_serial
589

    
590
  def __call__(self, job):
591
    """Checks whether job has changed.
592

593
    @type job: L{_QueuedJob}
594
    @param job: Job object
595

596
    """
597
    assert not job.writable, "Expected read-only job"
598

    
599
    status = job.CalcStatus()
600
    job_info = job.GetInfo(self._fields)
601
    log_entries = job.GetLogEntries(self._prev_log_serial)
602

    
603
    # Serializing and deserializing data can cause type changes (e.g. from
604
    # tuple to list) or precision loss. We're doing it here so that we get
605
    # the same modifications as the data received from the client. Without
606
    # this, the comparison afterwards might fail without the data being
607
    # significantly different.
608
    # TODO: we just deserialized from disk, investigate how to make sure that
609
    # the job info and log entries are compatible to avoid this further step.
610
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
611
    # efficient, though floats will be tricky
612
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
613
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
614

    
615
    # Don't even try to wait if the job is no longer running, there will be
616
    # no changes.
617
    if (status not in (constants.JOB_STATUS_QUEUED,
618
                       constants.JOB_STATUS_RUNNING,
619
                       constants.JOB_STATUS_WAITING) or
620
        job_info != self._prev_job_info or
621
        (log_entries and self._prev_log_serial != log_entries[0][0])):
622
      logging.debug("Job %s changed", job.id)
623
      return (job_info, log_entries)
624

    
625
    return None
626

    
627

    
628
class _JobFileChangesWaiter(object):
629
  def __init__(self, filename):
630
    """Initializes this class.
631

632
    @type filename: string
633
    @param filename: Path to job file
634
    @raises errors.InotifyError: if the notifier cannot be setup
635

636
    """
637
    self._wm = pyinotify.WatchManager()
638
    self._inotify_handler = \
639
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
640
    self._notifier = \
641
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
642
    try:
643
      self._inotify_handler.enable()
644
    except Exception:
645
      # pyinotify doesn't close file descriptors automatically
646
      self._notifier.stop()
647
      raise
648

    
649
  def _OnInotify(self, notifier_enabled):
650
    """Callback for inotify.
651

652
    """
653
    if not notifier_enabled:
654
      self._inotify_handler.enable()
655

    
656
  def Wait(self, timeout):
657
    """Waits for the job file to change.
658

659
    @type timeout: float
660
    @param timeout: Timeout in seconds
661
    @return: Whether there have been events
662

663
    """
664
    assert timeout >= 0
665
    have_events = self._notifier.check_events(timeout * 1000)
666
    if have_events:
667
      self._notifier.read_events()
668
    self._notifier.process_events()
669
    return have_events
670

    
671
  def Close(self):
672
    """Closes underlying notifier and its file descriptor.
673

674
    """
675
    self._notifier.stop()
676

    
677

    
678
class _JobChangesWaiter(object):
679
  def __init__(self, filename):
680
    """Initializes this class.
681

682
    @type filename: string
683
    @param filename: Path to job file
684

685
    """
686
    self._filewaiter = None
687
    self._filename = filename
688

    
689
  def Wait(self, timeout):
690
    """Waits for a job to change.
691

692
    @type timeout: float
693
    @param timeout: Timeout in seconds
694
    @return: Whether there have been events
695

696
    """
697
    if self._filewaiter:
698
      return self._filewaiter.Wait(timeout)
699

    
700
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
701
    # If this point is reached, return immediately and let caller check the job
702
    # file again in case there were changes since the last check. This avoids a
703
    # race condition.
704
    self._filewaiter = _JobFileChangesWaiter(self._filename)
705

    
706
    return True
707

    
708
  def Close(self):
709
    """Closes underlying waiter.
710

711
    """
712
    if self._filewaiter:
713
      self._filewaiter.Close()
714

    
715

    
716
class _WaitForJobChangesHelper(object):
717
  """Helper class using inotify to wait for changes in a job file.
718

719
  This class takes a previous job status and serial, and alerts the client when
720
  the current job status has changed.
721

722
  """
723
  @staticmethod
724
  def _CheckForChanges(counter, job_load_fn, check_fn):
725
    if counter.next() > 0:
726
      # If this isn't the first check the job is given some more time to change
727
      # again. This gives better performance for jobs generating many
728
      # changes/messages.
729
      time.sleep(0.1)
730

    
731
    job = job_load_fn()
732
    if not job:
733
      raise errors.JobLost()
734

    
735
    result = check_fn(job)
736
    if result is None:
737
      raise utils.RetryAgain()
738

    
739
    return result
740

    
741
  def __call__(self, filename, job_load_fn,
742
               fields, prev_job_info, prev_log_serial, timeout):
743
    """Waits for changes on a job.
744

745
    @type filename: string
746
    @param filename: File on which to wait for changes
747
    @type job_load_fn: callable
748
    @param job_load_fn: Function to load job
749
    @type fields: list of strings
750
    @param fields: Which fields to check for changes
751
    @type prev_job_info: list or None
752
    @param prev_job_info: Last job information returned
753
    @type prev_log_serial: int
754
    @param prev_log_serial: Last job message serial number
755
    @type timeout: float
756
    @param timeout: maximum time to wait in seconds
757

758
    """
759
    counter = itertools.count()
760
    try:
761
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
762
      waiter = _JobChangesWaiter(filename)
763
      try:
764
        return utils.Retry(compat.partial(self._CheckForChanges,
765
                                          counter, job_load_fn, check_fn),
766
                           utils.RETRY_REMAINING_TIME, timeout,
767
                           wait_fn=waiter.Wait)
768
      finally:
769
        waiter.Close()
770
    except (errors.InotifyError, errors.JobLost):
771
      return None
772
    except utils.RetryTimeout:
773
      return constants.JOB_NOTCHANGED
774

    
775

    
776
def _EncodeOpError(err):
777
  """Encodes an error which occurred while processing an opcode.
778

779
  """
780
  if isinstance(err, errors.GenericError):
781
    to_encode = err
782
  else:
783
    to_encode = errors.OpExecError(str(err))
784

    
785
  return errors.EncodeException(to_encode)
786

    
787

    
788
class _TimeoutStrategyWrapper:
789
  def __init__(self, fn):
790
    """Initializes this class.
791

792
    """
793
    self._fn = fn
794
    self._next = None
795

    
796
  def _Advance(self):
797
    """Gets the next timeout if necessary.
798

799
    """
800
    if self._next is None:
801
      self._next = self._fn()
802

    
803
  def Peek(self):
804
    """Returns the next timeout.
805

806
    """
807
    self._Advance()
808
    return self._next
809

    
810
  def Next(self):
811
    """Returns the current timeout and advances the internal state.
812

813
    """
814
    self._Advance()
815
    result = self._next
816
    self._next = None
817
    return result
818

    
819

    
820
class _OpExecContext:
821
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
822
    """Initializes this class.
823

824
    """
825
    self.op = op
826
    self.index = index
827
    self.log_prefix = log_prefix
828
    self.summary = op.input.Summary()
829

    
830
    # Create local copy to modify
831
    if getattr(op.input, opcodes.DEPEND_ATTR, None):
832
      self.jobdeps = op.input.depends[:]
833
    else:
834
      self.jobdeps = None
835

    
836
    self._timeout_strategy_factory = timeout_strategy_factory
837
    self._ResetTimeoutStrategy()
838

    
839
  def _ResetTimeoutStrategy(self):
840
    """Creates a new timeout strategy.
841

842
    """
843
    self._timeout_strategy = \
844
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
845

    
846
  def CheckPriorityIncrease(self):
847
    """Checks whether priority can and should be increased.
848

849
    Called when locks couldn't be acquired.
850

851
    """
852
    op = self.op
853

    
854
    # Exhausted all retries and next round should not use blocking acquire
855
    # for locks?
856
    if (self._timeout_strategy.Peek() is None and
857
        op.priority > constants.OP_PRIO_HIGHEST):
858
      logging.debug("Increasing priority")
859
      op.priority -= 1
860
      self._ResetTimeoutStrategy()
861
      return True
862

    
863
    return False
864

    
865
  def GetNextLockTimeout(self):
866
    """Returns the next lock acquire timeout.
867

868
    """
869
    return self._timeout_strategy.Next()
870

    
871

    
872
class _JobProcessor(object):
873
  (DEFER,
874
   WAITDEP,
875
   FINISHED) = range(1, 4)
876

    
877
  def __init__(self, queue, opexec_fn, job,
878
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
879
    """Initializes this class.
880

881
    """
882
    self.queue = queue
883
    self.opexec_fn = opexec_fn
884
    self.job = job
885
    self._timeout_strategy_factory = _timeout_strategy_factory
886

    
887
  @staticmethod
888
  def _FindNextOpcode(job, timeout_strategy_factory):
889
    """Locates the next opcode to run.
890

891
    @type job: L{_QueuedJob}
892
    @param job: Job object
893
    @param timeout_strategy_factory: Callable to create new timeout strategy
894

895
    """
896
    # Create some sort of a cache to speed up locating next opcode for future
897
    # lookups
898
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
899
    # pending and one for processed ops.
900
    if job.ops_iter is None:
901
      job.ops_iter = enumerate(job.ops)
902

    
903
    # Find next opcode to run
904
    while True:
905
      try:
906
        (idx, op) = job.ops_iter.next()
907
      except StopIteration:
908
        raise errors.ProgrammerError("Called for a finished job")
909

    
910
      if op.status == constants.OP_STATUS_RUNNING:
911
        # Found an opcode already marked as running
912
        raise errors.ProgrammerError("Called for job marked as running")
913

    
914
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
915
                             timeout_strategy_factory)
916

    
917
      if op.status not in constants.OPS_FINALIZED:
918
        return opctx
919

    
920
      # This is a job that was partially completed before master daemon
921
      # shutdown, so it can be expected that some opcodes are already
922
      # completed successfully (if any did error out, then the whole job
923
      # should have been aborted and not resubmitted for processing).
924
      logging.info("%s: opcode %s already processed, skipping",
925
                   opctx.log_prefix, opctx.summary)
926

    
927
  @staticmethod
928
  def _MarkWaitlock(job, op):
929
    """Marks an opcode as waiting for locks.
930

931
    The job's start timestamp is also set if necessary.
932

933
    @type job: L{_QueuedJob}
934
    @param job: Job object
935
    @type op: L{_QueuedOpCode}
936
    @param op: Opcode object
937

938
    """
939
    assert op in job.ops
940
    assert op.status in (constants.OP_STATUS_QUEUED,
941
                         constants.OP_STATUS_WAITING)
942

    
943
    update = False
944

    
945
    op.result = None
946

    
947
    if op.status == constants.OP_STATUS_QUEUED:
948
      op.status = constants.OP_STATUS_WAITING
949
      update = True
950

    
951
    if op.start_timestamp is None:
952
      op.start_timestamp = TimeStampNow()
953
      update = True
954

    
955
    if job.start_timestamp is None:
956
      job.start_timestamp = op.start_timestamp
957
      update = True
958

    
959
    assert op.status == constants.OP_STATUS_WAITING
960

    
961
    return update
962

    
963
  @staticmethod
964
  def _CheckDependencies(queue, job, opctx):
965
    """Checks if an opcode has dependencies and if so, processes them.
966

967
    @type queue: L{JobQueue}
968
    @param queue: Queue object
969
    @type job: L{_QueuedJob}
970
    @param job: Job object
971
    @type opctx: L{_OpExecContext}
972
    @param opctx: Opcode execution context
973
    @rtype: bool
974
    @return: Whether opcode will be re-scheduled by dependency tracker
975

976
    """
977
    op = opctx.op
978

    
979
    result = False
980

    
981
    while opctx.jobdeps:
982
      (dep_job_id, dep_status) = opctx.jobdeps[0]
983

    
984
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
985
                                                          dep_status)
986
      assert ht.TNonEmptyString(depmsg), "No dependency message"
987

    
988
      logging.info("%s: %s", opctx.log_prefix, depmsg)
989

    
990
      if depresult == _JobDependencyManager.CONTINUE:
991
        # Remove dependency and continue
992
        opctx.jobdeps.pop(0)
993

    
994
      elif depresult == _JobDependencyManager.WAIT:
995
        # Need to wait for notification, dependency tracker will re-add job
996
        # to workerpool
997
        result = True
998
        break
999

    
1000
      elif depresult == _JobDependencyManager.CANCEL:
1001
        # Job was cancelled, cancel this job as well
1002
        job.Cancel()
1003
        assert op.status == constants.OP_STATUS_CANCELING
1004
        break
1005

    
1006
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1007
                         _JobDependencyManager.ERROR):
1008
        # Job failed or there was an error, this job must fail
1009
        op.status = constants.OP_STATUS_ERROR
1010
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1011
        break
1012

    
1013
      else:
1014
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1015
                                     depresult)
1016

    
1017
    return result
1018

    
1019
  def _ExecOpCodeUnlocked(self, opctx):
1020
    """Processes one opcode and returns the result.
1021

1022
    """
1023
    op = opctx.op
1024

    
1025
    assert op.status == constants.OP_STATUS_WAITING
1026

    
1027
    timeout = opctx.GetNextLockTimeout()
1028

    
1029
    try:
1030
      # Make sure not to hold queue lock while calling ExecOpCode
1031
      result = self.opexec_fn(op.input,
1032
                              _OpExecCallbacks(self.queue, self.job, op),
1033
                              timeout=timeout, priority=op.priority)
1034
    except mcpu.LockAcquireTimeout:
1035
      assert timeout is not None, "Received timeout for blocking acquire"
1036
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1037

    
1038
      assert op.status in (constants.OP_STATUS_WAITING,
1039
                           constants.OP_STATUS_CANCELING)
1040

    
1041
      # Was job cancelled while we were waiting for the lock?
1042
      if op.status == constants.OP_STATUS_CANCELING:
1043
        return (constants.OP_STATUS_CANCELING, None)
1044

    
1045
      # Stay in waitlock while trying to re-acquire lock
1046
      return (constants.OP_STATUS_WAITING, None)
1047
    except CancelJob:
1048
      logging.exception("%s: Canceling job", opctx.log_prefix)
1049
      assert op.status == constants.OP_STATUS_CANCELING
1050
      return (constants.OP_STATUS_CANCELING, None)
1051
    except Exception, err: # pylint: disable=W0703
1052
      logging.exception("%s: Caught exception in %s",
1053
                        opctx.log_prefix, opctx.summary)
1054
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1055
    else:
1056
      logging.debug("%s: %s successful",
1057
                    opctx.log_prefix, opctx.summary)
1058
      return (constants.OP_STATUS_SUCCESS, result)
1059

    
1060
  def __call__(self, _nextop_fn=None):
1061
    """Continues execution of a job.
1062

1063
    @param _nextop_fn: Callback function for tests
1064
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1065
      be deferred and C{WAITDEP} if the dependency manager
1066
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1067

1068
    """
1069
    queue = self.queue
1070
    job = self.job
1071

    
1072
    logging.debug("Processing job %s", job.id)
1073

    
1074
    queue.acquire(shared=1)
1075
    try:
1076
      opcount = len(job.ops)
1077

    
1078
      assert job.writable, "Expected writable job"
1079

    
1080
      # Don't do anything for finalized jobs
1081
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1082
        return self.FINISHED
1083

    
1084
      # Is a previous opcode still pending?
1085
      if job.cur_opctx:
1086
        opctx = job.cur_opctx
1087
        job.cur_opctx = None
1088
      else:
1089
        if __debug__ and _nextop_fn:
1090
          _nextop_fn()
1091
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1092

    
1093
      op = opctx.op
1094

    
1095
      # Consistency check
1096
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1097
                                     constants.OP_STATUS_CANCELING)
1098
                        for i in job.ops[opctx.index + 1:])
1099

    
1100
      assert op.status in (constants.OP_STATUS_QUEUED,
1101
                           constants.OP_STATUS_WAITING,
1102
                           constants.OP_STATUS_CANCELING)
1103

    
1104
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1105
              op.priority >= constants.OP_PRIO_HIGHEST)
1106

    
1107
      waitjob = None
1108

    
1109
      if op.status != constants.OP_STATUS_CANCELING:
1110
        assert op.status in (constants.OP_STATUS_QUEUED,
1111
                             constants.OP_STATUS_WAITING)
1112

    
1113
        # Prepare to start opcode
1114
        if self._MarkWaitlock(job, op):
1115
          # Write to disk
1116
          queue.UpdateJobUnlocked(job)
1117

    
1118
        assert op.status == constants.OP_STATUS_WAITING
1119
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1120
        assert job.start_timestamp and op.start_timestamp
1121
        assert waitjob is None
1122

    
1123
        # Check if waiting for a job is necessary
1124
        waitjob = self._CheckDependencies(queue, job, opctx)
1125

    
1126
        assert op.status in (constants.OP_STATUS_WAITING,
1127
                             constants.OP_STATUS_CANCELING,
1128
                             constants.OP_STATUS_ERROR)
1129

    
1130
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1131
                                         constants.OP_STATUS_ERROR)):
1132
          logging.info("%s: opcode %s waiting for locks",
1133
                       opctx.log_prefix, opctx.summary)
1134

    
1135
          assert not opctx.jobdeps, "Not all dependencies were removed"
1136

    
1137
          queue.release()
1138
          try:
1139
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1140
          finally:
1141
            queue.acquire(shared=1)
1142

    
1143
          op.status = op_status
1144
          op.result = op_result
1145

    
1146
          assert not waitjob
1147

    
1148
        if op.status == constants.OP_STATUS_WAITING:
1149
          # Couldn't get locks in time
1150
          assert not op.end_timestamp
1151
        else:
1152
          # Finalize opcode
1153
          op.end_timestamp = TimeStampNow()
1154

    
1155
          if op.status == constants.OP_STATUS_CANCELING:
1156
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1157
                                  for i in job.ops[opctx.index:])
1158
          else:
1159
            assert op.status in constants.OPS_FINALIZED
1160

    
1161
      if op.status == constants.OP_STATUS_WAITING or waitjob:
1162
        finalize = False
1163

    
1164
        if not waitjob and opctx.CheckPriorityIncrease():
1165
          # Priority was changed, need to update on-disk file
1166
          queue.UpdateJobUnlocked(job)
1167

    
1168
        # Keep around for another round
1169
        job.cur_opctx = opctx
1170

    
1171
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1172
                op.priority >= constants.OP_PRIO_HIGHEST)
1173

    
1174
        # In no case must the status be finalized here
1175
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1176

    
1177
      else:
1178
        # Ensure all opcodes so far have been successful
1179
        assert (opctx.index == 0 or
1180
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1181
                           for i in job.ops[:opctx.index]))
1182

    
1183
        # Reset context
1184
        job.cur_opctx = None
1185

    
1186
        if op.status == constants.OP_STATUS_SUCCESS:
1187
          finalize = False
1188

    
1189
        elif op.status == constants.OP_STATUS_ERROR:
1190
          # Ensure failed opcode has an exception as its result
1191
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1192

    
1193
          to_encode = errors.OpExecError("Preceding opcode failed")
1194
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1195
                                _EncodeOpError(to_encode))
1196
          finalize = True
1197

    
1198
          # Consistency check
1199
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1200
                            errors.GetEncodedError(i.result)
1201
                            for i in job.ops[opctx.index:])
1202

    
1203
        elif op.status == constants.OP_STATUS_CANCELING:
1204
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1205
                                "Job canceled by request")
1206
          finalize = True
1207

    
1208
        else:
1209
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1210

    
1211
        if opctx.index == (opcount - 1):
1212
          # Finalize on last opcode
1213
          finalize = True
1214

    
1215
        if finalize:
1216
          # All opcodes have been run, finalize job
1217
          job.Finalize()
1218

    
1219
        # Write to disk. If the job status is final, this is the final write
1220
        # allowed. Once the file has been written, it can be archived anytime.
1221
        queue.UpdateJobUnlocked(job)
1222

    
1223
        assert not waitjob
1224

    
1225
        if finalize:
1226
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1227
          return self.FINISHED
1228

    
1229
      assert not waitjob or queue.depmgr.JobWaiting(job)
1230

    
1231
      if waitjob:
1232
        return self.WAITDEP
1233
      else:
1234
        return self.DEFER
1235
    finally:
1236
      assert job.writable, "Job became read-only while being processed"
1237
      queue.release()
1238

    
1239

    
1240
def _EvaluateJobProcessorResult(depmgr, job, result):
1241
  """Looks at a result from L{_JobProcessor} for a job.
1242

1243
  To be used in a L{_JobQueueWorker}.
1244

1245
  """
1246
  if result == _JobProcessor.FINISHED:
1247
    # Notify waiting jobs
1248
    depmgr.NotifyWaiters(job.id)
1249

    
1250
  elif result == _JobProcessor.DEFER:
1251
    # Schedule again
1252
    raise workerpool.DeferTask(priority=job.CalcPriority())
1253

    
1254
  elif result == _JobProcessor.WAITDEP:
1255
    # No-op, dependency manager will re-schedule
1256
    pass
1257

    
1258
  else:
1259
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1260
                                 (result, ))
1261

    
1262

    
1263
class _JobQueueWorker(workerpool.BaseWorker):
1264
  """The actual job workers.
1265

1266
  """
1267
  def RunTask(self, job): # pylint: disable=W0221
1268
    """Job executor.
1269

1270
    @type job: L{_QueuedJob}
1271
    @param job: the job to be processed
1272

1273
    """
1274
    assert job.writable, "Expected writable job"
1275

    
1276
    # Ensure only one worker is active on a single job. If a job registers for
1277
    # a dependency job, and the other job notifies before the first worker is
1278
    # done, the job can end up in the tasklist more than once.
1279
    job.processor_lock.acquire()
1280
    try:
1281
      return self._RunTaskInner(job)
1282
    finally:
1283
      job.processor_lock.release()
1284

    
1285
  def _RunTaskInner(self, job):
1286
    """Executes a job.
1287

1288
    Must be called with per-job lock acquired.
1289

1290
    """
1291
    queue = job.queue
1292
    assert queue == self.pool.queue
1293

    
1294
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1295
    setname_fn(None)
1296

    
1297
    proc = mcpu.Processor(queue.context, job.id)
1298

    
1299
    # Create wrapper for setting thread name
1300
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1301
                                    proc.ExecOpCode)
1302

    
1303
    _EvaluateJobProcessorResult(queue.depmgr, job,
1304
                                _JobProcessor(queue, wrap_execop_fn, job)())
1305

    
1306
  @staticmethod
1307
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1308
    """Updates the worker thread name to include a short summary of the opcode.
1309

1310
    @param setname_fn: Callable setting worker thread name
1311
    @param execop_fn: Callable for executing opcode (usually
1312
                      L{mcpu.Processor.ExecOpCode})
1313

1314
    """
1315
    setname_fn(op)
1316
    try:
1317
      return execop_fn(op, *args, **kwargs)
1318
    finally:
1319
      setname_fn(None)
1320

    
1321
  @staticmethod
1322
  def _GetWorkerName(job, op):
1323
    """Sets the worker thread name.
1324

1325
    @type job: L{_QueuedJob}
1326
    @type op: L{opcodes.OpCode}
1327

1328
    """
1329
    parts = ["Job%s" % job.id]
1330

    
1331
    if op:
1332
      parts.append(op.TinySummary())
1333

    
1334
    return "/".join(parts)
1335

    
1336

    
1337
class _JobQueueWorkerPool(workerpool.WorkerPool):
1338
  """Simple class implementing a job-processing workerpool.
1339

1340
  """
1341
  def __init__(self, queue):
1342
    super(_JobQueueWorkerPool, self).__init__("Jq",
1343
                                              JOBQUEUE_THREADS,
1344
                                              _JobQueueWorker)
1345
    self.queue = queue
1346

    
1347

    
1348
class _JobDependencyManager:
1349
  """Keeps track of job dependencies.
1350

1351
  """
1352
  (WAIT,
1353
   ERROR,
1354
   CANCEL,
1355
   CONTINUE,
1356
   WRONGSTATUS) = range(1, 6)
1357

    
1358
  def __init__(self, getstatus_fn, enqueue_fn):
1359
    """Initializes this class.
1360

1361
    """
1362
    self._getstatus_fn = getstatus_fn
1363
    self._enqueue_fn = enqueue_fn
1364

    
1365
    self._waiters = {}
1366
    self._lock = locking.SharedLock("JobDepMgr")
1367

    
1368
  @locking.ssynchronized(_LOCK, shared=1)
1369
  def GetLockInfo(self, requested): # pylint: disable=W0613
1370
    """Retrieves information about waiting jobs.
1371

1372
    @type requested: set
1373
    @param requested: Requested information, see C{query.LQ_*}
1374

1375
    """
1376
    # No need to sort here, that's being done by the lock manager and query
1377
    # library. There are no priorities for notifying jobs, hence all show up as
1378
    # one item under "pending".
1379
    return [("job/%s" % job_id, None, None,
1380
             [("job", [job.id for job in waiters])])
1381
            for job_id, waiters in self._waiters.items()
1382
            if waiters]
1383

    
1384
  @locking.ssynchronized(_LOCK, shared=1)
1385
  def JobWaiting(self, job):
1386
    """Checks if a job is waiting.
1387

1388
    """
1389
    return compat.any(job in jobs
1390
                      for jobs in self._waiters.values())
1391

    
1392
  @locking.ssynchronized(_LOCK)
1393
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1394
    """Checks if a dependency job has the requested status.
1395

1396
    If the other job is not yet in a finalized status, the calling job will be
1397
    notified (re-added to the workerpool) at a later point.
1398

1399
    @type job: L{_QueuedJob}
1400
    @param job: Job object
1401
    @type dep_job_id: string
1402
    @param dep_job_id: ID of dependency job
1403
    @type dep_status: list
1404
    @param dep_status: Required status
1405

1406
    """
1407
    assert ht.TString(job.id)
1408
    assert ht.TString(dep_job_id)
1409
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1410

    
1411
    if job.id == dep_job_id:
1412
      return (self.ERROR, "Job can't depend on itself")
1413

    
1414
    # Get status of dependency job
1415
    try:
1416
      status = self._getstatus_fn(dep_job_id)
1417
    except errors.JobLost, err:
1418
      return (self.ERROR, "Dependency error: %s" % err)
1419

    
1420
    assert status in constants.JOB_STATUS_ALL
1421

    
1422
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1423

    
1424
    if status not in constants.JOBS_FINALIZED:
1425
      # Register for notification and wait for job to finish
1426
      job_id_waiters.add(job)
1427
      return (self.WAIT,
1428
              "Need to wait for job %s, wanted status '%s'" %
1429
              (dep_job_id, dep_status))
1430

    
1431
    # Remove from waiters list
1432
    if job in job_id_waiters:
1433
      job_id_waiters.remove(job)
1434

    
1435
    if (status == constants.JOB_STATUS_CANCELED and
1436
        constants.JOB_STATUS_CANCELED not in dep_status):
1437
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1438

    
1439
    elif not dep_status or status in dep_status:
1440
      return (self.CONTINUE,
1441
              "Dependency job %s finished with status '%s'" %
1442
              (dep_job_id, status))
1443

    
1444
    else:
1445
      return (self.WRONGSTATUS,
1446
              "Dependency job %s finished with status '%s',"
1447
              " not one of '%s' as required" %
1448
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1449

    
1450
  def _RemoveEmptyWaitersUnlocked(self):
1451
    """Remove all jobs without actual waiters.
1452

1453
    """
1454
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1455
                   if not waiters]:
1456
      del self._waiters[job_id]
1457

    
1458
  def NotifyWaiters(self, job_id):
1459
    """Notifies all jobs waiting for a certain job ID.
1460

1461
    @attention: Do not call until L{CheckAndRegister} returned a status other
1462
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1463
    @type job_id: string
1464
    @param job_id: Job ID
1465

1466
    """
1467
    assert ht.TString(job_id)
1468

    
1469
    self._lock.acquire()
1470
    try:
1471
      self._RemoveEmptyWaitersUnlocked()
1472

    
1473
      jobs = self._waiters.pop(job_id, None)
1474
    finally:
1475
      self._lock.release()
1476

    
1477
    if jobs:
1478
      # Re-add jobs to workerpool
1479
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1480
                    len(jobs), job_id)
1481
      self._enqueue_fn(jobs)
1482

    
1483

    
1484
def _RequireOpenQueue(fn):
1485
  """Decorator for "public" functions.
1486

1487
  This function should be used for all 'public' functions. That is,
1488
  functions usually called from other classes. Note that this should
1489
  be applied only to methods (not plain functions), since it expects
1490
  that the decorated function is called with a first argument that has
1491
  a '_queue_filelock' argument.
1492

1493
  @warning: Use this decorator only after locking.ssynchronized
1494

1495
  Example::
1496
    @locking.ssynchronized(_LOCK)
1497
    @_RequireOpenQueue
1498
    def Example(self):
1499
      pass
1500

1501
  """
1502
  def wrapper(self, *args, **kwargs):
1503
    # pylint: disable=W0212
1504
    assert self._queue_filelock is not None, "Queue should be open"
1505
    return fn(self, *args, **kwargs)
1506
  return wrapper
1507

    
1508

    
1509
def _RequireNonDrainedQueue(fn):
1510
  """Decorator checking for a non-drained queue.
1511

1512
  To be used with functions submitting new jobs.
1513

1514
  """
1515
  def wrapper(self, *args, **kwargs):
1516
    """Wrapper function.
1517

1518
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1519

1520
    """
1521
    # Ok when sharing the big job queue lock, as the drain file is created when
1522
    # the lock is exclusive.
1523
    # Needs access to protected member, pylint: disable=W0212
1524
    if self._drained:
1525
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1526

    
1527
    if not self._accepting_jobs:
1528
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1529

    
1530
    return fn(self, *args, **kwargs)
1531
  return wrapper
1532

    
1533

    
1534
class JobQueue(object):
1535
  """Queue used to manage the jobs.
1536

1537
  """
1538
  def __init__(self, context):
1539
    """Constructor for JobQueue.
1540

1541
    The constructor will initialize the job queue object and then
1542
    start loading the current jobs from disk, either for starting them
1543
    (if they were queue) or for aborting them (if they were already
1544
    running).
1545

1546
    @type context: GanetiContext
1547
    @param context: the context object for access to the configuration
1548
        data and other ganeti objects
1549

1550
    """
1551
    self.context = context
1552
    self._memcache = weakref.WeakValueDictionary()
1553
    self._my_hostname = netutils.Hostname.GetSysName()
1554

    
1555
    # The Big JobQueue lock. If a code block or method acquires it in shared
1556
    # mode safe it must guarantee concurrency with all the code acquiring it in
1557
    # shared mode, including itself. In order not to acquire it at all
1558
    # concurrency must be guaranteed with all code acquiring it in shared mode
1559
    # and all code acquiring it exclusively.
1560
    self._lock = locking.SharedLock("JobQueue")
1561

    
1562
    self.acquire = self._lock.acquire
1563
    self.release = self._lock.release
1564

    
1565
    # Accept jobs by default
1566
    self._accepting_jobs = True
1567

    
1568
    # Initialize the queue, and acquire the filelock.
1569
    # This ensures no other process is working on the job queue.
1570
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1571

    
1572
    # Read serial file
1573
    self._last_serial = jstore.ReadSerial()
1574
    assert self._last_serial is not None, ("Serial file was modified between"
1575
                                           " check in jstore and here")
1576

    
1577
    # Get initial list of nodes
1578
    self._nodes = dict((n.name, n.primary_ip)
1579
                       for n in self.context.cfg.GetAllNodesInfo().values()
1580
                       if n.master_candidate)
1581

    
1582
    # Remove master node
1583
    self._nodes.pop(self._my_hostname, None)
1584

    
1585
    # TODO: Check consistency across nodes
1586

    
1587
    self._queue_size = None
1588
    self._UpdateQueueSizeUnlocked()
1589
    assert ht.TInt(self._queue_size)
1590
    self._drained = jstore.CheckDrainFlag()
1591

    
1592
    # Job dependencies
1593
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1594
                                        self._EnqueueJobs)
1595
    self.context.glm.AddToLockMonitor(self.depmgr)
1596

    
1597
    # Setup worker pool
1598
    self._wpool = _JobQueueWorkerPool(self)
1599
    try:
1600
      self._InspectQueue()
1601
    except:
1602
      self._wpool.TerminateWorkers()
1603
      raise
1604

    
1605
  @locking.ssynchronized(_LOCK)
1606
  @_RequireOpenQueue
1607
  def _InspectQueue(self):
1608
    """Loads the whole job queue and resumes unfinished jobs.
1609

1610
    This function needs the lock here because WorkerPool.AddTask() may start a
1611
    job while we're still doing our work.
1612

1613
    """
1614
    logging.info("Inspecting job queue")
1615

    
1616
    restartjobs = []
1617

    
1618
    all_job_ids = self._GetJobIDsUnlocked()
1619
    jobs_count = len(all_job_ids)
1620
    lastinfo = time.time()
1621
    for idx, job_id in enumerate(all_job_ids):
1622
      # Give an update every 1000 jobs or 10 seconds
1623
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1624
          idx == (jobs_count - 1)):
1625
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1626
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1627
        lastinfo = time.time()
1628

    
1629
      job = self._LoadJobUnlocked(job_id)
1630

    
1631
      # a failure in loading the job can cause 'None' to be returned
1632
      if job is None:
1633
        continue
1634

    
1635
      status = job.CalcStatus()
1636

    
1637
      if status == constants.JOB_STATUS_QUEUED:
1638
        restartjobs.append(job)
1639

    
1640
      elif status in (constants.JOB_STATUS_RUNNING,
1641
                      constants.JOB_STATUS_WAITING,
1642
                      constants.JOB_STATUS_CANCELING):
1643
        logging.warning("Unfinished job %s found: %s", job.id, job)
1644

    
1645
        if status == constants.JOB_STATUS_WAITING:
1646
          # Restart job
1647
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1648
          restartjobs.append(job)
1649
        else:
1650
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1651
                                "Unclean master daemon shutdown")
1652
          job.Finalize()
1653

    
1654
        self.UpdateJobUnlocked(job)
1655

    
1656
    if restartjobs:
1657
      logging.info("Restarting %s jobs", len(restartjobs))
1658
      self._EnqueueJobsUnlocked(restartjobs)
1659

    
1660
    logging.info("Job queue inspection finished")
1661

    
1662
  def _GetRpc(self, address_list):
1663
    """Gets RPC runner with context.
1664

1665
    """
1666
    return rpc.JobQueueRunner(self.context, address_list)
1667

    
1668
  @locking.ssynchronized(_LOCK)
1669
  @_RequireOpenQueue
1670
  def AddNode(self, node):
1671
    """Register a new node with the queue.
1672

1673
    @type node: L{objects.Node}
1674
    @param node: the node object to be added
1675

1676
    """
1677
    node_name = node.name
1678
    assert node_name != self._my_hostname
1679

    
1680
    # Clean queue directory on added node
1681
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1682
    msg = result.fail_msg
1683
    if msg:
1684
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1685
                      node_name, msg)
1686

    
1687
    if not node.master_candidate:
1688
      # remove if existing, ignoring errors
1689
      self._nodes.pop(node_name, None)
1690
      # and skip the replication of the job ids
1691
      return
1692

    
1693
    # Upload the whole queue excluding archived jobs
1694
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1695

    
1696
    # Upload current serial file
1697
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1698

    
1699
    # Static address list
1700
    addrs = [node.primary_ip]
1701

    
1702
    for file_name in files:
1703
      # Read file content
1704
      content = utils.ReadFile(file_name)
1705

    
1706
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1707
                                                        content)
1708
      msg = result[node_name].fail_msg
1709
      if msg:
1710
        logging.error("Failed to upload file %s to node %s: %s",
1711
                      file_name, node_name, msg)
1712

    
1713
    self._nodes[node_name] = node.primary_ip
1714

    
1715
  @locking.ssynchronized(_LOCK)
1716
  @_RequireOpenQueue
1717
  def RemoveNode(self, node_name):
1718
    """Callback called when removing nodes from the cluster.
1719

1720
    @type node_name: str
1721
    @param node_name: the name of the node to remove
1722

1723
    """
1724
    self._nodes.pop(node_name, None)
1725

    
1726
  @staticmethod
1727
  def _CheckRpcResult(result, nodes, failmsg):
1728
    """Verifies the status of an RPC call.
1729

1730
    Since we aim to keep consistency should this node (the current
1731
    master) fail, we will log errors if our rpc fail, and especially
1732
    log the case when more than half of the nodes fails.
1733

1734
    @param result: the data as returned from the rpc call
1735
    @type nodes: list
1736
    @param nodes: the list of nodes we made the call to
1737
    @type failmsg: str
1738
    @param failmsg: the identifier to be used for logging
1739

1740
    """
1741
    failed = []
1742
    success = []
1743

    
1744
    for node in nodes:
1745
      msg = result[node].fail_msg
1746
      if msg:
1747
        failed.append(node)
1748
        logging.error("RPC call %s (%s) failed on node %s: %s",
1749
                      result[node].call, failmsg, node, msg)
1750
      else:
1751
        success.append(node)
1752

    
1753
    # +1 for the master node
1754
    if (len(success) + 1) < len(failed):
1755
      # TODO: Handle failing nodes
1756
      logging.error("More than half of the nodes failed")
1757

    
1758
  def _GetNodeIp(self):
1759
    """Helper for returning the node name/ip list.
1760

1761
    @rtype: (list, list)
1762
    @return: a tuple of two lists, the first one with the node
1763
        names and the second one with the node addresses
1764

1765
    """
1766
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1767
    name_list = self._nodes.keys()
1768
    addr_list = [self._nodes[name] for name in name_list]
1769
    return name_list, addr_list
1770

    
1771
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1772
    """Writes a file locally and then replicates it to all nodes.
1773

1774
    This function will replace the contents of a file on the local
1775
    node and then replicate it to all the other nodes we have.
1776

1777
    @type file_name: str
1778
    @param file_name: the path of the file to be replicated
1779
    @type data: str
1780
    @param data: the new contents of the file
1781
    @type replicate: boolean
1782
    @param replicate: whether to spread the changes to the remote nodes
1783

1784
    """
1785
    getents = runtime.GetEnts()
1786
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1787
                    gid=getents.masterd_gid)
1788

    
1789
    if replicate:
1790
      names, addrs = self._GetNodeIp()
1791
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1792
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1793

    
1794
  def _RenameFilesUnlocked(self, rename):
1795
    """Renames a file locally and then replicate the change.
1796

1797
    This function will rename a file in the local queue directory
1798
    and then replicate this rename to all the other nodes we have.
1799

1800
    @type rename: list of (old, new)
1801
    @param rename: List containing tuples mapping old to new names
1802

1803
    """
1804
    # Rename them locally
1805
    for old, new in rename:
1806
      utils.RenameFile(old, new, mkdir=True)
1807

    
1808
    # ... and on all nodes
1809
    names, addrs = self._GetNodeIp()
1810
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1811
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1812

    
1813
  @staticmethod
1814
  def _FormatJobID(job_id):
1815
    """Convert a job ID to string format.
1816

1817
    Currently this just does C{str(job_id)} after performing some
1818
    checks, but if we want to change the job id format this will
1819
    abstract this change.
1820

1821
    @type job_id: int or long
1822
    @param job_id: the numeric job id
1823
    @rtype: str
1824
    @return: the formatted job id
1825

1826
    """
1827
    if not isinstance(job_id, (int, long)):
1828
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1829
    if job_id < 0:
1830
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1831

    
1832
    return str(job_id)
1833

    
1834
  @classmethod
1835
  def _GetArchiveDirectory(cls, job_id):
1836
    """Returns the archive directory for a job.
1837

1838
    @type job_id: str
1839
    @param job_id: Job identifier
1840
    @rtype: str
1841
    @return: Directory name
1842

1843
    """
1844
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1845

    
1846
  def _NewSerialsUnlocked(self, count):
1847
    """Generates a new job identifier.
1848

1849
    Job identifiers are unique during the lifetime of a cluster.
1850

1851
    @type count: integer
1852
    @param count: how many serials to return
1853
    @rtype: str
1854
    @return: a string representing the job identifier.
1855

1856
    """
1857
    assert ht.TPositiveInt(count)
1858

    
1859
    # New number
1860
    serial = self._last_serial + count
1861

    
1862
    # Write to file
1863
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1864
                             "%s\n" % serial, True)
1865

    
1866
    result = [self._FormatJobID(v)
1867
              for v in range(self._last_serial + 1, serial + 1)]
1868

    
1869
    # Keep it only if we were able to write the file
1870
    self._last_serial = serial
1871

    
1872
    assert len(result) == count
1873

    
1874
    return result
1875

    
1876
  @staticmethod
1877
  def _GetJobPath(job_id):
1878
    """Returns the job file for a given job id.
1879

1880
    @type job_id: str
1881
    @param job_id: the job identifier
1882
    @rtype: str
1883
    @return: the path to the job file
1884

1885
    """
1886
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1887

    
1888
  @classmethod
1889
  def _GetArchivedJobPath(cls, job_id):
1890
    """Returns the archived job file for a give job id.
1891

1892
    @type job_id: str
1893
    @param job_id: the job identifier
1894
    @rtype: str
1895
    @return: the path to the archived job file
1896

1897
    """
1898
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1899
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1900

    
1901
  @staticmethod
1902
  def _GetJobIDsUnlocked(sort=True):
1903
    """Return all known job IDs.
1904

1905
    The method only looks at disk because it's a requirement that all
1906
    jobs are present on disk (so in the _memcache we don't have any
1907
    extra IDs).
1908

1909
    @type sort: boolean
1910
    @param sort: perform sorting on the returned job ids
1911
    @rtype: list
1912
    @return: the list of job IDs
1913

1914
    """
1915
    jlist = []
1916
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1917
      m = constants.JOB_FILE_RE.match(filename)
1918
      if m:
1919
        jlist.append(m.group(1))
1920
    if sort:
1921
      jlist = utils.NiceSort(jlist)
1922
    return jlist
1923

    
1924
  def _LoadJobUnlocked(self, job_id):
1925
    """Loads a job from the disk or memory.
1926

1927
    Given a job id, this will return the cached job object if
1928
    existing, or try to load the job from the disk. If loading from
1929
    disk, it will also add the job to the cache.
1930

1931
    @param job_id: the job id
1932
    @rtype: L{_QueuedJob} or None
1933
    @return: either None or the job object
1934

1935
    """
1936
    job = self._memcache.get(job_id, None)
1937
    if job:
1938
      logging.debug("Found job %s in memcache", job_id)
1939
      assert job.writable, "Found read-only job in memcache"
1940
      return job
1941

    
1942
    try:
1943
      job = self._LoadJobFromDisk(job_id, False)
1944
      if job is None:
1945
        return job
1946
    except errors.JobFileCorrupted:
1947
      old_path = self._GetJobPath(job_id)
1948
      new_path = self._GetArchivedJobPath(job_id)
1949
      if old_path == new_path:
1950
        # job already archived (future case)
1951
        logging.exception("Can't parse job %s", job_id)
1952
      else:
1953
        # non-archived case
1954
        logging.exception("Can't parse job %s, will archive.", job_id)
1955
        self._RenameFilesUnlocked([(old_path, new_path)])
1956
      return None
1957

    
1958
    assert job.writable, "Job just loaded is not writable"
1959

    
1960
    self._memcache[job_id] = job
1961
    logging.debug("Added job %s to the cache", job_id)
1962
    return job
1963

    
1964
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1965
    """Load the given job file from disk.
1966

1967
    Given a job file, read, load and restore it in a _QueuedJob format.
1968

1969
    @type job_id: string
1970
    @param job_id: job identifier
1971
    @type try_archived: bool
1972
    @param try_archived: Whether to try loading an archived job
1973
    @rtype: L{_QueuedJob} or None
1974
    @return: either None or the job object
1975

1976
    """
1977
    path_functions = [(self._GetJobPath, True)]
1978

    
1979
    if try_archived:
1980
      path_functions.append((self._GetArchivedJobPath, False))
1981

    
1982
    raw_data = None
1983
    writable_default = None
1984

    
1985
    for (fn, writable_default) in path_functions:
1986
      filepath = fn(job_id)
1987
      logging.debug("Loading job from %s", filepath)
1988
      try:
1989
        raw_data = utils.ReadFile(filepath)
1990
      except EnvironmentError, err:
1991
        if err.errno != errno.ENOENT:
1992
          raise
1993
      else:
1994
        break
1995

    
1996
    if not raw_data:
1997
      return None
1998

    
1999
    if writable is None:
2000
      writable = writable_default
2001

    
2002
    try:
2003
      data = serializer.LoadJson(raw_data)
2004
      job = _QueuedJob.Restore(self, data, writable)
2005
    except Exception, err: # pylint: disable=W0703
2006
      raise errors.JobFileCorrupted(err)
2007

    
2008
    return job
2009

    
2010
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2011
    """Load the given job file from disk.
2012

2013
    Given a job file, read, load and restore it in a _QueuedJob format.
2014
    In case of error reading the job, it gets returned as None, and the
2015
    exception is logged.
2016

2017
    @type job_id: string
2018
    @param job_id: job identifier
2019
    @type try_archived: bool
2020
    @param try_archived: Whether to try loading an archived job
2021
    @rtype: L{_QueuedJob} or None
2022
    @return: either None or the job object
2023

2024
    """
2025
    try:
2026
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2027
    except (errors.JobFileCorrupted, EnvironmentError):
2028
      logging.exception("Can't load/parse job %s", job_id)
2029
      return None
2030

    
2031
  def _UpdateQueueSizeUnlocked(self):
2032
    """Update the queue size.
2033

2034
    """
2035
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2036

    
2037
  @locking.ssynchronized(_LOCK)
2038
  @_RequireOpenQueue
2039
  def SetDrainFlag(self, drain_flag):
2040
    """Sets the drain flag for the queue.
2041

2042
    @type drain_flag: boolean
2043
    @param drain_flag: Whether to set or unset the drain flag
2044

2045
    """
2046
    jstore.SetDrainFlag(drain_flag)
2047

    
2048
    self._drained = drain_flag
2049

    
2050
    return True
2051

    
2052
  @_RequireOpenQueue
2053
  def _SubmitJobUnlocked(self, job_id, ops):
2054
    """Create and store a new job.
2055

2056
    This enters the job into our job queue and also puts it on the new
2057
    queue, in order for it to be picked up by the queue processors.
2058

2059
    @type job_id: job ID
2060
    @param job_id: the job ID for the new job
2061
    @type ops: list
2062
    @param ops: The list of OpCodes that will become the new job.
2063
    @rtype: L{_QueuedJob}
2064
    @return: the job object to be queued
2065
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2066
    @raise errors.GenericError: If an opcode is not valid
2067

2068
    """
2069
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2070
      raise errors.JobQueueFull()
2071

    
2072
    job = _QueuedJob(self, job_id, ops, True)
2073

    
2074
    # Check priority
2075
    for idx, op in enumerate(job.ops):
2076
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2077
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2078
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2079
                                  " are %s" % (idx, op.priority, allowed))
2080

    
2081
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2082
      if not opcodes.TNoRelativeJobDependencies(dependencies):
2083
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2084
                                  " match %s: %s" %
2085
                                  (idx, opcodes.TNoRelativeJobDependencies,
2086
                                   dependencies))
2087

    
2088
    # Write to disk
2089
    self.UpdateJobUnlocked(job)
2090

    
2091
    self._queue_size += 1
2092

    
2093
    logging.debug("Adding new job %s to the cache", job_id)
2094
    self._memcache[job_id] = job
2095

    
2096
    return job
2097

    
2098
  @locking.ssynchronized(_LOCK)
2099
  @_RequireOpenQueue
2100
  @_RequireNonDrainedQueue
2101
  def SubmitJob(self, ops):
2102
    """Create and store a new job.
2103

2104
    @see: L{_SubmitJobUnlocked}
2105

2106
    """
2107
    (job_id, ) = self._NewSerialsUnlocked(1)
2108
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2109
    return job_id
2110

    
2111
  @locking.ssynchronized(_LOCK)
2112
  @_RequireOpenQueue
2113
  @_RequireNonDrainedQueue
2114
  def SubmitManyJobs(self, jobs):
2115
    """Create and store multiple jobs.
2116

2117
    @see: L{_SubmitJobUnlocked}
2118

2119
    """
2120
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2121

    
2122
    (results, added_jobs) = \
2123
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2124

    
2125
    self._EnqueueJobsUnlocked(added_jobs)
2126

    
2127
    return results
2128

    
2129
  @staticmethod
2130
  def _FormatSubmitError(msg, ops):
2131
    """Formats errors which occurred while submitting a job.
2132

2133
    """
2134
    return ("%s; opcodes %s" %
2135
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2136

    
2137
  @staticmethod
2138
  def _ResolveJobDependencies(resolve_fn, deps):
2139
    """Resolves relative job IDs in dependencies.
2140

2141
    @type resolve_fn: callable
2142
    @param resolve_fn: Function to resolve a relative job ID
2143
    @type deps: list
2144
    @param deps: Dependencies
2145
    @rtype: list
2146
    @return: Resolved dependencies
2147

2148
    """
2149
    result = []
2150

    
2151
    for (dep_job_id, dep_status) in deps:
2152
      if ht.TRelativeJobId(dep_job_id):
2153
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2154
        try:
2155
          job_id = resolve_fn(dep_job_id)
2156
        except IndexError:
2157
          # Abort
2158
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2159
      else:
2160
        job_id = dep_job_id
2161

    
2162
      result.append((job_id, dep_status))
2163

    
2164
    return (True, result)
2165

    
2166
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2167
    """Create and store multiple jobs.
2168

2169
    @see: L{_SubmitJobUnlocked}
2170

2171
    """
2172
    results = []
2173
    added_jobs = []
2174

    
2175
    def resolve_fn(job_idx, reljobid):
2176
      assert reljobid < 0
2177
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2178

    
2179
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2180
      for op in ops:
2181
        if getattr(op, opcodes.DEPEND_ATTR, None):
2182
          (status, data) = \
2183
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2184
                                         op.depends)
2185
          if not status:
2186
            # Abort resolving dependencies
2187
            assert ht.TNonEmptyString(data), "No error message"
2188
            break
2189
          # Use resolved dependencies
2190
          op.depends = data
2191
      else:
2192
        try:
2193
          job = self._SubmitJobUnlocked(job_id, ops)
2194
        except errors.GenericError, err:
2195
          status = False
2196
          data = self._FormatSubmitError(str(err), ops)
2197
        else:
2198
          status = True
2199
          data = job_id
2200
          added_jobs.append(job)
2201

    
2202
      results.append((status, data))
2203

    
2204
    return (results, added_jobs)
2205

    
2206
  @locking.ssynchronized(_LOCK)
2207
  def _EnqueueJobs(self, jobs):
2208
    """Helper function to add jobs to worker pool's queue.
2209

2210
    @type jobs: list
2211
    @param jobs: List of all jobs
2212

2213
    """
2214
    return self._EnqueueJobsUnlocked(jobs)
2215

    
2216
  def _EnqueueJobsUnlocked(self, jobs):
2217
    """Helper function to add jobs to worker pool's queue.
2218

2219
    @type jobs: list
2220
    @param jobs: List of all jobs
2221

2222
    """
2223
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2224
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2225
                             priority=[job.CalcPriority() for job in jobs])
2226

    
2227
  def _GetJobStatusForDependencies(self, job_id):
2228
    """Gets the status of a job for dependencies.
2229

2230
    @type job_id: string
2231
    @param job_id: Job ID
2232
    @raise errors.JobLost: If job can't be found
2233

2234
    """
2235
    if not isinstance(job_id, basestring):
2236
      job_id = self._FormatJobID(job_id)
2237

    
2238
    # Not using in-memory cache as doing so would require an exclusive lock
2239

    
2240
    # Try to load from disk
2241
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2242

    
2243
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2244

    
2245
    if job:
2246
      return job.CalcStatus()
2247

    
2248
    raise errors.JobLost("Job %s not found" % job_id)
2249

    
2250
  @_RequireOpenQueue
2251
  def UpdateJobUnlocked(self, job, replicate=True):
2252
    """Update a job's on disk storage.
2253

2254
    After a job has been modified, this function needs to be called in
2255
    order to write the changes to disk and replicate them to the other
2256
    nodes.
2257

2258
    @type job: L{_QueuedJob}
2259
    @param job: the changed job
2260
    @type replicate: boolean
2261
    @param replicate: whether to replicate the change to remote nodes
2262

2263
    """
2264
    if __debug__:
2265
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2266
      assert (finalized ^ (job.end_timestamp is None))
2267
      assert job.writable, "Can't update read-only job"
2268

    
2269
    filename = self._GetJobPath(job.id)
2270
    data = serializer.DumpJson(job.Serialize())
2271
    logging.debug("Writing job %s to %s", job.id, filename)
2272
    self._UpdateJobQueueFile(filename, data, replicate)
2273

    
2274
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2275
                        timeout):
2276
    """Waits for changes in a job.
2277

2278
    @type job_id: string
2279
    @param job_id: Job identifier
2280
    @type fields: list of strings
2281
    @param fields: Which fields to check for changes
2282
    @type prev_job_info: list or None
2283
    @param prev_job_info: Last job information returned
2284
    @type prev_log_serial: int
2285
    @param prev_log_serial: Last job message serial number
2286
    @type timeout: float
2287
    @param timeout: maximum time to wait in seconds
2288
    @rtype: tuple (job info, log entries)
2289
    @return: a tuple of the job information as required via
2290
        the fields parameter, and the log entries as a list
2291

2292
        if the job has not changed and the timeout has expired,
2293
        we instead return a special value,
2294
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2295
        as such by the clients
2296

2297
    """
2298
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2299
                             writable=False)
2300

    
2301
    helper = _WaitForJobChangesHelper()
2302

    
2303
    return helper(self._GetJobPath(job_id), load_fn,
2304
                  fields, prev_job_info, prev_log_serial, timeout)
2305

    
2306
  @locking.ssynchronized(_LOCK)
2307
  @_RequireOpenQueue
2308
  def CancelJob(self, job_id):
2309
    """Cancels a job.
2310

2311
    This will only succeed if the job has not started yet.
2312

2313
    @type job_id: string
2314
    @param job_id: job ID of job to be cancelled.
2315

2316
    """
2317
    logging.info("Cancelling job %s", job_id)
2318

    
2319
    job = self._LoadJobUnlocked(job_id)
2320
    if not job:
2321
      logging.debug("Job %s not found", job_id)
2322
      return (False, "Job %s not found" % job_id)
2323

    
2324
    assert job.writable, "Can't cancel read-only job"
2325

    
2326
    (success, msg) = job.Cancel()
2327

    
2328
    if success:
2329
      # If the job was finalized (e.g. cancelled), this is the final write
2330
      # allowed. The job can be archived anytime.
2331
      self.UpdateJobUnlocked(job)
2332

    
2333
    return (success, msg)
2334

    
2335
  @_RequireOpenQueue
2336
  def _ArchiveJobsUnlocked(self, jobs):
2337
    """Archives jobs.
2338

2339
    @type jobs: list of L{_QueuedJob}
2340
    @param jobs: Job objects
2341
    @rtype: int
2342
    @return: Number of archived jobs
2343

2344
    """
2345
    archive_jobs = []
2346
    rename_files = []
2347
    for job in jobs:
2348
      assert job.writable, "Can't archive read-only job"
2349

    
2350
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2351
        logging.debug("Job %s is not yet done", job.id)
2352
        continue
2353

    
2354
      archive_jobs.append(job)
2355

    
2356
      old = self._GetJobPath(job.id)
2357
      new = self._GetArchivedJobPath(job.id)
2358
      rename_files.append((old, new))
2359

    
2360
    # TODO: What if 1..n files fail to rename?
2361
    self._RenameFilesUnlocked(rename_files)
2362

    
2363
    logging.debug("Successfully archived job(s) %s",
2364
                  utils.CommaJoin(job.id for job in archive_jobs))
2365

    
2366
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2367
    # the files, we update the cached queue size from the filesystem. When we
2368
    # get around to fix the TODO: above, we can use the number of actually
2369
    # archived jobs to fix this.
2370
    self._UpdateQueueSizeUnlocked()
2371
    return len(archive_jobs)
2372

    
2373
  @locking.ssynchronized(_LOCK)
2374
  @_RequireOpenQueue
2375
  def ArchiveJob(self, job_id):
2376
    """Archives a job.
2377

2378
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2379

2380
    @type job_id: string
2381
    @param job_id: Job ID of job to be archived.
2382
    @rtype: bool
2383
    @return: Whether job was archived
2384

2385
    """
2386
    logging.info("Archiving job %s", job_id)
2387

    
2388
    job = self._LoadJobUnlocked(job_id)
2389
    if not job:
2390
      logging.debug("Job %s not found", job_id)
2391
      return False
2392

    
2393
    return self._ArchiveJobsUnlocked([job]) == 1
2394

    
2395
  @locking.ssynchronized(_LOCK)
2396
  @_RequireOpenQueue
2397
  def AutoArchiveJobs(self, age, timeout):
2398
    """Archives all jobs based on age.
2399

2400
    The method will archive all jobs which are older than the age
2401
    parameter. For jobs that don't have an end timestamp, the start
2402
    timestamp will be considered. The special '-1' age will cause
2403
    archival of all jobs (that are not running or queued).
2404

2405
    @type age: int
2406
    @param age: the minimum age in seconds
2407

2408
    """
2409
    logging.info("Archiving jobs with age more than %s seconds", age)
2410

    
2411
    now = time.time()
2412
    end_time = now + timeout
2413
    archived_count = 0
2414
    last_touched = 0
2415

    
2416
    all_job_ids = self._GetJobIDsUnlocked()
2417
    pending = []
2418
    for idx, job_id in enumerate(all_job_ids):
2419
      last_touched = idx + 1
2420

    
2421
      # Not optimal because jobs could be pending
2422
      # TODO: Measure average duration for job archival and take number of
2423
      # pending jobs into account.
2424
      if time.time() > end_time:
2425
        break
2426

    
2427
      # Returns None if the job failed to load
2428
      job = self._LoadJobUnlocked(job_id)
2429
      if job:
2430
        if job.end_timestamp is None:
2431
          if job.start_timestamp is None:
2432
            job_age = job.received_timestamp
2433
          else:
2434
            job_age = job.start_timestamp
2435
        else:
2436
          job_age = job.end_timestamp
2437

    
2438
        if age == -1 or now - job_age[0] > age:
2439
          pending.append(job)
2440

    
2441
          # Archive 10 jobs at a time
2442
          if len(pending) >= 10:
2443
            archived_count += self._ArchiveJobsUnlocked(pending)
2444
            pending = []
2445

    
2446
    if pending:
2447
      archived_count += self._ArchiveJobsUnlocked(pending)
2448

    
2449
    return (archived_count, len(all_job_ids) - last_touched)
2450

    
2451
  def QueryJobs(self, job_ids, fields):
2452
    """Returns a list of jobs in queue.
2453

2454
    @type job_ids: list
2455
    @param job_ids: sequence of job identifiers or None for all
2456
    @type fields: list
2457
    @param fields: names of fields to return
2458
    @rtype: list
2459
    @return: list one element per job, each element being list with
2460
        the requested fields
2461

2462
    """
2463
    jobs = []
2464
    list_all = False
2465
    if not job_ids:
2466
      # Since files are added to/removed from the queue atomically, there's no
2467
      # risk of getting the job ids in an inconsistent state.
2468
      job_ids = self._GetJobIDsUnlocked()
2469
      list_all = True
2470

    
2471
    for job_id in job_ids:
2472
      job = self.SafeLoadJobFromDisk(job_id, True)
2473
      if job is not None:
2474
        jobs.append(job.GetInfo(fields))
2475
      elif not list_all:
2476
        jobs.append(None)
2477

    
2478
    return jobs
2479

    
2480
  @locking.ssynchronized(_LOCK)
2481
  def PrepareShutdown(self):
2482
    """Prepare to stop the job queue.
2483

2484
    Disables execution of jobs in the workerpool and returns whether there are
2485
    any jobs currently running. If the latter is the case, the job queue is not
2486
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2487
    be called without interfering with any job. Queued and unfinished jobs will
2488
    be resumed next time.
2489

2490
    Once this function has been called no new job submissions will be accepted
2491
    (see L{_RequireNonDrainedQueue}).
2492

2493
    @rtype: bool
2494
    @return: Whether there are any running jobs
2495

2496
    """
2497
    if self._accepting_jobs:
2498
      self._accepting_jobs = False
2499

    
2500
      # Tell worker pool to stop processing pending tasks
2501
      self._wpool.SetActive(False)
2502

    
2503
    return self._wpool.HasRunningTasks()
2504

    
2505
  @locking.ssynchronized(_LOCK)
2506
  @_RequireOpenQueue
2507
  def Shutdown(self):
2508
    """Stops the job queue.
2509

2510
    This shutdowns all the worker threads an closes the queue.
2511

2512
    """
2513
    self._wpool.TerminateWorkers()
2514

    
2515
    self._queue_filelock.Close()
2516
    self._queue_filelock = None