Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 26d3fd2f

History | View | Annotate | Download (57.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
    self._InitInMemory(self)
207

    
208
  @staticmethod
209
  def _InitInMemory(obj):
210
    """Initializes in-memory variables.
211

212
    """
213
    obj.ops_iter = None
214
    obj.cur_opctx = None
215

    
216
  def __repr__(self):
217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218
              "id=%s" % self.id,
219
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220

    
221
    return "<%s at %#x>" % (" ".join(status), id(self))
222

    
223
  @classmethod
224
  def Restore(cls, queue, state):
225
    """Restore a _QueuedJob from serialized state:
226

227
    @type queue: L{JobQueue}
228
    @param queue: to which queue the restored job belongs
229
    @type state: dict
230
    @param state: the serialized state
231
    @rtype: _JobQueue
232
    @return: the restored _JobQueue instance
233

234
    """
235
    obj = _QueuedJob.__new__(cls)
236
    obj.queue = queue
237
    obj.id = state["id"]
238
    obj.received_timestamp = state.get("received_timestamp", None)
239
    obj.start_timestamp = state.get("start_timestamp", None)
240
    obj.end_timestamp = state.get("end_timestamp", None)
241

    
242
    obj.ops = []
243
    obj.log_serial = 0
244
    for op_state in state["ops"]:
245
      op = _QueuedOpCode.Restore(op_state)
246
      for log_entry in op.log:
247
        obj.log_serial = max(obj.log_serial, log_entry[0])
248
      obj.ops.append(op)
249

    
250
    cls._InitInMemory(obj)
251

    
252
    return obj
253

    
254
  def Serialize(self):
255
    """Serialize the _JobQueue instance.
256

257
    @rtype: dict
258
    @return: the serialized state
259

260
    """
261
    return {
262
      "id": self.id,
263
      "ops": [op.Serialize() for op in self.ops],
264
      "start_timestamp": self.start_timestamp,
265
      "end_timestamp": self.end_timestamp,
266
      "received_timestamp": self.received_timestamp,
267
      }
268

    
269
  def CalcStatus(self):
270
    """Compute the status of this job.
271

272
    This function iterates over all the _QueuedOpCodes in the job and
273
    based on their status, computes the job status.
274

275
    The algorithm is:
276
      - if we find a cancelled, or finished with error, the job
277
        status will be the same
278
      - otherwise, the last opcode with the status one of:
279
          - waitlock
280
          - canceling
281
          - running
282

283
        will determine the job status
284

285
      - otherwise, it means either all opcodes are queued, or success,
286
        and the job status will be the same
287

288
    @return: the job status
289

290
    """
291
    status = constants.JOB_STATUS_QUEUED
292

    
293
    all_success = True
294
    for op in self.ops:
295
      if op.status == constants.OP_STATUS_SUCCESS:
296
        continue
297

    
298
      all_success = False
299

    
300
      if op.status == constants.OP_STATUS_QUEUED:
301
        pass
302
      elif op.status == constants.OP_STATUS_WAITLOCK:
303
        status = constants.JOB_STATUS_WAITLOCK
304
      elif op.status == constants.OP_STATUS_RUNNING:
305
        status = constants.JOB_STATUS_RUNNING
306
      elif op.status == constants.OP_STATUS_CANCELING:
307
        status = constants.JOB_STATUS_CANCELING
308
        break
309
      elif op.status == constants.OP_STATUS_ERROR:
310
        status = constants.JOB_STATUS_ERROR
311
        # The whole job fails if one opcode failed
312
        break
313
      elif op.status == constants.OP_STATUS_CANCELED:
314
        status = constants.OP_STATUS_CANCELED
315
        break
316

    
317
    if all_success:
318
      status = constants.JOB_STATUS_SUCCESS
319

    
320
    return status
321

    
322
  def CalcPriority(self):
323
    """Gets the current priority for this job.
324

325
    Only unfinished opcodes are considered. When all are done, the default
326
    priority is used.
327

328
    @rtype: int
329

330
    """
331
    priorities = [op.priority for op in self.ops
332
                  if op.status not in constants.OPS_FINALIZED]
333

    
334
    if not priorities:
335
      # All opcodes are done, assume default priority
336
      return constants.OP_PRIO_DEFAULT
337

    
338
    return min(priorities)
339

    
340
  def GetLogEntries(self, newer_than):
341
    """Selectively returns the log entries.
342

343
    @type newer_than: None or int
344
    @param newer_than: if this is None, return all log entries,
345
        otherwise return only the log entries with serial higher
346
        than this value
347
    @rtype: list
348
    @return: the list of the log entries selected
349

350
    """
351
    if newer_than is None:
352
      serial = -1
353
    else:
354
      serial = newer_than
355

    
356
    entries = []
357
    for op in self.ops:
358
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359

    
360
    return entries
361

    
362
  def GetInfo(self, fields):
363
    """Returns information about a job.
364

365
    @type fields: list
366
    @param fields: names of fields to return
367
    @rtype: list
368
    @return: list with one element for each field
369
    @raise errors.OpExecError: when an invalid field
370
        has been passed
371

372
    """
373
    row = []
374
    for fname in fields:
375
      if fname == "id":
376
        row.append(self.id)
377
      elif fname == "status":
378
        row.append(self.CalcStatus())
379
      elif fname == "ops":
380
        row.append([op.input.__getstate__() for op in self.ops])
381
      elif fname == "opresult":
382
        row.append([op.result for op in self.ops])
383
      elif fname == "opstatus":
384
        row.append([op.status for op in self.ops])
385
      elif fname == "oplog":
386
        row.append([op.log for op in self.ops])
387
      elif fname == "opstart":
388
        row.append([op.start_timestamp for op in self.ops])
389
      elif fname == "opexec":
390
        row.append([op.exec_timestamp for op in self.ops])
391
      elif fname == "opend":
392
        row.append([op.end_timestamp for op in self.ops])
393
      elif fname == "received_ts":
394
        row.append(self.received_timestamp)
395
      elif fname == "start_ts":
396
        row.append(self.start_timestamp)
397
      elif fname == "end_ts":
398
        row.append(self.end_timestamp)
399
      elif fname == "summary":
400
        row.append([op.input.Summary() for op in self.ops])
401
      else:
402
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
403
    return row
404

    
405
  def MarkUnfinishedOps(self, status, result):
406
    """Mark unfinished opcodes with a given status and result.
407

408
    This is an utility function for marking all running or waiting to
409
    be run opcodes with a given status. Opcodes which are already
410
    finalised are not changed.
411

412
    @param status: a given opcode status
413
    @param result: the opcode result
414

415
    """
416
    not_marked = True
417
    for op in self.ops:
418
      if op.status in constants.OPS_FINALIZED:
419
        assert not_marked, "Finalized opcodes found after non-finalized ones"
420
        continue
421
      op.status = status
422
      op.result = result
423
      not_marked = False
424

    
425
  def Cancel(self):
426
    """Marks job as canceled/-ing if possible.
427

428
    @rtype: tuple; (bool, string)
429
    @return: Boolean describing whether job was successfully canceled or marked
430
      as canceling and a text message
431

432
    """
433
    status = self.CalcStatus()
434

    
435
    if status not in (constants.JOB_STATUS_QUEUED,
436
                      constants.JOB_STATUS_WAITLOCK):
437
      logging.debug("Job %s is no longer waiting in the queue", self.id)
438
      return (False, "Job %s is no longer waiting in the queue" % self.id)
439

    
440
    if status == constants.JOB_STATUS_QUEUED:
441
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
442
                             "Job canceled by request")
443
      msg = "Job %s canceled" % self.id
444

    
445
    elif status == constants.JOB_STATUS_WAITLOCK:
446
      # The worker will notice the new status and cancel the job
447
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
448
      msg = "Job %s will be canceled" % self.id
449

    
450
    return (True, msg)
451

    
452

    
453
class _OpExecCallbacks(mcpu.OpExecCbBase):
454
  def __init__(self, queue, job, op):
455
    """Initializes this class.
456

457
    @type queue: L{JobQueue}
458
    @param queue: Job queue
459
    @type job: L{_QueuedJob}
460
    @param job: Job object
461
    @type op: L{_QueuedOpCode}
462
    @param op: OpCode
463

464
    """
465
    assert queue, "Queue is missing"
466
    assert job, "Job is missing"
467
    assert op, "Opcode is missing"
468

    
469
    self._queue = queue
470
    self._job = job
471
    self._op = op
472

    
473
  def _CheckCancel(self):
474
    """Raises an exception to cancel the job if asked to.
475

476
    """
477
    # Cancel here if we were asked to
478
    if self._op.status == constants.OP_STATUS_CANCELING:
479
      logging.debug("Canceling opcode")
480
      raise CancelJob()
481

    
482
  @locking.ssynchronized(_QUEUE, shared=1)
483
  def NotifyStart(self):
484
    """Mark the opcode as running, not lock-waiting.
485

486
    This is called from the mcpu code as a notifier function, when the LU is
487
    finally about to start the Exec() method. Of course, to have end-user
488
    visible results, the opcode must be initially (before calling into
489
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
490

491
    """
492
    assert self._op in self._job.ops
493
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494
                               constants.OP_STATUS_CANCELING)
495

    
496
    # Cancel here if we were asked to
497
    self._CheckCancel()
498

    
499
    logging.debug("Opcode is now running")
500

    
501
    self._op.status = constants.OP_STATUS_RUNNING
502
    self._op.exec_timestamp = TimeStampNow()
503

    
504
    # And finally replicate the job status
505
    self._queue.UpdateJobUnlocked(self._job)
506

    
507
  @locking.ssynchronized(_QUEUE, shared=1)
508
  def _AppendFeedback(self, timestamp, log_type, log_msg):
509
    """Internal feedback append function, with locks
510

511
    """
512
    self._job.log_serial += 1
513
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
515

    
516
  def Feedback(self, *args):
517
    """Append a log entry.
518

519
    """
520
    assert len(args) < 3
521

    
522
    if len(args) == 1:
523
      log_type = constants.ELOG_MESSAGE
524
      log_msg = args[0]
525
    else:
526
      (log_type, log_msg) = args
527

    
528
    # The time is split to make serialization easier and not lose
529
    # precision.
530
    timestamp = utils.SplitTime(time.time())
531
    self._AppendFeedback(timestamp, log_type, log_msg)
532

    
533
  def CheckCancel(self):
534
    """Check whether job has been cancelled.
535

536
    """
537
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538
                               constants.OP_STATUS_CANCELING)
539

    
540
    # Cancel here if we were asked to
541
    self._CheckCancel()
542

    
543

    
544
class _JobChangesChecker(object):
545
  def __init__(self, fields, prev_job_info, prev_log_serial):
546
    """Initializes this class.
547

548
    @type fields: list of strings
549
    @param fields: Fields requested by LUXI client
550
    @type prev_job_info: string
551
    @param prev_job_info: previous job info, as passed by the LUXI client
552
    @type prev_log_serial: string
553
    @param prev_log_serial: previous job serial, as passed by the LUXI client
554

555
    """
556
    self._fields = fields
557
    self._prev_job_info = prev_job_info
558
    self._prev_log_serial = prev_log_serial
559

    
560
  def __call__(self, job):
561
    """Checks whether job has changed.
562

563
    @type job: L{_QueuedJob}
564
    @param job: Job object
565

566
    """
567
    status = job.CalcStatus()
568
    job_info = job.GetInfo(self._fields)
569
    log_entries = job.GetLogEntries(self._prev_log_serial)
570

    
571
    # Serializing and deserializing data can cause type changes (e.g. from
572
    # tuple to list) or precision loss. We're doing it here so that we get
573
    # the same modifications as the data received from the client. Without
574
    # this, the comparison afterwards might fail without the data being
575
    # significantly different.
576
    # TODO: we just deserialized from disk, investigate how to make sure that
577
    # the job info and log entries are compatible to avoid this further step.
578
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
579
    # efficient, though floats will be tricky
580
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
581
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
582

    
583
    # Don't even try to wait if the job is no longer running, there will be
584
    # no changes.
585
    if (status not in (constants.JOB_STATUS_QUEUED,
586
                       constants.JOB_STATUS_RUNNING,
587
                       constants.JOB_STATUS_WAITLOCK) or
588
        job_info != self._prev_job_info or
589
        (log_entries and self._prev_log_serial != log_entries[0][0])):
590
      logging.debug("Job %s changed", job.id)
591
      return (job_info, log_entries)
592

    
593
    return None
594

    
595

    
596
class _JobFileChangesWaiter(object):
597
  def __init__(self, filename):
598
    """Initializes this class.
599

600
    @type filename: string
601
    @param filename: Path to job file
602
    @raises errors.InotifyError: if the notifier cannot be setup
603

604
    """
605
    self._wm = pyinotify.WatchManager()
606
    self._inotify_handler = \
607
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
608
    self._notifier = \
609
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
610
    try:
611
      self._inotify_handler.enable()
612
    except Exception:
613
      # pyinotify doesn't close file descriptors automatically
614
      self._notifier.stop()
615
      raise
616

    
617
  def _OnInotify(self, notifier_enabled):
618
    """Callback for inotify.
619

620
    """
621
    if not notifier_enabled:
622
      self._inotify_handler.enable()
623

    
624
  def Wait(self, timeout):
625
    """Waits for the job file to change.
626

627
    @type timeout: float
628
    @param timeout: Timeout in seconds
629
    @return: Whether there have been events
630

631
    """
632
    assert timeout >= 0
633
    have_events = self._notifier.check_events(timeout * 1000)
634
    if have_events:
635
      self._notifier.read_events()
636
    self._notifier.process_events()
637
    return have_events
638

    
639
  def Close(self):
640
    """Closes underlying notifier and its file descriptor.
641

642
    """
643
    self._notifier.stop()
644

    
645

    
646
class _JobChangesWaiter(object):
647
  def __init__(self, filename):
648
    """Initializes this class.
649

650
    @type filename: string
651
    @param filename: Path to job file
652

653
    """
654
    self._filewaiter = None
655
    self._filename = filename
656

    
657
  def Wait(self, timeout):
658
    """Waits for a job to change.
659

660
    @type timeout: float
661
    @param timeout: Timeout in seconds
662
    @return: Whether there have been events
663

664
    """
665
    if self._filewaiter:
666
      return self._filewaiter.Wait(timeout)
667

    
668
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
669
    # If this point is reached, return immediately and let caller check the job
670
    # file again in case there were changes since the last check. This avoids a
671
    # race condition.
672
    self._filewaiter = _JobFileChangesWaiter(self._filename)
673

    
674
    return True
675

    
676
  def Close(self):
677
    """Closes underlying waiter.
678

679
    """
680
    if self._filewaiter:
681
      self._filewaiter.Close()
682

    
683

    
684
class _WaitForJobChangesHelper(object):
685
  """Helper class using inotify to wait for changes in a job file.
686

687
  This class takes a previous job status and serial, and alerts the client when
688
  the current job status has changed.
689

690
  """
691
  @staticmethod
692
  def _CheckForChanges(job_load_fn, check_fn):
693
    job = job_load_fn()
694
    if not job:
695
      raise errors.JobLost()
696

    
697
    result = check_fn(job)
698
    if result is None:
699
      raise utils.RetryAgain()
700

    
701
    return result
702

    
703
  def __call__(self, filename, job_load_fn,
704
               fields, prev_job_info, prev_log_serial, timeout):
705
    """Waits for changes on a job.
706

707
    @type filename: string
708
    @param filename: File on which to wait for changes
709
    @type job_load_fn: callable
710
    @param job_load_fn: Function to load job
711
    @type fields: list of strings
712
    @param fields: Which fields to check for changes
713
    @type prev_job_info: list or None
714
    @param prev_job_info: Last job information returned
715
    @type prev_log_serial: int
716
    @param prev_log_serial: Last job message serial number
717
    @type timeout: float
718
    @param timeout: maximum time to wait in seconds
719

720
    """
721
    try:
722
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
723
      waiter = _JobChangesWaiter(filename)
724
      try:
725
        return utils.Retry(compat.partial(self._CheckForChanges,
726
                                          job_load_fn, check_fn),
727
                           utils.RETRY_REMAINING_TIME, timeout,
728
                           wait_fn=waiter.Wait)
729
      finally:
730
        waiter.Close()
731
    except (errors.InotifyError, errors.JobLost):
732
      return None
733
    except utils.RetryTimeout:
734
      return constants.JOB_NOTCHANGED
735

    
736

    
737
def _EncodeOpError(err):
738
  """Encodes an error which occurred while processing an opcode.
739

740
  """
741
  if isinstance(err, errors.GenericError):
742
    to_encode = err
743
  else:
744
    to_encode = errors.OpExecError(str(err))
745

    
746
  return errors.EncodeException(to_encode)
747

    
748

    
749
class _TimeoutStrategyWrapper:
750
  def __init__(self, fn):
751
    """Initializes this class.
752

753
    """
754
    self._fn = fn
755
    self._next = None
756

    
757
  def _Advance(self):
758
    """Gets the next timeout if necessary.
759

760
    """
761
    if self._next is None:
762
      self._next = self._fn()
763

    
764
  def Peek(self):
765
    """Returns the next timeout.
766

767
    """
768
    self._Advance()
769
    return self._next
770

    
771
  def Next(self):
772
    """Returns the current timeout and advances the internal state.
773

774
    """
775
    self._Advance()
776
    result = self._next
777
    self._next = None
778
    return result
779

    
780

    
781
class _OpExecContext:
782
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
783
    """Initializes this class.
784

785
    """
786
    self.op = op
787
    self.index = index
788
    self.log_prefix = log_prefix
789
    self.summary = op.input.Summary()
790

    
791
    self._timeout_strategy_factory = timeout_strategy_factory
792
    self._ResetTimeoutStrategy()
793

    
794
  def _ResetTimeoutStrategy(self):
795
    """Creates a new timeout strategy.
796

797
    """
798
    self._timeout_strategy = \
799
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
800

    
801
  def CheckPriorityIncrease(self):
802
    """Checks whether priority can and should be increased.
803

804
    Called when locks couldn't be acquired.
805

806
    """
807
    op = self.op
808

    
809
    # Exhausted all retries and next round should not use blocking acquire
810
    # for locks?
811
    if (self._timeout_strategy.Peek() is None and
812
        op.priority > constants.OP_PRIO_HIGHEST):
813
      logging.debug("Increasing priority")
814
      op.priority -= 1
815
      self._ResetTimeoutStrategy()
816
      return True
817

    
818
    return False
819

    
820
  def GetNextLockTimeout(self):
821
    """Returns the next lock acquire timeout.
822

823
    """
824
    return self._timeout_strategy.Next()
825

    
826

    
827
class _JobProcessor(object):
828
  def __init__(self, queue, opexec_fn, job,
829
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
830
    """Initializes this class.
831

832
    """
833
    self.queue = queue
834
    self.opexec_fn = opexec_fn
835
    self.job = job
836
    self._timeout_strategy_factory = _timeout_strategy_factory
837

    
838
  @staticmethod
839
  def _FindNextOpcode(job, timeout_strategy_factory):
840
    """Locates the next opcode to run.
841

842
    @type job: L{_QueuedJob}
843
    @param job: Job object
844
    @param timeout_strategy_factory: Callable to create new timeout strategy
845

846
    """
847
    # Create some sort of a cache to speed up locating next opcode for future
848
    # lookups
849
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
850
    # pending and one for processed ops.
851
    if job.ops_iter is None:
852
      job.ops_iter = enumerate(job.ops)
853

    
854
    # Find next opcode to run
855
    while True:
856
      try:
857
        (idx, op) = job.ops_iter.next()
858
      except StopIteration:
859
        raise errors.ProgrammerError("Called for a finished job")
860

    
861
      if op.status == constants.OP_STATUS_RUNNING:
862
        # Found an opcode already marked as running
863
        raise errors.ProgrammerError("Called for job marked as running")
864

    
865
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
866
                             timeout_strategy_factory)
867

    
868
      if op.status == constants.OP_STATUS_CANCELED:
869
        # Cancelled jobs are handled by the caller
870
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
871
                              for i in job.ops[idx:])
872

    
873
      elif op.status in constants.OPS_FINALIZED:
874
        # This is a job that was partially completed before master daemon
875
        # shutdown, so it can be expected that some opcodes are already
876
        # completed successfully (if any did error out, then the whole job
877
        # should have been aborted and not resubmitted for processing).
878
        logging.info("%s: opcode %s already processed, skipping",
879
                     opctx.log_prefix, opctx.summary)
880
        continue
881

    
882
      return opctx
883

    
884
  @staticmethod
885
  def _MarkWaitlock(job, op):
886
    """Marks an opcode as waiting for locks.
887

888
    The job's start timestamp is also set if necessary.
889

890
    @type job: L{_QueuedJob}
891
    @param job: Job object
892
    @type job: L{_QueuedOpCode}
893
    @param job: Opcode object
894

895
    """
896
    assert op in job.ops
897

    
898
    op.status = constants.OP_STATUS_WAITLOCK
899
    op.result = None
900
    op.start_timestamp = TimeStampNow()
901

    
902
    if job.start_timestamp is None:
903
      job.start_timestamp = op.start_timestamp
904

    
905
  def _ExecOpCodeUnlocked(self, opctx):
906
    """Processes one opcode and returns the result.
907

908
    """
909
    op = opctx.op
910

    
911
    assert op.status == constants.OP_STATUS_WAITLOCK
912

    
913
    timeout = opctx.GetNextLockTimeout()
914

    
915
    try:
916
      # Make sure not to hold queue lock while calling ExecOpCode
917
      result = self.opexec_fn(op.input,
918
                              _OpExecCallbacks(self.queue, self.job, op),
919
                              timeout=timeout)
920
    except mcpu.LockAcquireTimeout:
921
      assert timeout is not None, "Received timeout for blocking acquire"
922
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
923
      assert op.status == constants.OP_STATUS_WAITLOCK
924
      return (constants.OP_STATUS_QUEUED, None)
925
    except CancelJob:
926
      logging.exception("%s: Canceling job", opctx.log_prefix)
927
      assert op.status == constants.OP_STATUS_CANCELING
928
      return (constants.OP_STATUS_CANCELING, None)
929
    except Exception, err: # pylint: disable-msg=W0703
930
      logging.exception("%s: Caught exception in %s",
931
                        opctx.log_prefix, opctx.summary)
932
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
933
    else:
934
      logging.debug("%s: %s successful",
935
                    opctx.log_prefix, opctx.summary)
936
      return (constants.OP_STATUS_SUCCESS, result)
937

    
938
  def __call__(self, _nextop_fn=None):
939
    """Continues execution of a job.
940

941
    @param _nextop_fn: Callback function for tests
942
    @rtype: bool
943
    @return: True if job is finished, False if processor needs to be called
944
             again
945

946
    """
947
    queue = self.queue
948
    job = self.job
949

    
950
    logging.debug("Processing job %s", job.id)
951

    
952
    queue.acquire(shared=1)
953
    try:
954
      opcount = len(job.ops)
955

    
956
      # Is a previous opcode still pending?
957
      if job.cur_opctx:
958
        opctx = job.cur_opctx
959
      else:
960
        if __debug__ and _nextop_fn:
961
          _nextop_fn()
962
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
963

    
964
      op = opctx.op
965

    
966
      # Consistency check
967
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
968
                                     constants.OP_STATUS_CANCELED)
969
                        for i in job.ops[opctx.index:])
970

    
971
      assert op.status in (constants.OP_STATUS_QUEUED,
972
                           constants.OP_STATUS_WAITLOCK,
973
                           constants.OP_STATUS_CANCELED)
974

    
975
      assert (op.priority <= constants.OP_PRIO_LOWEST and
976
              op.priority >= constants.OP_PRIO_HIGHEST)
977

    
978
      if op.status != constants.OP_STATUS_CANCELED:
979
        # Prepare to start opcode
980
        self._MarkWaitlock(job, op)
981

    
982
        assert op.status == constants.OP_STATUS_WAITLOCK
983
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
984

    
985
        # Write to disk
986
        queue.UpdateJobUnlocked(job)
987

    
988
        logging.info("%s: opcode %s waiting for locks",
989
                     opctx.log_prefix, opctx.summary)
990

    
991
        queue.release()
992
        try:
993
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
994
        finally:
995
          queue.acquire(shared=1)
996

    
997
        op.status = op_status
998
        op.result = op_result
999

    
1000
        if op.status == constants.OP_STATUS_QUEUED:
1001
          # Couldn't get locks in time
1002
          assert not op.end_timestamp
1003
        else:
1004
          # Finalize opcode
1005
          op.end_timestamp = TimeStampNow()
1006

    
1007
          if op.status == constants.OP_STATUS_CANCELING:
1008
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1009
                                  for i in job.ops[opctx.index:])
1010
          else:
1011
            assert op.status in constants.OPS_FINALIZED
1012

    
1013
      if op.status == constants.OP_STATUS_QUEUED:
1014
        finalize = False
1015

    
1016
        opctx.CheckPriorityIncrease()
1017

    
1018
        # Keep around for another round
1019
        job.cur_opctx = opctx
1020

    
1021
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1022
                op.priority >= constants.OP_PRIO_HIGHEST)
1023

    
1024
        # In no case must the status be finalized here
1025
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1026

    
1027
        queue.UpdateJobUnlocked(job)
1028

    
1029
      else:
1030
        # Ensure all opcodes so far have been successful
1031
        assert (opctx.index == 0 or
1032
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1033
                           for i in job.ops[:opctx.index]))
1034

    
1035
        # Reset context
1036
        job.cur_opctx = None
1037

    
1038
        if op.status == constants.OP_STATUS_SUCCESS:
1039
          finalize = False
1040

    
1041
        elif op.status == constants.OP_STATUS_ERROR:
1042
          # Ensure failed opcode has an exception as its result
1043
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1044

    
1045
          to_encode = errors.OpExecError("Preceding opcode failed")
1046
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1047
                                _EncodeOpError(to_encode))
1048
          finalize = True
1049

    
1050
          # Consistency check
1051
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1052
                            errors.GetEncodedError(i.result)
1053
                            for i in job.ops[opctx.index:])
1054

    
1055
        elif op.status == constants.OP_STATUS_CANCELING:
1056
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1057
                                "Job canceled by request")
1058
          finalize = True
1059

    
1060
        elif op.status == constants.OP_STATUS_CANCELED:
1061
          finalize = True
1062

    
1063
        else:
1064
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1065

    
1066
        # Finalizing or last opcode?
1067
        if finalize or opctx.index == (opcount - 1):
1068
          # All opcodes have been run, finalize job
1069
          job.end_timestamp = TimeStampNow()
1070

    
1071
        # Write to disk. If the job status is final, this is the final write
1072
        # allowed. Once the file has been written, it can be archived anytime.
1073
        queue.UpdateJobUnlocked(job)
1074

    
1075
        if finalize or opctx.index == (opcount - 1):
1076
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1077
          return True
1078

    
1079
      return False
1080
    finally:
1081
      queue.release()
1082

    
1083

    
1084
class _JobQueueWorker(workerpool.BaseWorker):
1085
  """The actual job workers.
1086

1087
  """
1088
  def RunTask(self, job): # pylint: disable-msg=W0221
1089
    """Job executor.
1090

1091
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1092
    L{_QueuedOpCode} classes.
1093

1094
    @type job: L{_QueuedJob}
1095
    @param job: the job to be processed
1096

1097
    """
1098
    queue = job.queue
1099
    assert queue == self.pool.queue
1100

    
1101
    self.SetTaskName("Job%s" % job.id)
1102

    
1103
    proc = mcpu.Processor(queue.context, job.id)
1104

    
1105
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1106
      # Schedule again
1107
      raise workerpool.DeferTask(priority=job.CalcPriority())
1108

    
1109

    
1110
class _JobQueueWorkerPool(workerpool.WorkerPool):
1111
  """Simple class implementing a job-processing workerpool.
1112

1113
  """
1114
  def __init__(self, queue):
1115
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1116
                                              JOBQUEUE_THREADS,
1117
                                              _JobQueueWorker)
1118
    self.queue = queue
1119

    
1120

    
1121
def _RequireOpenQueue(fn):
1122
  """Decorator for "public" functions.
1123

1124
  This function should be used for all 'public' functions. That is,
1125
  functions usually called from other classes. Note that this should
1126
  be applied only to methods (not plain functions), since it expects
1127
  that the decorated function is called with a first argument that has
1128
  a '_queue_filelock' argument.
1129

1130
  @warning: Use this decorator only after locking.ssynchronized
1131

1132
  Example::
1133
    @locking.ssynchronized(_LOCK)
1134
    @_RequireOpenQueue
1135
    def Example(self):
1136
      pass
1137

1138
  """
1139
  def wrapper(self, *args, **kwargs):
1140
    # pylint: disable-msg=W0212
1141
    assert self._queue_filelock is not None, "Queue should be open"
1142
    return fn(self, *args, **kwargs)
1143
  return wrapper
1144

    
1145

    
1146
class JobQueue(object):
1147
  """Queue used to manage the jobs.
1148

1149
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1150

1151
  """
1152
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1153

    
1154
  def __init__(self, context):
1155
    """Constructor for JobQueue.
1156

1157
    The constructor will initialize the job queue object and then
1158
    start loading the current jobs from disk, either for starting them
1159
    (if they were queue) or for aborting them (if they were already
1160
    running).
1161

1162
    @type context: GanetiContext
1163
    @param context: the context object for access to the configuration
1164
        data and other ganeti objects
1165

1166
    """
1167
    self.context = context
1168
    self._memcache = weakref.WeakValueDictionary()
1169
    self._my_hostname = netutils.Hostname.GetSysName()
1170

    
1171
    # The Big JobQueue lock. If a code block or method acquires it in shared
1172
    # mode safe it must guarantee concurrency with all the code acquiring it in
1173
    # shared mode, including itself. In order not to acquire it at all
1174
    # concurrency must be guaranteed with all code acquiring it in shared mode
1175
    # and all code acquiring it exclusively.
1176
    self._lock = locking.SharedLock("JobQueue")
1177

    
1178
    self.acquire = self._lock.acquire
1179
    self.release = self._lock.release
1180

    
1181
    # Initialize the queue, and acquire the filelock.
1182
    # This ensures no other process is working on the job queue.
1183
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1184

    
1185
    # Read serial file
1186
    self._last_serial = jstore.ReadSerial()
1187
    assert self._last_serial is not None, ("Serial file was modified between"
1188
                                           " check in jstore and here")
1189

    
1190
    # Get initial list of nodes
1191
    self._nodes = dict((n.name, n.primary_ip)
1192
                       for n in self.context.cfg.GetAllNodesInfo().values()
1193
                       if n.master_candidate)
1194

    
1195
    # Remove master node
1196
    self._nodes.pop(self._my_hostname, None)
1197

    
1198
    # TODO: Check consistency across nodes
1199

    
1200
    self._queue_size = 0
1201
    self._UpdateQueueSizeUnlocked()
1202
    self._drained = self._IsQueueMarkedDrain()
1203

    
1204
    # Setup worker pool
1205
    self._wpool = _JobQueueWorkerPool(self)
1206
    try:
1207
      self._InspectQueue()
1208
    except:
1209
      self._wpool.TerminateWorkers()
1210
      raise
1211

    
1212
  @locking.ssynchronized(_LOCK)
1213
  @_RequireOpenQueue
1214
  def _InspectQueue(self):
1215
    """Loads the whole job queue and resumes unfinished jobs.
1216

1217
    This function needs the lock here because WorkerPool.AddTask() may start a
1218
    job while we're still doing our work.
1219

1220
    """
1221
    logging.info("Inspecting job queue")
1222

    
1223
    restartjobs = []
1224

    
1225
    all_job_ids = self._GetJobIDsUnlocked()
1226
    jobs_count = len(all_job_ids)
1227
    lastinfo = time.time()
1228
    for idx, job_id in enumerate(all_job_ids):
1229
      # Give an update every 1000 jobs or 10 seconds
1230
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1231
          idx == (jobs_count - 1)):
1232
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1233
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1234
        lastinfo = time.time()
1235

    
1236
      job = self._LoadJobUnlocked(job_id)
1237

    
1238
      # a failure in loading the job can cause 'None' to be returned
1239
      if job is None:
1240
        continue
1241

    
1242
      status = job.CalcStatus()
1243

    
1244
      if status in (constants.JOB_STATUS_QUEUED, ):
1245
        restartjobs.append(job)
1246

    
1247
      elif status in (constants.JOB_STATUS_RUNNING,
1248
                      constants.JOB_STATUS_WAITLOCK,
1249
                      constants.JOB_STATUS_CANCELING):
1250
        logging.warning("Unfinished job %s found: %s", job.id, job)
1251
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1252
                              "Unclean master daemon shutdown")
1253
        self.UpdateJobUnlocked(job)
1254

    
1255
    if restartjobs:
1256
      logging.info("Restarting %s jobs", len(restartjobs))
1257
      self._EnqueueJobs(restartjobs)
1258

    
1259
    logging.info("Job queue inspection finished")
1260

    
1261
  @locking.ssynchronized(_LOCK)
1262
  @_RequireOpenQueue
1263
  def AddNode(self, node):
1264
    """Register a new node with the queue.
1265

1266
    @type node: L{objects.Node}
1267
    @param node: the node object to be added
1268

1269
    """
1270
    node_name = node.name
1271
    assert node_name != self._my_hostname
1272

    
1273
    # Clean queue directory on added node
1274
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1275
    msg = result.fail_msg
1276
    if msg:
1277
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1278
                      node_name, msg)
1279

    
1280
    if not node.master_candidate:
1281
      # remove if existing, ignoring errors
1282
      self._nodes.pop(node_name, None)
1283
      # and skip the replication of the job ids
1284
      return
1285

    
1286
    # Upload the whole queue excluding archived jobs
1287
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1288

    
1289
    # Upload current serial file
1290
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1291

    
1292
    for file_name in files:
1293
      # Read file content
1294
      content = utils.ReadFile(file_name)
1295

    
1296
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1297
                                                  [node.primary_ip],
1298
                                                  file_name, content)
1299
      msg = result[node_name].fail_msg
1300
      if msg:
1301
        logging.error("Failed to upload file %s to node %s: %s",
1302
                      file_name, node_name, msg)
1303

    
1304
    self._nodes[node_name] = node.primary_ip
1305

    
1306
  @locking.ssynchronized(_LOCK)
1307
  @_RequireOpenQueue
1308
  def RemoveNode(self, node_name):
1309
    """Callback called when removing nodes from the cluster.
1310

1311
    @type node_name: str
1312
    @param node_name: the name of the node to remove
1313

1314
    """
1315
    self._nodes.pop(node_name, None)
1316

    
1317
  @staticmethod
1318
  def _CheckRpcResult(result, nodes, failmsg):
1319
    """Verifies the status of an RPC call.
1320

1321
    Since we aim to keep consistency should this node (the current
1322
    master) fail, we will log errors if our rpc fail, and especially
1323
    log the case when more than half of the nodes fails.
1324

1325
    @param result: the data as returned from the rpc call
1326
    @type nodes: list
1327
    @param nodes: the list of nodes we made the call to
1328
    @type failmsg: str
1329
    @param failmsg: the identifier to be used for logging
1330

1331
    """
1332
    failed = []
1333
    success = []
1334

    
1335
    for node in nodes:
1336
      msg = result[node].fail_msg
1337
      if msg:
1338
        failed.append(node)
1339
        logging.error("RPC call %s (%s) failed on node %s: %s",
1340
                      result[node].call, failmsg, node, msg)
1341
      else:
1342
        success.append(node)
1343

    
1344
    # +1 for the master node
1345
    if (len(success) + 1) < len(failed):
1346
      # TODO: Handle failing nodes
1347
      logging.error("More than half of the nodes failed")
1348

    
1349
  def _GetNodeIp(self):
1350
    """Helper for returning the node name/ip list.
1351

1352
    @rtype: (list, list)
1353
    @return: a tuple of two lists, the first one with the node
1354
        names and the second one with the node addresses
1355

1356
    """
1357
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1358
    name_list = self._nodes.keys()
1359
    addr_list = [self._nodes[name] for name in name_list]
1360
    return name_list, addr_list
1361

    
1362
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1363
    """Writes a file locally and then replicates it to all nodes.
1364

1365
    This function will replace the contents of a file on the local
1366
    node and then replicate it to all the other nodes we have.
1367

1368
    @type file_name: str
1369
    @param file_name: the path of the file to be replicated
1370
    @type data: str
1371
    @param data: the new contents of the file
1372
    @type replicate: boolean
1373
    @param replicate: whether to spread the changes to the remote nodes
1374

1375
    """
1376
    getents = runtime.GetEnts()
1377
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1378
                    gid=getents.masterd_gid)
1379

    
1380
    if replicate:
1381
      names, addrs = self._GetNodeIp()
1382
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1383
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1384

    
1385
  def _RenameFilesUnlocked(self, rename):
1386
    """Renames a file locally and then replicate the change.
1387

1388
    This function will rename a file in the local queue directory
1389
    and then replicate this rename to all the other nodes we have.
1390

1391
    @type rename: list of (old, new)
1392
    @param rename: List containing tuples mapping old to new names
1393

1394
    """
1395
    # Rename them locally
1396
    for old, new in rename:
1397
      utils.RenameFile(old, new, mkdir=True)
1398

    
1399
    # ... and on all nodes
1400
    names, addrs = self._GetNodeIp()
1401
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1402
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1403

    
1404
  @staticmethod
1405
  def _FormatJobID(job_id):
1406
    """Convert a job ID to string format.
1407

1408
    Currently this just does C{str(job_id)} after performing some
1409
    checks, but if we want to change the job id format this will
1410
    abstract this change.
1411

1412
    @type job_id: int or long
1413
    @param job_id: the numeric job id
1414
    @rtype: str
1415
    @return: the formatted job id
1416

1417
    """
1418
    if not isinstance(job_id, (int, long)):
1419
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1420
    if job_id < 0:
1421
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1422

    
1423
    return str(job_id)
1424

    
1425
  @classmethod
1426
  def _GetArchiveDirectory(cls, job_id):
1427
    """Returns the archive directory for a job.
1428

1429
    @type job_id: str
1430
    @param job_id: Job identifier
1431
    @rtype: str
1432
    @return: Directory name
1433

1434
    """
1435
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1436

    
1437
  def _NewSerialsUnlocked(self, count):
1438
    """Generates a new job identifier.
1439

1440
    Job identifiers are unique during the lifetime of a cluster.
1441

1442
    @type count: integer
1443
    @param count: how many serials to return
1444
    @rtype: str
1445
    @return: a string representing the job identifier.
1446

1447
    """
1448
    assert count > 0
1449
    # New number
1450
    serial = self._last_serial + count
1451

    
1452
    # Write to file
1453
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1454
                             "%s\n" % serial, True)
1455

    
1456
    result = [self._FormatJobID(v)
1457
              for v in range(self._last_serial, serial + 1)]
1458
    # Keep it only if we were able to write the file
1459
    self._last_serial = serial
1460

    
1461
    return result
1462

    
1463
  @staticmethod
1464
  def _GetJobPath(job_id):
1465
    """Returns the job file for a given job id.
1466

1467
    @type job_id: str
1468
    @param job_id: the job identifier
1469
    @rtype: str
1470
    @return: the path to the job file
1471

1472
    """
1473
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1474

    
1475
  @classmethod
1476
  def _GetArchivedJobPath(cls, job_id):
1477
    """Returns the archived job file for a give job id.
1478

1479
    @type job_id: str
1480
    @param job_id: the job identifier
1481
    @rtype: str
1482
    @return: the path to the archived job file
1483

1484
    """
1485
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1486
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1487

    
1488
  def _GetJobIDsUnlocked(self, sort=True):
1489
    """Return all known job IDs.
1490

1491
    The method only looks at disk because it's a requirement that all
1492
    jobs are present on disk (so in the _memcache we don't have any
1493
    extra IDs).
1494

1495
    @type sort: boolean
1496
    @param sort: perform sorting on the returned job ids
1497
    @rtype: list
1498
    @return: the list of job IDs
1499

1500
    """
1501
    jlist = []
1502
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1503
      m = self._RE_JOB_FILE.match(filename)
1504
      if m:
1505
        jlist.append(m.group(1))
1506
    if sort:
1507
      jlist = utils.NiceSort(jlist)
1508
    return jlist
1509

    
1510
  def _LoadJobUnlocked(self, job_id):
1511
    """Loads a job from the disk or memory.
1512

1513
    Given a job id, this will return the cached job object if
1514
    existing, or try to load the job from the disk. If loading from
1515
    disk, it will also add the job to the cache.
1516

1517
    @param job_id: the job id
1518
    @rtype: L{_QueuedJob} or None
1519
    @return: either None or the job object
1520

1521
    """
1522
    job = self._memcache.get(job_id, None)
1523
    if job:
1524
      logging.debug("Found job %s in memcache", job_id)
1525
      return job
1526

    
1527
    try:
1528
      job = self._LoadJobFromDisk(job_id)
1529
      if job is None:
1530
        return job
1531
    except errors.JobFileCorrupted:
1532
      old_path = self._GetJobPath(job_id)
1533
      new_path = self._GetArchivedJobPath(job_id)
1534
      if old_path == new_path:
1535
        # job already archived (future case)
1536
        logging.exception("Can't parse job %s", job_id)
1537
      else:
1538
        # non-archived case
1539
        logging.exception("Can't parse job %s, will archive.", job_id)
1540
        self._RenameFilesUnlocked([(old_path, new_path)])
1541
      return None
1542

    
1543
    self._memcache[job_id] = job
1544
    logging.debug("Added job %s to the cache", job_id)
1545
    return job
1546

    
1547
  def _LoadJobFromDisk(self, job_id):
1548
    """Load the given job file from disk.
1549

1550
    Given a job file, read, load and restore it in a _QueuedJob format.
1551

1552
    @type job_id: string
1553
    @param job_id: job identifier
1554
    @rtype: L{_QueuedJob} or None
1555
    @return: either None or the job object
1556

1557
    """
1558
    filepath = self._GetJobPath(job_id)
1559
    logging.debug("Loading job from %s", filepath)
1560
    try:
1561
      raw_data = utils.ReadFile(filepath)
1562
    except EnvironmentError, err:
1563
      if err.errno in (errno.ENOENT, ):
1564
        return None
1565
      raise
1566

    
1567
    try:
1568
      data = serializer.LoadJson(raw_data)
1569
      job = _QueuedJob.Restore(self, data)
1570
    except Exception, err: # pylint: disable-msg=W0703
1571
      raise errors.JobFileCorrupted(err)
1572

    
1573
    return job
1574

    
1575
  def SafeLoadJobFromDisk(self, job_id):
1576
    """Load the given job file from disk.
1577

1578
    Given a job file, read, load and restore it in a _QueuedJob format.
1579
    In case of error reading the job, it gets returned as None, and the
1580
    exception is logged.
1581

1582
    @type job_id: string
1583
    @param job_id: job identifier
1584
    @rtype: L{_QueuedJob} or None
1585
    @return: either None or the job object
1586

1587
    """
1588
    try:
1589
      return self._LoadJobFromDisk(job_id)
1590
    except (errors.JobFileCorrupted, EnvironmentError):
1591
      logging.exception("Can't load/parse job %s", job_id)
1592
      return None
1593

    
1594
  @staticmethod
1595
  def _IsQueueMarkedDrain():
1596
    """Check if the queue is marked from drain.
1597

1598
    This currently uses the queue drain file, which makes it a
1599
    per-node flag. In the future this can be moved to the config file.
1600

1601
    @rtype: boolean
1602
    @return: True of the job queue is marked for draining
1603

1604
    """
1605
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1606

    
1607
  def _UpdateQueueSizeUnlocked(self):
1608
    """Update the queue size.
1609

1610
    """
1611
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1612

    
1613
  @locking.ssynchronized(_LOCK)
1614
  @_RequireOpenQueue
1615
  def SetDrainFlag(self, drain_flag):
1616
    """Sets the drain flag for the queue.
1617

1618
    @type drain_flag: boolean
1619
    @param drain_flag: Whether to set or unset the drain flag
1620

1621
    """
1622
    getents = runtime.GetEnts()
1623

    
1624
    if drain_flag:
1625
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1626
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1627
    else:
1628
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1629

    
1630
    self._drained = drain_flag
1631

    
1632
    return True
1633

    
1634
  @_RequireOpenQueue
1635
  def _SubmitJobUnlocked(self, job_id, ops):
1636
    """Create and store a new job.
1637

1638
    This enters the job into our job queue and also puts it on the new
1639
    queue, in order for it to be picked up by the queue processors.
1640

1641
    @type job_id: job ID
1642
    @param job_id: the job ID for the new job
1643
    @type ops: list
1644
    @param ops: The list of OpCodes that will become the new job.
1645
    @rtype: L{_QueuedJob}
1646
    @return: the job object to be queued
1647
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1648
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1649
    @raise errors.GenericError: If an opcode is not valid
1650

1651
    """
1652
    # Ok when sharing the big job queue lock, as the drain file is created when
1653
    # the lock is exclusive.
1654
    if self._drained:
1655
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1656

    
1657
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1658
      raise errors.JobQueueFull()
1659

    
1660
    job = _QueuedJob(self, job_id, ops)
1661

    
1662
    # Check priority
1663
    for idx, op in enumerate(job.ops):
1664
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1665
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1666
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1667
                                  " are %s" % (idx, op.priority, allowed))
1668

    
1669
    # Write to disk
1670
    self.UpdateJobUnlocked(job)
1671

    
1672
    self._queue_size += 1
1673

    
1674
    logging.debug("Adding new job %s to the cache", job_id)
1675
    self._memcache[job_id] = job
1676

    
1677
    return job
1678

    
1679
  @locking.ssynchronized(_LOCK)
1680
  @_RequireOpenQueue
1681
  def SubmitJob(self, ops):
1682
    """Create and store a new job.
1683

1684
    @see: L{_SubmitJobUnlocked}
1685

1686
    """
1687
    job_id = self._NewSerialsUnlocked(1)[0]
1688
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1689
    return job_id
1690

    
1691
  @locking.ssynchronized(_LOCK)
1692
  @_RequireOpenQueue
1693
  def SubmitManyJobs(self, jobs):
1694
    """Create and store multiple jobs.
1695

1696
    @see: L{_SubmitJobUnlocked}
1697

1698
    """
1699
    results = []
1700
    added_jobs = []
1701
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1702
    for job_id, ops in zip(all_job_ids, jobs):
1703
      try:
1704
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1705
        status = True
1706
        data = job_id
1707
      except errors.GenericError, err:
1708
        data = str(err)
1709
        status = False
1710
      results.append((status, data))
1711

    
1712
    self._EnqueueJobs(added_jobs)
1713

    
1714
    return results
1715

    
1716
  def _EnqueueJobs(self, jobs):
1717
    """Helper function to add jobs to worker pool's queue.
1718

1719
    @type jobs: list
1720
    @param jobs: List of all jobs
1721

1722
    """
1723
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1724
                             priority=[job.CalcPriority() for job in jobs])
1725

    
1726
  @_RequireOpenQueue
1727
  def UpdateJobUnlocked(self, job, replicate=True):
1728
    """Update a job's on disk storage.
1729

1730
    After a job has been modified, this function needs to be called in
1731
    order to write the changes to disk and replicate them to the other
1732
    nodes.
1733

1734
    @type job: L{_QueuedJob}
1735
    @param job: the changed job
1736
    @type replicate: boolean
1737
    @param replicate: whether to replicate the change to remote nodes
1738

1739
    """
1740
    filename = self._GetJobPath(job.id)
1741
    data = serializer.DumpJson(job.Serialize(), indent=False)
1742
    logging.debug("Writing job %s to %s", job.id, filename)
1743
    self._UpdateJobQueueFile(filename, data, replicate)
1744

    
1745
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1746
                        timeout):
1747
    """Waits for changes in a job.
1748

1749
    @type job_id: string
1750
    @param job_id: Job identifier
1751
    @type fields: list of strings
1752
    @param fields: Which fields to check for changes
1753
    @type prev_job_info: list or None
1754
    @param prev_job_info: Last job information returned
1755
    @type prev_log_serial: int
1756
    @param prev_log_serial: Last job message serial number
1757
    @type timeout: float
1758
    @param timeout: maximum time to wait in seconds
1759
    @rtype: tuple (job info, log entries)
1760
    @return: a tuple of the job information as required via
1761
        the fields parameter, and the log entries as a list
1762

1763
        if the job has not changed and the timeout has expired,
1764
        we instead return a special value,
1765
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1766
        as such by the clients
1767

1768
    """
1769
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1770

    
1771
    helper = _WaitForJobChangesHelper()
1772

    
1773
    return helper(self._GetJobPath(job_id), load_fn,
1774
                  fields, prev_job_info, prev_log_serial, timeout)
1775

    
1776
  @locking.ssynchronized(_LOCK)
1777
  @_RequireOpenQueue
1778
  def CancelJob(self, job_id):
1779
    """Cancels a job.
1780

1781
    This will only succeed if the job has not started yet.
1782

1783
    @type job_id: string
1784
    @param job_id: job ID of job to be cancelled.
1785

1786
    """
1787
    logging.info("Cancelling job %s", job_id)
1788

    
1789
    job = self._LoadJobUnlocked(job_id)
1790
    if not job:
1791
      logging.debug("Job %s not found", job_id)
1792
      return (False, "Job %s not found" % job_id)
1793

    
1794
    (success, msg) = job.Cancel()
1795

    
1796
    if success:
1797
      self.UpdateJobUnlocked(job)
1798

    
1799
    return (success, msg)
1800

    
1801
  @_RequireOpenQueue
1802
  def _ArchiveJobsUnlocked(self, jobs):
1803
    """Archives jobs.
1804

1805
    @type jobs: list of L{_QueuedJob}
1806
    @param jobs: Job objects
1807
    @rtype: int
1808
    @return: Number of archived jobs
1809

1810
    """
1811
    archive_jobs = []
1812
    rename_files = []
1813
    for job in jobs:
1814
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1815
        logging.debug("Job %s is not yet done", job.id)
1816
        continue
1817

    
1818
      archive_jobs.append(job)
1819

    
1820
      old = self._GetJobPath(job.id)
1821
      new = self._GetArchivedJobPath(job.id)
1822
      rename_files.append((old, new))
1823

    
1824
    # TODO: What if 1..n files fail to rename?
1825
    self._RenameFilesUnlocked(rename_files)
1826

    
1827
    logging.debug("Successfully archived job(s) %s",
1828
                  utils.CommaJoin(job.id for job in archive_jobs))
1829

    
1830
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1831
    # the files, we update the cached queue size from the filesystem. When we
1832
    # get around to fix the TODO: above, we can use the number of actually
1833
    # archived jobs to fix this.
1834
    self._UpdateQueueSizeUnlocked()
1835
    return len(archive_jobs)
1836

    
1837
  @locking.ssynchronized(_LOCK)
1838
  @_RequireOpenQueue
1839
  def ArchiveJob(self, job_id):
1840
    """Archives a job.
1841

1842
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1843

1844
    @type job_id: string
1845
    @param job_id: Job ID of job to be archived.
1846
    @rtype: bool
1847
    @return: Whether job was archived
1848

1849
    """
1850
    logging.info("Archiving job %s", job_id)
1851

    
1852
    job = self._LoadJobUnlocked(job_id)
1853
    if not job:
1854
      logging.debug("Job %s not found", job_id)
1855
      return False
1856

    
1857
    return self._ArchiveJobsUnlocked([job]) == 1
1858

    
1859
  @locking.ssynchronized(_LOCK)
1860
  @_RequireOpenQueue
1861
  def AutoArchiveJobs(self, age, timeout):
1862
    """Archives all jobs based on age.
1863

1864
    The method will archive all jobs which are older than the age
1865
    parameter. For jobs that don't have an end timestamp, the start
1866
    timestamp will be considered. The special '-1' age will cause
1867
    archival of all jobs (that are not running or queued).
1868

1869
    @type age: int
1870
    @param age: the minimum age in seconds
1871

1872
    """
1873
    logging.info("Archiving jobs with age more than %s seconds", age)
1874

    
1875
    now = time.time()
1876
    end_time = now + timeout
1877
    archived_count = 0
1878
    last_touched = 0
1879

    
1880
    all_job_ids = self._GetJobIDsUnlocked()
1881
    pending = []
1882
    for idx, job_id in enumerate(all_job_ids):
1883
      last_touched = idx + 1
1884

    
1885
      # Not optimal because jobs could be pending
1886
      # TODO: Measure average duration for job archival and take number of
1887
      # pending jobs into account.
1888
      if time.time() > end_time:
1889
        break
1890

    
1891
      # Returns None if the job failed to load
1892
      job = self._LoadJobUnlocked(job_id)
1893
      if job:
1894
        if job.end_timestamp is None:
1895
          if job.start_timestamp is None:
1896
            job_age = job.received_timestamp
1897
          else:
1898
            job_age = job.start_timestamp
1899
        else:
1900
          job_age = job.end_timestamp
1901

    
1902
        if age == -1 or now - job_age[0] > age:
1903
          pending.append(job)
1904

    
1905
          # Archive 10 jobs at a time
1906
          if len(pending) >= 10:
1907
            archived_count += self._ArchiveJobsUnlocked(pending)
1908
            pending = []
1909

    
1910
    if pending:
1911
      archived_count += self._ArchiveJobsUnlocked(pending)
1912

    
1913
    return (archived_count, len(all_job_ids) - last_touched)
1914

    
1915
  def QueryJobs(self, job_ids, fields):
1916
    """Returns a list of jobs in queue.
1917

1918
    @type job_ids: list
1919
    @param job_ids: sequence of job identifiers or None for all
1920
    @type fields: list
1921
    @param fields: names of fields to return
1922
    @rtype: list
1923
    @return: list one element per job, each element being list with
1924
        the requested fields
1925

1926
    """
1927
    jobs = []
1928
    list_all = False
1929
    if not job_ids:
1930
      # Since files are added to/removed from the queue atomically, there's no
1931
      # risk of getting the job ids in an inconsistent state.
1932
      job_ids = self._GetJobIDsUnlocked()
1933
      list_all = True
1934

    
1935
    for job_id in job_ids:
1936
      job = self.SafeLoadJobFromDisk(job_id)
1937
      if job is not None:
1938
        jobs.append(job.GetInfo(fields))
1939
      elif not list_all:
1940
        jobs.append(None)
1941

    
1942
    return jobs
1943

    
1944
  @locking.ssynchronized(_LOCK)
1945
  @_RequireOpenQueue
1946
  def Shutdown(self):
1947
    """Stops the job queue.
1948

1949
    This shutdowns all the worker threads an closes the queue.
1950

1951
    """
1952
    self._wpool.TerminateWorkers()
1953

    
1954
    self._queue_filelock.Close()
1955
    self._queue_filelock = None