Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b80cc518

History | View | Annotate | Download (53.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
    self._InitInMemory(self)
207

    
208
  @staticmethod
209
  def _InitInMemory(obj):
210
    """Initializes in-memory variables.
211

212
    """
213
    obj.ops_iter = None
214

    
215
  def __repr__(self):
216
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217
              "id=%s" % self.id,
218
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219

    
220
    return "<%s at %#x>" % (" ".join(status), id(self))
221

    
222
  @classmethod
223
  def Restore(cls, queue, state):
224
    """Restore a _QueuedJob from serialized state:
225

226
    @type queue: L{JobQueue}
227
    @param queue: to which queue the restored job belongs
228
    @type state: dict
229
    @param state: the serialized state
230
    @rtype: _JobQueue
231
    @return: the restored _JobQueue instance
232

233
    """
234
    obj = _QueuedJob.__new__(cls)
235
    obj.queue = queue
236
    obj.id = state["id"]
237
    obj.received_timestamp = state.get("received_timestamp", None)
238
    obj.start_timestamp = state.get("start_timestamp", None)
239
    obj.end_timestamp = state.get("end_timestamp", None)
240

    
241
    obj.ops = []
242
    obj.log_serial = 0
243
    for op_state in state["ops"]:
244
      op = _QueuedOpCode.Restore(op_state)
245
      for log_entry in op.log:
246
        obj.log_serial = max(obj.log_serial, log_entry[0])
247
      obj.ops.append(op)
248

    
249
    cls._InitInMemory(obj)
250

    
251
    return obj
252

    
253
  def Serialize(self):
254
    """Serialize the _JobQueue instance.
255

256
    @rtype: dict
257
    @return: the serialized state
258

259
    """
260
    return {
261
      "id": self.id,
262
      "ops": [op.Serialize() for op in self.ops],
263
      "start_timestamp": self.start_timestamp,
264
      "end_timestamp": self.end_timestamp,
265
      "received_timestamp": self.received_timestamp,
266
      }
267

    
268
  def CalcStatus(self):
269
    """Compute the status of this job.
270

271
    This function iterates over all the _QueuedOpCodes in the job and
272
    based on their status, computes the job status.
273

274
    The algorithm is:
275
      - if we find a cancelled, or finished with error, the job
276
        status will be the same
277
      - otherwise, the last opcode with the status one of:
278
          - waitlock
279
          - canceling
280
          - running
281

282
        will determine the job status
283

284
      - otherwise, it means either all opcodes are queued, or success,
285
        and the job status will be the same
286

287
    @return: the job status
288

289
    """
290
    status = constants.JOB_STATUS_QUEUED
291

    
292
    all_success = True
293
    for op in self.ops:
294
      if op.status == constants.OP_STATUS_SUCCESS:
295
        continue
296

    
297
      all_success = False
298

    
299
      if op.status == constants.OP_STATUS_QUEUED:
300
        pass
301
      elif op.status == constants.OP_STATUS_WAITLOCK:
302
        status = constants.JOB_STATUS_WAITLOCK
303
      elif op.status == constants.OP_STATUS_RUNNING:
304
        status = constants.JOB_STATUS_RUNNING
305
      elif op.status == constants.OP_STATUS_CANCELING:
306
        status = constants.JOB_STATUS_CANCELING
307
        break
308
      elif op.status == constants.OP_STATUS_ERROR:
309
        status = constants.JOB_STATUS_ERROR
310
        # The whole job fails if one opcode failed
311
        break
312
      elif op.status == constants.OP_STATUS_CANCELED:
313
        status = constants.OP_STATUS_CANCELED
314
        break
315

    
316
    if all_success:
317
      status = constants.JOB_STATUS_SUCCESS
318

    
319
    return status
320

    
321
  def CalcPriority(self):
322
    """Gets the current priority for this job.
323

324
    Only unfinished opcodes are considered. When all are done, the default
325
    priority is used.
326

327
    @rtype: int
328

329
    """
330
    priorities = [op.priority for op in self.ops
331
                  if op.status not in constants.OPS_FINALIZED]
332

    
333
    if not priorities:
334
      # All opcodes are done, assume default priority
335
      return constants.OP_PRIO_DEFAULT
336

    
337
    return min(priorities)
338

    
339
  def GetLogEntries(self, newer_than):
340
    """Selectively returns the log entries.
341

342
    @type newer_than: None or int
343
    @param newer_than: if this is None, return all log entries,
344
        otherwise return only the log entries with serial higher
345
        than this value
346
    @rtype: list
347
    @return: the list of the log entries selected
348

349
    """
350
    if newer_than is None:
351
      serial = -1
352
    else:
353
      serial = newer_than
354

    
355
    entries = []
356
    for op in self.ops:
357
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358

    
359
    return entries
360

    
361
  def GetInfo(self, fields):
362
    """Returns information about a job.
363

364
    @type fields: list
365
    @param fields: names of fields to return
366
    @rtype: list
367
    @return: list with one element for each field
368
    @raise errors.OpExecError: when an invalid field
369
        has been passed
370

371
    """
372
    row = []
373
    for fname in fields:
374
      if fname == "id":
375
        row.append(self.id)
376
      elif fname == "status":
377
        row.append(self.CalcStatus())
378
      elif fname == "ops":
379
        row.append([op.input.__getstate__() for op in self.ops])
380
      elif fname == "opresult":
381
        row.append([op.result for op in self.ops])
382
      elif fname == "opstatus":
383
        row.append([op.status for op in self.ops])
384
      elif fname == "oplog":
385
        row.append([op.log for op in self.ops])
386
      elif fname == "opstart":
387
        row.append([op.start_timestamp for op in self.ops])
388
      elif fname == "opexec":
389
        row.append([op.exec_timestamp for op in self.ops])
390
      elif fname == "opend":
391
        row.append([op.end_timestamp for op in self.ops])
392
      elif fname == "received_ts":
393
        row.append(self.received_timestamp)
394
      elif fname == "start_ts":
395
        row.append(self.start_timestamp)
396
      elif fname == "end_ts":
397
        row.append(self.end_timestamp)
398
      elif fname == "summary":
399
        row.append([op.input.Summary() for op in self.ops])
400
      else:
401
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
402
    return row
403

    
404
  def MarkUnfinishedOps(self, status, result):
405
    """Mark unfinished opcodes with a given status and result.
406

407
    This is an utility function for marking all running or waiting to
408
    be run opcodes with a given status. Opcodes which are already
409
    finalised are not changed.
410

411
    @param status: a given opcode status
412
    @param result: the opcode result
413

414
    """
415
    not_marked = True
416
    for op in self.ops:
417
      if op.status in constants.OPS_FINALIZED:
418
        assert not_marked, "Finalized opcodes found after non-finalized ones"
419
        continue
420
      op.status = status
421
      op.result = result
422
      not_marked = False
423

    
424
  def Cancel(self):
425
    """Marks job as canceled/-ing if possible.
426

427
    @rtype: tuple; (bool, string)
428
    @return: Boolean describing whether job was successfully canceled or marked
429
      as canceling and a text message
430

431
    """
432
    status = self.CalcStatus()
433

    
434
    if status not in (constants.JOB_STATUS_QUEUED,
435
                      constants.JOB_STATUS_WAITLOCK):
436
      logging.debug("Job %s is no longer waiting in the queue", self.id)
437
      return (False, "Job %s is no longer waiting in the queue" % self.id)
438

    
439
    if status == constants.JOB_STATUS_QUEUED:
440
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441
                             "Job canceled by request")
442
      msg = "Job %s canceled" % self.id
443

    
444
    elif status == constants.JOB_STATUS_WAITLOCK:
445
      # The worker will notice the new status and cancel the job
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447
      msg = "Job %s will be canceled" % self.id
448

    
449
    return (True, msg)
450

    
451

    
452
class _OpExecCallbacks(mcpu.OpExecCbBase):
453
  def __init__(self, queue, job, op):
454
    """Initializes this class.
455

456
    @type queue: L{JobQueue}
457
    @param queue: Job queue
458
    @type job: L{_QueuedJob}
459
    @param job: Job object
460
    @type op: L{_QueuedOpCode}
461
    @param op: OpCode
462

463
    """
464
    assert queue, "Queue is missing"
465
    assert job, "Job is missing"
466
    assert op, "Opcode is missing"
467

    
468
    self._queue = queue
469
    self._job = job
470
    self._op = op
471

    
472
  def _CheckCancel(self):
473
    """Raises an exception to cancel the job if asked to.
474

475
    """
476
    # Cancel here if we were asked to
477
    if self._op.status == constants.OP_STATUS_CANCELING:
478
      logging.debug("Canceling opcode")
479
      raise CancelJob()
480

    
481
  @locking.ssynchronized(_QUEUE, shared=1)
482
  def NotifyStart(self):
483
    """Mark the opcode as running, not lock-waiting.
484

485
    This is called from the mcpu code as a notifier function, when the LU is
486
    finally about to start the Exec() method. Of course, to have end-user
487
    visible results, the opcode must be initially (before calling into
488
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
489

490
    """
491
    assert self._op in self._job.ops
492
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
493
                               constants.OP_STATUS_CANCELING)
494

    
495
    # Cancel here if we were asked to
496
    self._CheckCancel()
497

    
498
    logging.debug("Opcode is now running")
499

    
500
    self._op.status = constants.OP_STATUS_RUNNING
501
    self._op.exec_timestamp = TimeStampNow()
502

    
503
    # And finally replicate the job status
504
    self._queue.UpdateJobUnlocked(self._job)
505

    
506
  @locking.ssynchronized(_QUEUE, shared=1)
507
  def _AppendFeedback(self, timestamp, log_type, log_msg):
508
    """Internal feedback append function, with locks
509

510
    """
511
    self._job.log_serial += 1
512
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
513
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
514

    
515
  def Feedback(self, *args):
516
    """Append a log entry.
517

518
    """
519
    assert len(args) < 3
520

    
521
    if len(args) == 1:
522
      log_type = constants.ELOG_MESSAGE
523
      log_msg = args[0]
524
    else:
525
      (log_type, log_msg) = args
526

    
527
    # The time is split to make serialization easier and not lose
528
    # precision.
529
    timestamp = utils.SplitTime(time.time())
530
    self._AppendFeedback(timestamp, log_type, log_msg)
531

    
532
  def CheckCancel(self):
533
    """Check whether job has been cancelled.
534

535
    """
536
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
537
                               constants.OP_STATUS_CANCELING)
538

    
539
    # Cancel here if we were asked to
540
    self._CheckCancel()
541

    
542

    
543
class _JobChangesChecker(object):
544
  def __init__(self, fields, prev_job_info, prev_log_serial):
545
    """Initializes this class.
546

547
    @type fields: list of strings
548
    @param fields: Fields requested by LUXI client
549
    @type prev_job_info: string
550
    @param prev_job_info: previous job info, as passed by the LUXI client
551
    @type prev_log_serial: string
552
    @param prev_log_serial: previous job serial, as passed by the LUXI client
553

554
    """
555
    self._fields = fields
556
    self._prev_job_info = prev_job_info
557
    self._prev_log_serial = prev_log_serial
558

    
559
  def __call__(self, job):
560
    """Checks whether job has changed.
561

562
    @type job: L{_QueuedJob}
563
    @param job: Job object
564

565
    """
566
    status = job.CalcStatus()
567
    job_info = job.GetInfo(self._fields)
568
    log_entries = job.GetLogEntries(self._prev_log_serial)
569

    
570
    # Serializing and deserializing data can cause type changes (e.g. from
571
    # tuple to list) or precision loss. We're doing it here so that we get
572
    # the same modifications as the data received from the client. Without
573
    # this, the comparison afterwards might fail without the data being
574
    # significantly different.
575
    # TODO: we just deserialized from disk, investigate how to make sure that
576
    # the job info and log entries are compatible to avoid this further step.
577
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
578
    # efficient, though floats will be tricky
579
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
580
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
581

    
582
    # Don't even try to wait if the job is no longer running, there will be
583
    # no changes.
584
    if (status not in (constants.JOB_STATUS_QUEUED,
585
                       constants.JOB_STATUS_RUNNING,
586
                       constants.JOB_STATUS_WAITLOCK) or
587
        job_info != self._prev_job_info or
588
        (log_entries and self._prev_log_serial != log_entries[0][0])):
589
      logging.debug("Job %s changed", job.id)
590
      return (job_info, log_entries)
591

    
592
    return None
593

    
594

    
595
class _JobFileChangesWaiter(object):
596
  def __init__(self, filename):
597
    """Initializes this class.
598

599
    @type filename: string
600
    @param filename: Path to job file
601
    @raises errors.InotifyError: if the notifier cannot be setup
602

603
    """
604
    self._wm = pyinotify.WatchManager()
605
    self._inotify_handler = \
606
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
607
    self._notifier = \
608
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
609
    try:
610
      self._inotify_handler.enable()
611
    except Exception:
612
      # pyinotify doesn't close file descriptors automatically
613
      self._notifier.stop()
614
      raise
615

    
616
  def _OnInotify(self, notifier_enabled):
617
    """Callback for inotify.
618

619
    """
620
    if not notifier_enabled:
621
      self._inotify_handler.enable()
622

    
623
  def Wait(self, timeout):
624
    """Waits for the job file to change.
625

626
    @type timeout: float
627
    @param timeout: Timeout in seconds
628
    @return: Whether there have been events
629

630
    """
631
    assert timeout >= 0
632
    have_events = self._notifier.check_events(timeout * 1000)
633
    if have_events:
634
      self._notifier.read_events()
635
    self._notifier.process_events()
636
    return have_events
637

    
638
  def Close(self):
639
    """Closes underlying notifier and its file descriptor.
640

641
    """
642
    self._notifier.stop()
643

    
644

    
645
class _JobChangesWaiter(object):
646
  def __init__(self, filename):
647
    """Initializes this class.
648

649
    @type filename: string
650
    @param filename: Path to job file
651

652
    """
653
    self._filewaiter = None
654
    self._filename = filename
655

    
656
  def Wait(self, timeout):
657
    """Waits for a job to change.
658

659
    @type timeout: float
660
    @param timeout: Timeout in seconds
661
    @return: Whether there have been events
662

663
    """
664
    if self._filewaiter:
665
      return self._filewaiter.Wait(timeout)
666

    
667
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
668
    # If this point is reached, return immediately and let caller check the job
669
    # file again in case there were changes since the last check. This avoids a
670
    # race condition.
671
    self._filewaiter = _JobFileChangesWaiter(self._filename)
672

    
673
    return True
674

    
675
  def Close(self):
676
    """Closes underlying waiter.
677

678
    """
679
    if self._filewaiter:
680
      self._filewaiter.Close()
681

    
682

    
683
class _WaitForJobChangesHelper(object):
684
  """Helper class using inotify to wait for changes in a job file.
685

686
  This class takes a previous job status and serial, and alerts the client when
687
  the current job status has changed.
688

689
  """
690
  @staticmethod
691
  def _CheckForChanges(job_load_fn, check_fn):
692
    job = job_load_fn()
693
    if not job:
694
      raise errors.JobLost()
695

    
696
    result = check_fn(job)
697
    if result is None:
698
      raise utils.RetryAgain()
699

    
700
    return result
701

    
702
  def __call__(self, filename, job_load_fn,
703
               fields, prev_job_info, prev_log_serial, timeout):
704
    """Waits for changes on a job.
705

706
    @type filename: string
707
    @param filename: File on which to wait for changes
708
    @type job_load_fn: callable
709
    @param job_load_fn: Function to load job
710
    @type fields: list of strings
711
    @param fields: Which fields to check for changes
712
    @type prev_job_info: list or None
713
    @param prev_job_info: Last job information returned
714
    @type prev_log_serial: int
715
    @param prev_log_serial: Last job message serial number
716
    @type timeout: float
717
    @param timeout: maximum time to wait in seconds
718

719
    """
720
    try:
721
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
722
      waiter = _JobChangesWaiter(filename)
723
      try:
724
        return utils.Retry(compat.partial(self._CheckForChanges,
725
                                          job_load_fn, check_fn),
726
                           utils.RETRY_REMAINING_TIME, timeout,
727
                           wait_fn=waiter.Wait)
728
      finally:
729
        waiter.Close()
730
    except (errors.InotifyError, errors.JobLost):
731
      return None
732
    except utils.RetryTimeout:
733
      return constants.JOB_NOTCHANGED
734

    
735

    
736
def _EncodeOpError(err):
737
  """Encodes an error which occurred while processing an opcode.
738

739
  """
740
  if isinstance(err, errors.GenericError):
741
    to_encode = err
742
  else:
743
    to_encode = errors.OpExecError(str(err))
744

    
745
  return errors.EncodeException(to_encode)
746

    
747

    
748
class _OpExecContext:
749
  def __init__(self, op, index, log_prefix):
750
    """Initializes this class.
751

752
    """
753
    self.op = op
754
    self.index = index
755
    self.log_prefix = log_prefix
756
    self.summary = op.input.Summary()
757

    
758

    
759
class _JobProcessor(object):
760
  def __init__(self, queue, opexec_fn, job):
761
    """Initializes this class.
762

763
    """
764
    self.queue = queue
765
    self.opexec_fn = opexec_fn
766
    self.job = job
767

    
768
  @staticmethod
769
  def _FindNextOpcode(job):
770
    """Locates the next opcode to run.
771

772
    @type job: L{_QueuedJob}
773
    @param job: Job object
774

775
    """
776
    # Create some sort of a cache to speed up locating next opcode for future
777
    # lookups
778
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
779
    # pending and one for processed ops.
780
    if job.ops_iter is None:
781
      job.ops_iter = enumerate(job.ops)
782

    
783
    # Find next opcode to run
784
    while True:
785
      try:
786
        (idx, op) = job.ops_iter.next()
787
      except StopIteration:
788
        raise errors.ProgrammerError("Called for a finished job")
789

    
790
      if op.status == constants.OP_STATUS_RUNNING:
791
        # Found an opcode already marked as running
792
        raise errors.ProgrammerError("Called for job marked as running")
793

    
794
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
795

    
796
      if op.status == constants.OP_STATUS_CANCELED:
797
        # Cancelled jobs are handled by the caller
798
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
799
                              for i in job.ops[idx:])
800

    
801
      elif op.status in constants.OPS_FINALIZED:
802
        # This is a job that was partially completed before master daemon
803
        # shutdown, so it can be expected that some opcodes are already
804
        # completed successfully (if any did error out, then the whole job
805
        # should have been aborted and not resubmitted for processing).
806
        logging.info("%s: opcode %s already processed, skipping",
807
                     opctx.log_prefix, opctx.summary)
808
        continue
809

    
810
      return opctx
811

    
812
  @staticmethod
813
  def _MarkWaitlock(job, op):
814
    """Marks an opcode as waiting for locks.
815

816
    The job's start timestamp is also set if necessary.
817

818
    @type job: L{_QueuedJob}
819
    @param job: Job object
820
    @type job: L{_QueuedOpCode}
821
    @param job: Opcode object
822

823
    """
824
    assert op in job.ops
825

    
826
    op.status = constants.OP_STATUS_WAITLOCK
827
    op.result = None
828
    op.start_timestamp = TimeStampNow()
829

    
830
    if job.start_timestamp is None:
831
      job.start_timestamp = op.start_timestamp
832

    
833
  def _ExecOpCodeUnlocked(self, opctx):
834
    """Processes one opcode and returns the result.
835

836
    """
837
    op = opctx.op
838

    
839
    assert op.status == constants.OP_STATUS_WAITLOCK
840

    
841
    try:
842
      # Make sure not to hold queue lock while calling ExecOpCode
843
      result = self.opexec_fn(op.input,
844
                              _OpExecCallbacks(self.queue, self.job, op))
845
    except CancelJob:
846
      logging.exception("%s: Canceling job", opctx.log_prefix)
847
      assert op.status == constants.OP_STATUS_CANCELING
848
      return (constants.OP_STATUS_CANCELING, None)
849
    except Exception, err: # pylint: disable-msg=W0703
850
      logging.exception("%s: Caught exception in %s",
851
                        opctx.log_prefix, opctx.summary)
852
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
853
    else:
854
      logging.debug("%s: %s successful",
855
                    opctx.log_prefix, opctx.summary)
856
      return (constants.OP_STATUS_SUCCESS, result)
857

    
858
  def __call__(self):
859
    """Continues execution of a job.
860

861
    @rtype: bool
862
    @return: True if job is finished, False if processor needs to be called
863
             again
864

865
    """
866
    queue = self.queue
867
    job = self.job
868

    
869
    logging.debug("Processing job %s", job.id)
870

    
871
    queue.acquire(shared=1)
872
    try:
873
      opcount = len(job.ops)
874

    
875
      opctx = self._FindNextOpcode(job)
876
      op = opctx.op
877

    
878
      # Consistency check
879
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
880
                                     constants.OP_STATUS_CANCELED)
881
                        for i in job.ops[opctx.index:])
882

    
883
      assert op.status in (constants.OP_STATUS_QUEUED,
884
                           constants.OP_STATUS_WAITLOCK,
885
                           constants.OP_STATUS_CANCELED)
886

    
887
      if op.status != constants.OP_STATUS_CANCELED:
888
        # Prepare to start opcode
889
        self._MarkWaitlock(job, op)
890

    
891
        assert op.status == constants.OP_STATUS_WAITLOCK
892
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
893

    
894
        # Write to disk
895
        queue.UpdateJobUnlocked(job)
896

    
897
        logging.info("%s: opcode %s waiting for locks",
898
                     opctx.log_prefix, opctx.summary)
899

    
900
        queue.release()
901
        try:
902
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
903
        finally:
904
          queue.acquire(shared=1)
905

    
906
        # Finalize opcode
907
        op.end_timestamp = TimeStampNow()
908
        op.status = op_status
909
        op.result = op_result
910

    
911
        if op.status == constants.OP_STATUS_CANCELING:
912
          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
913
                                for i in job.ops[opctx.index:])
914
        else:
915
          assert op.status in constants.OPS_FINALIZED
916

    
917
      # Ensure all opcodes so far have been successful
918
      assert (opctx.index == 0 or
919
              compat.all(i.status == constants.OP_STATUS_SUCCESS
920
                         for i in job.ops[:opctx.index]))
921

    
922
      if op.status == constants.OP_STATUS_SUCCESS:
923
        finalize = False
924

    
925
      elif op.status == constants.OP_STATUS_ERROR:
926
        # Ensure failed opcode has an exception as its result
927
        assert errors.GetEncodedError(job.ops[opctx.index].result)
928

    
929
        to_encode = errors.OpExecError("Preceding opcode failed")
930
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
931
                              _EncodeOpError(to_encode))
932
        finalize = True
933

    
934
        # Consistency check
935
        assert compat.all(i.status == constants.OP_STATUS_ERROR and
936
                          errors.GetEncodedError(i.result)
937
                          for i in job.ops[opctx.index:])
938

    
939
      elif op.status == constants.OP_STATUS_CANCELING:
940
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
941
                              "Job canceled by request")
942
        finalize = True
943

    
944
      elif op.status == constants.OP_STATUS_CANCELED:
945
        finalize = True
946

    
947
      else:
948
        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
949

    
950
      # Finalizing or last opcode?
951
      if finalize or opctx.index == (opcount - 1):
952
        # All opcodes have been run, finalize job
953
        job.end_timestamp = TimeStampNow()
954

    
955
      # Write to disk. If the job status is final, this is the final write
956
      # allowed. Once the file has been written, it can be archived anytime.
957
      queue.UpdateJobUnlocked(job)
958

    
959
      if finalize or opctx.index == (opcount - 1):
960
        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
961
        return True
962

    
963
      return False
964
    finally:
965
      queue.release()
966

    
967

    
968
class _JobQueueWorker(workerpool.BaseWorker):
969
  """The actual job workers.
970

971
  """
972
  def RunTask(self, job): # pylint: disable-msg=W0221
973
    """Job executor.
974

975
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
976
    L{_QueuedOpCode} classes.
977

978
    @type job: L{_QueuedJob}
979
    @param job: the job to be processed
980

981
    """
982
    queue = job.queue
983
    assert queue == self.pool.queue
984

    
985
    self.SetTaskName("Job%s" % job.id)
986

    
987
    proc = mcpu.Processor(queue.context, job.id)
988

    
989
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
990
      # Schedule again
991
      raise workerpool.DeferTask()
992

    
993

    
994
class _JobQueueWorkerPool(workerpool.WorkerPool):
995
  """Simple class implementing a job-processing workerpool.
996

997
  """
998
  def __init__(self, queue):
999
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1000
                                              JOBQUEUE_THREADS,
1001
                                              _JobQueueWorker)
1002
    self.queue = queue
1003

    
1004

    
1005
def _RequireOpenQueue(fn):
1006
  """Decorator for "public" functions.
1007

1008
  This function should be used for all 'public' functions. That is,
1009
  functions usually called from other classes. Note that this should
1010
  be applied only to methods (not plain functions), since it expects
1011
  that the decorated function is called with a first argument that has
1012
  a '_queue_filelock' argument.
1013

1014
  @warning: Use this decorator only after locking.ssynchronized
1015

1016
  Example::
1017
    @locking.ssynchronized(_LOCK)
1018
    @_RequireOpenQueue
1019
    def Example(self):
1020
      pass
1021

1022
  """
1023
  def wrapper(self, *args, **kwargs):
1024
    # pylint: disable-msg=W0212
1025
    assert self._queue_filelock is not None, "Queue should be open"
1026
    return fn(self, *args, **kwargs)
1027
  return wrapper
1028

    
1029

    
1030
class JobQueue(object):
1031
  """Queue used to manage the jobs.
1032

1033
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1034

1035
  """
1036
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1037

    
1038
  def __init__(self, context):
1039
    """Constructor for JobQueue.
1040

1041
    The constructor will initialize the job queue object and then
1042
    start loading the current jobs from disk, either for starting them
1043
    (if they were queue) or for aborting them (if they were already
1044
    running).
1045

1046
    @type context: GanetiContext
1047
    @param context: the context object for access to the configuration
1048
        data and other ganeti objects
1049

1050
    """
1051
    self.context = context
1052
    self._memcache = weakref.WeakValueDictionary()
1053
    self._my_hostname = netutils.Hostname.GetSysName()
1054

    
1055
    # The Big JobQueue lock. If a code block or method acquires it in shared
1056
    # mode safe it must guarantee concurrency with all the code acquiring it in
1057
    # shared mode, including itself. In order not to acquire it at all
1058
    # concurrency must be guaranteed with all code acquiring it in shared mode
1059
    # and all code acquiring it exclusively.
1060
    self._lock = locking.SharedLock("JobQueue")
1061

    
1062
    self.acquire = self._lock.acquire
1063
    self.release = self._lock.release
1064

    
1065
    # Initialize the queue, and acquire the filelock.
1066
    # This ensures no other process is working on the job queue.
1067
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1068

    
1069
    # Read serial file
1070
    self._last_serial = jstore.ReadSerial()
1071
    assert self._last_serial is not None, ("Serial file was modified between"
1072
                                           " check in jstore and here")
1073

    
1074
    # Get initial list of nodes
1075
    self._nodes = dict((n.name, n.primary_ip)
1076
                       for n in self.context.cfg.GetAllNodesInfo().values()
1077
                       if n.master_candidate)
1078

    
1079
    # Remove master node
1080
    self._nodes.pop(self._my_hostname, None)
1081

    
1082
    # TODO: Check consistency across nodes
1083

    
1084
    self._queue_size = 0
1085
    self._UpdateQueueSizeUnlocked()
1086
    self._drained = self._IsQueueMarkedDrain()
1087

    
1088
    # Setup worker pool
1089
    self._wpool = _JobQueueWorkerPool(self)
1090
    try:
1091
      self._InspectQueue()
1092
    except:
1093
      self._wpool.TerminateWorkers()
1094
      raise
1095

    
1096
  @locking.ssynchronized(_LOCK)
1097
  @_RequireOpenQueue
1098
  def _InspectQueue(self):
1099
    """Loads the whole job queue and resumes unfinished jobs.
1100

1101
    This function needs the lock here because WorkerPool.AddTask() may start a
1102
    job while we're still doing our work.
1103

1104
    """
1105
    logging.info("Inspecting job queue")
1106

    
1107
    restartjobs = []
1108

    
1109
    all_job_ids = self._GetJobIDsUnlocked()
1110
    jobs_count = len(all_job_ids)
1111
    lastinfo = time.time()
1112
    for idx, job_id in enumerate(all_job_ids):
1113
      # Give an update every 1000 jobs or 10 seconds
1114
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1115
          idx == (jobs_count - 1)):
1116
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1117
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1118
        lastinfo = time.time()
1119

    
1120
      job = self._LoadJobUnlocked(job_id)
1121

    
1122
      # a failure in loading the job can cause 'None' to be returned
1123
      if job is None:
1124
        continue
1125

    
1126
      status = job.CalcStatus()
1127

    
1128
      if status in (constants.JOB_STATUS_QUEUED, ):
1129
        restartjobs.append(job)
1130

    
1131
      elif status in (constants.JOB_STATUS_RUNNING,
1132
                      constants.JOB_STATUS_WAITLOCK,
1133
                      constants.JOB_STATUS_CANCELING):
1134
        logging.warning("Unfinished job %s found: %s", job.id, job)
1135
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1136
                              "Unclean master daemon shutdown")
1137
        self.UpdateJobUnlocked(job)
1138

    
1139
    if restartjobs:
1140
      logging.info("Restarting %s jobs", len(restartjobs))
1141
      self._EnqueueJobs(restartjobs)
1142

    
1143
    logging.info("Job queue inspection finished")
1144

    
1145
  @locking.ssynchronized(_LOCK)
1146
  @_RequireOpenQueue
1147
  def AddNode(self, node):
1148
    """Register a new node with the queue.
1149

1150
    @type node: L{objects.Node}
1151
    @param node: the node object to be added
1152

1153
    """
1154
    node_name = node.name
1155
    assert node_name != self._my_hostname
1156

    
1157
    # Clean queue directory on added node
1158
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1159
    msg = result.fail_msg
1160
    if msg:
1161
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1162
                      node_name, msg)
1163

    
1164
    if not node.master_candidate:
1165
      # remove if existing, ignoring errors
1166
      self._nodes.pop(node_name, None)
1167
      # and skip the replication of the job ids
1168
      return
1169

    
1170
    # Upload the whole queue excluding archived jobs
1171
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1172

    
1173
    # Upload current serial file
1174
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1175

    
1176
    for file_name in files:
1177
      # Read file content
1178
      content = utils.ReadFile(file_name)
1179

    
1180
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1181
                                                  [node.primary_ip],
1182
                                                  file_name, content)
1183
      msg = result[node_name].fail_msg
1184
      if msg:
1185
        logging.error("Failed to upload file %s to node %s: %s",
1186
                      file_name, node_name, msg)
1187

    
1188
    self._nodes[node_name] = node.primary_ip
1189

    
1190
  @locking.ssynchronized(_LOCK)
1191
  @_RequireOpenQueue
1192
  def RemoveNode(self, node_name):
1193
    """Callback called when removing nodes from the cluster.
1194

1195
    @type node_name: str
1196
    @param node_name: the name of the node to remove
1197

1198
    """
1199
    self._nodes.pop(node_name, None)
1200

    
1201
  @staticmethod
1202
  def _CheckRpcResult(result, nodes, failmsg):
1203
    """Verifies the status of an RPC call.
1204

1205
    Since we aim to keep consistency should this node (the current
1206
    master) fail, we will log errors if our rpc fail, and especially
1207
    log the case when more than half of the nodes fails.
1208

1209
    @param result: the data as returned from the rpc call
1210
    @type nodes: list
1211
    @param nodes: the list of nodes we made the call to
1212
    @type failmsg: str
1213
    @param failmsg: the identifier to be used for logging
1214

1215
    """
1216
    failed = []
1217
    success = []
1218

    
1219
    for node in nodes:
1220
      msg = result[node].fail_msg
1221
      if msg:
1222
        failed.append(node)
1223
        logging.error("RPC call %s (%s) failed on node %s: %s",
1224
                      result[node].call, failmsg, node, msg)
1225
      else:
1226
        success.append(node)
1227

    
1228
    # +1 for the master node
1229
    if (len(success) + 1) < len(failed):
1230
      # TODO: Handle failing nodes
1231
      logging.error("More than half of the nodes failed")
1232

    
1233
  def _GetNodeIp(self):
1234
    """Helper for returning the node name/ip list.
1235

1236
    @rtype: (list, list)
1237
    @return: a tuple of two lists, the first one with the node
1238
        names and the second one with the node addresses
1239

1240
    """
1241
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1242
    name_list = self._nodes.keys()
1243
    addr_list = [self._nodes[name] for name in name_list]
1244
    return name_list, addr_list
1245

    
1246
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1247
    """Writes a file locally and then replicates it to all nodes.
1248

1249
    This function will replace the contents of a file on the local
1250
    node and then replicate it to all the other nodes we have.
1251

1252
    @type file_name: str
1253
    @param file_name: the path of the file to be replicated
1254
    @type data: str
1255
    @param data: the new contents of the file
1256
    @type replicate: boolean
1257
    @param replicate: whether to spread the changes to the remote nodes
1258

1259
    """
1260
    getents = runtime.GetEnts()
1261
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1262
                    gid=getents.masterd_gid)
1263

    
1264
    if replicate:
1265
      names, addrs = self._GetNodeIp()
1266
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1267
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1268

    
1269
  def _RenameFilesUnlocked(self, rename):
1270
    """Renames a file locally and then replicate the change.
1271

1272
    This function will rename a file in the local queue directory
1273
    and then replicate this rename to all the other nodes we have.
1274

1275
    @type rename: list of (old, new)
1276
    @param rename: List containing tuples mapping old to new names
1277

1278
    """
1279
    # Rename them locally
1280
    for old, new in rename:
1281
      utils.RenameFile(old, new, mkdir=True)
1282

    
1283
    # ... and on all nodes
1284
    names, addrs = self._GetNodeIp()
1285
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1286
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1287

    
1288
  @staticmethod
1289
  def _FormatJobID(job_id):
1290
    """Convert a job ID to string format.
1291

1292
    Currently this just does C{str(job_id)} after performing some
1293
    checks, but if we want to change the job id format this will
1294
    abstract this change.
1295

1296
    @type job_id: int or long
1297
    @param job_id: the numeric job id
1298
    @rtype: str
1299
    @return: the formatted job id
1300

1301
    """
1302
    if not isinstance(job_id, (int, long)):
1303
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1304
    if job_id < 0:
1305
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1306

    
1307
    return str(job_id)
1308

    
1309
  @classmethod
1310
  def _GetArchiveDirectory(cls, job_id):
1311
    """Returns the archive directory for a job.
1312

1313
    @type job_id: str
1314
    @param job_id: Job identifier
1315
    @rtype: str
1316
    @return: Directory name
1317

1318
    """
1319
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1320

    
1321
  def _NewSerialsUnlocked(self, count):
1322
    """Generates a new job identifier.
1323

1324
    Job identifiers are unique during the lifetime of a cluster.
1325

1326
    @type count: integer
1327
    @param count: how many serials to return
1328
    @rtype: str
1329
    @return: a string representing the job identifier.
1330

1331
    """
1332
    assert count > 0
1333
    # New number
1334
    serial = self._last_serial + count
1335

    
1336
    # Write to file
1337
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1338
                             "%s\n" % serial, True)
1339

    
1340
    result = [self._FormatJobID(v)
1341
              for v in range(self._last_serial, serial + 1)]
1342
    # Keep it only if we were able to write the file
1343
    self._last_serial = serial
1344

    
1345
    return result
1346

    
1347
  @staticmethod
1348
  def _GetJobPath(job_id):
1349
    """Returns the job file for a given job id.
1350

1351
    @type job_id: str
1352
    @param job_id: the job identifier
1353
    @rtype: str
1354
    @return: the path to the job file
1355

1356
    """
1357
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1358

    
1359
  @classmethod
1360
  def _GetArchivedJobPath(cls, job_id):
1361
    """Returns the archived job file for a give job id.
1362

1363
    @type job_id: str
1364
    @param job_id: the job identifier
1365
    @rtype: str
1366
    @return: the path to the archived job file
1367

1368
    """
1369
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1370
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1371

    
1372
  def _GetJobIDsUnlocked(self, sort=True):
1373
    """Return all known job IDs.
1374

1375
    The method only looks at disk because it's a requirement that all
1376
    jobs are present on disk (so in the _memcache we don't have any
1377
    extra IDs).
1378

1379
    @type sort: boolean
1380
    @param sort: perform sorting on the returned job ids
1381
    @rtype: list
1382
    @return: the list of job IDs
1383

1384
    """
1385
    jlist = []
1386
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1387
      m = self._RE_JOB_FILE.match(filename)
1388
      if m:
1389
        jlist.append(m.group(1))
1390
    if sort:
1391
      jlist = utils.NiceSort(jlist)
1392
    return jlist
1393

    
1394
  def _LoadJobUnlocked(self, job_id):
1395
    """Loads a job from the disk or memory.
1396

1397
    Given a job id, this will return the cached job object if
1398
    existing, or try to load the job from the disk. If loading from
1399
    disk, it will also add the job to the cache.
1400

1401
    @param job_id: the job id
1402
    @rtype: L{_QueuedJob} or None
1403
    @return: either None or the job object
1404

1405
    """
1406
    job = self._memcache.get(job_id, None)
1407
    if job:
1408
      logging.debug("Found job %s in memcache", job_id)
1409
      return job
1410

    
1411
    try:
1412
      job = self._LoadJobFromDisk(job_id)
1413
      if job is None:
1414
        return job
1415
    except errors.JobFileCorrupted:
1416
      old_path = self._GetJobPath(job_id)
1417
      new_path = self._GetArchivedJobPath(job_id)
1418
      if old_path == new_path:
1419
        # job already archived (future case)
1420
        logging.exception("Can't parse job %s", job_id)
1421
      else:
1422
        # non-archived case
1423
        logging.exception("Can't parse job %s, will archive.", job_id)
1424
        self._RenameFilesUnlocked([(old_path, new_path)])
1425
      return None
1426

    
1427
    self._memcache[job_id] = job
1428
    logging.debug("Added job %s to the cache", job_id)
1429
    return job
1430

    
1431
  def _LoadJobFromDisk(self, job_id):
1432
    """Load the given job file from disk.
1433

1434
    Given a job file, read, load and restore it in a _QueuedJob format.
1435

1436
    @type job_id: string
1437
    @param job_id: job identifier
1438
    @rtype: L{_QueuedJob} or None
1439
    @return: either None or the job object
1440

1441
    """
1442
    filepath = self._GetJobPath(job_id)
1443
    logging.debug("Loading job from %s", filepath)
1444
    try:
1445
      raw_data = utils.ReadFile(filepath)
1446
    except EnvironmentError, err:
1447
      if err.errno in (errno.ENOENT, ):
1448
        return None
1449
      raise
1450

    
1451
    try:
1452
      data = serializer.LoadJson(raw_data)
1453
      job = _QueuedJob.Restore(self, data)
1454
    except Exception, err: # pylint: disable-msg=W0703
1455
      raise errors.JobFileCorrupted(err)
1456

    
1457
    return job
1458

    
1459
  def SafeLoadJobFromDisk(self, job_id):
1460
    """Load the given job file from disk.
1461

1462
    Given a job file, read, load and restore it in a _QueuedJob format.
1463
    In case of error reading the job, it gets returned as None, and the
1464
    exception is logged.
1465

1466
    @type job_id: string
1467
    @param job_id: job identifier
1468
    @rtype: L{_QueuedJob} or None
1469
    @return: either None or the job object
1470

1471
    """
1472
    try:
1473
      return self._LoadJobFromDisk(job_id)
1474
    except (errors.JobFileCorrupted, EnvironmentError):
1475
      logging.exception("Can't load/parse job %s", job_id)
1476
      return None
1477

    
1478
  @staticmethod
1479
  def _IsQueueMarkedDrain():
1480
    """Check if the queue is marked from drain.
1481

1482
    This currently uses the queue drain file, which makes it a
1483
    per-node flag. In the future this can be moved to the config file.
1484

1485
    @rtype: boolean
1486
    @return: True of the job queue is marked for draining
1487

1488
    """
1489
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1490

    
1491
  def _UpdateQueueSizeUnlocked(self):
1492
    """Update the queue size.
1493

1494
    """
1495
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1496

    
1497
  @locking.ssynchronized(_LOCK)
1498
  @_RequireOpenQueue
1499
  def SetDrainFlag(self, drain_flag):
1500
    """Sets the drain flag for the queue.
1501

1502
    @type drain_flag: boolean
1503
    @param drain_flag: Whether to set or unset the drain flag
1504

1505
    """
1506
    getents = runtime.GetEnts()
1507

    
1508
    if drain_flag:
1509
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1510
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1511
    else:
1512
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1513

    
1514
    self._drained = drain_flag
1515

    
1516
    return True
1517

    
1518
  @_RequireOpenQueue
1519
  def _SubmitJobUnlocked(self, job_id, ops):
1520
    """Create and store a new job.
1521

1522
    This enters the job into our job queue and also puts it on the new
1523
    queue, in order for it to be picked up by the queue processors.
1524

1525
    @type job_id: job ID
1526
    @param job_id: the job ID for the new job
1527
    @type ops: list
1528
    @param ops: The list of OpCodes that will become the new job.
1529
    @rtype: L{_QueuedJob}
1530
    @return: the job object to be queued
1531
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1532
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1533
    @raise errors.GenericError: If an opcode is not valid
1534

1535
    """
1536
    # Ok when sharing the big job queue lock, as the drain file is created when
1537
    # the lock is exclusive.
1538
    if self._drained:
1539
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1540

    
1541
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1542
      raise errors.JobQueueFull()
1543

    
1544
    job = _QueuedJob(self, job_id, ops)
1545

    
1546
    # Check priority
1547
    for idx, op in enumerate(job.ops):
1548
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1549
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1550
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1551
                                  " are %s" % (idx, op.priority, allowed))
1552

    
1553
    # Write to disk
1554
    self.UpdateJobUnlocked(job)
1555

    
1556
    self._queue_size += 1
1557

    
1558
    logging.debug("Adding new job %s to the cache", job_id)
1559
    self._memcache[job_id] = job
1560

    
1561
    return job
1562

    
1563
  @locking.ssynchronized(_LOCK)
1564
  @_RequireOpenQueue
1565
  def SubmitJob(self, ops):
1566
    """Create and store a new job.
1567

1568
    @see: L{_SubmitJobUnlocked}
1569

1570
    """
1571
    job_id = self._NewSerialsUnlocked(1)[0]
1572
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1573
    return job_id
1574

    
1575
  @locking.ssynchronized(_LOCK)
1576
  @_RequireOpenQueue
1577
  def SubmitManyJobs(self, jobs):
1578
    """Create and store multiple jobs.
1579

1580
    @see: L{_SubmitJobUnlocked}
1581

1582
    """
1583
    results = []
1584
    added_jobs = []
1585
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1586
    for job_id, ops in zip(all_job_ids, jobs):
1587
      try:
1588
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1589
        status = True
1590
        data = job_id
1591
      except errors.GenericError, err:
1592
        data = str(err)
1593
        status = False
1594
      results.append((status, data))
1595

    
1596
    self._EnqueueJobs(added_jobs)
1597

    
1598
    return results
1599

    
1600
  def _EnqueueJobs(self, jobs):
1601
    """Helper function to add jobs to worker pool's queue.
1602

1603
    @type jobs: list
1604
    @param jobs: List of all jobs
1605

1606
    """
1607
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1608
                             priority=[job.CalcPriority() for job in jobs])
1609

    
1610
  @_RequireOpenQueue
1611
  def UpdateJobUnlocked(self, job, replicate=True):
1612
    """Update a job's on disk storage.
1613

1614
    After a job has been modified, this function needs to be called in
1615
    order to write the changes to disk and replicate them to the other
1616
    nodes.
1617

1618
    @type job: L{_QueuedJob}
1619
    @param job: the changed job
1620
    @type replicate: boolean
1621
    @param replicate: whether to replicate the change to remote nodes
1622

1623
    """
1624
    filename = self._GetJobPath(job.id)
1625
    data = serializer.DumpJson(job.Serialize(), indent=False)
1626
    logging.debug("Writing job %s to %s", job.id, filename)
1627
    self._UpdateJobQueueFile(filename, data, replicate)
1628

    
1629
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1630
                        timeout):
1631
    """Waits for changes in a job.
1632

1633
    @type job_id: string
1634
    @param job_id: Job identifier
1635
    @type fields: list of strings
1636
    @param fields: Which fields to check for changes
1637
    @type prev_job_info: list or None
1638
    @param prev_job_info: Last job information returned
1639
    @type prev_log_serial: int
1640
    @param prev_log_serial: Last job message serial number
1641
    @type timeout: float
1642
    @param timeout: maximum time to wait in seconds
1643
    @rtype: tuple (job info, log entries)
1644
    @return: a tuple of the job information as required via
1645
        the fields parameter, and the log entries as a list
1646

1647
        if the job has not changed and the timeout has expired,
1648
        we instead return a special value,
1649
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1650
        as such by the clients
1651

1652
    """
1653
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1654

    
1655
    helper = _WaitForJobChangesHelper()
1656

    
1657
    return helper(self._GetJobPath(job_id), load_fn,
1658
                  fields, prev_job_info, prev_log_serial, timeout)
1659

    
1660
  @locking.ssynchronized(_LOCK)
1661
  @_RequireOpenQueue
1662
  def CancelJob(self, job_id):
1663
    """Cancels a job.
1664

1665
    This will only succeed if the job has not started yet.
1666

1667
    @type job_id: string
1668
    @param job_id: job ID of job to be cancelled.
1669

1670
    """
1671
    logging.info("Cancelling job %s", job_id)
1672

    
1673
    job = self._LoadJobUnlocked(job_id)
1674
    if not job:
1675
      logging.debug("Job %s not found", job_id)
1676
      return (False, "Job %s not found" % job_id)
1677

    
1678
    (success, msg) = job.Cancel()
1679

    
1680
    if success:
1681
      self.UpdateJobUnlocked(job)
1682

    
1683
    return (success, msg)
1684

    
1685
  @_RequireOpenQueue
1686
  def _ArchiveJobsUnlocked(self, jobs):
1687
    """Archives jobs.
1688

1689
    @type jobs: list of L{_QueuedJob}
1690
    @param jobs: Job objects
1691
    @rtype: int
1692
    @return: Number of archived jobs
1693

1694
    """
1695
    archive_jobs = []
1696
    rename_files = []
1697
    for job in jobs:
1698
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1699
        logging.debug("Job %s is not yet done", job.id)
1700
        continue
1701

    
1702
      archive_jobs.append(job)
1703

    
1704
      old = self._GetJobPath(job.id)
1705
      new = self._GetArchivedJobPath(job.id)
1706
      rename_files.append((old, new))
1707

    
1708
    # TODO: What if 1..n files fail to rename?
1709
    self._RenameFilesUnlocked(rename_files)
1710

    
1711
    logging.debug("Successfully archived job(s) %s",
1712
                  utils.CommaJoin(job.id for job in archive_jobs))
1713

    
1714
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1715
    # the files, we update the cached queue size from the filesystem. When we
1716
    # get around to fix the TODO: above, we can use the number of actually
1717
    # archived jobs to fix this.
1718
    self._UpdateQueueSizeUnlocked()
1719
    return len(archive_jobs)
1720

    
1721
  @locking.ssynchronized(_LOCK)
1722
  @_RequireOpenQueue
1723
  def ArchiveJob(self, job_id):
1724
    """Archives a job.
1725

1726
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1727

1728
    @type job_id: string
1729
    @param job_id: Job ID of job to be archived.
1730
    @rtype: bool
1731
    @return: Whether job was archived
1732

1733
    """
1734
    logging.info("Archiving job %s", job_id)
1735

    
1736
    job = self._LoadJobUnlocked(job_id)
1737
    if not job:
1738
      logging.debug("Job %s not found", job_id)
1739
      return False
1740

    
1741
    return self._ArchiveJobsUnlocked([job]) == 1
1742

    
1743
  @locking.ssynchronized(_LOCK)
1744
  @_RequireOpenQueue
1745
  def AutoArchiveJobs(self, age, timeout):
1746
    """Archives all jobs based on age.
1747

1748
    The method will archive all jobs which are older than the age
1749
    parameter. For jobs that don't have an end timestamp, the start
1750
    timestamp will be considered. The special '-1' age will cause
1751
    archival of all jobs (that are not running or queued).
1752

1753
    @type age: int
1754
    @param age: the minimum age in seconds
1755

1756
    """
1757
    logging.info("Archiving jobs with age more than %s seconds", age)
1758

    
1759
    now = time.time()
1760
    end_time = now + timeout
1761
    archived_count = 0
1762
    last_touched = 0
1763

    
1764
    all_job_ids = self._GetJobIDsUnlocked()
1765
    pending = []
1766
    for idx, job_id in enumerate(all_job_ids):
1767
      last_touched = idx + 1
1768

    
1769
      # Not optimal because jobs could be pending
1770
      # TODO: Measure average duration for job archival and take number of
1771
      # pending jobs into account.
1772
      if time.time() > end_time:
1773
        break
1774

    
1775
      # Returns None if the job failed to load
1776
      job = self._LoadJobUnlocked(job_id)
1777
      if job:
1778
        if job.end_timestamp is None:
1779
          if job.start_timestamp is None:
1780
            job_age = job.received_timestamp
1781
          else:
1782
            job_age = job.start_timestamp
1783
        else:
1784
          job_age = job.end_timestamp
1785

    
1786
        if age == -1 or now - job_age[0] > age:
1787
          pending.append(job)
1788

    
1789
          # Archive 10 jobs at a time
1790
          if len(pending) >= 10:
1791
            archived_count += self._ArchiveJobsUnlocked(pending)
1792
            pending = []
1793

    
1794
    if pending:
1795
      archived_count += self._ArchiveJobsUnlocked(pending)
1796

    
1797
    return (archived_count, len(all_job_ids) - last_touched)
1798

    
1799
  def QueryJobs(self, job_ids, fields):
1800
    """Returns a list of jobs in queue.
1801

1802
    @type job_ids: list
1803
    @param job_ids: sequence of job identifiers or None for all
1804
    @type fields: list
1805
    @param fields: names of fields to return
1806
    @rtype: list
1807
    @return: list one element per job, each element being list with
1808
        the requested fields
1809

1810
    """
1811
    jobs = []
1812
    list_all = False
1813
    if not job_ids:
1814
      # Since files are added to/removed from the queue atomically, there's no
1815
      # risk of getting the job ids in an inconsistent state.
1816
      job_ids = self._GetJobIDsUnlocked()
1817
      list_all = True
1818

    
1819
    for job_id in job_ids:
1820
      job = self.SafeLoadJobFromDisk(job_id)
1821
      if job is not None:
1822
        jobs.append(job.GetInfo(fields))
1823
      elif not list_all:
1824
        jobs.append(None)
1825

    
1826
    return jobs
1827

    
1828
  @locking.ssynchronized(_LOCK)
1829
  @_RequireOpenQueue
1830
  def Shutdown(self):
1831
    """Stops the job queue.
1832

1833
    This shutdowns all the worker threads an closes the queue.
1834

1835
    """
1836
    self._wpool.TerminateWorkers()
1837

    
1838
    self._queue_filelock.Close()
1839
    self._queue_filelock = None