Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 98ed5092

History | View | Annotate | Download (58 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
try:
39
  # pylint: disable-msg=E0611
40
  from pyinotify import pyinotify
41
except ImportError:
42
  import pyinotify
43

    
44
from ganeti import asyncnotifier
45
from ganeti import constants
46
from ganeti import serializer
47
from ganeti import workerpool
48
from ganeti import locking
49
from ganeti import opcodes
50
from ganeti import errors
51
from ganeti import mcpu
52
from ganeti import utils
53
from ganeti import jstore
54
from ganeti import rpc
55
from ganeti import runtime
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log", "priority",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
    # Get initial priority (it might change during the lifetime of this opcode)
117
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118

    
119
  @classmethod
120
  def Restore(cls, state):
121
    """Restore the _QueuedOpCode from the serialized form.
122

123
    @type state: dict
124
    @param state: the serialized state
125
    @rtype: _QueuedOpCode
126
    @return: a new _QueuedOpCode instance
127

128
    """
129
    obj = _QueuedOpCode.__new__(cls)
130
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131
    obj.status = state["status"]
132
    obj.result = state["result"]
133
    obj.log = state["log"]
134
    obj.start_timestamp = state.get("start_timestamp", None)
135
    obj.exec_timestamp = state.get("exec_timestamp", None)
136
    obj.end_timestamp = state.get("end_timestamp", None)
137
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138
    return obj
139

    
140
  def Serialize(self):
141
    """Serializes this _QueuedOpCode.
142

143
    @rtype: dict
144
    @return: the dictionary holding the serialized state
145

146
    """
147
    return {
148
      "input": self.input.__getstate__(),
149
      "status": self.status,
150
      "result": self.result,
151
      "log": self.log,
152
      "start_timestamp": self.start_timestamp,
153
      "exec_timestamp": self.exec_timestamp,
154
      "end_timestamp": self.end_timestamp,
155
      "priority": self.priority,
156
      }
157

    
158

    
159
class _QueuedJob(object):
160
  """In-memory job representation.
161

162
  This is what we use to track the user-submitted jobs. Locking must
163
  be taken care of by users of this class.
164

165
  @type queue: L{JobQueue}
166
  @ivar queue: the parent queue
167
  @ivar id: the job ID
168
  @type ops: list
169
  @ivar ops: the list of _QueuedOpCode that constitute the job
170
  @type log_serial: int
171
  @ivar log_serial: holds the index for the next log entry
172
  @ivar received_timestamp: the timestamp for when the job was received
173
  @ivar start_timestmap: the timestamp for start of execution
174
  @ivar end_timestamp: the timestamp for end of execution
175

176
  """
177
  # pylint: disable-msg=W0212
178
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179
               "received_timestamp", "start_timestamp", "end_timestamp",
180
               "__weakref__"]
181

    
182
  def __init__(self, queue, job_id, ops):
183
    """Constructor for the _QueuedJob.
184

185
    @type queue: L{JobQueue}
186
    @param queue: our parent queue
187
    @type job_id: job_id
188
    @param job_id: our job id
189
    @type ops: list
190
    @param ops: the list of opcodes we hold, which will be encapsulated
191
        in _QueuedOpCodes
192

193
    """
194
    if not ops:
195
      raise errors.GenericError("A job needs at least one opcode")
196

    
197
    self.queue = queue
198
    self.id = job_id
199
    self.ops = [_QueuedOpCode(op) for op in ops]
200
    self.log_serial = 0
201
    self.received_timestamp = TimeStampNow()
202
    self.start_timestamp = None
203
    self.end_timestamp = None
204

    
205
    self._InitInMemory(self)
206

    
207
  @staticmethod
208
  def _InitInMemory(obj):
209
    """Initializes in-memory variables.
210

211
    """
212
    obj.ops_iter = None
213
    obj.cur_opctx = None
214

    
215
  def __repr__(self):
216
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217
              "id=%s" % self.id,
218
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219

    
220
    return "<%s at %#x>" % (" ".join(status), id(self))
221

    
222
  @classmethod
223
  def Restore(cls, queue, state):
224
    """Restore a _QueuedJob from serialized state:
225

226
    @type queue: L{JobQueue}
227
    @param queue: to which queue the restored job belongs
228
    @type state: dict
229
    @param state: the serialized state
230
    @rtype: _JobQueue
231
    @return: the restored _JobQueue instance
232

233
    """
234
    obj = _QueuedJob.__new__(cls)
235
    obj.queue = queue
236
    obj.id = state["id"]
237
    obj.received_timestamp = state.get("received_timestamp", None)
238
    obj.start_timestamp = state.get("start_timestamp", None)
239
    obj.end_timestamp = state.get("end_timestamp", None)
240

    
241
    obj.ops = []
242
    obj.log_serial = 0
243
    for op_state in state["ops"]:
244
      op = _QueuedOpCode.Restore(op_state)
245
      for log_entry in op.log:
246
        obj.log_serial = max(obj.log_serial, log_entry[0])
247
      obj.ops.append(op)
248

    
249
    cls._InitInMemory(obj)
250

    
251
    return obj
252

    
253
  def Serialize(self):
254
    """Serialize the _JobQueue instance.
255

256
    @rtype: dict
257
    @return: the serialized state
258

259
    """
260
    return {
261
      "id": self.id,
262
      "ops": [op.Serialize() for op in self.ops],
263
      "start_timestamp": self.start_timestamp,
264
      "end_timestamp": self.end_timestamp,
265
      "received_timestamp": self.received_timestamp,
266
      }
267

    
268
  def CalcStatus(self):
269
    """Compute the status of this job.
270

271
    This function iterates over all the _QueuedOpCodes in the job and
272
    based on their status, computes the job status.
273

274
    The algorithm is:
275
      - if we find a cancelled, or finished with error, the job
276
        status will be the same
277
      - otherwise, the last opcode with the status one of:
278
          - waitlock
279
          - canceling
280
          - running
281

282
        will determine the job status
283

284
      - otherwise, it means either all opcodes are queued, or success,
285
        and the job status will be the same
286

287
    @return: the job status
288

289
    """
290
    status = constants.JOB_STATUS_QUEUED
291

    
292
    all_success = True
293
    for op in self.ops:
294
      if op.status == constants.OP_STATUS_SUCCESS:
295
        continue
296

    
297
      all_success = False
298

    
299
      if op.status == constants.OP_STATUS_QUEUED:
300
        pass
301
      elif op.status == constants.OP_STATUS_WAITLOCK:
302
        status = constants.JOB_STATUS_WAITLOCK
303
      elif op.status == constants.OP_STATUS_RUNNING:
304
        status = constants.JOB_STATUS_RUNNING
305
      elif op.status == constants.OP_STATUS_CANCELING:
306
        status = constants.JOB_STATUS_CANCELING
307
        break
308
      elif op.status == constants.OP_STATUS_ERROR:
309
        status = constants.JOB_STATUS_ERROR
310
        # The whole job fails if one opcode failed
311
        break
312
      elif op.status == constants.OP_STATUS_CANCELED:
313
        status = constants.OP_STATUS_CANCELED
314
        break
315

    
316
    if all_success:
317
      status = constants.JOB_STATUS_SUCCESS
318

    
319
    return status
320

    
321
  def CalcPriority(self):
322
    """Gets the current priority for this job.
323

324
    Only unfinished opcodes are considered. When all are done, the default
325
    priority is used.
326

327
    @rtype: int
328

329
    """
330
    priorities = [op.priority for op in self.ops
331
                  if op.status not in constants.OPS_FINALIZED]
332

    
333
    if not priorities:
334
      # All opcodes are done, assume default priority
335
      return constants.OP_PRIO_DEFAULT
336

    
337
    return min(priorities)
338

    
339
  def GetLogEntries(self, newer_than):
340
    """Selectively returns the log entries.
341

342
    @type newer_than: None or int
343
    @param newer_than: if this is None, return all log entries,
344
        otherwise return only the log entries with serial higher
345
        than this value
346
    @rtype: list
347
    @return: the list of the log entries selected
348

349
    """
350
    if newer_than is None:
351
      serial = -1
352
    else:
353
      serial = newer_than
354

    
355
    entries = []
356
    for op in self.ops:
357
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358

    
359
    return entries
360

    
361
  def GetInfo(self, fields):
362
    """Returns information about a job.
363

364
    @type fields: list
365
    @param fields: names of fields to return
366
    @rtype: list
367
    @return: list with one element for each field
368
    @raise errors.OpExecError: when an invalid field
369
        has been passed
370

371
    """
372
    row = []
373
    for fname in fields:
374
      if fname == "id":
375
        row.append(self.id)
376
      elif fname == "status":
377
        row.append(self.CalcStatus())
378
      elif fname == "priority":
379
        row.append(self.CalcPriority())
380
      elif fname == "ops":
381
        row.append([op.input.__getstate__() for op in self.ops])
382
      elif fname == "opresult":
383
        row.append([op.result for op in self.ops])
384
      elif fname == "opstatus":
385
        row.append([op.status for op in self.ops])
386
      elif fname == "oplog":
387
        row.append([op.log for op in self.ops])
388
      elif fname == "opstart":
389
        row.append([op.start_timestamp for op in self.ops])
390
      elif fname == "opexec":
391
        row.append([op.exec_timestamp for op in self.ops])
392
      elif fname == "opend":
393
        row.append([op.end_timestamp for op in self.ops])
394
      elif fname == "oppriority":
395
        row.append([op.priority for op in self.ops])
396
      elif fname == "received_ts":
397
        row.append(self.received_timestamp)
398
      elif fname == "start_ts":
399
        row.append(self.start_timestamp)
400
      elif fname == "end_ts":
401
        row.append(self.end_timestamp)
402
      elif fname == "summary":
403
        row.append([op.input.Summary() for op in self.ops])
404
      else:
405
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
406
    return row
407

    
408
  def MarkUnfinishedOps(self, status, result):
409
    """Mark unfinished opcodes with a given status and result.
410

411
    This is an utility function for marking all running or waiting to
412
    be run opcodes with a given status. Opcodes which are already
413
    finalised are not changed.
414

415
    @param status: a given opcode status
416
    @param result: the opcode result
417

418
    """
419
    not_marked = True
420
    for op in self.ops:
421
      if op.status in constants.OPS_FINALIZED:
422
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423
        continue
424
      op.status = status
425
      op.result = result
426
      not_marked = False
427

    
428
  def Cancel(self):
429
    """Marks job as canceled/-ing if possible.
430

431
    @rtype: tuple; (bool, string)
432
    @return: Boolean describing whether job was successfully canceled or marked
433
      as canceling and a text message
434

435
    """
436
    status = self.CalcStatus()
437

    
438
    if status == constants.JOB_STATUS_QUEUED:
439
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
440
                             "Job canceled by request")
441
      return (True, "Job %s canceled" % self.id)
442

    
443
    elif status == constants.JOB_STATUS_WAITLOCK:
444
      # The worker will notice the new status and cancel the job
445
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446
      return (True, "Job %s will be canceled" % self.id)
447

    
448
    else:
449
      logging.debug("Job %s is no longer waiting in the queue", self.id)
450
      return (False, "Job %s is no longer waiting in the queue" % self.id)
451

    
452

    
453
class _OpExecCallbacks(mcpu.OpExecCbBase):
454
  def __init__(self, queue, job, op):
455
    """Initializes this class.
456

457
    @type queue: L{JobQueue}
458
    @param queue: Job queue
459
    @type job: L{_QueuedJob}
460
    @param job: Job object
461
    @type op: L{_QueuedOpCode}
462
    @param op: OpCode
463

464
    """
465
    assert queue, "Queue is missing"
466
    assert job, "Job is missing"
467
    assert op, "Opcode is missing"
468

    
469
    self._queue = queue
470
    self._job = job
471
    self._op = op
472

    
473
  def _CheckCancel(self):
474
    """Raises an exception to cancel the job if asked to.
475

476
    """
477
    # Cancel here if we were asked to
478
    if self._op.status == constants.OP_STATUS_CANCELING:
479
      logging.debug("Canceling opcode")
480
      raise CancelJob()
481

    
482
  @locking.ssynchronized(_QUEUE, shared=1)
483
  def NotifyStart(self):
484
    """Mark the opcode as running, not lock-waiting.
485

486
    This is called from the mcpu code as a notifier function, when the LU is
487
    finally about to start the Exec() method. Of course, to have end-user
488
    visible results, the opcode must be initially (before calling into
489
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
490

491
    """
492
    assert self._op in self._job.ops
493
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494
                               constants.OP_STATUS_CANCELING)
495

    
496
    # Cancel here if we were asked to
497
    self._CheckCancel()
498

    
499
    logging.debug("Opcode is now running")
500

    
501
    self._op.status = constants.OP_STATUS_RUNNING
502
    self._op.exec_timestamp = TimeStampNow()
503

    
504
    # And finally replicate the job status
505
    self._queue.UpdateJobUnlocked(self._job)
506

    
507
  @locking.ssynchronized(_QUEUE, shared=1)
508
  def _AppendFeedback(self, timestamp, log_type, log_msg):
509
    """Internal feedback append function, with locks
510

511
    """
512
    self._job.log_serial += 1
513
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
515

    
516
  def Feedback(self, *args):
517
    """Append a log entry.
518

519
    """
520
    assert len(args) < 3
521

    
522
    if len(args) == 1:
523
      log_type = constants.ELOG_MESSAGE
524
      log_msg = args[0]
525
    else:
526
      (log_type, log_msg) = args
527

    
528
    # The time is split to make serialization easier and not lose
529
    # precision.
530
    timestamp = utils.SplitTime(time.time())
531
    self._AppendFeedback(timestamp, log_type, log_msg)
532

    
533
  def CheckCancel(self):
534
    """Check whether job has been cancelled.
535

536
    """
537
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538
                               constants.OP_STATUS_CANCELING)
539

    
540
    # Cancel here if we were asked to
541
    self._CheckCancel()
542

    
543

    
544
class _JobChangesChecker(object):
545
  def __init__(self, fields, prev_job_info, prev_log_serial):
546
    """Initializes this class.
547

548
    @type fields: list of strings
549
    @param fields: Fields requested by LUXI client
550
    @type prev_job_info: string
551
    @param prev_job_info: previous job info, as passed by the LUXI client
552
    @type prev_log_serial: string
553
    @param prev_log_serial: previous job serial, as passed by the LUXI client
554

555
    """
556
    self._fields = fields
557
    self._prev_job_info = prev_job_info
558
    self._prev_log_serial = prev_log_serial
559

    
560
  def __call__(self, job):
561
    """Checks whether job has changed.
562

563
    @type job: L{_QueuedJob}
564
    @param job: Job object
565

566
    """
567
    status = job.CalcStatus()
568
    job_info = job.GetInfo(self._fields)
569
    log_entries = job.GetLogEntries(self._prev_log_serial)
570

    
571
    # Serializing and deserializing data can cause type changes (e.g. from
572
    # tuple to list) or precision loss. We're doing it here so that we get
573
    # the same modifications as the data received from the client. Without
574
    # this, the comparison afterwards might fail without the data being
575
    # significantly different.
576
    # TODO: we just deserialized from disk, investigate how to make sure that
577
    # the job info and log entries are compatible to avoid this further step.
578
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
579
    # efficient, though floats will be tricky
580
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
581
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
582

    
583
    # Don't even try to wait if the job is no longer running, there will be
584
    # no changes.
585
    if (status not in (constants.JOB_STATUS_QUEUED,
586
                       constants.JOB_STATUS_RUNNING,
587
                       constants.JOB_STATUS_WAITLOCK) or
588
        job_info != self._prev_job_info or
589
        (log_entries and self._prev_log_serial != log_entries[0][0])):
590
      logging.debug("Job %s changed", job.id)
591
      return (job_info, log_entries)
592

    
593
    return None
594

    
595

    
596
class _JobFileChangesWaiter(object):
597
  def __init__(self, filename):
598
    """Initializes this class.
599

600
    @type filename: string
601
    @param filename: Path to job file
602
    @raises errors.InotifyError: if the notifier cannot be setup
603

604
    """
605
    self._wm = pyinotify.WatchManager()
606
    self._inotify_handler = \
607
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
608
    self._notifier = \
609
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
610
    try:
611
      self._inotify_handler.enable()
612
    except Exception:
613
      # pyinotify doesn't close file descriptors automatically
614
      self._notifier.stop()
615
      raise
616

    
617
  def _OnInotify(self, notifier_enabled):
618
    """Callback for inotify.
619

620
    """
621
    if not notifier_enabled:
622
      self._inotify_handler.enable()
623

    
624
  def Wait(self, timeout):
625
    """Waits for the job file to change.
626

627
    @type timeout: float
628
    @param timeout: Timeout in seconds
629
    @return: Whether there have been events
630

631
    """
632
    assert timeout >= 0
633
    have_events = self._notifier.check_events(timeout * 1000)
634
    if have_events:
635
      self._notifier.read_events()
636
    self._notifier.process_events()
637
    return have_events
638

    
639
  def Close(self):
640
    """Closes underlying notifier and its file descriptor.
641

642
    """
643
    self._notifier.stop()
644

    
645

    
646
class _JobChangesWaiter(object):
647
  def __init__(self, filename):
648
    """Initializes this class.
649

650
    @type filename: string
651
    @param filename: Path to job file
652

653
    """
654
    self._filewaiter = None
655
    self._filename = filename
656

    
657
  def Wait(self, timeout):
658
    """Waits for a job to change.
659

660
    @type timeout: float
661
    @param timeout: Timeout in seconds
662
    @return: Whether there have been events
663

664
    """
665
    if self._filewaiter:
666
      return self._filewaiter.Wait(timeout)
667

    
668
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
669
    # If this point is reached, return immediately and let caller check the job
670
    # file again in case there were changes since the last check. This avoids a
671
    # race condition.
672
    self._filewaiter = _JobFileChangesWaiter(self._filename)
673

    
674
    return True
675

    
676
  def Close(self):
677
    """Closes underlying waiter.
678

679
    """
680
    if self._filewaiter:
681
      self._filewaiter.Close()
682

    
683

    
684
class _WaitForJobChangesHelper(object):
685
  """Helper class using inotify to wait for changes in a job file.
686

687
  This class takes a previous job status and serial, and alerts the client when
688
  the current job status has changed.
689

690
  """
691
  @staticmethod
692
  def _CheckForChanges(job_load_fn, check_fn):
693
    job = job_load_fn()
694
    if not job:
695
      raise errors.JobLost()
696

    
697
    result = check_fn(job)
698
    if result is None:
699
      raise utils.RetryAgain()
700

    
701
    return result
702

    
703
  def __call__(self, filename, job_load_fn,
704
               fields, prev_job_info, prev_log_serial, timeout):
705
    """Waits for changes on a job.
706

707
    @type filename: string
708
    @param filename: File on which to wait for changes
709
    @type job_load_fn: callable
710
    @param job_load_fn: Function to load job
711
    @type fields: list of strings
712
    @param fields: Which fields to check for changes
713
    @type prev_job_info: list or None
714
    @param prev_job_info: Last job information returned
715
    @type prev_log_serial: int
716
    @param prev_log_serial: Last job message serial number
717
    @type timeout: float
718
    @param timeout: maximum time to wait in seconds
719

720
    """
721
    try:
722
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
723
      waiter = _JobChangesWaiter(filename)
724
      try:
725
        return utils.Retry(compat.partial(self._CheckForChanges,
726
                                          job_load_fn, check_fn),
727
                           utils.RETRY_REMAINING_TIME, timeout,
728
                           wait_fn=waiter.Wait)
729
      finally:
730
        waiter.Close()
731
    except (errors.InotifyError, errors.JobLost):
732
      return None
733
    except utils.RetryTimeout:
734
      return constants.JOB_NOTCHANGED
735

    
736

    
737
def _EncodeOpError(err):
738
  """Encodes an error which occurred while processing an opcode.
739

740
  """
741
  if isinstance(err, errors.GenericError):
742
    to_encode = err
743
  else:
744
    to_encode = errors.OpExecError(str(err))
745

    
746
  return errors.EncodeException(to_encode)
747

    
748

    
749
class _TimeoutStrategyWrapper:
750
  def __init__(self, fn):
751
    """Initializes this class.
752

753
    """
754
    self._fn = fn
755
    self._next = None
756

    
757
  def _Advance(self):
758
    """Gets the next timeout if necessary.
759

760
    """
761
    if self._next is None:
762
      self._next = self._fn()
763

    
764
  def Peek(self):
765
    """Returns the next timeout.
766

767
    """
768
    self._Advance()
769
    return self._next
770

    
771
  def Next(self):
772
    """Returns the current timeout and advances the internal state.
773

774
    """
775
    self._Advance()
776
    result = self._next
777
    self._next = None
778
    return result
779

    
780

    
781
class _OpExecContext:
782
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
783
    """Initializes this class.
784

785
    """
786
    self.op = op
787
    self.index = index
788
    self.log_prefix = log_prefix
789
    self.summary = op.input.Summary()
790

    
791
    self._timeout_strategy_factory = timeout_strategy_factory
792
    self._ResetTimeoutStrategy()
793

    
794
  def _ResetTimeoutStrategy(self):
795
    """Creates a new timeout strategy.
796

797
    """
798
    self._timeout_strategy = \
799
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
800

    
801
  def CheckPriorityIncrease(self):
802
    """Checks whether priority can and should be increased.
803

804
    Called when locks couldn't be acquired.
805

806
    """
807
    op = self.op
808

    
809
    # Exhausted all retries and next round should not use blocking acquire
810
    # for locks?
811
    if (self._timeout_strategy.Peek() is None and
812
        op.priority > constants.OP_PRIO_HIGHEST):
813
      logging.debug("Increasing priority")
814
      op.priority -= 1
815
      self._ResetTimeoutStrategy()
816
      return True
817

    
818
    return False
819

    
820
  def GetNextLockTimeout(self):
821
    """Returns the next lock acquire timeout.
822

823
    """
824
    return self._timeout_strategy.Next()
825

    
826

    
827
class _JobProcessor(object):
828
  def __init__(self, queue, opexec_fn, job,
829
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
830
    """Initializes this class.
831

832
    """
833
    self.queue = queue
834
    self.opexec_fn = opexec_fn
835
    self.job = job
836
    self._timeout_strategy_factory = _timeout_strategy_factory
837

    
838
  @staticmethod
839
  def _FindNextOpcode(job, timeout_strategy_factory):
840
    """Locates the next opcode to run.
841

842
    @type job: L{_QueuedJob}
843
    @param job: Job object
844
    @param timeout_strategy_factory: Callable to create new timeout strategy
845

846
    """
847
    # Create some sort of a cache to speed up locating next opcode for future
848
    # lookups
849
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
850
    # pending and one for processed ops.
851
    if job.ops_iter is None:
852
      job.ops_iter = enumerate(job.ops)
853

    
854
    # Find next opcode to run
855
    while True:
856
      try:
857
        (idx, op) = job.ops_iter.next()
858
      except StopIteration:
859
        raise errors.ProgrammerError("Called for a finished job")
860

    
861
      if op.status == constants.OP_STATUS_RUNNING:
862
        # Found an opcode already marked as running
863
        raise errors.ProgrammerError("Called for job marked as running")
864

    
865
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
866
                             timeout_strategy_factory)
867

    
868
      if op.status == constants.OP_STATUS_CANCELED:
869
        # Cancelled jobs are handled by the caller
870
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
871
                              for i in job.ops[idx:])
872

    
873
      elif op.status in constants.OPS_FINALIZED:
874
        # This is a job that was partially completed before master daemon
875
        # shutdown, so it can be expected that some opcodes are already
876
        # completed successfully (if any did error out, then the whole job
877
        # should have been aborted and not resubmitted for processing).
878
        logging.info("%s: opcode %s already processed, skipping",
879
                     opctx.log_prefix, opctx.summary)
880
        continue
881

    
882
      return opctx
883

    
884
  @staticmethod
885
  def _MarkWaitlock(job, op):
886
    """Marks an opcode as waiting for locks.
887

888
    The job's start timestamp is also set if necessary.
889

890
    @type job: L{_QueuedJob}
891
    @param job: Job object
892
    @type op: L{_QueuedOpCode}
893
    @param op: Opcode object
894

895
    """
896
    assert op in job.ops
897
    assert op.status in (constants.OP_STATUS_QUEUED,
898
                         constants.OP_STATUS_WAITLOCK)
899

    
900
    update = False
901

    
902
    op.result = None
903

    
904
    if op.status == constants.OP_STATUS_QUEUED:
905
      op.status = constants.OP_STATUS_WAITLOCK
906
      update = True
907

    
908
    if op.start_timestamp is None:
909
      op.start_timestamp = TimeStampNow()
910
      update = True
911

    
912
    if job.start_timestamp is None:
913
      job.start_timestamp = op.start_timestamp
914
      update = True
915

    
916
    assert op.status == constants.OP_STATUS_WAITLOCK
917

    
918
    return update
919

    
920
  def _ExecOpCodeUnlocked(self, opctx):
921
    """Processes one opcode and returns the result.
922

923
    """
924
    op = opctx.op
925

    
926
    assert op.status == constants.OP_STATUS_WAITLOCK
927

    
928
    timeout = opctx.GetNextLockTimeout()
929

    
930
    try:
931
      # Make sure not to hold queue lock while calling ExecOpCode
932
      result = self.opexec_fn(op.input,
933
                              _OpExecCallbacks(self.queue, self.job, op),
934
                              timeout=timeout, priority=op.priority)
935
    except mcpu.LockAcquireTimeout:
936
      assert timeout is not None, "Received timeout for blocking acquire"
937
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
938

    
939
      assert op.status in (constants.OP_STATUS_WAITLOCK,
940
                           constants.OP_STATUS_CANCELING)
941

    
942
      # Was job cancelled while we were waiting for the lock?
943
      if op.status == constants.OP_STATUS_CANCELING:
944
        return (constants.OP_STATUS_CANCELING, None)
945

    
946
      # Stay in waitlock while trying to re-acquire lock
947
      return (constants.OP_STATUS_WAITLOCK, None)
948
    except CancelJob:
949
      logging.exception("%s: Canceling job", opctx.log_prefix)
950
      assert op.status == constants.OP_STATUS_CANCELING
951
      return (constants.OP_STATUS_CANCELING, None)
952
    except Exception, err: # pylint: disable-msg=W0703
953
      logging.exception("%s: Caught exception in %s",
954
                        opctx.log_prefix, opctx.summary)
955
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
956
    else:
957
      logging.debug("%s: %s successful",
958
                    opctx.log_prefix, opctx.summary)
959
      return (constants.OP_STATUS_SUCCESS, result)
960

    
961
  def __call__(self, _nextop_fn=None):
962
    """Continues execution of a job.
963

964
    @param _nextop_fn: Callback function for tests
965
    @rtype: bool
966
    @return: True if job is finished, False if processor needs to be called
967
             again
968

969
    """
970
    queue = self.queue
971
    job = self.job
972

    
973
    logging.debug("Processing job %s", job.id)
974

    
975
    queue.acquire(shared=1)
976
    try:
977
      opcount = len(job.ops)
978

    
979
      # Is a previous opcode still pending?
980
      if job.cur_opctx:
981
        opctx = job.cur_opctx
982
        job.cur_opctx = None
983
      else:
984
        if __debug__ and _nextop_fn:
985
          _nextop_fn()
986
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
987

    
988
      op = opctx.op
989

    
990
      # Consistency check
991
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
992
                                     constants.OP_STATUS_CANCELING,
993
                                     constants.OP_STATUS_CANCELED)
994
                        for i in job.ops[opctx.index + 1:])
995

    
996
      assert op.status in (constants.OP_STATUS_QUEUED,
997
                           constants.OP_STATUS_WAITLOCK,
998
                           constants.OP_STATUS_CANCELING,
999
                           constants.OP_STATUS_CANCELED)
1000

    
1001
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1002
              op.priority >= constants.OP_PRIO_HIGHEST)
1003

    
1004
      if op.status not in (constants.OP_STATUS_CANCELING,
1005
                           constants.OP_STATUS_CANCELED):
1006
        assert op.status in (constants.OP_STATUS_QUEUED,
1007
                             constants.OP_STATUS_WAITLOCK)
1008

    
1009
        # Prepare to start opcode
1010
        if self._MarkWaitlock(job, op):
1011
          # Write to disk
1012
          queue.UpdateJobUnlocked(job)
1013

    
1014
        assert op.status == constants.OP_STATUS_WAITLOCK
1015
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1016
        assert job.start_timestamp and op.start_timestamp
1017

    
1018
        logging.info("%s: opcode %s waiting for locks",
1019
                     opctx.log_prefix, opctx.summary)
1020

    
1021
        queue.release()
1022
        try:
1023
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1024
        finally:
1025
          queue.acquire(shared=1)
1026

    
1027
        op.status = op_status
1028
        op.result = op_result
1029

    
1030
        if op.status == constants.OP_STATUS_WAITLOCK:
1031
          # Couldn't get locks in time
1032
          assert not op.end_timestamp
1033
        else:
1034
          # Finalize opcode
1035
          op.end_timestamp = TimeStampNow()
1036

    
1037
          if op.status == constants.OP_STATUS_CANCELING:
1038
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1039
                                  for i in job.ops[opctx.index:])
1040
          else:
1041
            assert op.status in constants.OPS_FINALIZED
1042

    
1043
      if op.status == constants.OP_STATUS_WAITLOCK:
1044
        finalize = False
1045

    
1046
        if opctx.CheckPriorityIncrease():
1047
          # Priority was changed, need to update on-disk file
1048
          queue.UpdateJobUnlocked(job)
1049

    
1050
        # Keep around for another round
1051
        job.cur_opctx = opctx
1052

    
1053
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1054
                op.priority >= constants.OP_PRIO_HIGHEST)
1055

    
1056
        # In no case must the status be finalized here
1057
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1058

    
1059
      else:
1060
        # Ensure all opcodes so far have been successful
1061
        assert (opctx.index == 0 or
1062
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1063
                           for i in job.ops[:opctx.index]))
1064

    
1065
        # Reset context
1066
        job.cur_opctx = None
1067

    
1068
        if op.status == constants.OP_STATUS_SUCCESS:
1069
          finalize = False
1070

    
1071
        elif op.status == constants.OP_STATUS_ERROR:
1072
          # Ensure failed opcode has an exception as its result
1073
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1074

    
1075
          to_encode = errors.OpExecError("Preceding opcode failed")
1076
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1077
                                _EncodeOpError(to_encode))
1078
          finalize = True
1079

    
1080
          # Consistency check
1081
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1082
                            errors.GetEncodedError(i.result)
1083
                            for i in job.ops[opctx.index:])
1084

    
1085
        elif op.status == constants.OP_STATUS_CANCELING:
1086
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1087
                                "Job canceled by request")
1088
          finalize = True
1089

    
1090
        elif op.status == constants.OP_STATUS_CANCELED:
1091
          finalize = True
1092

    
1093
        else:
1094
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1095

    
1096
        # Finalizing or last opcode?
1097
        if finalize or opctx.index == (opcount - 1):
1098
          # All opcodes have been run, finalize job
1099
          job.end_timestamp = TimeStampNow()
1100

    
1101
        # Write to disk. If the job status is final, this is the final write
1102
        # allowed. Once the file has been written, it can be archived anytime.
1103
        queue.UpdateJobUnlocked(job)
1104

    
1105
        if finalize or opctx.index == (opcount - 1):
1106
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1107
          return True
1108

    
1109
      return False
1110
    finally:
1111
      queue.release()
1112

    
1113

    
1114
class _JobQueueWorker(workerpool.BaseWorker):
1115
  """The actual job workers.
1116

1117
  """
1118
  def RunTask(self, job): # pylint: disable-msg=W0221
1119
    """Job executor.
1120

1121
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1122
    L{_QueuedOpCode} classes.
1123

1124
    @type job: L{_QueuedJob}
1125
    @param job: the job to be processed
1126

1127
    """
1128
    queue = job.queue
1129
    assert queue == self.pool.queue
1130

    
1131
    self.SetTaskName("Job%s" % job.id)
1132

    
1133
    proc = mcpu.Processor(queue.context, job.id)
1134

    
1135
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1136
      # Schedule again
1137
      raise workerpool.DeferTask(priority=job.CalcPriority())
1138

    
1139

    
1140
class _JobQueueWorkerPool(workerpool.WorkerPool):
1141
  """Simple class implementing a job-processing workerpool.
1142

1143
  """
1144
  def __init__(self, queue):
1145
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1146
                                              JOBQUEUE_THREADS,
1147
                                              _JobQueueWorker)
1148
    self.queue = queue
1149

    
1150

    
1151
def _RequireOpenQueue(fn):
1152
  """Decorator for "public" functions.
1153

1154
  This function should be used for all 'public' functions. That is,
1155
  functions usually called from other classes. Note that this should
1156
  be applied only to methods (not plain functions), since it expects
1157
  that the decorated function is called with a first argument that has
1158
  a '_queue_filelock' argument.
1159

1160
  @warning: Use this decorator only after locking.ssynchronized
1161

1162
  Example::
1163
    @locking.ssynchronized(_LOCK)
1164
    @_RequireOpenQueue
1165
    def Example(self):
1166
      pass
1167

1168
  """
1169
  def wrapper(self, *args, **kwargs):
1170
    # pylint: disable-msg=W0212
1171
    assert self._queue_filelock is not None, "Queue should be open"
1172
    return fn(self, *args, **kwargs)
1173
  return wrapper
1174

    
1175

    
1176
class JobQueue(object):
1177
  """Queue used to manage the jobs.
1178

1179
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1180

1181
  """
1182
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1183

    
1184
  def __init__(self, context):
1185
    """Constructor for JobQueue.
1186

1187
    The constructor will initialize the job queue object and then
1188
    start loading the current jobs from disk, either for starting them
1189
    (if they were queue) or for aborting them (if they were already
1190
    running).
1191

1192
    @type context: GanetiContext
1193
    @param context: the context object for access to the configuration
1194
        data and other ganeti objects
1195

1196
    """
1197
    self.context = context
1198
    self._memcache = weakref.WeakValueDictionary()
1199
    self._my_hostname = netutils.Hostname.GetSysName()
1200

    
1201
    # The Big JobQueue lock. If a code block or method acquires it in shared
1202
    # mode safe it must guarantee concurrency with all the code acquiring it in
1203
    # shared mode, including itself. In order not to acquire it at all
1204
    # concurrency must be guaranteed with all code acquiring it in shared mode
1205
    # and all code acquiring it exclusively.
1206
    self._lock = locking.SharedLock("JobQueue")
1207

    
1208
    self.acquire = self._lock.acquire
1209
    self.release = self._lock.release
1210

    
1211
    # Initialize the queue, and acquire the filelock.
1212
    # This ensures no other process is working on the job queue.
1213
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1214

    
1215
    # Read serial file
1216
    self._last_serial = jstore.ReadSerial()
1217
    assert self._last_serial is not None, ("Serial file was modified between"
1218
                                           " check in jstore and here")
1219

    
1220
    # Get initial list of nodes
1221
    self._nodes = dict((n.name, n.primary_ip)
1222
                       for n in self.context.cfg.GetAllNodesInfo().values()
1223
                       if n.master_candidate)
1224

    
1225
    # Remove master node
1226
    self._nodes.pop(self._my_hostname, None)
1227

    
1228
    # TODO: Check consistency across nodes
1229

    
1230
    self._queue_size = 0
1231
    self._UpdateQueueSizeUnlocked()
1232
    self._drained = jstore.CheckDrainFlag()
1233

    
1234
    # Setup worker pool
1235
    self._wpool = _JobQueueWorkerPool(self)
1236
    try:
1237
      self._InspectQueue()
1238
    except:
1239
      self._wpool.TerminateWorkers()
1240
      raise
1241

    
1242
  @locking.ssynchronized(_LOCK)
1243
  @_RequireOpenQueue
1244
  def _InspectQueue(self):
1245
    """Loads the whole job queue and resumes unfinished jobs.
1246

1247
    This function needs the lock here because WorkerPool.AddTask() may start a
1248
    job while we're still doing our work.
1249

1250
    """
1251
    logging.info("Inspecting job queue")
1252

    
1253
    restartjobs = []
1254

    
1255
    all_job_ids = self._GetJobIDsUnlocked()
1256
    jobs_count = len(all_job_ids)
1257
    lastinfo = time.time()
1258
    for idx, job_id in enumerate(all_job_ids):
1259
      # Give an update every 1000 jobs or 10 seconds
1260
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1261
          idx == (jobs_count - 1)):
1262
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1263
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1264
        lastinfo = time.time()
1265

    
1266
      job = self._LoadJobUnlocked(job_id)
1267

    
1268
      # a failure in loading the job can cause 'None' to be returned
1269
      if job is None:
1270
        continue
1271

    
1272
      status = job.CalcStatus()
1273

    
1274
      if status == constants.JOB_STATUS_QUEUED:
1275
        restartjobs.append(job)
1276

    
1277
      elif status in (constants.JOB_STATUS_RUNNING,
1278
                      constants.JOB_STATUS_WAITLOCK,
1279
                      constants.JOB_STATUS_CANCELING):
1280
        logging.warning("Unfinished job %s found: %s", job.id, job)
1281

    
1282
        if status == constants.JOB_STATUS_WAITLOCK:
1283
          # Restart job
1284
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1285
          restartjobs.append(job)
1286
        else:
1287
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1288
                                "Unclean master daemon shutdown")
1289

    
1290
        self.UpdateJobUnlocked(job)
1291

    
1292
    if restartjobs:
1293
      logging.info("Restarting %s jobs", len(restartjobs))
1294
      self._EnqueueJobs(restartjobs)
1295

    
1296
    logging.info("Job queue inspection finished")
1297

    
1298
  @locking.ssynchronized(_LOCK)
1299
  @_RequireOpenQueue
1300
  def AddNode(self, node):
1301
    """Register a new node with the queue.
1302

1303
    @type node: L{objects.Node}
1304
    @param node: the node object to be added
1305

1306
    """
1307
    node_name = node.name
1308
    assert node_name != self._my_hostname
1309

    
1310
    # Clean queue directory on added node
1311
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1312
    msg = result.fail_msg
1313
    if msg:
1314
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1315
                      node_name, msg)
1316

    
1317
    if not node.master_candidate:
1318
      # remove if existing, ignoring errors
1319
      self._nodes.pop(node_name, None)
1320
      # and skip the replication of the job ids
1321
      return
1322

    
1323
    # Upload the whole queue excluding archived jobs
1324
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1325

    
1326
    # Upload current serial file
1327
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1328

    
1329
    for file_name in files:
1330
      # Read file content
1331
      content = utils.ReadFile(file_name)
1332

    
1333
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1334
                                                  [node.primary_ip],
1335
                                                  file_name, content)
1336
      msg = result[node_name].fail_msg
1337
      if msg:
1338
        logging.error("Failed to upload file %s to node %s: %s",
1339
                      file_name, node_name, msg)
1340

    
1341
    self._nodes[node_name] = node.primary_ip
1342

    
1343
  @locking.ssynchronized(_LOCK)
1344
  @_RequireOpenQueue
1345
  def RemoveNode(self, node_name):
1346
    """Callback called when removing nodes from the cluster.
1347

1348
    @type node_name: str
1349
    @param node_name: the name of the node to remove
1350

1351
    """
1352
    self._nodes.pop(node_name, None)
1353

    
1354
  @staticmethod
1355
  def _CheckRpcResult(result, nodes, failmsg):
1356
    """Verifies the status of an RPC call.
1357

1358
    Since we aim to keep consistency should this node (the current
1359
    master) fail, we will log errors if our rpc fail, and especially
1360
    log the case when more than half of the nodes fails.
1361

1362
    @param result: the data as returned from the rpc call
1363
    @type nodes: list
1364
    @param nodes: the list of nodes we made the call to
1365
    @type failmsg: str
1366
    @param failmsg: the identifier to be used for logging
1367

1368
    """
1369
    failed = []
1370
    success = []
1371

    
1372
    for node in nodes:
1373
      msg = result[node].fail_msg
1374
      if msg:
1375
        failed.append(node)
1376
        logging.error("RPC call %s (%s) failed on node %s: %s",
1377
                      result[node].call, failmsg, node, msg)
1378
      else:
1379
        success.append(node)
1380

    
1381
    # +1 for the master node
1382
    if (len(success) + 1) < len(failed):
1383
      # TODO: Handle failing nodes
1384
      logging.error("More than half of the nodes failed")
1385

    
1386
  def _GetNodeIp(self):
1387
    """Helper for returning the node name/ip list.
1388

1389
    @rtype: (list, list)
1390
    @return: a tuple of two lists, the first one with the node
1391
        names and the second one with the node addresses
1392

1393
    """
1394
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1395
    name_list = self._nodes.keys()
1396
    addr_list = [self._nodes[name] for name in name_list]
1397
    return name_list, addr_list
1398

    
1399
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1400
    """Writes a file locally and then replicates it to all nodes.
1401

1402
    This function will replace the contents of a file on the local
1403
    node and then replicate it to all the other nodes we have.
1404

1405
    @type file_name: str
1406
    @param file_name: the path of the file to be replicated
1407
    @type data: str
1408
    @param data: the new contents of the file
1409
    @type replicate: boolean
1410
    @param replicate: whether to spread the changes to the remote nodes
1411

1412
    """
1413
    getents = runtime.GetEnts()
1414
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1415
                    gid=getents.masterd_gid)
1416

    
1417
    if replicate:
1418
      names, addrs = self._GetNodeIp()
1419
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1420
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1421

    
1422
  def _RenameFilesUnlocked(self, rename):
1423
    """Renames a file locally and then replicate the change.
1424

1425
    This function will rename a file in the local queue directory
1426
    and then replicate this rename to all the other nodes we have.
1427

1428
    @type rename: list of (old, new)
1429
    @param rename: List containing tuples mapping old to new names
1430

1431
    """
1432
    # Rename them locally
1433
    for old, new in rename:
1434
      utils.RenameFile(old, new, mkdir=True)
1435

    
1436
    # ... and on all nodes
1437
    names, addrs = self._GetNodeIp()
1438
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1439
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1440

    
1441
  @staticmethod
1442
  def _FormatJobID(job_id):
1443
    """Convert a job ID to string format.
1444

1445
    Currently this just does C{str(job_id)} after performing some
1446
    checks, but if we want to change the job id format this will
1447
    abstract this change.
1448

1449
    @type job_id: int or long
1450
    @param job_id: the numeric job id
1451
    @rtype: str
1452
    @return: the formatted job id
1453

1454
    """
1455
    if not isinstance(job_id, (int, long)):
1456
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1457
    if job_id < 0:
1458
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1459

    
1460
    return str(job_id)
1461

    
1462
  @classmethod
1463
  def _GetArchiveDirectory(cls, job_id):
1464
    """Returns the archive directory for a job.
1465

1466
    @type job_id: str
1467
    @param job_id: Job identifier
1468
    @rtype: str
1469
    @return: Directory name
1470

1471
    """
1472
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1473

    
1474
  def _NewSerialsUnlocked(self, count):
1475
    """Generates a new job identifier.
1476

1477
    Job identifiers are unique during the lifetime of a cluster.
1478

1479
    @type count: integer
1480
    @param count: how many serials to return
1481
    @rtype: str
1482
    @return: a string representing the job identifier.
1483

1484
    """
1485
    assert count > 0
1486
    # New number
1487
    serial = self._last_serial + count
1488

    
1489
    # Write to file
1490
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1491
                             "%s\n" % serial, True)
1492

    
1493
    result = [self._FormatJobID(v)
1494
              for v in range(self._last_serial, serial + 1)]
1495
    # Keep it only if we were able to write the file
1496
    self._last_serial = serial
1497

    
1498
    return result
1499

    
1500
  @staticmethod
1501
  def _GetJobPath(job_id):
1502
    """Returns the job file for a given job id.
1503

1504
    @type job_id: str
1505
    @param job_id: the job identifier
1506
    @rtype: str
1507
    @return: the path to the job file
1508

1509
    """
1510
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1511

    
1512
  @classmethod
1513
  def _GetArchivedJobPath(cls, job_id):
1514
    """Returns the archived job file for a give job id.
1515

1516
    @type job_id: str
1517
    @param job_id: the job identifier
1518
    @rtype: str
1519
    @return: the path to the archived job file
1520

1521
    """
1522
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1523
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1524

    
1525
  def _GetJobIDsUnlocked(self, sort=True):
1526
    """Return all known job IDs.
1527

1528
    The method only looks at disk because it's a requirement that all
1529
    jobs are present on disk (so in the _memcache we don't have any
1530
    extra IDs).
1531

1532
    @type sort: boolean
1533
    @param sort: perform sorting on the returned job ids
1534
    @rtype: list
1535
    @return: the list of job IDs
1536

1537
    """
1538
    jlist = []
1539
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1540
      m = self._RE_JOB_FILE.match(filename)
1541
      if m:
1542
        jlist.append(m.group(1))
1543
    if sort:
1544
      jlist = utils.NiceSort(jlist)
1545
    return jlist
1546

    
1547
  def _LoadJobUnlocked(self, job_id):
1548
    """Loads a job from the disk or memory.
1549

1550
    Given a job id, this will return the cached job object if
1551
    existing, or try to load the job from the disk. If loading from
1552
    disk, it will also add the job to the cache.
1553

1554
    @param job_id: the job id
1555
    @rtype: L{_QueuedJob} or None
1556
    @return: either None or the job object
1557

1558
    """
1559
    job = self._memcache.get(job_id, None)
1560
    if job:
1561
      logging.debug("Found job %s in memcache", job_id)
1562
      return job
1563

    
1564
    try:
1565
      job = self._LoadJobFromDisk(job_id)
1566
      if job is None:
1567
        return job
1568
    except errors.JobFileCorrupted:
1569
      old_path = self._GetJobPath(job_id)
1570
      new_path = self._GetArchivedJobPath(job_id)
1571
      if old_path == new_path:
1572
        # job already archived (future case)
1573
        logging.exception("Can't parse job %s", job_id)
1574
      else:
1575
        # non-archived case
1576
        logging.exception("Can't parse job %s, will archive.", job_id)
1577
        self._RenameFilesUnlocked([(old_path, new_path)])
1578
      return None
1579

    
1580
    self._memcache[job_id] = job
1581
    logging.debug("Added job %s to the cache", job_id)
1582
    return job
1583

    
1584
  def _LoadJobFromDisk(self, job_id):
1585
    """Load the given job file from disk.
1586

1587
    Given a job file, read, load and restore it in a _QueuedJob format.
1588

1589
    @type job_id: string
1590
    @param job_id: job identifier
1591
    @rtype: L{_QueuedJob} or None
1592
    @return: either None or the job object
1593

1594
    """
1595
    filepath = self._GetJobPath(job_id)
1596
    logging.debug("Loading job from %s", filepath)
1597
    try:
1598
      raw_data = utils.ReadFile(filepath)
1599
    except EnvironmentError, err:
1600
      if err.errno in (errno.ENOENT, ):
1601
        return None
1602
      raise
1603

    
1604
    try:
1605
      data = serializer.LoadJson(raw_data)
1606
      job = _QueuedJob.Restore(self, data)
1607
    except Exception, err: # pylint: disable-msg=W0703
1608
      raise errors.JobFileCorrupted(err)
1609

    
1610
    return job
1611

    
1612
  def SafeLoadJobFromDisk(self, job_id):
1613
    """Load the given job file from disk.
1614

1615
    Given a job file, read, load and restore it in a _QueuedJob format.
1616
    In case of error reading the job, it gets returned as None, and the
1617
    exception is logged.
1618

1619
    @type job_id: string
1620
    @param job_id: job identifier
1621
    @rtype: L{_QueuedJob} or None
1622
    @return: either None or the job object
1623

1624
    """
1625
    try:
1626
      return self._LoadJobFromDisk(job_id)
1627
    except (errors.JobFileCorrupted, EnvironmentError):
1628
      logging.exception("Can't load/parse job %s", job_id)
1629
      return None
1630

    
1631
  def _UpdateQueueSizeUnlocked(self):
1632
    """Update the queue size.
1633

1634
    """
1635
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1636

    
1637
  @locking.ssynchronized(_LOCK)
1638
  @_RequireOpenQueue
1639
  def SetDrainFlag(self, drain_flag):
1640
    """Sets the drain flag for the queue.
1641

1642
    @type drain_flag: boolean
1643
    @param drain_flag: Whether to set or unset the drain flag
1644

1645
    """
1646
    jstore.SetDrainFlag(drain_flag)
1647

    
1648
    self._drained = drain_flag
1649

    
1650
    return True
1651

    
1652
  @_RequireOpenQueue
1653
  def _SubmitJobUnlocked(self, job_id, ops):
1654
    """Create and store a new job.
1655

1656
    This enters the job into our job queue and also puts it on the new
1657
    queue, in order for it to be picked up by the queue processors.
1658

1659
    @type job_id: job ID
1660
    @param job_id: the job ID for the new job
1661
    @type ops: list
1662
    @param ops: The list of OpCodes that will become the new job.
1663
    @rtype: L{_QueuedJob}
1664
    @return: the job object to be queued
1665
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1666
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1667
    @raise errors.GenericError: If an opcode is not valid
1668

1669
    """
1670
    # Ok when sharing the big job queue lock, as the drain file is created when
1671
    # the lock is exclusive.
1672
    if self._drained:
1673
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1674

    
1675
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1676
      raise errors.JobQueueFull()
1677

    
1678
    job = _QueuedJob(self, job_id, ops)
1679

    
1680
    # Check priority
1681
    for idx, op in enumerate(job.ops):
1682
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1683
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1684
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1685
                                  " are %s" % (idx, op.priority, allowed))
1686

    
1687
    # Write to disk
1688
    self.UpdateJobUnlocked(job)
1689

    
1690
    self._queue_size += 1
1691

    
1692
    logging.debug("Adding new job %s to the cache", job_id)
1693
    self._memcache[job_id] = job
1694

    
1695
    return job
1696

    
1697
  @locking.ssynchronized(_LOCK)
1698
  @_RequireOpenQueue
1699
  def SubmitJob(self, ops):
1700
    """Create and store a new job.
1701

1702
    @see: L{_SubmitJobUnlocked}
1703

1704
    """
1705
    job_id = self._NewSerialsUnlocked(1)[0]
1706
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1707
    return job_id
1708

    
1709
  @locking.ssynchronized(_LOCK)
1710
  @_RequireOpenQueue
1711
  def SubmitManyJobs(self, jobs):
1712
    """Create and store multiple jobs.
1713

1714
    @see: L{_SubmitJobUnlocked}
1715

1716
    """
1717
    results = []
1718
    added_jobs = []
1719
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1720
    for job_id, ops in zip(all_job_ids, jobs):
1721
      try:
1722
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1723
        status = True
1724
        data = job_id
1725
      except errors.GenericError, err:
1726
        data = ("%s; opcodes %s" %
1727
                (err, utils.CommaJoin(op.Summary() for op in ops)))
1728
        status = False
1729
      results.append((status, data))
1730

    
1731
    self._EnqueueJobs(added_jobs)
1732

    
1733
    return results
1734

    
1735
  def _EnqueueJobs(self, jobs):
1736
    """Helper function to add jobs to worker pool's queue.
1737

1738
    @type jobs: list
1739
    @param jobs: List of all jobs
1740

1741
    """
1742
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1743
                             priority=[job.CalcPriority() for job in jobs])
1744

    
1745
  @_RequireOpenQueue
1746
  def UpdateJobUnlocked(self, job, replicate=True):
1747
    """Update a job's on disk storage.
1748

1749
    After a job has been modified, this function needs to be called in
1750
    order to write the changes to disk and replicate them to the other
1751
    nodes.
1752

1753
    @type job: L{_QueuedJob}
1754
    @param job: the changed job
1755
    @type replicate: boolean
1756
    @param replicate: whether to replicate the change to remote nodes
1757

1758
    """
1759
    filename = self._GetJobPath(job.id)
1760
    data = serializer.DumpJson(job.Serialize(), indent=False)
1761
    logging.debug("Writing job %s to %s", job.id, filename)
1762
    self._UpdateJobQueueFile(filename, data, replicate)
1763

    
1764
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1765
                        timeout):
1766
    """Waits for changes in a job.
1767

1768
    @type job_id: string
1769
    @param job_id: Job identifier
1770
    @type fields: list of strings
1771
    @param fields: Which fields to check for changes
1772
    @type prev_job_info: list or None
1773
    @param prev_job_info: Last job information returned
1774
    @type prev_log_serial: int
1775
    @param prev_log_serial: Last job message serial number
1776
    @type timeout: float
1777
    @param timeout: maximum time to wait in seconds
1778
    @rtype: tuple (job info, log entries)
1779
    @return: a tuple of the job information as required via
1780
        the fields parameter, and the log entries as a list
1781

1782
        if the job has not changed and the timeout has expired,
1783
        we instead return a special value,
1784
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1785
        as such by the clients
1786

1787
    """
1788
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1789

    
1790
    helper = _WaitForJobChangesHelper()
1791

    
1792
    return helper(self._GetJobPath(job_id), load_fn,
1793
                  fields, prev_job_info, prev_log_serial, timeout)
1794

    
1795
  @locking.ssynchronized(_LOCK)
1796
  @_RequireOpenQueue
1797
  def CancelJob(self, job_id):
1798
    """Cancels a job.
1799

1800
    This will only succeed if the job has not started yet.
1801

1802
    @type job_id: string
1803
    @param job_id: job ID of job to be cancelled.
1804

1805
    """
1806
    logging.info("Cancelling job %s", job_id)
1807

    
1808
    job = self._LoadJobUnlocked(job_id)
1809
    if not job:
1810
      logging.debug("Job %s not found", job_id)
1811
      return (False, "Job %s not found" % job_id)
1812

    
1813
    (success, msg) = job.Cancel()
1814

    
1815
    if success:
1816
      self.UpdateJobUnlocked(job)
1817

    
1818
    return (success, msg)
1819

    
1820
  @_RequireOpenQueue
1821
  def _ArchiveJobsUnlocked(self, jobs):
1822
    """Archives jobs.
1823

1824
    @type jobs: list of L{_QueuedJob}
1825
    @param jobs: Job objects
1826
    @rtype: int
1827
    @return: Number of archived jobs
1828

1829
    """
1830
    archive_jobs = []
1831
    rename_files = []
1832
    for job in jobs:
1833
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1834
        logging.debug("Job %s is not yet done", job.id)
1835
        continue
1836

    
1837
      archive_jobs.append(job)
1838

    
1839
      old = self._GetJobPath(job.id)
1840
      new = self._GetArchivedJobPath(job.id)
1841
      rename_files.append((old, new))
1842

    
1843
    # TODO: What if 1..n files fail to rename?
1844
    self._RenameFilesUnlocked(rename_files)
1845

    
1846
    logging.debug("Successfully archived job(s) %s",
1847
                  utils.CommaJoin(job.id for job in archive_jobs))
1848

    
1849
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1850
    # the files, we update the cached queue size from the filesystem. When we
1851
    # get around to fix the TODO: above, we can use the number of actually
1852
    # archived jobs to fix this.
1853
    self._UpdateQueueSizeUnlocked()
1854
    return len(archive_jobs)
1855

    
1856
  @locking.ssynchronized(_LOCK)
1857
  @_RequireOpenQueue
1858
  def ArchiveJob(self, job_id):
1859
    """Archives a job.
1860

1861
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1862

1863
    @type job_id: string
1864
    @param job_id: Job ID of job to be archived.
1865
    @rtype: bool
1866
    @return: Whether job was archived
1867

1868
    """
1869
    logging.info("Archiving job %s", job_id)
1870

    
1871
    job = self._LoadJobUnlocked(job_id)
1872
    if not job:
1873
      logging.debug("Job %s not found", job_id)
1874
      return False
1875

    
1876
    return self._ArchiveJobsUnlocked([job]) == 1
1877

    
1878
  @locking.ssynchronized(_LOCK)
1879
  @_RequireOpenQueue
1880
  def AutoArchiveJobs(self, age, timeout):
1881
    """Archives all jobs based on age.
1882

1883
    The method will archive all jobs which are older than the age
1884
    parameter. For jobs that don't have an end timestamp, the start
1885
    timestamp will be considered. The special '-1' age will cause
1886
    archival of all jobs (that are not running or queued).
1887

1888
    @type age: int
1889
    @param age: the minimum age in seconds
1890

1891
    """
1892
    logging.info("Archiving jobs with age more than %s seconds", age)
1893

    
1894
    now = time.time()
1895
    end_time = now + timeout
1896
    archived_count = 0
1897
    last_touched = 0
1898

    
1899
    all_job_ids = self._GetJobIDsUnlocked()
1900
    pending = []
1901
    for idx, job_id in enumerate(all_job_ids):
1902
      last_touched = idx + 1
1903

    
1904
      # Not optimal because jobs could be pending
1905
      # TODO: Measure average duration for job archival and take number of
1906
      # pending jobs into account.
1907
      if time.time() > end_time:
1908
        break
1909

    
1910
      # Returns None if the job failed to load
1911
      job = self._LoadJobUnlocked(job_id)
1912
      if job:
1913
        if job.end_timestamp is None:
1914
          if job.start_timestamp is None:
1915
            job_age = job.received_timestamp
1916
          else:
1917
            job_age = job.start_timestamp
1918
        else:
1919
          job_age = job.end_timestamp
1920

    
1921
        if age == -1 or now - job_age[0] > age:
1922
          pending.append(job)
1923

    
1924
          # Archive 10 jobs at a time
1925
          if len(pending) >= 10:
1926
            archived_count += self._ArchiveJobsUnlocked(pending)
1927
            pending = []
1928

    
1929
    if pending:
1930
      archived_count += self._ArchiveJobsUnlocked(pending)
1931

    
1932
    return (archived_count, len(all_job_ids) - last_touched)
1933

    
1934
  def QueryJobs(self, job_ids, fields):
1935
    """Returns a list of jobs in queue.
1936

1937
    @type job_ids: list
1938
    @param job_ids: sequence of job identifiers or None for all
1939
    @type fields: list
1940
    @param fields: names of fields to return
1941
    @rtype: list
1942
    @return: list one element per job, each element being list with
1943
        the requested fields
1944

1945
    """
1946
    jobs = []
1947
    list_all = False
1948
    if not job_ids:
1949
      # Since files are added to/removed from the queue atomically, there's no
1950
      # risk of getting the job ids in an inconsistent state.
1951
      job_ids = self._GetJobIDsUnlocked()
1952
      list_all = True
1953

    
1954
    for job_id in job_ids:
1955
      job = self.SafeLoadJobFromDisk(job_id)
1956
      if job is not None:
1957
        jobs.append(job.GetInfo(fields))
1958
      elif not list_all:
1959
        jobs.append(None)
1960

    
1961
    return jobs
1962

    
1963
  @locking.ssynchronized(_LOCK)
1964
  @_RequireOpenQueue
1965
  def Shutdown(self):
1966
    """Stops the job queue.
1967

1968
    This shutdowns all the worker threads an closes the queue.
1969

1970
    """
1971
    self._wpool.TerminateWorkers()
1972

    
1973
    self._queue_filelock.Close()
1974
    self._queue_filelock = None