Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b8802cc4

History | View | Annotate | Download (57.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
    self._InitInMemory(self)
207

    
208
  @staticmethod
209
  def _InitInMemory(obj):
210
    """Initializes in-memory variables.
211

212
    """
213
    obj.ops_iter = None
214
    obj.cur_opctx = None
215

    
216
  def __repr__(self):
217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218
              "id=%s" % self.id,
219
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220

    
221
    return "<%s at %#x>" % (" ".join(status), id(self))
222

    
223
  @classmethod
224
  def Restore(cls, queue, state):
225
    """Restore a _QueuedJob from serialized state:
226

227
    @type queue: L{JobQueue}
228
    @param queue: to which queue the restored job belongs
229
    @type state: dict
230
    @param state: the serialized state
231
    @rtype: _JobQueue
232
    @return: the restored _JobQueue instance
233

234
    """
235
    obj = _QueuedJob.__new__(cls)
236
    obj.queue = queue
237
    obj.id = state["id"]
238
    obj.received_timestamp = state.get("received_timestamp", None)
239
    obj.start_timestamp = state.get("start_timestamp", None)
240
    obj.end_timestamp = state.get("end_timestamp", None)
241

    
242
    obj.ops = []
243
    obj.log_serial = 0
244
    for op_state in state["ops"]:
245
      op = _QueuedOpCode.Restore(op_state)
246
      for log_entry in op.log:
247
        obj.log_serial = max(obj.log_serial, log_entry[0])
248
      obj.ops.append(op)
249

    
250
    cls._InitInMemory(obj)
251

    
252
    return obj
253

    
254
  def Serialize(self):
255
    """Serialize the _JobQueue instance.
256

257
    @rtype: dict
258
    @return: the serialized state
259

260
    """
261
    return {
262
      "id": self.id,
263
      "ops": [op.Serialize() for op in self.ops],
264
      "start_timestamp": self.start_timestamp,
265
      "end_timestamp": self.end_timestamp,
266
      "received_timestamp": self.received_timestamp,
267
      }
268

    
269
  def CalcStatus(self):
270
    """Compute the status of this job.
271

272
    This function iterates over all the _QueuedOpCodes in the job and
273
    based on their status, computes the job status.
274

275
    The algorithm is:
276
      - if we find a cancelled, or finished with error, the job
277
        status will be the same
278
      - otherwise, the last opcode with the status one of:
279
          - waitlock
280
          - canceling
281
          - running
282

283
        will determine the job status
284

285
      - otherwise, it means either all opcodes are queued, or success,
286
        and the job status will be the same
287

288
    @return: the job status
289

290
    """
291
    status = constants.JOB_STATUS_QUEUED
292

    
293
    all_success = True
294
    for op in self.ops:
295
      if op.status == constants.OP_STATUS_SUCCESS:
296
        continue
297

    
298
      all_success = False
299

    
300
      if op.status == constants.OP_STATUS_QUEUED:
301
        pass
302
      elif op.status == constants.OP_STATUS_WAITLOCK:
303
        status = constants.JOB_STATUS_WAITLOCK
304
      elif op.status == constants.OP_STATUS_RUNNING:
305
        status = constants.JOB_STATUS_RUNNING
306
      elif op.status == constants.OP_STATUS_CANCELING:
307
        status = constants.JOB_STATUS_CANCELING
308
        break
309
      elif op.status == constants.OP_STATUS_ERROR:
310
        status = constants.JOB_STATUS_ERROR
311
        # The whole job fails if one opcode failed
312
        break
313
      elif op.status == constants.OP_STATUS_CANCELED:
314
        status = constants.OP_STATUS_CANCELED
315
        break
316

    
317
    if all_success:
318
      status = constants.JOB_STATUS_SUCCESS
319

    
320
    return status
321

    
322
  def CalcPriority(self):
323
    """Gets the current priority for this job.
324

325
    Only unfinished opcodes are considered. When all are done, the default
326
    priority is used.
327

328
    @rtype: int
329

330
    """
331
    priorities = [op.priority for op in self.ops
332
                  if op.status not in constants.OPS_FINALIZED]
333

    
334
    if not priorities:
335
      # All opcodes are done, assume default priority
336
      return constants.OP_PRIO_DEFAULT
337

    
338
    return min(priorities)
339

    
340
  def GetLogEntries(self, newer_than):
341
    """Selectively returns the log entries.
342

343
    @type newer_than: None or int
344
    @param newer_than: if this is None, return all log entries,
345
        otherwise return only the log entries with serial higher
346
        than this value
347
    @rtype: list
348
    @return: the list of the log entries selected
349

350
    """
351
    if newer_than is None:
352
      serial = -1
353
    else:
354
      serial = newer_than
355

    
356
    entries = []
357
    for op in self.ops:
358
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359

    
360
    return entries
361

    
362
  def GetInfo(self, fields):
363
    """Returns information about a job.
364

365
    @type fields: list
366
    @param fields: names of fields to return
367
    @rtype: list
368
    @return: list with one element for each field
369
    @raise errors.OpExecError: when an invalid field
370
        has been passed
371

372
    """
373
    row = []
374
    for fname in fields:
375
      if fname == "id":
376
        row.append(self.id)
377
      elif fname == "status":
378
        row.append(self.CalcStatus())
379
      elif fname == "priority":
380
        row.append(self.CalcPriority())
381
      elif fname == "ops":
382
        row.append([op.input.__getstate__() for op in self.ops])
383
      elif fname == "opresult":
384
        row.append([op.result for op in self.ops])
385
      elif fname == "opstatus":
386
        row.append([op.status for op in self.ops])
387
      elif fname == "oplog":
388
        row.append([op.log for op in self.ops])
389
      elif fname == "opstart":
390
        row.append([op.start_timestamp for op in self.ops])
391
      elif fname == "opexec":
392
        row.append([op.exec_timestamp for op in self.ops])
393
      elif fname == "opend":
394
        row.append([op.end_timestamp for op in self.ops])
395
      elif fname == "oppriority":
396
        row.append([op.priority for op in self.ops])
397
      elif fname == "received_ts":
398
        row.append(self.received_timestamp)
399
      elif fname == "start_ts":
400
        row.append(self.start_timestamp)
401
      elif fname == "end_ts":
402
        row.append(self.end_timestamp)
403
      elif fname == "summary":
404
        row.append([op.input.Summary() for op in self.ops])
405
      else:
406
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
407
    return row
408

    
409
  def MarkUnfinishedOps(self, status, result):
410
    """Mark unfinished opcodes with a given status and result.
411

412
    This is an utility function for marking all running or waiting to
413
    be run opcodes with a given status. Opcodes which are already
414
    finalised are not changed.
415

416
    @param status: a given opcode status
417
    @param result: the opcode result
418

419
    """
420
    not_marked = True
421
    for op in self.ops:
422
      if op.status in constants.OPS_FINALIZED:
423
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424
        continue
425
      op.status = status
426
      op.result = result
427
      not_marked = False
428

    
429
  def Cancel(self):
430
    """Marks job as canceled/-ing if possible.
431

432
    @rtype: tuple; (bool, string)
433
    @return: Boolean describing whether job was successfully canceled or marked
434
      as canceling and a text message
435

436
    """
437
    status = self.CalcStatus()
438

    
439
    if status == constants.JOB_STATUS_QUEUED:
440
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441
                             "Job canceled by request")
442
      return (True, "Job %s canceled" % self.id)
443

    
444
    elif status == constants.JOB_STATUS_WAITLOCK:
445
      # The worker will notice the new status and cancel the job
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447
      return (True, "Job %s will be canceled" % self.id)
448

    
449
    else:
450
      logging.debug("Job %s is no longer waiting in the queue", self.id)
451
      return (False, "Job %s is no longer waiting in the queue" % self.id)
452

    
453

    
454
class _OpExecCallbacks(mcpu.OpExecCbBase):
455
  def __init__(self, queue, job, op):
456
    """Initializes this class.
457

458
    @type queue: L{JobQueue}
459
    @param queue: Job queue
460
    @type job: L{_QueuedJob}
461
    @param job: Job object
462
    @type op: L{_QueuedOpCode}
463
    @param op: OpCode
464

465
    """
466
    assert queue, "Queue is missing"
467
    assert job, "Job is missing"
468
    assert op, "Opcode is missing"
469

    
470
    self._queue = queue
471
    self._job = job
472
    self._op = op
473

    
474
  def _CheckCancel(self):
475
    """Raises an exception to cancel the job if asked to.
476

477
    """
478
    # Cancel here if we were asked to
479
    if self._op.status == constants.OP_STATUS_CANCELING:
480
      logging.debug("Canceling opcode")
481
      raise CancelJob()
482

    
483
  @locking.ssynchronized(_QUEUE, shared=1)
484
  def NotifyStart(self):
485
    """Mark the opcode as running, not lock-waiting.
486

487
    This is called from the mcpu code as a notifier function, when the LU is
488
    finally about to start the Exec() method. Of course, to have end-user
489
    visible results, the opcode must be initially (before calling into
490
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
491

492
    """
493
    assert self._op in self._job.ops
494
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495
                               constants.OP_STATUS_CANCELING)
496

    
497
    # Cancel here if we were asked to
498
    self._CheckCancel()
499

    
500
    logging.debug("Opcode is now running")
501

    
502
    self._op.status = constants.OP_STATUS_RUNNING
503
    self._op.exec_timestamp = TimeStampNow()
504

    
505
    # And finally replicate the job status
506
    self._queue.UpdateJobUnlocked(self._job)
507

    
508
  @locking.ssynchronized(_QUEUE, shared=1)
509
  def _AppendFeedback(self, timestamp, log_type, log_msg):
510
    """Internal feedback append function, with locks
511

512
    """
513
    self._job.log_serial += 1
514
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
516

    
517
  def Feedback(self, *args):
518
    """Append a log entry.
519

520
    """
521
    assert len(args) < 3
522

    
523
    if len(args) == 1:
524
      log_type = constants.ELOG_MESSAGE
525
      log_msg = args[0]
526
    else:
527
      (log_type, log_msg) = args
528

    
529
    # The time is split to make serialization easier and not lose
530
    # precision.
531
    timestamp = utils.SplitTime(time.time())
532
    self._AppendFeedback(timestamp, log_type, log_msg)
533

    
534
  def CheckCancel(self):
535
    """Check whether job has been cancelled.
536

537
    """
538
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
539
                               constants.OP_STATUS_CANCELING)
540

    
541
    # Cancel here if we were asked to
542
    self._CheckCancel()
543

    
544

    
545
class _JobChangesChecker(object):
546
  def __init__(self, fields, prev_job_info, prev_log_serial):
547
    """Initializes this class.
548

549
    @type fields: list of strings
550
    @param fields: Fields requested by LUXI client
551
    @type prev_job_info: string
552
    @param prev_job_info: previous job info, as passed by the LUXI client
553
    @type prev_log_serial: string
554
    @param prev_log_serial: previous job serial, as passed by the LUXI client
555

556
    """
557
    self._fields = fields
558
    self._prev_job_info = prev_job_info
559
    self._prev_log_serial = prev_log_serial
560

    
561
  def __call__(self, job):
562
    """Checks whether job has changed.
563

564
    @type job: L{_QueuedJob}
565
    @param job: Job object
566

567
    """
568
    status = job.CalcStatus()
569
    job_info = job.GetInfo(self._fields)
570
    log_entries = job.GetLogEntries(self._prev_log_serial)
571

    
572
    # Serializing and deserializing data can cause type changes (e.g. from
573
    # tuple to list) or precision loss. We're doing it here so that we get
574
    # the same modifications as the data received from the client. Without
575
    # this, the comparison afterwards might fail without the data being
576
    # significantly different.
577
    # TODO: we just deserialized from disk, investigate how to make sure that
578
    # the job info and log entries are compatible to avoid this further step.
579
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
580
    # efficient, though floats will be tricky
581
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
582
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
583

    
584
    # Don't even try to wait if the job is no longer running, there will be
585
    # no changes.
586
    if (status not in (constants.JOB_STATUS_QUEUED,
587
                       constants.JOB_STATUS_RUNNING,
588
                       constants.JOB_STATUS_WAITLOCK) or
589
        job_info != self._prev_job_info or
590
        (log_entries and self._prev_log_serial != log_entries[0][0])):
591
      logging.debug("Job %s changed", job.id)
592
      return (job_info, log_entries)
593

    
594
    return None
595

    
596

    
597
class _JobFileChangesWaiter(object):
598
  def __init__(self, filename):
599
    """Initializes this class.
600

601
    @type filename: string
602
    @param filename: Path to job file
603
    @raises errors.InotifyError: if the notifier cannot be setup
604

605
    """
606
    self._wm = pyinotify.WatchManager()
607
    self._inotify_handler = \
608
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
609
    self._notifier = \
610
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
611
    try:
612
      self._inotify_handler.enable()
613
    except Exception:
614
      # pyinotify doesn't close file descriptors automatically
615
      self._notifier.stop()
616
      raise
617

    
618
  def _OnInotify(self, notifier_enabled):
619
    """Callback for inotify.
620

621
    """
622
    if not notifier_enabled:
623
      self._inotify_handler.enable()
624

    
625
  def Wait(self, timeout):
626
    """Waits for the job file to change.
627

628
    @type timeout: float
629
    @param timeout: Timeout in seconds
630
    @return: Whether there have been events
631

632
    """
633
    assert timeout >= 0
634
    have_events = self._notifier.check_events(timeout * 1000)
635
    if have_events:
636
      self._notifier.read_events()
637
    self._notifier.process_events()
638
    return have_events
639

    
640
  def Close(self):
641
    """Closes underlying notifier and its file descriptor.
642

643
    """
644
    self._notifier.stop()
645

    
646

    
647
class _JobChangesWaiter(object):
648
  def __init__(self, filename):
649
    """Initializes this class.
650

651
    @type filename: string
652
    @param filename: Path to job file
653

654
    """
655
    self._filewaiter = None
656
    self._filename = filename
657

    
658
  def Wait(self, timeout):
659
    """Waits for a job to change.
660

661
    @type timeout: float
662
    @param timeout: Timeout in seconds
663
    @return: Whether there have been events
664

665
    """
666
    if self._filewaiter:
667
      return self._filewaiter.Wait(timeout)
668

    
669
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
670
    # If this point is reached, return immediately and let caller check the job
671
    # file again in case there were changes since the last check. This avoids a
672
    # race condition.
673
    self._filewaiter = _JobFileChangesWaiter(self._filename)
674

    
675
    return True
676

    
677
  def Close(self):
678
    """Closes underlying waiter.
679

680
    """
681
    if self._filewaiter:
682
      self._filewaiter.Close()
683

    
684

    
685
class _WaitForJobChangesHelper(object):
686
  """Helper class using inotify to wait for changes in a job file.
687

688
  This class takes a previous job status and serial, and alerts the client when
689
  the current job status has changed.
690

691
  """
692
  @staticmethod
693
  def _CheckForChanges(job_load_fn, check_fn):
694
    job = job_load_fn()
695
    if not job:
696
      raise errors.JobLost()
697

    
698
    result = check_fn(job)
699
    if result is None:
700
      raise utils.RetryAgain()
701

    
702
    return result
703

    
704
  def __call__(self, filename, job_load_fn,
705
               fields, prev_job_info, prev_log_serial, timeout):
706
    """Waits for changes on a job.
707

708
    @type filename: string
709
    @param filename: File on which to wait for changes
710
    @type job_load_fn: callable
711
    @param job_load_fn: Function to load job
712
    @type fields: list of strings
713
    @param fields: Which fields to check for changes
714
    @type prev_job_info: list or None
715
    @param prev_job_info: Last job information returned
716
    @type prev_log_serial: int
717
    @param prev_log_serial: Last job message serial number
718
    @type timeout: float
719
    @param timeout: maximum time to wait in seconds
720

721
    """
722
    try:
723
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724
      waiter = _JobChangesWaiter(filename)
725
      try:
726
        return utils.Retry(compat.partial(self._CheckForChanges,
727
                                          job_load_fn, check_fn),
728
                           utils.RETRY_REMAINING_TIME, timeout,
729
                           wait_fn=waiter.Wait)
730
      finally:
731
        waiter.Close()
732
    except (errors.InotifyError, errors.JobLost):
733
      return None
734
    except utils.RetryTimeout:
735
      return constants.JOB_NOTCHANGED
736

    
737

    
738
def _EncodeOpError(err):
739
  """Encodes an error which occurred while processing an opcode.
740

741
  """
742
  if isinstance(err, errors.GenericError):
743
    to_encode = err
744
  else:
745
    to_encode = errors.OpExecError(str(err))
746

    
747
  return errors.EncodeException(to_encode)
748

    
749

    
750
class _TimeoutStrategyWrapper:
751
  def __init__(self, fn):
752
    """Initializes this class.
753

754
    """
755
    self._fn = fn
756
    self._next = None
757

    
758
  def _Advance(self):
759
    """Gets the next timeout if necessary.
760

761
    """
762
    if self._next is None:
763
      self._next = self._fn()
764

    
765
  def Peek(self):
766
    """Returns the next timeout.
767

768
    """
769
    self._Advance()
770
    return self._next
771

    
772
  def Next(self):
773
    """Returns the current timeout and advances the internal state.
774

775
    """
776
    self._Advance()
777
    result = self._next
778
    self._next = None
779
    return result
780

    
781

    
782
class _OpExecContext:
783
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784
    """Initializes this class.
785

786
    """
787
    self.op = op
788
    self.index = index
789
    self.log_prefix = log_prefix
790
    self.summary = op.input.Summary()
791

    
792
    self._timeout_strategy_factory = timeout_strategy_factory
793
    self._ResetTimeoutStrategy()
794

    
795
  def _ResetTimeoutStrategy(self):
796
    """Creates a new timeout strategy.
797

798
    """
799
    self._timeout_strategy = \
800
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801

    
802
  def CheckPriorityIncrease(self):
803
    """Checks whether priority can and should be increased.
804

805
    Called when locks couldn't be acquired.
806

807
    """
808
    op = self.op
809

    
810
    # Exhausted all retries and next round should not use blocking acquire
811
    # for locks?
812
    if (self._timeout_strategy.Peek() is None and
813
        op.priority > constants.OP_PRIO_HIGHEST):
814
      logging.debug("Increasing priority")
815
      op.priority -= 1
816
      self._ResetTimeoutStrategy()
817
      return True
818

    
819
    return False
820

    
821
  def GetNextLockTimeout(self):
822
    """Returns the next lock acquire timeout.
823

824
    """
825
    return self._timeout_strategy.Next()
826

    
827

    
828
class _JobProcessor(object):
829
  def __init__(self, queue, opexec_fn, job,
830
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831
    """Initializes this class.
832

833
    """
834
    self.queue = queue
835
    self.opexec_fn = opexec_fn
836
    self.job = job
837
    self._timeout_strategy_factory = _timeout_strategy_factory
838

    
839
  @staticmethod
840
  def _FindNextOpcode(job, timeout_strategy_factory):
841
    """Locates the next opcode to run.
842

843
    @type job: L{_QueuedJob}
844
    @param job: Job object
845
    @param timeout_strategy_factory: Callable to create new timeout strategy
846

847
    """
848
    # Create some sort of a cache to speed up locating next opcode for future
849
    # lookups
850
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
851
    # pending and one for processed ops.
852
    if job.ops_iter is None:
853
      job.ops_iter = enumerate(job.ops)
854

    
855
    # Find next opcode to run
856
    while True:
857
      try:
858
        (idx, op) = job.ops_iter.next()
859
      except StopIteration:
860
        raise errors.ProgrammerError("Called for a finished job")
861

    
862
      if op.status == constants.OP_STATUS_RUNNING:
863
        # Found an opcode already marked as running
864
        raise errors.ProgrammerError("Called for job marked as running")
865

    
866
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867
                             timeout_strategy_factory)
868

    
869
      if op.status == constants.OP_STATUS_CANCELED:
870
        # Cancelled jobs are handled by the caller
871
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872
                              for i in job.ops[idx:])
873

    
874
      elif op.status in constants.OPS_FINALIZED:
875
        # This is a job that was partially completed before master daemon
876
        # shutdown, so it can be expected that some opcodes are already
877
        # completed successfully (if any did error out, then the whole job
878
        # should have been aborted and not resubmitted for processing).
879
        logging.info("%s: opcode %s already processed, skipping",
880
                     opctx.log_prefix, opctx.summary)
881
        continue
882

    
883
      return opctx
884

    
885
  @staticmethod
886
  def _MarkWaitlock(job, op):
887
    """Marks an opcode as waiting for locks.
888

889
    The job's start timestamp is also set if necessary.
890

891
    @type job: L{_QueuedJob}
892
    @param job: Job object
893
    @type op: L{_QueuedOpCode}
894
    @param op: Opcode object
895

896
    """
897
    assert op in job.ops
898

    
899
    op.status = constants.OP_STATUS_WAITLOCK
900
    op.result = None
901
    op.start_timestamp = TimeStampNow()
902

    
903
    if job.start_timestamp is None:
904
      job.start_timestamp = op.start_timestamp
905

    
906
  def _ExecOpCodeUnlocked(self, opctx):
907
    """Processes one opcode and returns the result.
908

909
    """
910
    op = opctx.op
911

    
912
    assert op.status == constants.OP_STATUS_WAITLOCK
913

    
914
    timeout = opctx.GetNextLockTimeout()
915

    
916
    try:
917
      # Make sure not to hold queue lock while calling ExecOpCode
918
      result = self.opexec_fn(op.input,
919
                              _OpExecCallbacks(self.queue, self.job, op),
920
                              timeout=timeout, priority=op.priority)
921
    except mcpu.LockAcquireTimeout:
922
      assert timeout is not None, "Received timeout for blocking acquire"
923
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
924
      assert op.status == constants.OP_STATUS_WAITLOCK
925
      return (constants.OP_STATUS_QUEUED, None)
926
    except CancelJob:
927
      logging.exception("%s: Canceling job", opctx.log_prefix)
928
      assert op.status == constants.OP_STATUS_CANCELING
929
      return (constants.OP_STATUS_CANCELING, None)
930
    except Exception, err: # pylint: disable-msg=W0703
931
      logging.exception("%s: Caught exception in %s",
932
                        opctx.log_prefix, opctx.summary)
933
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
934
    else:
935
      logging.debug("%s: %s successful",
936
                    opctx.log_prefix, opctx.summary)
937
      return (constants.OP_STATUS_SUCCESS, result)
938

    
939
  def __call__(self, _nextop_fn=None):
940
    """Continues execution of a job.
941

942
    @param _nextop_fn: Callback function for tests
943
    @rtype: bool
944
    @return: True if job is finished, False if processor needs to be called
945
             again
946

947
    """
948
    queue = self.queue
949
    job = self.job
950

    
951
    logging.debug("Processing job %s", job.id)
952

    
953
    queue.acquire(shared=1)
954
    try:
955
      opcount = len(job.ops)
956

    
957
      # Is a previous opcode still pending?
958
      if job.cur_opctx:
959
        opctx = job.cur_opctx
960
      else:
961
        if __debug__ and _nextop_fn:
962
          _nextop_fn()
963
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
964

    
965
      op = opctx.op
966

    
967
      # Consistency check
968
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
969
                                     constants.OP_STATUS_CANCELED)
970
                        for i in job.ops[opctx.index:])
971

    
972
      assert op.status in (constants.OP_STATUS_QUEUED,
973
                           constants.OP_STATUS_WAITLOCK,
974
                           constants.OP_STATUS_CANCELED)
975

    
976
      assert (op.priority <= constants.OP_PRIO_LOWEST and
977
              op.priority >= constants.OP_PRIO_HIGHEST)
978

    
979
      if op.status != constants.OP_STATUS_CANCELED:
980
        # Prepare to start opcode
981
        self._MarkWaitlock(job, op)
982

    
983
        assert op.status == constants.OP_STATUS_WAITLOCK
984
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
985

    
986
        # Write to disk
987
        queue.UpdateJobUnlocked(job)
988

    
989
        logging.info("%s: opcode %s waiting for locks",
990
                     opctx.log_prefix, opctx.summary)
991

    
992
        queue.release()
993
        try:
994
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
995
        finally:
996
          queue.acquire(shared=1)
997

    
998
        op.status = op_status
999
        op.result = op_result
1000

    
1001
        if op.status == constants.OP_STATUS_QUEUED:
1002
          # Couldn't get locks in time
1003
          assert not op.end_timestamp
1004
        else:
1005
          # Finalize opcode
1006
          op.end_timestamp = TimeStampNow()
1007

    
1008
          if op.status == constants.OP_STATUS_CANCELING:
1009
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1010
                                  for i in job.ops[opctx.index:])
1011
          else:
1012
            assert op.status in constants.OPS_FINALIZED
1013

    
1014
      if op.status == constants.OP_STATUS_QUEUED:
1015
        finalize = False
1016

    
1017
        opctx.CheckPriorityIncrease()
1018

    
1019
        # Keep around for another round
1020
        job.cur_opctx = opctx
1021

    
1022
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1023
                op.priority >= constants.OP_PRIO_HIGHEST)
1024

    
1025
        # In no case must the status be finalized here
1026
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1027

    
1028
        queue.UpdateJobUnlocked(job)
1029

    
1030
      else:
1031
        # Ensure all opcodes so far have been successful
1032
        assert (opctx.index == 0 or
1033
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1034
                           for i in job.ops[:opctx.index]))
1035

    
1036
        # Reset context
1037
        job.cur_opctx = None
1038

    
1039
        if op.status == constants.OP_STATUS_SUCCESS:
1040
          finalize = False
1041

    
1042
        elif op.status == constants.OP_STATUS_ERROR:
1043
          # Ensure failed opcode has an exception as its result
1044
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1045

    
1046
          to_encode = errors.OpExecError("Preceding opcode failed")
1047
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1048
                                _EncodeOpError(to_encode))
1049
          finalize = True
1050

    
1051
          # Consistency check
1052
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1053
                            errors.GetEncodedError(i.result)
1054
                            for i in job.ops[opctx.index:])
1055

    
1056
        elif op.status == constants.OP_STATUS_CANCELING:
1057
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1058
                                "Job canceled by request")
1059
          finalize = True
1060

    
1061
        elif op.status == constants.OP_STATUS_CANCELED:
1062
          finalize = True
1063

    
1064
        else:
1065
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1066

    
1067
        # Finalizing or last opcode?
1068
        if finalize or opctx.index == (opcount - 1):
1069
          # All opcodes have been run, finalize job
1070
          job.end_timestamp = TimeStampNow()
1071

    
1072
        # Write to disk. If the job status is final, this is the final write
1073
        # allowed. Once the file has been written, it can be archived anytime.
1074
        queue.UpdateJobUnlocked(job)
1075

    
1076
        if finalize or opctx.index == (opcount - 1):
1077
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1078
          return True
1079

    
1080
      return False
1081
    finally:
1082
      queue.release()
1083

    
1084

    
1085
class _JobQueueWorker(workerpool.BaseWorker):
1086
  """The actual job workers.
1087

1088
  """
1089
  def RunTask(self, job): # pylint: disable-msg=W0221
1090
    """Job executor.
1091

1092
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1093
    L{_QueuedOpCode} classes.
1094

1095
    @type job: L{_QueuedJob}
1096
    @param job: the job to be processed
1097

1098
    """
1099
    queue = job.queue
1100
    assert queue == self.pool.queue
1101

    
1102
    self.SetTaskName("Job%s" % job.id)
1103

    
1104
    proc = mcpu.Processor(queue.context, job.id)
1105

    
1106
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1107
      # Schedule again
1108
      raise workerpool.DeferTask(priority=job.CalcPriority())
1109

    
1110

    
1111
class _JobQueueWorkerPool(workerpool.WorkerPool):
1112
  """Simple class implementing a job-processing workerpool.
1113

1114
  """
1115
  def __init__(self, queue):
1116
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1117
                                              JOBQUEUE_THREADS,
1118
                                              _JobQueueWorker)
1119
    self.queue = queue
1120

    
1121

    
1122
def _RequireOpenQueue(fn):
1123
  """Decorator for "public" functions.
1124

1125
  This function should be used for all 'public' functions. That is,
1126
  functions usually called from other classes. Note that this should
1127
  be applied only to methods (not plain functions), since it expects
1128
  that the decorated function is called with a first argument that has
1129
  a '_queue_filelock' argument.
1130

1131
  @warning: Use this decorator only after locking.ssynchronized
1132

1133
  Example::
1134
    @locking.ssynchronized(_LOCK)
1135
    @_RequireOpenQueue
1136
    def Example(self):
1137
      pass
1138

1139
  """
1140
  def wrapper(self, *args, **kwargs):
1141
    # pylint: disable-msg=W0212
1142
    assert self._queue_filelock is not None, "Queue should be open"
1143
    return fn(self, *args, **kwargs)
1144
  return wrapper
1145

    
1146

    
1147
class JobQueue(object):
1148
  """Queue used to manage the jobs.
1149

1150
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1151

1152
  """
1153
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1154

    
1155
  def __init__(self, context):
1156
    """Constructor for JobQueue.
1157

1158
    The constructor will initialize the job queue object and then
1159
    start loading the current jobs from disk, either for starting them
1160
    (if they were queue) or for aborting them (if they were already
1161
    running).
1162

1163
    @type context: GanetiContext
1164
    @param context: the context object for access to the configuration
1165
        data and other ganeti objects
1166

1167
    """
1168
    self.context = context
1169
    self._memcache = weakref.WeakValueDictionary()
1170
    self._my_hostname = netutils.Hostname.GetSysName()
1171

    
1172
    # The Big JobQueue lock. If a code block or method acquires it in shared
1173
    # mode safe it must guarantee concurrency with all the code acquiring it in
1174
    # shared mode, including itself. In order not to acquire it at all
1175
    # concurrency must be guaranteed with all code acquiring it in shared mode
1176
    # and all code acquiring it exclusively.
1177
    self._lock = locking.SharedLock("JobQueue")
1178

    
1179
    self.acquire = self._lock.acquire
1180
    self.release = self._lock.release
1181

    
1182
    # Initialize the queue, and acquire the filelock.
1183
    # This ensures no other process is working on the job queue.
1184
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1185

    
1186
    # Read serial file
1187
    self._last_serial = jstore.ReadSerial()
1188
    assert self._last_serial is not None, ("Serial file was modified between"
1189
                                           " check in jstore and here")
1190

    
1191
    # Get initial list of nodes
1192
    self._nodes = dict((n.name, n.primary_ip)
1193
                       for n in self.context.cfg.GetAllNodesInfo().values()
1194
                       if n.master_candidate)
1195

    
1196
    # Remove master node
1197
    self._nodes.pop(self._my_hostname, None)
1198

    
1199
    # TODO: Check consistency across nodes
1200

    
1201
    self._queue_size = 0
1202
    self._UpdateQueueSizeUnlocked()
1203
    self._drained = self._IsQueueMarkedDrain()
1204

    
1205
    # Setup worker pool
1206
    self._wpool = _JobQueueWorkerPool(self)
1207
    try:
1208
      self._InspectQueue()
1209
    except:
1210
      self._wpool.TerminateWorkers()
1211
      raise
1212

    
1213
  @locking.ssynchronized(_LOCK)
1214
  @_RequireOpenQueue
1215
  def _InspectQueue(self):
1216
    """Loads the whole job queue and resumes unfinished jobs.
1217

1218
    This function needs the lock here because WorkerPool.AddTask() may start a
1219
    job while we're still doing our work.
1220

1221
    """
1222
    logging.info("Inspecting job queue")
1223

    
1224
    restartjobs = []
1225

    
1226
    all_job_ids = self._GetJobIDsUnlocked()
1227
    jobs_count = len(all_job_ids)
1228
    lastinfo = time.time()
1229
    for idx, job_id in enumerate(all_job_ids):
1230
      # Give an update every 1000 jobs or 10 seconds
1231
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1232
          idx == (jobs_count - 1)):
1233
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1234
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1235
        lastinfo = time.time()
1236

    
1237
      job = self._LoadJobUnlocked(job_id)
1238

    
1239
      # a failure in loading the job can cause 'None' to be returned
1240
      if job is None:
1241
        continue
1242

    
1243
      status = job.CalcStatus()
1244

    
1245
      if status in (constants.JOB_STATUS_QUEUED, ):
1246
        restartjobs.append(job)
1247

    
1248
      elif status in (constants.JOB_STATUS_RUNNING,
1249
                      constants.JOB_STATUS_WAITLOCK,
1250
                      constants.JOB_STATUS_CANCELING):
1251
        logging.warning("Unfinished job %s found: %s", job.id, job)
1252
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1253
                              "Unclean master daemon shutdown")
1254
        self.UpdateJobUnlocked(job)
1255

    
1256
    if restartjobs:
1257
      logging.info("Restarting %s jobs", len(restartjobs))
1258
      self._EnqueueJobs(restartjobs)
1259

    
1260
    logging.info("Job queue inspection finished")
1261

    
1262
  @locking.ssynchronized(_LOCK)
1263
  @_RequireOpenQueue
1264
  def AddNode(self, node):
1265
    """Register a new node with the queue.
1266

1267
    @type node: L{objects.Node}
1268
    @param node: the node object to be added
1269

1270
    """
1271
    node_name = node.name
1272
    assert node_name != self._my_hostname
1273

    
1274
    # Clean queue directory on added node
1275
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1276
    msg = result.fail_msg
1277
    if msg:
1278
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1279
                      node_name, msg)
1280

    
1281
    if not node.master_candidate:
1282
      # remove if existing, ignoring errors
1283
      self._nodes.pop(node_name, None)
1284
      # and skip the replication of the job ids
1285
      return
1286

    
1287
    # Upload the whole queue excluding archived jobs
1288
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1289

    
1290
    # Upload current serial file
1291
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1292

    
1293
    for file_name in files:
1294
      # Read file content
1295
      content = utils.ReadFile(file_name)
1296

    
1297
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1298
                                                  [node.primary_ip],
1299
                                                  file_name, content)
1300
      msg = result[node_name].fail_msg
1301
      if msg:
1302
        logging.error("Failed to upload file %s to node %s: %s",
1303
                      file_name, node_name, msg)
1304

    
1305
    self._nodes[node_name] = node.primary_ip
1306

    
1307
  @locking.ssynchronized(_LOCK)
1308
  @_RequireOpenQueue
1309
  def RemoveNode(self, node_name):
1310
    """Callback called when removing nodes from the cluster.
1311

1312
    @type node_name: str
1313
    @param node_name: the name of the node to remove
1314

1315
    """
1316
    self._nodes.pop(node_name, None)
1317

    
1318
  @staticmethod
1319
  def _CheckRpcResult(result, nodes, failmsg):
1320
    """Verifies the status of an RPC call.
1321

1322
    Since we aim to keep consistency should this node (the current
1323
    master) fail, we will log errors if our rpc fail, and especially
1324
    log the case when more than half of the nodes fails.
1325

1326
    @param result: the data as returned from the rpc call
1327
    @type nodes: list
1328
    @param nodes: the list of nodes we made the call to
1329
    @type failmsg: str
1330
    @param failmsg: the identifier to be used for logging
1331

1332
    """
1333
    failed = []
1334
    success = []
1335

    
1336
    for node in nodes:
1337
      msg = result[node].fail_msg
1338
      if msg:
1339
        failed.append(node)
1340
        logging.error("RPC call %s (%s) failed on node %s: %s",
1341
                      result[node].call, failmsg, node, msg)
1342
      else:
1343
        success.append(node)
1344

    
1345
    # +1 for the master node
1346
    if (len(success) + 1) < len(failed):
1347
      # TODO: Handle failing nodes
1348
      logging.error("More than half of the nodes failed")
1349

    
1350
  def _GetNodeIp(self):
1351
    """Helper for returning the node name/ip list.
1352

1353
    @rtype: (list, list)
1354
    @return: a tuple of two lists, the first one with the node
1355
        names and the second one with the node addresses
1356

1357
    """
1358
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1359
    name_list = self._nodes.keys()
1360
    addr_list = [self._nodes[name] for name in name_list]
1361
    return name_list, addr_list
1362

    
1363
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1364
    """Writes a file locally and then replicates it to all nodes.
1365

1366
    This function will replace the contents of a file on the local
1367
    node and then replicate it to all the other nodes we have.
1368

1369
    @type file_name: str
1370
    @param file_name: the path of the file to be replicated
1371
    @type data: str
1372
    @param data: the new contents of the file
1373
    @type replicate: boolean
1374
    @param replicate: whether to spread the changes to the remote nodes
1375

1376
    """
1377
    getents = runtime.GetEnts()
1378
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1379
                    gid=getents.masterd_gid)
1380

    
1381
    if replicate:
1382
      names, addrs = self._GetNodeIp()
1383
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1384
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1385

    
1386
  def _RenameFilesUnlocked(self, rename):
1387
    """Renames a file locally and then replicate the change.
1388

1389
    This function will rename a file in the local queue directory
1390
    and then replicate this rename to all the other nodes we have.
1391

1392
    @type rename: list of (old, new)
1393
    @param rename: List containing tuples mapping old to new names
1394

1395
    """
1396
    # Rename them locally
1397
    for old, new in rename:
1398
      utils.RenameFile(old, new, mkdir=True)
1399

    
1400
    # ... and on all nodes
1401
    names, addrs = self._GetNodeIp()
1402
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1403
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1404

    
1405
  @staticmethod
1406
  def _FormatJobID(job_id):
1407
    """Convert a job ID to string format.
1408

1409
    Currently this just does C{str(job_id)} after performing some
1410
    checks, but if we want to change the job id format this will
1411
    abstract this change.
1412

1413
    @type job_id: int or long
1414
    @param job_id: the numeric job id
1415
    @rtype: str
1416
    @return: the formatted job id
1417

1418
    """
1419
    if not isinstance(job_id, (int, long)):
1420
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1421
    if job_id < 0:
1422
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1423

    
1424
    return str(job_id)
1425

    
1426
  @classmethod
1427
  def _GetArchiveDirectory(cls, job_id):
1428
    """Returns the archive directory for a job.
1429

1430
    @type job_id: str
1431
    @param job_id: Job identifier
1432
    @rtype: str
1433
    @return: Directory name
1434

1435
    """
1436
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1437

    
1438
  def _NewSerialsUnlocked(self, count):
1439
    """Generates a new job identifier.
1440

1441
    Job identifiers are unique during the lifetime of a cluster.
1442

1443
    @type count: integer
1444
    @param count: how many serials to return
1445
    @rtype: str
1446
    @return: a string representing the job identifier.
1447

1448
    """
1449
    assert count > 0
1450
    # New number
1451
    serial = self._last_serial + count
1452

    
1453
    # Write to file
1454
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1455
                             "%s\n" % serial, True)
1456

    
1457
    result = [self._FormatJobID(v)
1458
              for v in range(self._last_serial, serial + 1)]
1459
    # Keep it only if we were able to write the file
1460
    self._last_serial = serial
1461

    
1462
    return result
1463

    
1464
  @staticmethod
1465
  def _GetJobPath(job_id):
1466
    """Returns the job file for a given job id.
1467

1468
    @type job_id: str
1469
    @param job_id: the job identifier
1470
    @rtype: str
1471
    @return: the path to the job file
1472

1473
    """
1474
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1475

    
1476
  @classmethod
1477
  def _GetArchivedJobPath(cls, job_id):
1478
    """Returns the archived job file for a give job id.
1479

1480
    @type job_id: str
1481
    @param job_id: the job identifier
1482
    @rtype: str
1483
    @return: the path to the archived job file
1484

1485
    """
1486
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1487
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1488

    
1489
  def _GetJobIDsUnlocked(self, sort=True):
1490
    """Return all known job IDs.
1491

1492
    The method only looks at disk because it's a requirement that all
1493
    jobs are present on disk (so in the _memcache we don't have any
1494
    extra IDs).
1495

1496
    @type sort: boolean
1497
    @param sort: perform sorting on the returned job ids
1498
    @rtype: list
1499
    @return: the list of job IDs
1500

1501
    """
1502
    jlist = []
1503
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1504
      m = self._RE_JOB_FILE.match(filename)
1505
      if m:
1506
        jlist.append(m.group(1))
1507
    if sort:
1508
      jlist = utils.NiceSort(jlist)
1509
    return jlist
1510

    
1511
  def _LoadJobUnlocked(self, job_id):
1512
    """Loads a job from the disk or memory.
1513

1514
    Given a job id, this will return the cached job object if
1515
    existing, or try to load the job from the disk. If loading from
1516
    disk, it will also add the job to the cache.
1517

1518
    @param job_id: the job id
1519
    @rtype: L{_QueuedJob} or None
1520
    @return: either None or the job object
1521

1522
    """
1523
    job = self._memcache.get(job_id, None)
1524
    if job:
1525
      logging.debug("Found job %s in memcache", job_id)
1526
      return job
1527

    
1528
    try:
1529
      job = self._LoadJobFromDisk(job_id)
1530
      if job is None:
1531
        return job
1532
    except errors.JobFileCorrupted:
1533
      old_path = self._GetJobPath(job_id)
1534
      new_path = self._GetArchivedJobPath(job_id)
1535
      if old_path == new_path:
1536
        # job already archived (future case)
1537
        logging.exception("Can't parse job %s", job_id)
1538
      else:
1539
        # non-archived case
1540
        logging.exception("Can't parse job %s, will archive.", job_id)
1541
        self._RenameFilesUnlocked([(old_path, new_path)])
1542
      return None
1543

    
1544
    self._memcache[job_id] = job
1545
    logging.debug("Added job %s to the cache", job_id)
1546
    return job
1547

    
1548
  def _LoadJobFromDisk(self, job_id):
1549
    """Load the given job file from disk.
1550

1551
    Given a job file, read, load and restore it in a _QueuedJob format.
1552

1553
    @type job_id: string
1554
    @param job_id: job identifier
1555
    @rtype: L{_QueuedJob} or None
1556
    @return: either None or the job object
1557

1558
    """
1559
    filepath = self._GetJobPath(job_id)
1560
    logging.debug("Loading job from %s", filepath)
1561
    try:
1562
      raw_data = utils.ReadFile(filepath)
1563
    except EnvironmentError, err:
1564
      if err.errno in (errno.ENOENT, ):
1565
        return None
1566
      raise
1567

    
1568
    try:
1569
      data = serializer.LoadJson(raw_data)
1570
      job = _QueuedJob.Restore(self, data)
1571
    except Exception, err: # pylint: disable-msg=W0703
1572
      raise errors.JobFileCorrupted(err)
1573

    
1574
    return job
1575

    
1576
  def SafeLoadJobFromDisk(self, job_id):
1577
    """Load the given job file from disk.
1578

1579
    Given a job file, read, load and restore it in a _QueuedJob format.
1580
    In case of error reading the job, it gets returned as None, and the
1581
    exception is logged.
1582

1583
    @type job_id: string
1584
    @param job_id: job identifier
1585
    @rtype: L{_QueuedJob} or None
1586
    @return: either None or the job object
1587

1588
    """
1589
    try:
1590
      return self._LoadJobFromDisk(job_id)
1591
    except (errors.JobFileCorrupted, EnvironmentError):
1592
      logging.exception("Can't load/parse job %s", job_id)
1593
      return None
1594

    
1595
  @staticmethod
1596
  def _IsQueueMarkedDrain():
1597
    """Check if the queue is marked from drain.
1598

1599
    This currently uses the queue drain file, which makes it a
1600
    per-node flag. In the future this can be moved to the config file.
1601

1602
    @rtype: boolean
1603
    @return: True of the job queue is marked for draining
1604

1605
    """
1606
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1607

    
1608
  def _UpdateQueueSizeUnlocked(self):
1609
    """Update the queue size.
1610

1611
    """
1612
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1613

    
1614
  @locking.ssynchronized(_LOCK)
1615
  @_RequireOpenQueue
1616
  def SetDrainFlag(self, drain_flag):
1617
    """Sets the drain flag for the queue.
1618

1619
    @type drain_flag: boolean
1620
    @param drain_flag: Whether to set or unset the drain flag
1621

1622
    """
1623
    getents = runtime.GetEnts()
1624

    
1625
    if drain_flag:
1626
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1627
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1628
    else:
1629
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1630

    
1631
    self._drained = drain_flag
1632

    
1633
    return True
1634

    
1635
  @_RequireOpenQueue
1636
  def _SubmitJobUnlocked(self, job_id, ops):
1637
    """Create and store a new job.
1638

1639
    This enters the job into our job queue and also puts it on the new
1640
    queue, in order for it to be picked up by the queue processors.
1641

1642
    @type job_id: job ID
1643
    @param job_id: the job ID for the new job
1644
    @type ops: list
1645
    @param ops: The list of OpCodes that will become the new job.
1646
    @rtype: L{_QueuedJob}
1647
    @return: the job object to be queued
1648
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1649
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1650
    @raise errors.GenericError: If an opcode is not valid
1651

1652
    """
1653
    # Ok when sharing the big job queue lock, as the drain file is created when
1654
    # the lock is exclusive.
1655
    if self._drained:
1656
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1657

    
1658
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1659
      raise errors.JobQueueFull()
1660

    
1661
    job = _QueuedJob(self, job_id, ops)
1662

    
1663
    # Check priority
1664
    for idx, op in enumerate(job.ops):
1665
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1666
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1667
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1668
                                  " are %s" % (idx, op.priority, allowed))
1669

    
1670
    # Write to disk
1671
    self.UpdateJobUnlocked(job)
1672

    
1673
    self._queue_size += 1
1674

    
1675
    logging.debug("Adding new job %s to the cache", job_id)
1676
    self._memcache[job_id] = job
1677

    
1678
    return job
1679

    
1680
  @locking.ssynchronized(_LOCK)
1681
  @_RequireOpenQueue
1682
  def SubmitJob(self, ops):
1683
    """Create and store a new job.
1684

1685
    @see: L{_SubmitJobUnlocked}
1686

1687
    """
1688
    job_id = self._NewSerialsUnlocked(1)[0]
1689
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1690
    return job_id
1691

    
1692
  @locking.ssynchronized(_LOCK)
1693
  @_RequireOpenQueue
1694
  def SubmitManyJobs(self, jobs):
1695
    """Create and store multiple jobs.
1696

1697
    @see: L{_SubmitJobUnlocked}
1698

1699
    """
1700
    results = []
1701
    added_jobs = []
1702
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1703
    for job_id, ops in zip(all_job_ids, jobs):
1704
      try:
1705
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1706
        status = True
1707
        data = job_id
1708
      except errors.GenericError, err:
1709
        data = str(err)
1710
        status = False
1711
      results.append((status, data))
1712

    
1713
    self._EnqueueJobs(added_jobs)
1714

    
1715
    return results
1716

    
1717
  def _EnqueueJobs(self, jobs):
1718
    """Helper function to add jobs to worker pool's queue.
1719

1720
    @type jobs: list
1721
    @param jobs: List of all jobs
1722

1723
    """
1724
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1725
                             priority=[job.CalcPriority() for job in jobs])
1726

    
1727
  @_RequireOpenQueue
1728
  def UpdateJobUnlocked(self, job, replicate=True):
1729
    """Update a job's on disk storage.
1730

1731
    After a job has been modified, this function needs to be called in
1732
    order to write the changes to disk and replicate them to the other
1733
    nodes.
1734

1735
    @type job: L{_QueuedJob}
1736
    @param job: the changed job
1737
    @type replicate: boolean
1738
    @param replicate: whether to replicate the change to remote nodes
1739

1740
    """
1741
    filename = self._GetJobPath(job.id)
1742
    data = serializer.DumpJson(job.Serialize(), indent=False)
1743
    logging.debug("Writing job %s to %s", job.id, filename)
1744
    self._UpdateJobQueueFile(filename, data, replicate)
1745

    
1746
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1747
                        timeout):
1748
    """Waits for changes in a job.
1749

1750
    @type job_id: string
1751
    @param job_id: Job identifier
1752
    @type fields: list of strings
1753
    @param fields: Which fields to check for changes
1754
    @type prev_job_info: list or None
1755
    @param prev_job_info: Last job information returned
1756
    @type prev_log_serial: int
1757
    @param prev_log_serial: Last job message serial number
1758
    @type timeout: float
1759
    @param timeout: maximum time to wait in seconds
1760
    @rtype: tuple (job info, log entries)
1761
    @return: a tuple of the job information as required via
1762
        the fields parameter, and the log entries as a list
1763

1764
        if the job has not changed and the timeout has expired,
1765
        we instead return a special value,
1766
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1767
        as such by the clients
1768

1769
    """
1770
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1771

    
1772
    helper = _WaitForJobChangesHelper()
1773

    
1774
    return helper(self._GetJobPath(job_id), load_fn,
1775
                  fields, prev_job_info, prev_log_serial, timeout)
1776

    
1777
  @locking.ssynchronized(_LOCK)
1778
  @_RequireOpenQueue
1779
  def CancelJob(self, job_id):
1780
    """Cancels a job.
1781

1782
    This will only succeed if the job has not started yet.
1783

1784
    @type job_id: string
1785
    @param job_id: job ID of job to be cancelled.
1786

1787
    """
1788
    logging.info("Cancelling job %s", job_id)
1789

    
1790
    job = self._LoadJobUnlocked(job_id)
1791
    if not job:
1792
      logging.debug("Job %s not found", job_id)
1793
      return (False, "Job %s not found" % job_id)
1794

    
1795
    (success, msg) = job.Cancel()
1796

    
1797
    if success:
1798
      self.UpdateJobUnlocked(job)
1799

    
1800
    return (success, msg)
1801

    
1802
  @_RequireOpenQueue
1803
  def _ArchiveJobsUnlocked(self, jobs):
1804
    """Archives jobs.
1805

1806
    @type jobs: list of L{_QueuedJob}
1807
    @param jobs: Job objects
1808
    @rtype: int
1809
    @return: Number of archived jobs
1810

1811
    """
1812
    archive_jobs = []
1813
    rename_files = []
1814
    for job in jobs:
1815
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1816
        logging.debug("Job %s is not yet done", job.id)
1817
        continue
1818

    
1819
      archive_jobs.append(job)
1820

    
1821
      old = self._GetJobPath(job.id)
1822
      new = self._GetArchivedJobPath(job.id)
1823
      rename_files.append((old, new))
1824

    
1825
    # TODO: What if 1..n files fail to rename?
1826
    self._RenameFilesUnlocked(rename_files)
1827

    
1828
    logging.debug("Successfully archived job(s) %s",
1829
                  utils.CommaJoin(job.id for job in archive_jobs))
1830

    
1831
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1832
    # the files, we update the cached queue size from the filesystem. When we
1833
    # get around to fix the TODO: above, we can use the number of actually
1834
    # archived jobs to fix this.
1835
    self._UpdateQueueSizeUnlocked()
1836
    return len(archive_jobs)
1837

    
1838
  @locking.ssynchronized(_LOCK)
1839
  @_RequireOpenQueue
1840
  def ArchiveJob(self, job_id):
1841
    """Archives a job.
1842

1843
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1844

1845
    @type job_id: string
1846
    @param job_id: Job ID of job to be archived.
1847
    @rtype: bool
1848
    @return: Whether job was archived
1849

1850
    """
1851
    logging.info("Archiving job %s", job_id)
1852

    
1853
    job = self._LoadJobUnlocked(job_id)
1854
    if not job:
1855
      logging.debug("Job %s not found", job_id)
1856
      return False
1857

    
1858
    return self._ArchiveJobsUnlocked([job]) == 1
1859

    
1860
  @locking.ssynchronized(_LOCK)
1861
  @_RequireOpenQueue
1862
  def AutoArchiveJobs(self, age, timeout):
1863
    """Archives all jobs based on age.
1864

1865
    The method will archive all jobs which are older than the age
1866
    parameter. For jobs that don't have an end timestamp, the start
1867
    timestamp will be considered. The special '-1' age will cause
1868
    archival of all jobs (that are not running or queued).
1869

1870
    @type age: int
1871
    @param age: the minimum age in seconds
1872

1873
    """
1874
    logging.info("Archiving jobs with age more than %s seconds", age)
1875

    
1876
    now = time.time()
1877
    end_time = now + timeout
1878
    archived_count = 0
1879
    last_touched = 0
1880

    
1881
    all_job_ids = self._GetJobIDsUnlocked()
1882
    pending = []
1883
    for idx, job_id in enumerate(all_job_ids):
1884
      last_touched = idx + 1
1885

    
1886
      # Not optimal because jobs could be pending
1887
      # TODO: Measure average duration for job archival and take number of
1888
      # pending jobs into account.
1889
      if time.time() > end_time:
1890
        break
1891

    
1892
      # Returns None if the job failed to load
1893
      job = self._LoadJobUnlocked(job_id)
1894
      if job:
1895
        if job.end_timestamp is None:
1896
          if job.start_timestamp is None:
1897
            job_age = job.received_timestamp
1898
          else:
1899
            job_age = job.start_timestamp
1900
        else:
1901
          job_age = job.end_timestamp
1902

    
1903
        if age == -1 or now - job_age[0] > age:
1904
          pending.append(job)
1905

    
1906
          # Archive 10 jobs at a time
1907
          if len(pending) >= 10:
1908
            archived_count += self._ArchiveJobsUnlocked(pending)
1909
            pending = []
1910

    
1911
    if pending:
1912
      archived_count += self._ArchiveJobsUnlocked(pending)
1913

    
1914
    return (archived_count, len(all_job_ids) - last_touched)
1915

    
1916
  def QueryJobs(self, job_ids, fields):
1917
    """Returns a list of jobs in queue.
1918

1919
    @type job_ids: list
1920
    @param job_ids: sequence of job identifiers or None for all
1921
    @type fields: list
1922
    @param fields: names of fields to return
1923
    @rtype: list
1924
    @return: list one element per job, each element being list with
1925
        the requested fields
1926

1927
    """
1928
    jobs = []
1929
    list_all = False
1930
    if not job_ids:
1931
      # Since files are added to/removed from the queue atomically, there's no
1932
      # risk of getting the job ids in an inconsistent state.
1933
      job_ids = self._GetJobIDsUnlocked()
1934
      list_all = True
1935

    
1936
    for job_id in job_ids:
1937
      job = self.SafeLoadJobFromDisk(job_id)
1938
      if job is not None:
1939
        jobs.append(job.GetInfo(fields))
1940
      elif not list_all:
1941
        jobs.append(None)
1942

    
1943
    return jobs
1944

    
1945
  @locking.ssynchronized(_LOCK)
1946
  @_RequireOpenQueue
1947
  def Shutdown(self):
1948
    """Stops the job queue.
1949

1950
    This shutdowns all the worker threads an closes the queue.
1951

1952
    """
1953
    self._wpool.TerminateWorkers()
1954

    
1955
    self._queue_filelock.Close()
1956
    self._queue_filelock = None