Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue / __init__.py @ 58e4df3c

History | View | Annotate | Download (76.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import time
35
import weakref
36
import threading
37
import itertools
38
import operator
39
import os
40

    
41
try:
42
  # pylint: disable=E0611
43
  from pyinotify import pyinotify
44
except ImportError:
45
  import pyinotify
46

    
47
from ganeti import asyncnotifier
48
from ganeti import constants
49
from ganeti import serializer
50
from ganeti import workerpool
51
from ganeti import locking
52
from ganeti import luxi
53
from ganeti import opcodes
54
from ganeti import opcodes_base
55
from ganeti import errors
56
from ganeti import mcpu
57
from ganeti import utils
58
from ganeti import jstore
59
import ganeti.rpc.node as rpc
60
from ganeti import runtime
61
from ganeti import netutils
62
from ganeti import compat
63
from ganeti import ht
64
from ganeti import query
65
from ganeti import qlang
66
from ganeti import pathutils
67
from ganeti import vcluster
68

    
69

    
70
JOBQUEUE_THREADS = 1
71

    
72
# member lock names to be passed to @ssynchronized decorator
73
_LOCK = "_lock"
74
_QUEUE = "_queue"
75

    
76
#: Retrieves "id" attribute
77
_GetIdAttr = operator.attrgetter("id")
78

    
79

    
80
class CancelJob(Exception):
81
  """Special exception to cancel a job.
82

83
  """
84

    
85

    
86
class QueueShutdown(Exception):
87
  """Special exception to abort a job when the job queue is shutting down.
88

89
  """
90

    
91

    
92
def TimeStampNow():
93
  """Returns the current timestamp.
94

95
  @rtype: tuple
96
  @return: the current time in the (seconds, microseconds) format
97

98
  """
99
  return utils.SplitTime(time.time())
100

    
101

    
102
def _CallJqUpdate(runner, names, file_name, content):
103
  """Updates job queue file after virtualizing filename.
104

105
  """
106
  virt_file_name = vcluster.MakeVirtualPath(file_name)
107
  return runner.call_jobqueue_update(names, virt_file_name, content)
108

    
109

    
110
class _SimpleJobQuery:
111
  """Wrapper for job queries.
112

113
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
114

115
  """
116
  def __init__(self, fields):
117
    """Initializes this class.
118

119
    """
120
    self._query = query.Query(query.JOB_FIELDS, fields)
121

    
122
  def __call__(self, job):
123
    """Executes a job query using cached field list.
124

125
    """
126
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
127

    
128

    
129
class _QueuedOpCode(object):
130
  """Encapsulates an opcode object.
131

132
  @ivar log: holds the execution log and consists of tuples
133
  of the form C{(log_serial, timestamp, level, message)}
134
  @ivar input: the OpCode we encapsulate
135
  @ivar status: the current status
136
  @ivar result: the result of the LU execution
137
  @ivar start_timestamp: timestamp for the start of the execution
138
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
139
  @ivar stop_timestamp: timestamp for the end of the execution
140

141
  """
142
  __slots__ = ["input", "status", "result", "log", "priority",
143
               "start_timestamp", "exec_timestamp", "end_timestamp",
144
               "__weakref__"]
145

    
146
  def __init__(self, op):
147
    """Initializes instances of this class.
148

149
    @type op: L{opcodes.OpCode}
150
    @param op: the opcode we encapsulate
151

152
    """
153
    self.input = op
154
    self.status = constants.OP_STATUS_QUEUED
155
    self.result = None
156
    self.log = []
157
    self.start_timestamp = None
158
    self.exec_timestamp = None
159
    self.end_timestamp = None
160

    
161
    # Get initial priority (it might change during the lifetime of this opcode)
162
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
163

    
164
  @classmethod
165
  def Restore(cls, state):
166
    """Restore the _QueuedOpCode from the serialized form.
167

168
    @type state: dict
169
    @param state: the serialized state
170
    @rtype: _QueuedOpCode
171
    @return: a new _QueuedOpCode instance
172

173
    """
174
    obj = _QueuedOpCode.__new__(cls)
175
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
176
    obj.status = state["status"]
177
    obj.result = state["result"]
178
    obj.log = state["log"]
179
    obj.start_timestamp = state.get("start_timestamp", None)
180
    obj.exec_timestamp = state.get("exec_timestamp", None)
181
    obj.end_timestamp = state.get("end_timestamp", None)
182
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
183
    return obj
184

    
185
  def Serialize(self):
186
    """Serializes this _QueuedOpCode.
187

188
    @rtype: dict
189
    @return: the dictionary holding the serialized state
190

191
    """
192
    return {
193
      "input": self.input.__getstate__(),
194
      "status": self.status,
195
      "result": self.result,
196
      "log": self.log,
197
      "start_timestamp": self.start_timestamp,
198
      "exec_timestamp": self.exec_timestamp,
199
      "end_timestamp": self.end_timestamp,
200
      "priority": self.priority,
201
      }
202

    
203

    
204
class _QueuedJob(object):
205
  """In-memory job representation.
206

207
  This is what we use to track the user-submitted jobs. Locking must
208
  be taken care of by users of this class.
209

210
  @type queue: L{JobQueue}
211
  @ivar queue: the parent queue
212
  @ivar id: the job ID
213
  @type ops: list
214
  @ivar ops: the list of _QueuedOpCode that constitute the job
215
  @type log_serial: int
216
  @ivar log_serial: holds the index for the next log entry
217
  @ivar received_timestamp: the timestamp for when the job was received
218
  @ivar start_timestmap: the timestamp for start of execution
219
  @ivar end_timestamp: the timestamp for end of execution
220
  @ivar writable: Whether the job is allowed to be modified
221

222
  """
223
  # pylint: disable=W0212
224
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
225
               "received_timestamp", "start_timestamp", "end_timestamp",
226
               "processor_lock", "writable", "archived",
227
               "livelock", "process_id",
228
               "__weakref__"]
229

    
230
  def AddReasons(self, pickup=False):
231
    """Extend the reason trail
232

233
    Add the reason for all the opcodes of this job to be executed.
234

235
    """
236
    count = 0
237
    for queued_op in self.ops:
238
      op = queued_op.input
239
      if pickup:
240
        reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
241
      else:
242
        reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
243
      reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
244
                                                reason_src_prefix)
245
      reason_text = "job=%d;index=%d" % (self.id, count)
246
      reason = getattr(op, "reason", [])
247
      reason.append((reason_src, reason_text, utils.EpochNano()))
248
      op.reason = reason
249
      count = count + 1
250

    
251
  def __init__(self, queue, job_id, ops, writable):
252
    """Constructor for the _QueuedJob.
253

254
    @type queue: L{JobQueue}
255
    @param queue: our parent queue
256
    @type job_id: job_id
257
    @param job_id: our job id
258
    @type ops: list
259
    @param ops: the list of opcodes we hold, which will be encapsulated
260
        in _QueuedOpCodes
261
    @type writable: bool
262
    @param writable: Whether job can be modified
263

264
    """
265
    if not ops:
266
      raise errors.GenericError("A job needs at least one opcode")
267

    
268
    self.queue = queue
269
    self.id = int(job_id)
270
    self.ops = [_QueuedOpCode(op) for op in ops]
271
    self.AddReasons()
272
    self.log_serial = 0
273
    self.received_timestamp = TimeStampNow()
274
    self.start_timestamp = None
275
    self.end_timestamp = None
276
    self.archived = False
277
    self.livelock = None
278
    self.process_id = None
279

    
280
    self._InitInMemory(self, writable)
281

    
282
    assert not self.archived, "New jobs can not be marked as archived"
283

    
284
  @staticmethod
285
  def _InitInMemory(obj, writable):
286
    """Initializes in-memory variables.
287

288
    """
289
    obj.writable = writable
290
    obj.ops_iter = None
291
    obj.cur_opctx = None
292

    
293
    # Read-only jobs are not processed and therefore don't need a lock
294
    if writable:
295
      obj.processor_lock = threading.Lock()
296
    else:
297
      obj.processor_lock = None
298

    
299
  def __repr__(self):
300
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
301
              "id=%s" % self.id,
302
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
303

    
304
    return "<%s at %#x>" % (" ".join(status), id(self))
305

    
306
  @classmethod
307
  def Restore(cls, queue, state, writable, archived):
308
    """Restore a _QueuedJob from serialized state:
309

310
    @type queue: L{JobQueue}
311
    @param queue: to which queue the restored job belongs
312
    @type state: dict
313
    @param state: the serialized state
314
    @type writable: bool
315
    @param writable: Whether job can be modified
316
    @type archived: bool
317
    @param archived: Whether job was already archived
318
    @rtype: _JobQueue
319
    @return: the restored _JobQueue instance
320

321
    """
322
    obj = _QueuedJob.__new__(cls)
323
    obj.queue = queue
324
    obj.id = int(state["id"])
325
    obj.received_timestamp = state.get("received_timestamp", None)
326
    obj.start_timestamp = state.get("start_timestamp", None)
327
    obj.end_timestamp = state.get("end_timestamp", None)
328
    obj.archived = archived
329
    obj.livelock = state.get("livelock", None)
330
    obj.process_id = state.get("process_id", None)
331
    if obj.process_id is not None:
332
      obj.process_id = int(obj.process_id)
333

    
334
    obj.ops = []
335
    obj.log_serial = 0
336
    for op_state in state["ops"]:
337
      op = _QueuedOpCode.Restore(op_state)
338
      for log_entry in op.log:
339
        obj.log_serial = max(obj.log_serial, log_entry[0])
340
      obj.ops.append(op)
341

    
342
    cls._InitInMemory(obj, writable)
343

    
344
    return obj
345

    
346
  def Serialize(self):
347
    """Serialize the _JobQueue instance.
348

349
    @rtype: dict
350
    @return: the serialized state
351

352
    """
353
    return {
354
      "id": self.id,
355
      "ops": [op.Serialize() for op in self.ops],
356
      "start_timestamp": self.start_timestamp,
357
      "end_timestamp": self.end_timestamp,
358
      "received_timestamp": self.received_timestamp,
359
      "livelock": self.livelock,
360
      "process_id": self.process_id,
361
      }
362

    
363
  def CalcStatus(self):
364
    """Compute the status of this job.
365

366
    This function iterates over all the _QueuedOpCodes in the job and
367
    based on their status, computes the job status.
368

369
    The algorithm is:
370
      - if we find a cancelled, or finished with error, the job
371
        status will be the same
372
      - otherwise, the last opcode with the status one of:
373
          - waitlock
374
          - canceling
375
          - running
376

377
        will determine the job status
378

379
      - otherwise, it means either all opcodes are queued, or success,
380
        and the job status will be the same
381

382
    @return: the job status
383

384
    """
385
    status = constants.JOB_STATUS_QUEUED
386

    
387
    all_success = True
388
    for op in self.ops:
389
      if op.status == constants.OP_STATUS_SUCCESS:
390
        continue
391

    
392
      all_success = False
393

    
394
      if op.status == constants.OP_STATUS_QUEUED:
395
        pass
396
      elif op.status == constants.OP_STATUS_WAITING:
397
        status = constants.JOB_STATUS_WAITING
398
      elif op.status == constants.OP_STATUS_RUNNING:
399
        status = constants.JOB_STATUS_RUNNING
400
      elif op.status == constants.OP_STATUS_CANCELING:
401
        status = constants.JOB_STATUS_CANCELING
402
        break
403
      elif op.status == constants.OP_STATUS_ERROR:
404
        status = constants.JOB_STATUS_ERROR
405
        # The whole job fails if one opcode failed
406
        break
407
      elif op.status == constants.OP_STATUS_CANCELED:
408
        status = constants.OP_STATUS_CANCELED
409
        break
410

    
411
    if all_success:
412
      status = constants.JOB_STATUS_SUCCESS
413

    
414
    return status
415

    
416
  def CalcPriority(self):
417
    """Gets the current priority for this job.
418

419
    Only unfinished opcodes are considered. When all are done, the default
420
    priority is used.
421

422
    @rtype: int
423

424
    """
425
    priorities = [op.priority for op in self.ops
426
                  if op.status not in constants.OPS_FINALIZED]
427

    
428
    if not priorities:
429
      # All opcodes are done, assume default priority
430
      return constants.OP_PRIO_DEFAULT
431

    
432
    return min(priorities)
433

    
434
  def GetLogEntries(self, newer_than):
435
    """Selectively returns the log entries.
436

437
    @type newer_than: None or int
438
    @param newer_than: if this is None, return all log entries,
439
        otherwise return only the log entries with serial higher
440
        than this value
441
    @rtype: list
442
    @return: the list of the log entries selected
443

444
    """
445
    if newer_than is None:
446
      serial = -1
447
    else:
448
      serial = newer_than
449

    
450
    entries = []
451
    for op in self.ops:
452
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
453

    
454
    return entries
455

    
456
  def GetInfo(self, fields):
457
    """Returns information about a job.
458

459
    @type fields: list
460
    @param fields: names of fields to return
461
    @rtype: list
462
    @return: list with one element for each field
463
    @raise errors.OpExecError: when an invalid field
464
        has been passed
465

466
    """
467
    return _SimpleJobQuery(fields)(self)
468

    
469
  def MarkUnfinishedOps(self, status, result):
470
    """Mark unfinished opcodes with a given status and result.
471

472
    This is an utility function for marking all running or waiting to
473
    be run opcodes with a given status. Opcodes which are already
474
    finalised are not changed.
475

476
    @param status: a given opcode status
477
    @param result: the opcode result
478

479
    """
480
    not_marked = True
481
    for op in self.ops:
482
      if op.status in constants.OPS_FINALIZED:
483
        assert not_marked, "Finalized opcodes found after non-finalized ones"
484
        continue
485
      op.status = status
486
      op.result = result
487
      not_marked = False
488

    
489
  def Finalize(self):
490
    """Marks the job as finalized.
491

492
    """
493
    self.end_timestamp = TimeStampNow()
494

    
495
  def Cancel(self):
496
    """Marks job as canceled/-ing if possible.
497

498
    @rtype: tuple; (bool, string)
499
    @return: Boolean describing whether job was successfully canceled or marked
500
      as canceling and a text message
501

502
    """
503
    status = self.CalcStatus()
504

    
505
    if status == constants.JOB_STATUS_QUEUED:
506
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
507
                             "Job canceled by request")
508
      self.Finalize()
509
      return (True, "Job %s canceled" % self.id)
510

    
511
    elif status == constants.JOB_STATUS_WAITING:
512
      # The worker will notice the new status and cancel the job
513
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
514
      return (True, "Job %s will be canceled" % self.id)
515

    
516
    else:
517
      logging.debug("Job %s is no longer waiting in the queue", self.id)
518
      return (False, "Job %s is no longer waiting in the queue" % self.id)
519

    
520
  def ChangePriority(self, priority):
521
    """Changes the job priority.
522

523
    @type priority: int
524
    @param priority: New priority
525
    @rtype: tuple; (bool, string)
526
    @return: Boolean describing whether job's priority was successfully changed
527
      and a text message
528

529
    """
530
    status = self.CalcStatus()
531

    
532
    if status in constants.JOBS_FINALIZED:
533
      return (False, "Job %s is finished" % self.id)
534
    elif status == constants.JOB_STATUS_CANCELING:
535
      return (False, "Job %s is cancelling" % self.id)
536
    else:
537
      assert status in (constants.JOB_STATUS_QUEUED,
538
                        constants.JOB_STATUS_WAITING,
539
                        constants.JOB_STATUS_RUNNING)
540

    
541
      changed = False
542
      for op in self.ops:
543
        if (op.status == constants.OP_STATUS_RUNNING or
544
            op.status in constants.OPS_FINALIZED):
545
          assert not changed, \
546
            ("Found opcode for which priority should not be changed after"
547
             " priority has been changed for previous opcodes")
548
          continue
549

    
550
        assert op.status in (constants.OP_STATUS_QUEUED,
551
                             constants.OP_STATUS_WAITING)
552

    
553
        changed = True
554

    
555
        # Set new priority (doesn't modify opcode input)
556
        op.priority = priority
557

    
558
      if changed:
559
        return (True, ("Priorities of pending opcodes for job %s have been"
560
                       " changed to %s" % (self.id, priority)))
561
      else:
562
        return (False, "Job %s had no pending opcodes" % self.id)
563

    
564
  def SetPid(self, pid):
565
    """Sets the job's process ID
566

567
    @type pid: int
568
    @param pid: the process ID
569

570
    """
571
    status = self.CalcStatus()
572

    
573
    if status in (constants.JOB_STATUS_QUEUED,
574
                  constants.JOB_STATUS_WAITING):
575
      if self.process_id is not None:
576
        logging.warning("Replacing the process id %s of job %s with %s",
577
                        self.process_id, self.id, pid)
578
      self.process_id = pid
579
    else:
580
      logging.warning("Can set pid only for queued/waiting jobs")
581

    
582

    
583
class _OpExecCallbacks(mcpu.OpExecCbBase):
584
  def __init__(self, queue, job, op):
585
    """Initializes this class.
586

587
    @type queue: L{JobQueue}
588
    @param queue: Job queue
589
    @type job: L{_QueuedJob}
590
    @param job: Job object
591
    @type op: L{_QueuedOpCode}
592
    @param op: OpCode
593

594
    """
595
    assert queue, "Queue is missing"
596
    assert job, "Job is missing"
597
    assert op, "Opcode is missing"
598

    
599
    self._queue = queue
600
    self._job = job
601
    self._op = op
602

    
603
  def _CheckCancel(self):
604
    """Raises an exception to cancel the job if asked to.
605

606
    """
607
    # Cancel here if we were asked to
608
    if self._op.status == constants.OP_STATUS_CANCELING:
609
      logging.debug("Canceling opcode")
610
      raise CancelJob()
611

    
612
    # See if queue is shutting down
613
    if not self._queue.AcceptingJobsUnlocked():
614
      logging.debug("Queue is shutting down")
615
      raise QueueShutdown()
616

    
617
  @locking.ssynchronized(_QUEUE, shared=1)
618
  def NotifyStart(self):
619
    """Mark the opcode as running, not lock-waiting.
620

621
    This is called from the mcpu code as a notifier function, when the LU is
622
    finally about to start the Exec() method. Of course, to have end-user
623
    visible results, the opcode must be initially (before calling into
624
    Processor.ExecOpCode) set to OP_STATUS_WAITING.
625

626
    """
627
    assert self._op in self._job.ops
628
    assert self._op.status in (constants.OP_STATUS_WAITING,
629
                               constants.OP_STATUS_CANCELING)
630

    
631
    # Cancel here if we were asked to
632
    self._CheckCancel()
633

    
634
    logging.debug("Opcode is now running")
635

    
636
    self._op.status = constants.OP_STATUS_RUNNING
637
    self._op.exec_timestamp = TimeStampNow()
638

    
639
    # And finally replicate the job status
640
    self._queue.UpdateJobUnlocked(self._job)
641

    
642
  @locking.ssynchronized(_QUEUE, shared=1)
643
  def _AppendFeedback(self, timestamp, log_type, log_msg):
644
    """Internal feedback append function, with locks
645

646
    """
647
    self._job.log_serial += 1
648
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
649
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
650

    
651
  def Feedback(self, *args):
652
    """Append a log entry.
653

654
    """
655
    assert len(args) < 3
656

    
657
    if len(args) == 1:
658
      log_type = constants.ELOG_MESSAGE
659
      log_msg = args[0]
660
    else:
661
      (log_type, log_msg) = args
662

    
663
    # The time is split to make serialization easier and not lose
664
    # precision.
665
    timestamp = utils.SplitTime(time.time())
666
    self._AppendFeedback(timestamp, log_type, log_msg)
667

    
668
  def CurrentPriority(self):
669
    """Returns current priority for opcode.
670

671
    """
672
    assert self._op.status in (constants.OP_STATUS_WAITING,
673
                               constants.OP_STATUS_CANCELING)
674

    
675
    # Cancel here if we were asked to
676
    self._CheckCancel()
677

    
678
    return self._op.priority
679

    
680
  def SubmitManyJobs(self, jobs):
681
    """Submits jobs for processing.
682

683
    See L{JobQueue.SubmitManyJobs}.
684

685
    """
686
    # Locking is done in job queue
687
    return self._queue.SubmitManyJobs(jobs)
688

    
689

    
690
class _JobChangesChecker(object):
691
  def __init__(self, fields, prev_job_info, prev_log_serial):
692
    """Initializes this class.
693

694
    @type fields: list of strings
695
    @param fields: Fields requested by LUXI client
696
    @type prev_job_info: string
697
    @param prev_job_info: previous job info, as passed by the LUXI client
698
    @type prev_log_serial: string
699
    @param prev_log_serial: previous job serial, as passed by the LUXI client
700

701
    """
702
    self._squery = _SimpleJobQuery(fields)
703
    self._prev_job_info = prev_job_info
704
    self._prev_log_serial = prev_log_serial
705

    
706
  def __call__(self, job):
707
    """Checks whether job has changed.
708

709
    @type job: L{_QueuedJob}
710
    @param job: Job object
711

712
    """
713
    assert not job.writable, "Expected read-only job"
714

    
715
    status = job.CalcStatus()
716
    job_info = self._squery(job)
717
    log_entries = job.GetLogEntries(self._prev_log_serial)
718

    
719
    # Serializing and deserializing data can cause type changes (e.g. from
720
    # tuple to list) or precision loss. We're doing it here so that we get
721
    # the same modifications as the data received from the client. Without
722
    # this, the comparison afterwards might fail without the data being
723
    # significantly different.
724
    # TODO: we just deserialized from disk, investigate how to make sure that
725
    # the job info and log entries are compatible to avoid this further step.
726
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
727
    # efficient, though floats will be tricky
728
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
729
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
730

    
731
    # Don't even try to wait if the job is no longer running, there will be
732
    # no changes.
733
    if (status not in (constants.JOB_STATUS_QUEUED,
734
                       constants.JOB_STATUS_RUNNING,
735
                       constants.JOB_STATUS_WAITING) or
736
        job_info != self._prev_job_info or
737
        (log_entries and self._prev_log_serial != log_entries[0][0])):
738
      logging.debug("Job %s changed", job.id)
739
      return (job_info, log_entries)
740

    
741
    return None
742

    
743

    
744
class _JobFileChangesWaiter(object):
745
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
746
    """Initializes this class.
747

748
    @type filename: string
749
    @param filename: Path to job file
750
    @raises errors.InotifyError: if the notifier cannot be setup
751

752
    """
753
    self._wm = _inotify_wm_cls()
754
    self._inotify_handler = \
755
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
756
    self._notifier = \
757
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
758
    try:
759
      self._inotify_handler.enable()
760
    except Exception:
761
      # pyinotify doesn't close file descriptors automatically
762
      self._notifier.stop()
763
      raise
764

    
765
  def _OnInotify(self, notifier_enabled):
766
    """Callback for inotify.
767

768
    """
769
    if not notifier_enabled:
770
      self._inotify_handler.enable()
771

    
772
  def Wait(self, timeout):
773
    """Waits for the job file to change.
774

775
    @type timeout: float
776
    @param timeout: Timeout in seconds
777
    @return: Whether there have been events
778

779
    """
780
    assert timeout >= 0
781
    have_events = self._notifier.check_events(timeout * 1000)
782
    if have_events:
783
      self._notifier.read_events()
784
    self._notifier.process_events()
785
    return have_events
786

    
787
  def Close(self):
788
    """Closes underlying notifier and its file descriptor.
789

790
    """
791
    self._notifier.stop()
792

    
793

    
794
class _JobChangesWaiter(object):
795
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
796
    """Initializes this class.
797

798
    @type filename: string
799
    @param filename: Path to job file
800

801
    """
802
    self._filewaiter = None
803
    self._filename = filename
804
    self._waiter_cls = _waiter_cls
805

    
806
  def Wait(self, timeout):
807
    """Waits for a job to change.
808

809
    @type timeout: float
810
    @param timeout: Timeout in seconds
811
    @return: Whether there have been events
812

813
    """
814
    if self._filewaiter:
815
      return self._filewaiter.Wait(timeout)
816

    
817
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
818
    # If this point is reached, return immediately and let caller check the job
819
    # file again in case there were changes since the last check. This avoids a
820
    # race condition.
821
    self._filewaiter = self._waiter_cls(self._filename)
822

    
823
    return True
824

    
825
  def Close(self):
826
    """Closes underlying waiter.
827

828
    """
829
    if self._filewaiter:
830
      self._filewaiter.Close()
831

    
832

    
833
class _WaitForJobChangesHelper(object):
834
  """Helper class using inotify to wait for changes in a job file.
835

836
  This class takes a previous job status and serial, and alerts the client when
837
  the current job status has changed.
838

839
  """
840
  @staticmethod
841
  def _CheckForChanges(counter, job_load_fn, check_fn):
842
    if counter.next() > 0:
843
      # If this isn't the first check the job is given some more time to change
844
      # again. This gives better performance for jobs generating many
845
      # changes/messages.
846
      time.sleep(0.1)
847

    
848
    job = job_load_fn()
849
    if not job:
850
      raise errors.JobLost()
851

    
852
    result = check_fn(job)
853
    if result is None:
854
      raise utils.RetryAgain()
855

    
856
    return result
857

    
858
  def __call__(self, filename, job_load_fn,
859
               fields, prev_job_info, prev_log_serial, timeout,
860
               _waiter_cls=_JobChangesWaiter):
861
    """Waits for changes on a job.
862

863
    @type filename: string
864
    @param filename: File on which to wait for changes
865
    @type job_load_fn: callable
866
    @param job_load_fn: Function to load job
867
    @type fields: list of strings
868
    @param fields: Which fields to check for changes
869
    @type prev_job_info: list or None
870
    @param prev_job_info: Last job information returned
871
    @type prev_log_serial: int
872
    @param prev_log_serial: Last job message serial number
873
    @type timeout: float
874
    @param timeout: maximum time to wait in seconds
875

876
    """
877
    counter = itertools.count()
878
    try:
879
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
880
      waiter = _waiter_cls(filename)
881
      try:
882
        return utils.Retry(compat.partial(self._CheckForChanges,
883
                                          counter, job_load_fn, check_fn),
884
                           utils.RETRY_REMAINING_TIME, timeout,
885
                           wait_fn=waiter.Wait)
886
      finally:
887
        waiter.Close()
888
    except errors.JobLost:
889
      return None
890
    except utils.RetryTimeout:
891
      return constants.JOB_NOTCHANGED
892

    
893

    
894
def _EncodeOpError(err):
895
  """Encodes an error which occurred while processing an opcode.
896

897
  """
898
  if isinstance(err, errors.GenericError):
899
    to_encode = err
900
  else:
901
    to_encode = errors.OpExecError(str(err))
902

    
903
  return errors.EncodeException(to_encode)
904

    
905

    
906
class _TimeoutStrategyWrapper:
907
  def __init__(self, fn):
908
    """Initializes this class.
909

910
    """
911
    self._fn = fn
912
    self._next = None
913

    
914
  def _Advance(self):
915
    """Gets the next timeout if necessary.
916

917
    """
918
    if self._next is None:
919
      self._next = self._fn()
920

    
921
  def Peek(self):
922
    """Returns the next timeout.
923

924
    """
925
    self._Advance()
926
    return self._next
927

    
928
  def Next(self):
929
    """Returns the current timeout and advances the internal state.
930

931
    """
932
    self._Advance()
933
    result = self._next
934
    self._next = None
935
    return result
936

    
937

    
938
class _OpExecContext:
939
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
940
    """Initializes this class.
941

942
    """
943
    self.op = op
944
    self.index = index
945
    self.log_prefix = log_prefix
946
    self.summary = op.input.Summary()
947

    
948
    # Create local copy to modify
949
    if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
950
      self.jobdeps = op.input.depends[:]
951
    else:
952
      self.jobdeps = None
953

    
954
    self._timeout_strategy_factory = timeout_strategy_factory
955
    self._ResetTimeoutStrategy()
956

    
957
  def _ResetTimeoutStrategy(self):
958
    """Creates a new timeout strategy.
959

960
    """
961
    self._timeout_strategy = \
962
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
963

    
964
  def CheckPriorityIncrease(self):
965
    """Checks whether priority can and should be increased.
966

967
    Called when locks couldn't be acquired.
968

969
    """
970
    op = self.op
971

    
972
    # Exhausted all retries and next round should not use blocking acquire
973
    # for locks?
974
    if (self._timeout_strategy.Peek() is None and
975
        op.priority > constants.OP_PRIO_HIGHEST):
976
      logging.debug("Increasing priority")
977
      op.priority -= 1
978
      self._ResetTimeoutStrategy()
979
      return True
980

    
981
    return False
982

    
983
  def GetNextLockTimeout(self):
984
    """Returns the next lock acquire timeout.
985

986
    """
987
    return self._timeout_strategy.Next()
988

    
989

    
990
class _JobProcessor(object):
991
  (DEFER,
992
   WAITDEP,
993
   FINISHED) = range(1, 4)
994

    
995
  def __init__(self, queue, opexec_fn, job,
996
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
997
    """Initializes this class.
998

999
    """
1000
    self.queue = queue
1001
    self.opexec_fn = opexec_fn
1002
    self.job = job
1003
    self._timeout_strategy_factory = _timeout_strategy_factory
1004

    
1005
  @staticmethod
1006
  def _FindNextOpcode(job, timeout_strategy_factory):
1007
    """Locates the next opcode to run.
1008

1009
    @type job: L{_QueuedJob}
1010
    @param job: Job object
1011
    @param timeout_strategy_factory: Callable to create new timeout strategy
1012

1013
    """
1014
    # Create some sort of a cache to speed up locating next opcode for future
1015
    # lookups
1016
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
1017
    # pending and one for processed ops.
1018
    if job.ops_iter is None:
1019
      job.ops_iter = enumerate(job.ops)
1020

    
1021
    # Find next opcode to run
1022
    while True:
1023
      try:
1024
        (idx, op) = job.ops_iter.next()
1025
      except StopIteration:
1026
        raise errors.ProgrammerError("Called for a finished job")
1027

    
1028
      if op.status == constants.OP_STATUS_RUNNING:
1029
        # Found an opcode already marked as running
1030
        raise errors.ProgrammerError("Called for job marked as running")
1031

    
1032
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
1033
                             timeout_strategy_factory)
1034

    
1035
      if op.status not in constants.OPS_FINALIZED:
1036
        return opctx
1037

    
1038
      # This is a job that was partially completed before master daemon
1039
      # shutdown, so it can be expected that some opcodes are already
1040
      # completed successfully (if any did error out, then the whole job
1041
      # should have been aborted and not resubmitted for processing).
1042
      logging.info("%s: opcode %s already processed, skipping",
1043
                   opctx.log_prefix, opctx.summary)
1044

    
1045
  @staticmethod
1046
  def _MarkWaitlock(job, op):
1047
    """Marks an opcode as waiting for locks.
1048

1049
    The job's start timestamp is also set if necessary.
1050

1051
    @type job: L{_QueuedJob}
1052
    @param job: Job object
1053
    @type op: L{_QueuedOpCode}
1054
    @param op: Opcode object
1055

1056
    """
1057
    assert op in job.ops
1058
    assert op.status in (constants.OP_STATUS_QUEUED,
1059
                         constants.OP_STATUS_WAITING)
1060

    
1061
    update = False
1062

    
1063
    op.result = None
1064

    
1065
    if op.status == constants.OP_STATUS_QUEUED:
1066
      op.status = constants.OP_STATUS_WAITING
1067
      update = True
1068

    
1069
    if op.start_timestamp is None:
1070
      op.start_timestamp = TimeStampNow()
1071
      update = True
1072

    
1073
    if job.start_timestamp is None:
1074
      job.start_timestamp = op.start_timestamp
1075
      update = True
1076

    
1077
    assert op.status == constants.OP_STATUS_WAITING
1078

    
1079
    return update
1080

    
1081
  @staticmethod
1082
  def _CheckDependencies(queue, job, opctx):
1083
    """Checks if an opcode has dependencies and if so, processes them.
1084

1085
    @type queue: L{JobQueue}
1086
    @param queue: Queue object
1087
    @type job: L{_QueuedJob}
1088
    @param job: Job object
1089
    @type opctx: L{_OpExecContext}
1090
    @param opctx: Opcode execution context
1091
    @rtype: bool
1092
    @return: Whether opcode will be re-scheduled by dependency tracker
1093

1094
    """
1095
    op = opctx.op
1096

    
1097
    result = False
1098

    
1099
    while opctx.jobdeps:
1100
      (dep_job_id, dep_status) = opctx.jobdeps[0]
1101

    
1102
      (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1103
                                                          dep_status)
1104
      assert ht.TNonEmptyString(depmsg), "No dependency message"
1105

    
1106
      logging.info("%s: %s", opctx.log_prefix, depmsg)
1107

    
1108
      if depresult == _JobDependencyManager.CONTINUE:
1109
        # Remove dependency and continue
1110
        opctx.jobdeps.pop(0)
1111

    
1112
      elif depresult == _JobDependencyManager.WAIT:
1113
        # Need to wait for notification, dependency tracker will re-add job
1114
        # to workerpool
1115
        result = True
1116
        break
1117

    
1118
      elif depresult == _JobDependencyManager.CANCEL:
1119
        # Job was cancelled, cancel this job as well
1120
        job.Cancel()
1121
        assert op.status == constants.OP_STATUS_CANCELING
1122
        break
1123

    
1124
      elif depresult in (_JobDependencyManager.WRONGSTATUS,
1125
                         _JobDependencyManager.ERROR):
1126
        # Job failed or there was an error, this job must fail
1127
        op.status = constants.OP_STATUS_ERROR
1128
        op.result = _EncodeOpError(errors.OpExecError(depmsg))
1129
        break
1130

    
1131
      else:
1132
        raise errors.ProgrammerError("Unknown dependency result '%s'" %
1133
                                     depresult)
1134

    
1135
    return result
1136

    
1137
  def _ExecOpCodeUnlocked(self, opctx):
1138
    """Processes one opcode and returns the result.
1139

1140
    """
1141
    op = opctx.op
1142

    
1143
    assert op.status in (constants.OP_STATUS_WAITING,
1144
                         constants.OP_STATUS_CANCELING)
1145

    
1146
    # The very last check if the job was cancelled before trying to execute
1147
    if op.status == constants.OP_STATUS_CANCELING:
1148
      return (constants.OP_STATUS_CANCELING, None)
1149

    
1150
    timeout = opctx.GetNextLockTimeout()
1151

    
1152
    try:
1153
      # Make sure not to hold queue lock while calling ExecOpCode
1154
      result = self.opexec_fn(op.input,
1155
                              _OpExecCallbacks(self.queue, self.job, op),
1156
                              timeout=timeout)
1157
    except mcpu.LockAcquireTimeout:
1158
      assert timeout is not None, "Received timeout for blocking acquire"
1159
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1160

    
1161
      assert op.status in (constants.OP_STATUS_WAITING,
1162
                           constants.OP_STATUS_CANCELING)
1163

    
1164
      # Was job cancelled while we were waiting for the lock?
1165
      if op.status == constants.OP_STATUS_CANCELING:
1166
        return (constants.OP_STATUS_CANCELING, None)
1167

    
1168
      # Queue is shutting down, return to queued
1169
      if not self.queue.AcceptingJobsUnlocked():
1170
        return (constants.OP_STATUS_QUEUED, None)
1171

    
1172
      # Stay in waitlock while trying to re-acquire lock
1173
      return (constants.OP_STATUS_WAITING, None)
1174
    except CancelJob:
1175
      logging.exception("%s: Canceling job", opctx.log_prefix)
1176
      assert op.status == constants.OP_STATUS_CANCELING
1177
      return (constants.OP_STATUS_CANCELING, None)
1178

    
1179
    except QueueShutdown:
1180
      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1181

    
1182
      assert op.status == constants.OP_STATUS_WAITING
1183

    
1184
      # Job hadn't been started yet, so it should return to the queue
1185
      return (constants.OP_STATUS_QUEUED, None)
1186

    
1187
    except Exception, err: # pylint: disable=W0703
1188
      logging.exception("%s: Caught exception in %s",
1189
                        opctx.log_prefix, opctx.summary)
1190
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1191
    else:
1192
      logging.debug("%s: %s successful",
1193
                    opctx.log_prefix, opctx.summary)
1194
      return (constants.OP_STATUS_SUCCESS, result)
1195

    
1196
  def __call__(self, _nextop_fn=None):
1197
    """Continues execution of a job.
1198

1199
    @param _nextop_fn: Callback function for tests
1200
    @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1201
      be deferred and C{WAITDEP} if the dependency manager
1202
      (L{_JobDependencyManager}) will re-schedule the job when appropriate
1203

1204
    """
1205
    queue = self.queue
1206
    job = self.job
1207

    
1208
    logging.debug("Processing job %s", job.id)
1209

    
1210
    queue.acquire(shared=1)
1211
    try:
1212
      opcount = len(job.ops)
1213

    
1214
      assert job.writable, "Expected writable job"
1215

    
1216
      # Don't do anything for finalized jobs
1217
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1218
        return self.FINISHED
1219

    
1220
      # Is a previous opcode still pending?
1221
      if job.cur_opctx:
1222
        opctx = job.cur_opctx
1223
        job.cur_opctx = None
1224
      else:
1225
        if __debug__ and _nextop_fn:
1226
          _nextop_fn()
1227
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1228

    
1229
      op = opctx.op
1230

    
1231
      # Consistency check
1232
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1233
                                     constants.OP_STATUS_CANCELING)
1234
                        for i in job.ops[opctx.index + 1:])
1235

    
1236
      assert op.status in (constants.OP_STATUS_QUEUED,
1237
                           constants.OP_STATUS_WAITING,
1238
                           constants.OP_STATUS_CANCELING)
1239

    
1240
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1241
              op.priority >= constants.OP_PRIO_HIGHEST)
1242

    
1243
      waitjob = None
1244

    
1245
      if op.status != constants.OP_STATUS_CANCELING:
1246
        assert op.status in (constants.OP_STATUS_QUEUED,
1247
                             constants.OP_STATUS_WAITING)
1248

    
1249
        # Prepare to start opcode
1250
        if self._MarkWaitlock(job, op):
1251
          # Write to disk
1252
          queue.UpdateJobUnlocked(job)
1253

    
1254
        assert op.status == constants.OP_STATUS_WAITING
1255
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1256
        assert job.start_timestamp and op.start_timestamp
1257
        assert waitjob is None
1258

    
1259
        # Check if waiting for a job is necessary
1260
        waitjob = self._CheckDependencies(queue, job, opctx)
1261

    
1262
        assert op.status in (constants.OP_STATUS_WAITING,
1263
                             constants.OP_STATUS_CANCELING,
1264
                             constants.OP_STATUS_ERROR)
1265

    
1266
        if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1267
                                         constants.OP_STATUS_ERROR)):
1268
          logging.info("%s: opcode %s waiting for locks",
1269
                       opctx.log_prefix, opctx.summary)
1270

    
1271
          assert not opctx.jobdeps, "Not all dependencies were removed"
1272

    
1273
          queue.release()
1274
          try:
1275
            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1276
          finally:
1277
            queue.acquire(shared=1)
1278

    
1279
          op.status = op_status
1280
          op.result = op_result
1281

    
1282
          assert not waitjob
1283

    
1284
        if op.status in (constants.OP_STATUS_WAITING,
1285
                         constants.OP_STATUS_QUEUED):
1286
          # waiting: Couldn't get locks in time
1287
          # queued: Queue is shutting down
1288
          assert not op.end_timestamp
1289
        else:
1290
          # Finalize opcode
1291
          op.end_timestamp = TimeStampNow()
1292

    
1293
          if op.status == constants.OP_STATUS_CANCELING:
1294
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1295
                                  for i in job.ops[opctx.index:])
1296
          else:
1297
            assert op.status in constants.OPS_FINALIZED
1298

    
1299
      if op.status == constants.OP_STATUS_QUEUED:
1300
        # Queue is shutting down
1301
        assert not waitjob
1302

    
1303
        finalize = False
1304

    
1305
        # Reset context
1306
        job.cur_opctx = None
1307

    
1308
        # In no case must the status be finalized here
1309
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1310

    
1311
      elif op.status == constants.OP_STATUS_WAITING or waitjob:
1312
        finalize = False
1313

    
1314
        if not waitjob and opctx.CheckPriorityIncrease():
1315
          # Priority was changed, need to update on-disk file
1316
          queue.UpdateJobUnlocked(job)
1317

    
1318
        # Keep around for another round
1319
        job.cur_opctx = opctx
1320

    
1321
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1322
                op.priority >= constants.OP_PRIO_HIGHEST)
1323

    
1324
        # In no case must the status be finalized here
1325
        assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1326

    
1327
      else:
1328
        # Ensure all opcodes so far have been successful
1329
        assert (opctx.index == 0 or
1330
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1331
                           for i in job.ops[:opctx.index]))
1332

    
1333
        # Reset context
1334
        job.cur_opctx = None
1335

    
1336
        if op.status == constants.OP_STATUS_SUCCESS:
1337
          finalize = False
1338

    
1339
        elif op.status == constants.OP_STATUS_ERROR:
1340
          # Ensure failed opcode has an exception as its result
1341
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1342

    
1343
          to_encode = errors.OpExecError("Preceding opcode failed")
1344
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1345
                                _EncodeOpError(to_encode))
1346
          finalize = True
1347

    
1348
          # Consistency check
1349
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1350
                            errors.GetEncodedError(i.result)
1351
                            for i in job.ops[opctx.index:])
1352

    
1353
        elif op.status == constants.OP_STATUS_CANCELING:
1354
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1355
                                "Job canceled by request")
1356
          finalize = True
1357

    
1358
        else:
1359
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1360

    
1361
        if opctx.index == (opcount - 1):
1362
          # Finalize on last opcode
1363
          finalize = True
1364

    
1365
        if finalize:
1366
          # All opcodes have been run, finalize job
1367
          job.Finalize()
1368

    
1369
        # Write to disk. If the job status is final, this is the final write
1370
        # allowed. Once the file has been written, it can be archived anytime.
1371
        queue.UpdateJobUnlocked(job)
1372

    
1373
        assert not waitjob
1374

    
1375
        if finalize:
1376
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1377
          return self.FINISHED
1378

    
1379
      assert not waitjob or queue.depmgr.JobWaiting(job)
1380

    
1381
      if waitjob:
1382
        return self.WAITDEP
1383
      else:
1384
        return self.DEFER
1385
    finally:
1386
      assert job.writable, "Job became read-only while being processed"
1387
      queue.release()
1388

    
1389

    
1390
def _EvaluateJobProcessorResult(depmgr, job, result):
1391
  """Looks at a result from L{_JobProcessor} for a job.
1392

1393
  To be used in a L{_JobQueueWorker}.
1394

1395
  """
1396
  if result == _JobProcessor.FINISHED:
1397
    # Notify waiting jobs
1398
    depmgr.NotifyWaiters(job.id)
1399

    
1400
  elif result == _JobProcessor.DEFER:
1401
    # Schedule again
1402
    raise workerpool.DeferTask(priority=job.CalcPriority())
1403

    
1404
  elif result == _JobProcessor.WAITDEP:
1405
    # No-op, dependency manager will re-schedule
1406
    pass
1407

    
1408
  else:
1409
    raise errors.ProgrammerError("Job processor returned unknown status %s" %
1410
                                 (result, ))
1411

    
1412

    
1413
class _JobQueueWorker(workerpool.BaseWorker):
1414
  """The actual job workers.
1415

1416
  """
1417
  def RunTask(self, job): # pylint: disable=W0221
1418
    """Job executor.
1419

1420
    @type job: L{_QueuedJob}
1421
    @param job: the job to be processed
1422

1423
    """
1424
    assert job.writable, "Expected writable job"
1425

    
1426
    # Ensure only one worker is active on a single job. If a job registers for
1427
    # a dependency job, and the other job notifies before the first worker is
1428
    # done, the job can end up in the tasklist more than once.
1429
    job.processor_lock.acquire()
1430
    try:
1431
      return self._RunTaskInner(job)
1432
    finally:
1433
      job.processor_lock.release()
1434

    
1435
  def _RunTaskInner(self, job):
1436
    """Executes a job.
1437

1438
    Must be called with per-job lock acquired.
1439

1440
    """
1441
    queue = job.queue
1442
    assert queue == self.pool.queue
1443

    
1444
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1445
    setname_fn(None)
1446

    
1447
    proc = mcpu.Processor(queue.context, job.id)
1448

    
1449
    # Create wrapper for setting thread name
1450
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1451
                                    proc.ExecOpCode)
1452

    
1453
    _EvaluateJobProcessorResult(queue.depmgr, job,
1454
                                _JobProcessor(queue, wrap_execop_fn, job)())
1455

    
1456
  @staticmethod
1457
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1458
    """Updates the worker thread name to include a short summary of the opcode.
1459

1460
    @param setname_fn: Callable setting worker thread name
1461
    @param execop_fn: Callable for executing opcode (usually
1462
                      L{mcpu.Processor.ExecOpCode})
1463

1464
    """
1465
    setname_fn(op)
1466
    try:
1467
      return execop_fn(op, *args, **kwargs)
1468
    finally:
1469
      setname_fn(None)
1470

    
1471
  @staticmethod
1472
  def _GetWorkerName(job, op):
1473
    """Sets the worker thread name.
1474

1475
    @type job: L{_QueuedJob}
1476
    @type op: L{opcodes.OpCode}
1477

1478
    """
1479
    parts = ["Job%s" % job.id]
1480

    
1481
    if op:
1482
      parts.append(op.TinySummary())
1483

    
1484
    return "/".join(parts)
1485

    
1486

    
1487
class _JobQueueWorkerPool(workerpool.WorkerPool):
1488
  """Simple class implementing a job-processing workerpool.
1489

1490
  """
1491
  def __init__(self, queue):
1492
    super(_JobQueueWorkerPool, self).__init__("Jq",
1493
                                              JOBQUEUE_THREADS,
1494
                                              _JobQueueWorker)
1495
    self.queue = queue
1496

    
1497

    
1498
class _JobDependencyManager:
1499
  """Keeps track of job dependencies.
1500

1501
  """
1502
  (WAIT,
1503
   ERROR,
1504
   CANCEL,
1505
   CONTINUE,
1506
   WRONGSTATUS) = range(1, 6)
1507

    
1508
  def __init__(self, getstatus_fn, enqueue_fn):
1509
    """Initializes this class.
1510

1511
    """
1512
    self._getstatus_fn = getstatus_fn
1513
    self._enqueue_fn = enqueue_fn
1514

    
1515
    self._waiters = {}
1516
    self._lock = locking.SharedLock("JobDepMgr")
1517

    
1518
  @locking.ssynchronized(_LOCK, shared=1)
1519
  def GetLockInfo(self, requested): # pylint: disable=W0613
1520
    """Retrieves information about waiting jobs.
1521

1522
    @type requested: set
1523
    @param requested: Requested information, see C{query.LQ_*}
1524

1525
    """
1526
    # No need to sort here, that's being done by the lock manager and query
1527
    # library. There are no priorities for notifying jobs, hence all show up as
1528
    # one item under "pending".
1529
    return [("job/%s" % job_id, None, None,
1530
             [("job", [job.id for job in waiters])])
1531
            for job_id, waiters in self._waiters.items()
1532
            if waiters]
1533

    
1534
  @locking.ssynchronized(_LOCK, shared=1)
1535
  def JobWaiting(self, job):
1536
    """Checks if a job is waiting.
1537

1538
    """
1539
    return compat.any(job in jobs
1540
                      for jobs in self._waiters.values())
1541

    
1542
  @locking.ssynchronized(_LOCK)
1543
  def CheckAndRegister(self, job, dep_job_id, dep_status):
1544
    """Checks if a dependency job has the requested status.
1545

1546
    If the other job is not yet in a finalized status, the calling job will be
1547
    notified (re-added to the workerpool) at a later point.
1548

1549
    @type job: L{_QueuedJob}
1550
    @param job: Job object
1551
    @type dep_job_id: int
1552
    @param dep_job_id: ID of dependency job
1553
    @type dep_status: list
1554
    @param dep_status: Required status
1555

1556
    """
1557
    assert ht.TJobId(job.id)
1558
    assert ht.TJobId(dep_job_id)
1559
    assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1560

    
1561
    if job.id == dep_job_id:
1562
      return (self.ERROR, "Job can't depend on itself")
1563

    
1564
    # Get status of dependency job
1565
    try:
1566
      status = self._getstatus_fn(dep_job_id)
1567
    except errors.JobLost, err:
1568
      return (self.ERROR, "Dependency error: %s" % err)
1569

    
1570
    assert status in constants.JOB_STATUS_ALL
1571

    
1572
    job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1573

    
1574
    if status not in constants.JOBS_FINALIZED:
1575
      # Register for notification and wait for job to finish
1576
      job_id_waiters.add(job)
1577
      return (self.WAIT,
1578
              "Need to wait for job %s, wanted status '%s'" %
1579
              (dep_job_id, dep_status))
1580

    
1581
    # Remove from waiters list
1582
    if job in job_id_waiters:
1583
      job_id_waiters.remove(job)
1584

    
1585
    if (status == constants.JOB_STATUS_CANCELED and
1586
        constants.JOB_STATUS_CANCELED not in dep_status):
1587
      return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1588

    
1589
    elif not dep_status or status in dep_status:
1590
      return (self.CONTINUE,
1591
              "Dependency job %s finished with status '%s'" %
1592
              (dep_job_id, status))
1593

    
1594
    else:
1595
      return (self.WRONGSTATUS,
1596
              "Dependency job %s finished with status '%s',"
1597
              " not one of '%s' as required" %
1598
              (dep_job_id, status, utils.CommaJoin(dep_status)))
1599

    
1600
  def _RemoveEmptyWaitersUnlocked(self):
1601
    """Remove all jobs without actual waiters.
1602

1603
    """
1604
    for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1605
                   if not waiters]:
1606
      del self._waiters[job_id]
1607

    
1608
  def NotifyWaiters(self, job_id):
1609
    """Notifies all jobs waiting for a certain job ID.
1610

1611
    @attention: Do not call until L{CheckAndRegister} returned a status other
1612
      than C{WAITDEP} for C{job_id}, or behaviour is undefined
1613
    @type job_id: int
1614
    @param job_id: Job ID
1615

1616
    """
1617
    assert ht.TJobId(job_id)
1618

    
1619
    self._lock.acquire()
1620
    try:
1621
      self._RemoveEmptyWaitersUnlocked()
1622

    
1623
      jobs = self._waiters.pop(job_id, None)
1624
    finally:
1625
      self._lock.release()
1626

    
1627
    if jobs:
1628
      # Re-add jobs to workerpool
1629
      logging.debug("Re-adding %s jobs which were waiting for job %s",
1630
                    len(jobs), job_id)
1631
      self._enqueue_fn(jobs)
1632

    
1633

    
1634
def _RequireNonDrainedQueue(fn):
1635
  """Decorator checking for a non-drained queue.
1636

1637
  To be used with functions submitting new jobs.
1638

1639
  """
1640
  def wrapper(self, *args, **kwargs):
1641
    """Wrapper function.
1642

1643
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1644

1645
    """
1646
    # Ok when sharing the big job queue lock, as the drain file is created when
1647
    # the lock is exclusive.
1648
    # Needs access to protected member, pylint: disable=W0212
1649
    if self._drained:
1650
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1651

    
1652
    if not self._accepting_jobs:
1653
      raise errors.JobQueueError("Job queue is shutting down, refusing job")
1654

    
1655
    return fn(self, *args, **kwargs)
1656
  return wrapper
1657

    
1658

    
1659
class JobQueue(object):
1660
  """Queue used to manage the jobs.
1661

1662
  """
1663
  def __init__(self, context, cfg):
1664
    """Constructor for JobQueue.
1665

1666
    The constructor will initialize the job queue object and then
1667
    start loading the current jobs from disk, either for starting them
1668
    (if they were queue) or for aborting them (if they were already
1669
    running).
1670

1671
    @type context: GanetiContext
1672
    @param context: the context object for access to the configuration
1673
        data and other ganeti objects
1674

1675
    """
1676
    self.context = context
1677
    self._memcache = weakref.WeakValueDictionary()
1678
    self._my_hostname = netutils.Hostname.GetSysName()
1679

    
1680
    # The Big JobQueue lock. If a code block or method acquires it in shared
1681
    # mode safe it must guarantee concurrency with all the code acquiring it in
1682
    # shared mode, including itself. In order not to acquire it at all
1683
    # concurrency must be guaranteed with all code acquiring it in shared mode
1684
    # and all code acquiring it exclusively.
1685
    self._lock = locking.SharedLock("JobQueue")
1686

    
1687
    self.acquire = self._lock.acquire
1688
    self.release = self._lock.release
1689

    
1690
    # Accept jobs by default
1691
    self._accepting_jobs = True
1692

    
1693
    # Read serial file
1694
    self._last_serial = jstore.ReadSerial()
1695
    assert self._last_serial is not None, ("Serial file was modified between"
1696
                                           " check in jstore and here")
1697

    
1698
    # Get initial list of nodes
1699
    self._nodes = dict((n.name, n.primary_ip)
1700
                       for n in cfg.GetAllNodesInfo().values()
1701
                       if n.master_candidate)
1702

    
1703
    # Remove master node
1704
    self._nodes.pop(self._my_hostname, None)
1705

    
1706
    # TODO: Check consistency across nodes
1707

    
1708
    self._queue_size = None
1709
    self._UpdateQueueSizeUnlocked()
1710
    assert ht.TInt(self._queue_size)
1711
    self._drained = jstore.CheckDrainFlag()
1712

    
1713
    # Job dependencies
1714
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1715
                                        self._EnqueueJobs)
1716
    self.context.glm.AddToLockMonitor(self.depmgr)
1717

    
1718
    # Setup worker pool
1719
    self._wpool = _JobQueueWorkerPool(self)
1720

    
1721
  def _PickupJobUnlocked(self, job_id):
1722
    """Load a job from the job queue
1723

1724
    Pick up a job that already is in the job queue and start/resume it.
1725

1726
    """
1727
    job = self._LoadJobUnlocked(job_id)
1728

    
1729
    if job is None:
1730
      logging.warning("Job %s could not be read", job_id)
1731
      return
1732

    
1733
    job.AddReasons(pickup=True)
1734

    
1735
    status = job.CalcStatus()
1736
    if status == constants.JOB_STATUS_QUEUED:
1737
      job.SetPid(os.getpid())
1738
      self._EnqueueJobsUnlocked([job])
1739
      logging.info("Restarting job %s", job.id)
1740

    
1741
    elif status in (constants.JOB_STATUS_RUNNING,
1742
                    constants.JOB_STATUS_WAITING,
1743
                    constants.JOB_STATUS_CANCELING):
1744
      logging.warning("Unfinished job %s found: %s", job.id, job)
1745

    
1746
      if status == constants.JOB_STATUS_WAITING:
1747
        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1748
        job.SetPid(os.getpid())
1749
        self._EnqueueJobsUnlocked([job])
1750
        logging.info("Restarting job %s", job.id)
1751
      else:
1752
        to_encode = errors.OpExecError("Unclean master daemon shutdown")
1753
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1754
                              _EncodeOpError(to_encode))
1755
        job.Finalize()
1756

    
1757
    self.UpdateJobUnlocked(job)
1758

    
1759
  @locking.ssynchronized(_LOCK)
1760
  def PickupJob(self, job_id):
1761
    self._PickupJobUnlocked(job_id)
1762

    
1763
  def _GetRpc(self, address_list):
1764
    """Gets RPC runner with context.
1765

1766
    """
1767
    return rpc.JobQueueRunner(self.context, address_list)
1768

    
1769
  @locking.ssynchronized(_LOCK)
1770
  def AddNode(self, node):
1771
    """Register a new node with the queue.
1772

1773
    @type node: L{objects.Node}
1774
    @param node: the node object to be added
1775

1776
    """
1777
    node_name = node.name
1778
    assert node_name != self._my_hostname
1779

    
1780
    # Clean queue directory on added node
1781
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1782
    msg = result.fail_msg
1783
    if msg:
1784
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1785
                      node_name, msg)
1786

    
1787
    if not node.master_candidate:
1788
      # remove if existing, ignoring errors
1789
      self._nodes.pop(node_name, None)
1790
      # and skip the replication of the job ids
1791
      return
1792

    
1793
    # Upload the whole queue excluding archived jobs
1794
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1795

    
1796
    # Upload current serial file
1797
    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1798

    
1799
    # Static address list
1800
    addrs = [node.primary_ip]
1801

    
1802
    for file_name in files:
1803
      # Read file content
1804
      content = utils.ReadFile(file_name)
1805

    
1806
      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1807
                             file_name, content)
1808
      msg = result[node_name].fail_msg
1809
      if msg:
1810
        logging.error("Failed to upload file %s to node %s: %s",
1811
                      file_name, node_name, msg)
1812

    
1813
    # Set queue drained flag
1814
    result = \
1815
      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1816
                                                       self._drained)
1817
    msg = result[node_name].fail_msg
1818
    if msg:
1819
      logging.error("Failed to set queue drained flag on node %s: %s",
1820
                    node_name, msg)
1821

    
1822
    self._nodes[node_name] = node.primary_ip
1823

    
1824
  @locking.ssynchronized(_LOCK)
1825
  def RemoveNode(self, node_name):
1826
    """Callback called when removing nodes from the cluster.
1827

1828
    @type node_name: str
1829
    @param node_name: the name of the node to remove
1830

1831
    """
1832
    self._nodes.pop(node_name, None)
1833

    
1834
  @staticmethod
1835
  def _CheckRpcResult(result, nodes, failmsg):
1836
    """Verifies the status of an RPC call.
1837

1838
    Since we aim to keep consistency should this node (the current
1839
    master) fail, we will log errors if our rpc fail, and especially
1840
    log the case when more than half of the nodes fails.
1841

1842
    @param result: the data as returned from the rpc call
1843
    @type nodes: list
1844
    @param nodes: the list of nodes we made the call to
1845
    @type failmsg: str
1846
    @param failmsg: the identifier to be used for logging
1847

1848
    """
1849
    failed = []
1850
    success = []
1851

    
1852
    for node in nodes:
1853
      msg = result[node].fail_msg
1854
      if msg:
1855
        failed.append(node)
1856
        logging.error("RPC call %s (%s) failed on node %s: %s",
1857
                      result[node].call, failmsg, node, msg)
1858
      else:
1859
        success.append(node)
1860

    
1861
    # +1 for the master node
1862
    if (len(success) + 1) < len(failed):
1863
      # TODO: Handle failing nodes
1864
      logging.error("More than half of the nodes failed")
1865

    
1866
  def _GetNodeIp(self):
1867
    """Helper for returning the node name/ip list.
1868

1869
    @rtype: (list, list)
1870
    @return: a tuple of two lists, the first one with the node
1871
        names and the second one with the node addresses
1872

1873
    """
1874
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1875
    name_list = self._nodes.keys()
1876
    addr_list = [self._nodes[name] for name in name_list]
1877
    return name_list, addr_list
1878

    
1879
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1880
    """Writes a file locally and then replicates it to all nodes.
1881

1882
    This function will replace the contents of a file on the local
1883
    node and then replicate it to all the other nodes we have.
1884

1885
    @type file_name: str
1886
    @param file_name: the path of the file to be replicated
1887
    @type data: str
1888
    @param data: the new contents of the file
1889
    @type replicate: boolean
1890
    @param replicate: whether to spread the changes to the remote nodes
1891

1892
    """
1893
    getents = runtime.GetEnts()
1894
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1895
                    gid=getents.daemons_gid,
1896
                    mode=constants.JOB_QUEUE_FILES_PERMS)
1897

    
1898
    if replicate:
1899
      names, addrs = self._GetNodeIp()
1900
      result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1901
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1902

    
1903
  def _RenameFilesUnlocked(self, rename):
1904
    """Renames a file locally and then replicate the change.
1905

1906
    This function will rename a file in the local queue directory
1907
    and then replicate this rename to all the other nodes we have.
1908

1909
    @type rename: list of (old, new)
1910
    @param rename: List containing tuples mapping old to new names
1911

1912
    """
1913
    # Rename them locally
1914
    for old, new in rename:
1915
      utils.RenameFile(old, new, mkdir=True)
1916

    
1917
    # ... and on all nodes
1918
    names, addrs = self._GetNodeIp()
1919
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1920
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1921

    
1922
  @staticmethod
1923
  def _GetJobPath(job_id):
1924
    """Returns the job file for a given job id.
1925

1926
    @type job_id: str
1927
    @param job_id: the job identifier
1928
    @rtype: str
1929
    @return: the path to the job file
1930

1931
    """
1932
    return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1933

    
1934
  @staticmethod
1935
  def _GetArchivedJobPath(job_id):
1936
    """Returns the archived job file for a give job id.
1937

1938
    @type job_id: str
1939
    @param job_id: the job identifier
1940
    @rtype: str
1941
    @return: the path to the archived job file
1942

1943
    """
1944
    return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1945
                          jstore.GetArchiveDirectory(job_id),
1946
                          "job-%s" % job_id)
1947

    
1948
  @staticmethod
1949
  def _DetermineJobDirectories(archived):
1950
    """Build list of directories containing job files.
1951

1952
    @type archived: bool
1953
    @param archived: Whether to include directories for archived jobs
1954
    @rtype: list
1955

1956
    """
1957
    result = [pathutils.QUEUE_DIR]
1958

    
1959
    if archived:
1960
      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1961
      result.extend(map(compat.partial(utils.PathJoin, archive_path),
1962
                        utils.ListVisibleFiles(archive_path)))
1963

    
1964
    return result
1965

    
1966
  @classmethod
1967
  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1968
    """Return all known job IDs.
1969

1970
    The method only looks at disk because it's a requirement that all
1971
    jobs are present on disk (so in the _memcache we don't have any
1972
    extra IDs).
1973

1974
    @type sort: boolean
1975
    @param sort: perform sorting on the returned job ids
1976
    @rtype: list
1977
    @return: the list of job IDs
1978

1979
    """
1980
    jlist = []
1981

    
1982
    for path in cls._DetermineJobDirectories(archived):
1983
      for filename in utils.ListVisibleFiles(path):
1984
        m = constants.JOB_FILE_RE.match(filename)
1985
        if m:
1986
          jlist.append(int(m.group(1)))
1987

    
1988
    if sort:
1989
      jlist.sort()
1990
    return jlist
1991

    
1992
  def _LoadJobUnlocked(self, job_id):
1993
    """Loads a job from the disk or memory.
1994

1995
    Given a job id, this will return the cached job object if
1996
    existing, or try to load the job from the disk. If loading from
1997
    disk, it will also add the job to the cache.
1998

1999
    @type job_id: int
2000
    @param job_id: the job id
2001
    @rtype: L{_QueuedJob} or None
2002
    @return: either None or the job object
2003

2004
    """
2005
    assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2006

    
2007
    job = self._memcache.get(job_id, None)
2008
    if job:
2009
      logging.debug("Found job %s in memcache", job_id)
2010
      assert job.writable, "Found read-only job in memcache"
2011
      return job
2012

    
2013
    try:
2014
      job = self._LoadJobFromDisk(job_id, False)
2015
      if job is None:
2016
        return job
2017
    except errors.JobFileCorrupted:
2018
      old_path = self._GetJobPath(job_id)
2019
      new_path = self._GetArchivedJobPath(job_id)
2020
      if old_path == new_path:
2021
        # job already archived (future case)
2022
        logging.exception("Can't parse job %s", job_id)
2023
      else:
2024
        # non-archived case
2025
        logging.exception("Can't parse job %s, will archive.", job_id)
2026
        self._RenameFilesUnlocked([(old_path, new_path)])
2027
      return None
2028

    
2029
    assert job.writable, "Job just loaded is not writable"
2030

    
2031
    self._memcache[job_id] = job
2032
    logging.debug("Added job %s to the cache", job_id)
2033
    return job
2034

    
2035
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2036
    """Load the given job file from disk.
2037

2038
    Given a job file, read, load and restore it in a _QueuedJob format.
2039

2040
    @type job_id: int
2041
    @param job_id: job identifier
2042
    @type try_archived: bool
2043
    @param try_archived: Whether to try loading an archived job
2044
    @rtype: L{_QueuedJob} or None
2045
    @return: either None or the job object
2046

2047
    """
2048
    path_functions = [(self._GetJobPath, False)]
2049

    
2050
    if try_archived:
2051
      path_functions.append((self._GetArchivedJobPath, True))
2052

    
2053
    raw_data = None
2054
    archived = None
2055

    
2056
    for (fn, archived) in path_functions:
2057
      filepath = fn(job_id)
2058
      logging.debug("Loading job from %s", filepath)
2059
      try:
2060
        raw_data = utils.ReadFile(filepath)
2061
      except EnvironmentError, err:
2062
        if err.errno != errno.ENOENT:
2063
          raise
2064
      else:
2065
        break
2066

    
2067
    if not raw_data:
2068
      return None
2069

    
2070
    if writable is None:
2071
      writable = not archived
2072

    
2073
    try:
2074
      data = serializer.LoadJson(raw_data)
2075
      job = _QueuedJob.Restore(self, data, writable, archived)
2076
    except Exception, err: # pylint: disable=W0703
2077
      raise errors.JobFileCorrupted(err)
2078

    
2079
    return job
2080

    
2081
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2082
    """Load the given job file from disk.
2083

2084
    Given a job file, read, load and restore it in a _QueuedJob format.
2085
    In case of error reading the job, it gets returned as None, and the
2086
    exception is logged.
2087

2088
    @type job_id: int
2089
    @param job_id: job identifier
2090
    @type try_archived: bool
2091
    @param try_archived: Whether to try loading an archived job
2092
    @rtype: L{_QueuedJob} or None
2093
    @return: either None or the job object
2094

2095
    """
2096
    try:
2097
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2098
    except (errors.JobFileCorrupted, EnvironmentError):
2099
      logging.exception("Can't load/parse job %s", job_id)
2100
      return None
2101

    
2102
  def _UpdateQueueSizeUnlocked(self):
2103
    """Update the queue size.
2104

2105
    """
2106
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2107

    
2108
  @locking.ssynchronized(_LOCK)
2109
  def SetDrainFlag(self, drain_flag):
2110
    """Sets the drain flag for the queue.
2111

2112
    @type drain_flag: boolean
2113
    @param drain_flag: Whether to set or unset the drain flag
2114

2115
    """
2116
    # Change flag locally
2117
    jstore.SetDrainFlag(drain_flag)
2118

    
2119
    self._drained = drain_flag
2120

    
2121
    # ... and on all nodes
2122
    (names, addrs) = self._GetNodeIp()
2123
    result = \
2124
      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2125
    self._CheckRpcResult(result, self._nodes,
2126
                         "Setting queue drain flag to %s" % drain_flag)
2127

    
2128
    return True
2129

    
2130
  @classmethod
2131
  def SubmitJob(cls, ops):
2132
    """Create and store a new job.
2133

2134
    """
2135
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2136

    
2137
  @classmethod
2138
  def SubmitJobToDrainedQueue(cls, ops):
2139
    """Forcefully create and store a new job.
2140

2141
    Do so, even if the job queue is drained.
2142

2143
    """
2144
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2145
        .SubmitJobToDrainedQueue(ops)
2146

    
2147
  @classmethod
2148
  def SubmitManyJobs(cls, jobs):
2149
    """Create and store multiple jobs.
2150

2151
    """
2152
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2153

    
2154
  @staticmethod
2155
  def _FormatSubmitError(msg, ops):
2156
    """Formats errors which occurred while submitting a job.
2157

2158
    """
2159
    return ("%s; opcodes %s" %
2160
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2161

    
2162
  @staticmethod
2163
  def _ResolveJobDependencies(resolve_fn, deps):
2164
    """Resolves relative job IDs in dependencies.
2165

2166
    @type resolve_fn: callable
2167
    @param resolve_fn: Function to resolve a relative job ID
2168
    @type deps: list
2169
    @param deps: Dependencies
2170
    @rtype: tuple; (boolean, string or list)
2171
    @return: If successful (first tuple item), the returned list contains
2172
      resolved job IDs along with the requested status; if not successful,
2173
      the second element is an error message
2174

2175
    """
2176
    result = []
2177

    
2178
    for (dep_job_id, dep_status) in deps:
2179
      if ht.TRelativeJobId(dep_job_id):
2180
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2181
        try:
2182
          job_id = resolve_fn(dep_job_id)
2183
        except IndexError:
2184
          # Abort
2185
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2186
      else:
2187
        job_id = dep_job_id
2188

    
2189
      result.append((job_id, dep_status))
2190

    
2191
    return (True, result)
2192

    
2193
  @locking.ssynchronized(_LOCK)
2194
  def _EnqueueJobs(self, jobs):
2195
    """Helper function to add jobs to worker pool's queue.
2196

2197
    @type jobs: list
2198
    @param jobs: List of all jobs
2199

2200
    """
2201
    return self._EnqueueJobsUnlocked(jobs)
2202

    
2203
  def _EnqueueJobsUnlocked(self, jobs):
2204
    """Helper function to add jobs to worker pool's queue.
2205

2206
    @type jobs: list
2207
    @param jobs: List of all jobs
2208

2209
    """
2210
    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2211
    self._wpool.AddManyTasks([(job, ) for job in jobs],
2212
                             priority=[job.CalcPriority() for job in jobs],
2213
                             task_id=map(_GetIdAttr, jobs))
2214

    
2215
  def _GetJobStatusForDependencies(self, job_id):
2216
    """Gets the status of a job for dependencies.
2217

2218
    @type job_id: int
2219
    @param job_id: Job ID
2220
    @raise errors.JobLost: If job can't be found
2221

2222
    """
2223
    # Not using in-memory cache as doing so would require an exclusive lock
2224

    
2225
    # Try to load from disk
2226
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2227

    
2228
    assert not job.writable, "Got writable job" # pylint: disable=E1101
2229

    
2230
    if job:
2231
      return job.CalcStatus()
2232

    
2233
    raise errors.JobLost("Job %s not found" % job_id)
2234

    
2235
  def UpdateJobUnlocked(self, job, replicate=True):
2236
    """Update a job's on disk storage.
2237

2238
    After a job has been modified, this function needs to be called in
2239
    order to write the changes to disk and replicate them to the other
2240
    nodes.
2241

2242
    @type job: L{_QueuedJob}
2243
    @param job: the changed job
2244
    @type replicate: boolean
2245
    @param replicate: whether to replicate the change to remote nodes
2246

2247
    """
2248
    if __debug__:
2249
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2250
      assert (finalized ^ (job.end_timestamp is None))
2251
      assert job.writable, "Can't update read-only job"
2252
      assert not job.archived, "Can't update archived job"
2253

    
2254
    filename = self._GetJobPath(job.id)
2255
    data = serializer.DumpJson(job.Serialize())
2256
    logging.debug("Writing job %s to %s", job.id, filename)
2257
    self._UpdateJobQueueFile(filename, data, replicate)
2258

    
2259
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2260
                        timeout):
2261
    """Waits for changes in a job.
2262

2263
    @type job_id: int
2264
    @param job_id: Job identifier
2265
    @type fields: list of strings
2266
    @param fields: Which fields to check for changes
2267
    @type prev_job_info: list or None
2268
    @param prev_job_info: Last job information returned
2269
    @type prev_log_serial: int
2270
    @param prev_log_serial: Last job message serial number
2271
    @type timeout: float
2272
    @param timeout: maximum time to wait in seconds
2273
    @rtype: tuple (job info, log entries)
2274
    @return: a tuple of the job information as required via
2275
        the fields parameter, and the log entries as a list
2276

2277
        if the job has not changed and the timeout has expired,
2278
        we instead return a special value,
2279
        L{constants.JOB_NOTCHANGED}, which should be interpreted
2280
        as such by the clients
2281

2282
    """
2283
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2284
                             writable=False)
2285

    
2286
    helper = _WaitForJobChangesHelper()
2287

    
2288
    return helper(self._GetJobPath(job_id), load_fn,
2289
                  fields, prev_job_info, prev_log_serial, timeout)
2290

    
2291
  def HasJobBeenFinalized(self, job_id):
2292
    """Checks if a job has been finalized.
2293

2294
    @type job_id: int
2295
    @param job_id: Job identifier
2296
    @rtype: boolean
2297
    @return: True if the job has been finalized,
2298
        False if the timeout has been reached,
2299
        None if the job doesn't exist
2300

2301
    """
2302
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2303
    if job is not None:
2304
      return job.CalcStatus() in constants.JOBS_FINALIZED
2305
    else:
2306
      return None
2307

    
2308
  @locking.ssynchronized(_LOCK)
2309
  def CancelJob(self, job_id):
2310
    """Cancels a job.
2311

2312
    This will only succeed if the job has not started yet.
2313

2314
    @type job_id: int
2315
    @param job_id: job ID of job to be cancelled.
2316

2317
    """
2318
    logging.info("Cancelling job %s", job_id)
2319

    
2320
    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2321

    
2322
  @locking.ssynchronized(_LOCK)
2323
  def ChangeJobPriority(self, job_id, priority):
2324
    """Changes a job's priority.
2325

2326
    @type job_id: int
2327
    @param job_id: ID of the job whose priority should be changed
2328
    @type priority: int
2329
    @param priority: New priority
2330

2331
    """
2332
    logging.info("Changing priority of job %s to %s", job_id, priority)
2333

    
2334
    if priority not in constants.OP_PRIO_SUBMIT_VALID:
2335
      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2336
      raise errors.GenericError("Invalid priority %s, allowed are %s" %
2337
                                (priority, allowed))
2338

    
2339
    def fn(job):
2340
      (success, msg) = job.ChangePriority(priority)
2341

    
2342
      if success:
2343
        try:
2344
          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2345
        except workerpool.NoSuchTask:
2346
          logging.debug("Job %s is not in workerpool at this time", job.id)
2347

    
2348
      return (success, msg)
2349

    
2350
    return self._ModifyJobUnlocked(job_id, fn)
2351

    
2352
  def _ModifyJobUnlocked(self, job_id, mod_fn):
2353
    """Modifies a job.
2354

2355
    @type job_id: int
2356
    @param job_id: Job ID
2357
    @type mod_fn: callable
2358
    @param mod_fn: Modifying function, receiving job object as parameter,
2359
      returning tuple of (status boolean, message string)
2360

2361
    """
2362
    job = self._LoadJobUnlocked(job_id)
2363
    if not job:
2364
      logging.debug("Job %s not found", job_id)
2365
      return (False, "Job %s not found" % job_id)
2366

    
2367
    assert job.writable, "Can't modify read-only job"
2368
    assert not job.archived, "Can't modify archived job"
2369

    
2370
    (success, msg) = mod_fn(job)
2371

    
2372
    if success:
2373
      # If the job was finalized (e.g. cancelled), this is the final write
2374
      # allowed. The job can be archived anytime.
2375
      self.UpdateJobUnlocked(job)
2376

    
2377
    return (success, msg)
2378

    
2379
  def _ArchiveJobsUnlocked(self, jobs):
2380
    """Archives jobs.
2381

2382
    @type jobs: list of L{_QueuedJob}
2383
    @param jobs: Job objects
2384
    @rtype: int
2385
    @return: Number of archived jobs
2386

2387
    """
2388
    archive_jobs = []
2389
    rename_files = []
2390
    for job in jobs:
2391
      assert job.writable, "Can't archive read-only job"
2392
      assert not job.archived, "Can't cancel archived job"
2393

    
2394
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2395
        logging.debug("Job %s is not yet done", job.id)
2396
        continue
2397

    
2398
      archive_jobs.append(job)
2399

    
2400
      old = self._GetJobPath(job.id)
2401
      new = self._GetArchivedJobPath(job.id)
2402
      rename_files.append((old, new))
2403

    
2404
    # TODO: What if 1..n files fail to rename?
2405
    self._RenameFilesUnlocked(rename_files)
2406

    
2407
    logging.debug("Successfully archived job(s) %s",
2408
                  utils.CommaJoin(job.id for job in archive_jobs))
2409

    
2410
    # Since we haven't quite checked, above, if we succeeded or failed renaming
2411
    # the files, we update the cached queue size from the filesystem. When we
2412
    # get around to fix the TODO: above, we can use the number of actually
2413
    # archived jobs to fix this.
2414
    self._UpdateQueueSizeUnlocked()
2415
    return len(archive_jobs)
2416

    
2417
  @locking.ssynchronized(_LOCK)
2418
  def ArchiveJob(self, job_id):
2419
    """Archives a job.
2420

2421
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
2422

2423
    @type job_id: int
2424
    @param job_id: Job ID of job to be archived.
2425
    @rtype: bool
2426
    @return: Whether job was archived
2427

2428
    """
2429
    logging.info("Archiving job %s", job_id)
2430

    
2431
    job = self._LoadJobUnlocked(job_id)
2432
    if not job:
2433
      logging.debug("Job %s not found", job_id)
2434
      return False
2435

    
2436
    return self._ArchiveJobsUnlocked([job]) == 1
2437

    
2438
  @locking.ssynchronized(_LOCK)
2439
  def AutoArchiveJobs(self, age, timeout):
2440
    """Archives all jobs based on age.
2441

2442
    The method will archive all jobs which are older than the age
2443
    parameter. For jobs that don't have an end timestamp, the start
2444
    timestamp will be considered. The special '-1' age will cause
2445
    archival of all jobs (that are not running or queued).
2446

2447
    @type age: int
2448
    @param age: the minimum age in seconds
2449

2450
    """
2451
    logging.info("Archiving jobs with age more than %s seconds", age)
2452

    
2453
    now = time.time()
2454
    end_time = now + timeout
2455
    archived_count = 0
2456
    last_touched = 0
2457

    
2458
    all_job_ids = self._GetJobIDsUnlocked()
2459
    pending = []
2460
    for idx, job_id in enumerate(all_job_ids):
2461
      last_touched = idx + 1
2462

    
2463
      # Not optimal because jobs could be pending
2464
      # TODO: Measure average duration for job archival and take number of
2465
      # pending jobs into account.
2466
      if time.time() > end_time:
2467
        break
2468

    
2469
      # Returns None if the job failed to load
2470
      job = self._LoadJobUnlocked(job_id)
2471
      if job:
2472
        if job.end_timestamp is None:
2473
          if job.start_timestamp is None:
2474
            job_age = job.received_timestamp
2475
          else:
2476
            job_age = job.start_timestamp
2477
        else:
2478
          job_age = job.end_timestamp
2479

    
2480
        if age == -1 or now - job_age[0] > age:
2481
          pending.append(job)
2482

    
2483
          # Archive 10 jobs at a time
2484
          if len(pending) >= 10:
2485
            archived_count += self._ArchiveJobsUnlocked(pending)
2486
            pending = []
2487

    
2488
    if pending:
2489
      archived_count += self._ArchiveJobsUnlocked(pending)
2490

    
2491
    return (archived_count, len(all_job_ids) - last_touched)
2492

    
2493
  def _Query(self, fields, qfilter):
2494
    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2495
                       namefield="id")
2496

    
2497
    # Archived jobs are only looked at if the "archived" field is referenced
2498
    # either as a requested field or in the filter. By default archived jobs
2499
    # are ignored.
2500
    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2501

    
2502
    job_ids = qobj.RequestedNames()
2503

    
2504
    list_all = (job_ids is None)
2505

    
2506
    if list_all:
2507
      # Since files are added to/removed from the queue atomically, there's no
2508
      # risk of getting the job ids in an inconsistent state.
2509
      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2510

    
2511
    jobs = []
2512

    
2513
    for job_id in job_ids:
2514
      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2515
      if job is not None or not list_all:
2516
        jobs.append((job_id, job))
2517

    
2518
    return (qobj, jobs, list_all)
2519

    
2520
  def QueryJobs(self, fields, qfilter):
2521
    """Returns a list of jobs in queue.
2522

2523
    @type fields: sequence
2524
    @param fields: List of wanted fields
2525
    @type qfilter: None or query2 filter (list)
2526
    @param qfilter: Query filter
2527

2528
    """
2529
    (qobj, ctx, _) = self._Query(fields, qfilter)
2530

    
2531
    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2532

    
2533
  def OldStyleQueryJobs(self, job_ids, fields):
2534
    """Returns a list of jobs in queue.
2535

2536
    @type job_ids: list
2537
    @param job_ids: sequence of job identifiers or None for all
2538
    @type fields: list
2539
    @param fields: names of fields to return
2540
    @rtype: list
2541
    @return: list one element per job, each element being list with
2542
        the requested fields
2543

2544
    """
2545
    # backwards compat:
2546
    job_ids = [int(jid) for jid in job_ids]
2547
    qfilter = qlang.MakeSimpleFilter("id", job_ids)
2548

    
2549
    (qobj, ctx, _) = self._Query(fields, qfilter)
2550

    
2551
    return qobj.OldStyleQuery(ctx, sort_by_name=False)
2552

    
2553
  @locking.ssynchronized(_LOCK)
2554
  def PrepareShutdown(self):
2555
    """Prepare to stop the job queue.
2556

2557
    Disables execution of jobs in the workerpool and returns whether there are
2558
    any jobs currently running. If the latter is the case, the job queue is not
2559
    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2560
    be called without interfering with any job. Queued and unfinished jobs will
2561
    be resumed next time.
2562

2563
    Once this function has been called no new job submissions will be accepted
2564
    (see L{_RequireNonDrainedQueue}).
2565

2566
    @rtype: bool
2567
    @return: Whether there are any running jobs
2568

2569
    """
2570
    if self._accepting_jobs:
2571
      self._accepting_jobs = False
2572

    
2573
      # Tell worker pool to stop processing pending tasks
2574
      self._wpool.SetActive(False)
2575

    
2576
    return self._wpool.HasRunningTasks()
2577

    
2578
  def AcceptingJobsUnlocked(self):
2579
    """Returns whether jobs are accepted.
2580

2581
    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2582
    queue is shutting down.
2583

2584
    @rtype: bool
2585

2586
    """
2587
    return self._accepting_jobs
2588

    
2589
  @locking.ssynchronized(_LOCK)
2590
  def Shutdown(self):
2591
    """Stops the job queue.
2592

2593
    This shutdowns all the worker threads an closes the queue.
2594

2595
    """
2596
    self._wpool.TerminateWorkers()