Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6b9b18a2

History | View | Annotate | Download (58.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
    self._InitInMemory(self)
207

    
208
  @staticmethod
209
  def _InitInMemory(obj):
210
    """Initializes in-memory variables.
211

212
    """
213
    obj.ops_iter = None
214
    obj.cur_opctx = None
215

    
216
  def __repr__(self):
217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218
              "id=%s" % self.id,
219
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220

    
221
    return "<%s at %#x>" % (" ".join(status), id(self))
222

    
223
  @classmethod
224
  def Restore(cls, queue, state):
225
    """Restore a _QueuedJob from serialized state:
226

227
    @type queue: L{JobQueue}
228
    @param queue: to which queue the restored job belongs
229
    @type state: dict
230
    @param state: the serialized state
231
    @rtype: _JobQueue
232
    @return: the restored _JobQueue instance
233

234
    """
235
    obj = _QueuedJob.__new__(cls)
236
    obj.queue = queue
237
    obj.id = state["id"]
238
    obj.received_timestamp = state.get("received_timestamp", None)
239
    obj.start_timestamp = state.get("start_timestamp", None)
240
    obj.end_timestamp = state.get("end_timestamp", None)
241

    
242
    obj.ops = []
243
    obj.log_serial = 0
244
    for op_state in state["ops"]:
245
      op = _QueuedOpCode.Restore(op_state)
246
      for log_entry in op.log:
247
        obj.log_serial = max(obj.log_serial, log_entry[0])
248
      obj.ops.append(op)
249

    
250
    cls._InitInMemory(obj)
251

    
252
    return obj
253

    
254
  def Serialize(self):
255
    """Serialize the _JobQueue instance.
256

257
    @rtype: dict
258
    @return: the serialized state
259

260
    """
261
    return {
262
      "id": self.id,
263
      "ops": [op.Serialize() for op in self.ops],
264
      "start_timestamp": self.start_timestamp,
265
      "end_timestamp": self.end_timestamp,
266
      "received_timestamp": self.received_timestamp,
267
      }
268

    
269
  def CalcStatus(self):
270
    """Compute the status of this job.
271

272
    This function iterates over all the _QueuedOpCodes in the job and
273
    based on their status, computes the job status.
274

275
    The algorithm is:
276
      - if we find a cancelled, or finished with error, the job
277
        status will be the same
278
      - otherwise, the last opcode with the status one of:
279
          - waitlock
280
          - canceling
281
          - running
282

283
        will determine the job status
284

285
      - otherwise, it means either all opcodes are queued, or success,
286
        and the job status will be the same
287

288
    @return: the job status
289

290
    """
291
    status = constants.JOB_STATUS_QUEUED
292

    
293
    all_success = True
294
    for op in self.ops:
295
      if op.status == constants.OP_STATUS_SUCCESS:
296
        continue
297

    
298
      all_success = False
299

    
300
      if op.status == constants.OP_STATUS_QUEUED:
301
        pass
302
      elif op.status == constants.OP_STATUS_WAITLOCK:
303
        status = constants.JOB_STATUS_WAITLOCK
304
      elif op.status == constants.OP_STATUS_RUNNING:
305
        status = constants.JOB_STATUS_RUNNING
306
      elif op.status == constants.OP_STATUS_CANCELING:
307
        status = constants.JOB_STATUS_CANCELING
308
        break
309
      elif op.status == constants.OP_STATUS_ERROR:
310
        status = constants.JOB_STATUS_ERROR
311
        # The whole job fails if one opcode failed
312
        break
313
      elif op.status == constants.OP_STATUS_CANCELED:
314
        status = constants.OP_STATUS_CANCELED
315
        break
316

    
317
    if all_success:
318
      status = constants.JOB_STATUS_SUCCESS
319

    
320
    return status
321

    
322
  def CalcPriority(self):
323
    """Gets the current priority for this job.
324

325
    Only unfinished opcodes are considered. When all are done, the default
326
    priority is used.
327

328
    @rtype: int
329

330
    """
331
    priorities = [op.priority for op in self.ops
332
                  if op.status not in constants.OPS_FINALIZED]
333

    
334
    if not priorities:
335
      # All opcodes are done, assume default priority
336
      return constants.OP_PRIO_DEFAULT
337

    
338
    return min(priorities)
339

    
340
  def GetLogEntries(self, newer_than):
341
    """Selectively returns the log entries.
342

343
    @type newer_than: None or int
344
    @param newer_than: if this is None, return all log entries,
345
        otherwise return only the log entries with serial higher
346
        than this value
347
    @rtype: list
348
    @return: the list of the log entries selected
349

350
    """
351
    if newer_than is None:
352
      serial = -1
353
    else:
354
      serial = newer_than
355

    
356
    entries = []
357
    for op in self.ops:
358
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359

    
360
    return entries
361

    
362
  def GetInfo(self, fields):
363
    """Returns information about a job.
364

365
    @type fields: list
366
    @param fields: names of fields to return
367
    @rtype: list
368
    @return: list with one element for each field
369
    @raise errors.OpExecError: when an invalid field
370
        has been passed
371

372
    """
373
    row = []
374
    for fname in fields:
375
      if fname == "id":
376
        row.append(self.id)
377
      elif fname == "status":
378
        row.append(self.CalcStatus())
379
      elif fname == "priority":
380
        row.append(self.CalcPriority())
381
      elif fname == "ops":
382
        row.append([op.input.__getstate__() for op in self.ops])
383
      elif fname == "opresult":
384
        row.append([op.result for op in self.ops])
385
      elif fname == "opstatus":
386
        row.append([op.status for op in self.ops])
387
      elif fname == "oplog":
388
        row.append([op.log for op in self.ops])
389
      elif fname == "opstart":
390
        row.append([op.start_timestamp for op in self.ops])
391
      elif fname == "opexec":
392
        row.append([op.exec_timestamp for op in self.ops])
393
      elif fname == "opend":
394
        row.append([op.end_timestamp for op in self.ops])
395
      elif fname == "oppriority":
396
        row.append([op.priority for op in self.ops])
397
      elif fname == "received_ts":
398
        row.append(self.received_timestamp)
399
      elif fname == "start_ts":
400
        row.append(self.start_timestamp)
401
      elif fname == "end_ts":
402
        row.append(self.end_timestamp)
403
      elif fname == "summary":
404
        row.append([op.input.Summary() for op in self.ops])
405
      else:
406
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
407
    return row
408

    
409
  def MarkUnfinishedOps(self, status, result):
410
    """Mark unfinished opcodes with a given status and result.
411

412
    This is an utility function for marking all running or waiting to
413
    be run opcodes with a given status. Opcodes which are already
414
    finalised are not changed.
415

416
    @param status: a given opcode status
417
    @param result: the opcode result
418

419
    """
420
    not_marked = True
421
    for op in self.ops:
422
      if op.status in constants.OPS_FINALIZED:
423
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424
        continue
425
      op.status = status
426
      op.result = result
427
      not_marked = False
428

    
429
  def Cancel(self):
430
    """Marks job as canceled/-ing if possible.
431

432
    @rtype: tuple; (bool, string)
433
    @return: Boolean describing whether job was successfully canceled or marked
434
      as canceling and a text message
435

436
    """
437
    status = self.CalcStatus()
438

    
439
    if status == constants.JOB_STATUS_QUEUED:
440
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441
                             "Job canceled by request")
442
      return (True, "Job %s canceled" % self.id)
443

    
444
    elif status == constants.JOB_STATUS_WAITLOCK:
445
      # The worker will notice the new status and cancel the job
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447
      return (True, "Job %s will be canceled" % self.id)
448

    
449
    else:
450
      logging.debug("Job %s is no longer waiting in the queue", self.id)
451
      return (False, "Job %s is no longer waiting in the queue" % self.id)
452

    
453

    
454
class _OpExecCallbacks(mcpu.OpExecCbBase):
455
  def __init__(self, queue, job, op):
456
    """Initializes this class.
457

458
    @type queue: L{JobQueue}
459
    @param queue: Job queue
460
    @type job: L{_QueuedJob}
461
    @param job: Job object
462
    @type op: L{_QueuedOpCode}
463
    @param op: OpCode
464

465
    """
466
    assert queue, "Queue is missing"
467
    assert job, "Job is missing"
468
    assert op, "Opcode is missing"
469

    
470
    self._queue = queue
471
    self._job = job
472
    self._op = op
473

    
474
  def _CheckCancel(self):
475
    """Raises an exception to cancel the job if asked to.
476

477
    """
478
    # Cancel here if we were asked to
479
    if self._op.status == constants.OP_STATUS_CANCELING:
480
      logging.debug("Canceling opcode")
481
      raise CancelJob()
482

    
483
  @locking.ssynchronized(_QUEUE, shared=1)
484
  def NotifyStart(self):
485
    """Mark the opcode as running, not lock-waiting.
486

487
    This is called from the mcpu code as a notifier function, when the LU is
488
    finally about to start the Exec() method. Of course, to have end-user
489
    visible results, the opcode must be initially (before calling into
490
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
491

492
    """
493
    assert self._op in self._job.ops
494
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495
                               constants.OP_STATUS_CANCELING)
496

    
497
    # Cancel here if we were asked to
498
    self._CheckCancel()
499

    
500
    logging.debug("Opcode is now running")
501

    
502
    self._op.status = constants.OP_STATUS_RUNNING
503
    self._op.exec_timestamp = TimeStampNow()
504

    
505
    # And finally replicate the job status
506
    self._queue.UpdateJobUnlocked(self._job)
507

    
508
  @locking.ssynchronized(_QUEUE, shared=1)
509
  def _AppendFeedback(self, timestamp, log_type, log_msg):
510
    """Internal feedback append function, with locks
511

512
    """
513
    self._job.log_serial += 1
514
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
516

    
517
  def Feedback(self, *args):
518
    """Append a log entry.
519

520
    """
521
    assert len(args) < 3
522

    
523
    if len(args) == 1:
524
      log_type = constants.ELOG_MESSAGE
525
      log_msg = args[0]
526
    else:
527
      (log_type, log_msg) = args
528

    
529
    # The time is split to make serialization easier and not lose
530
    # precision.
531
    timestamp = utils.SplitTime(time.time())
532
    self._AppendFeedback(timestamp, log_type, log_msg)
533

    
534
  def CheckCancel(self):
535
    """Check whether job has been cancelled.
536

537
    """
538
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
539
                               constants.OP_STATUS_CANCELING)
540

    
541
    # Cancel here if we were asked to
542
    self._CheckCancel()
543

    
544

    
545
class _JobChangesChecker(object):
546
  def __init__(self, fields, prev_job_info, prev_log_serial):
547
    """Initializes this class.
548

549
    @type fields: list of strings
550
    @param fields: Fields requested by LUXI client
551
    @type prev_job_info: string
552
    @param prev_job_info: previous job info, as passed by the LUXI client
553
    @type prev_log_serial: string
554
    @param prev_log_serial: previous job serial, as passed by the LUXI client
555

556
    """
557
    self._fields = fields
558
    self._prev_job_info = prev_job_info
559
    self._prev_log_serial = prev_log_serial
560

    
561
  def __call__(self, job):
562
    """Checks whether job has changed.
563

564
    @type job: L{_QueuedJob}
565
    @param job: Job object
566

567
    """
568
    status = job.CalcStatus()
569
    job_info = job.GetInfo(self._fields)
570
    log_entries = job.GetLogEntries(self._prev_log_serial)
571

    
572
    # Serializing and deserializing data can cause type changes (e.g. from
573
    # tuple to list) or precision loss. We're doing it here so that we get
574
    # the same modifications as the data received from the client. Without
575
    # this, the comparison afterwards might fail without the data being
576
    # significantly different.
577
    # TODO: we just deserialized from disk, investigate how to make sure that
578
    # the job info and log entries are compatible to avoid this further step.
579
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
580
    # efficient, though floats will be tricky
581
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
582
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
583

    
584
    # Don't even try to wait if the job is no longer running, there will be
585
    # no changes.
586
    if (status not in (constants.JOB_STATUS_QUEUED,
587
                       constants.JOB_STATUS_RUNNING,
588
                       constants.JOB_STATUS_WAITLOCK) or
589
        job_info != self._prev_job_info or
590
        (log_entries and self._prev_log_serial != log_entries[0][0])):
591
      logging.debug("Job %s changed", job.id)
592
      return (job_info, log_entries)
593

    
594
    return None
595

    
596

    
597
class _JobFileChangesWaiter(object):
598
  def __init__(self, filename):
599
    """Initializes this class.
600

601
    @type filename: string
602
    @param filename: Path to job file
603
    @raises errors.InotifyError: if the notifier cannot be setup
604

605
    """
606
    self._wm = pyinotify.WatchManager()
607
    self._inotify_handler = \
608
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
609
    self._notifier = \
610
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
611
    try:
612
      self._inotify_handler.enable()
613
    except Exception:
614
      # pyinotify doesn't close file descriptors automatically
615
      self._notifier.stop()
616
      raise
617

    
618
  def _OnInotify(self, notifier_enabled):
619
    """Callback for inotify.
620

621
    """
622
    if not notifier_enabled:
623
      self._inotify_handler.enable()
624

    
625
  def Wait(self, timeout):
626
    """Waits for the job file to change.
627

628
    @type timeout: float
629
    @param timeout: Timeout in seconds
630
    @return: Whether there have been events
631

632
    """
633
    assert timeout >= 0
634
    have_events = self._notifier.check_events(timeout * 1000)
635
    if have_events:
636
      self._notifier.read_events()
637
    self._notifier.process_events()
638
    return have_events
639

    
640
  def Close(self):
641
    """Closes underlying notifier and its file descriptor.
642

643
    """
644
    self._notifier.stop()
645

    
646

    
647
class _JobChangesWaiter(object):
648
  def __init__(self, filename):
649
    """Initializes this class.
650

651
    @type filename: string
652
    @param filename: Path to job file
653

654
    """
655
    self._filewaiter = None
656
    self._filename = filename
657

    
658
  def Wait(self, timeout):
659
    """Waits for a job to change.
660

661
    @type timeout: float
662
    @param timeout: Timeout in seconds
663
    @return: Whether there have been events
664

665
    """
666
    if self._filewaiter:
667
      return self._filewaiter.Wait(timeout)
668

    
669
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
670
    # If this point is reached, return immediately and let caller check the job
671
    # file again in case there were changes since the last check. This avoids a
672
    # race condition.
673
    self._filewaiter = _JobFileChangesWaiter(self._filename)
674

    
675
    return True
676

    
677
  def Close(self):
678
    """Closes underlying waiter.
679

680
    """
681
    if self._filewaiter:
682
      self._filewaiter.Close()
683

    
684

    
685
class _WaitForJobChangesHelper(object):
686
  """Helper class using inotify to wait for changes in a job file.
687

688
  This class takes a previous job status and serial, and alerts the client when
689
  the current job status has changed.
690

691
  """
692
  @staticmethod
693
  def _CheckForChanges(job_load_fn, check_fn):
694
    job = job_load_fn()
695
    if not job:
696
      raise errors.JobLost()
697

    
698
    result = check_fn(job)
699
    if result is None:
700
      raise utils.RetryAgain()
701

    
702
    return result
703

    
704
  def __call__(self, filename, job_load_fn,
705
               fields, prev_job_info, prev_log_serial, timeout):
706
    """Waits for changes on a job.
707

708
    @type filename: string
709
    @param filename: File on which to wait for changes
710
    @type job_load_fn: callable
711
    @param job_load_fn: Function to load job
712
    @type fields: list of strings
713
    @param fields: Which fields to check for changes
714
    @type prev_job_info: list or None
715
    @param prev_job_info: Last job information returned
716
    @type prev_log_serial: int
717
    @param prev_log_serial: Last job message serial number
718
    @type timeout: float
719
    @param timeout: maximum time to wait in seconds
720

721
    """
722
    try:
723
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724
      waiter = _JobChangesWaiter(filename)
725
      try:
726
        return utils.Retry(compat.partial(self._CheckForChanges,
727
                                          job_load_fn, check_fn),
728
                           utils.RETRY_REMAINING_TIME, timeout,
729
                           wait_fn=waiter.Wait)
730
      finally:
731
        waiter.Close()
732
    except (errors.InotifyError, errors.JobLost):
733
      return None
734
    except utils.RetryTimeout:
735
      return constants.JOB_NOTCHANGED
736

    
737

    
738
def _EncodeOpError(err):
739
  """Encodes an error which occurred while processing an opcode.
740

741
  """
742
  if isinstance(err, errors.GenericError):
743
    to_encode = err
744
  else:
745
    to_encode = errors.OpExecError(str(err))
746

    
747
  return errors.EncodeException(to_encode)
748

    
749

    
750
class _TimeoutStrategyWrapper:
751
  def __init__(self, fn):
752
    """Initializes this class.
753

754
    """
755
    self._fn = fn
756
    self._next = None
757

    
758
  def _Advance(self):
759
    """Gets the next timeout if necessary.
760

761
    """
762
    if self._next is None:
763
      self._next = self._fn()
764

    
765
  def Peek(self):
766
    """Returns the next timeout.
767

768
    """
769
    self._Advance()
770
    return self._next
771

    
772
  def Next(self):
773
    """Returns the current timeout and advances the internal state.
774

775
    """
776
    self._Advance()
777
    result = self._next
778
    self._next = None
779
    return result
780

    
781

    
782
class _OpExecContext:
783
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784
    """Initializes this class.
785

786
    """
787
    self.op = op
788
    self.index = index
789
    self.log_prefix = log_prefix
790
    self.summary = op.input.Summary()
791

    
792
    self._timeout_strategy_factory = timeout_strategy_factory
793
    self._ResetTimeoutStrategy()
794

    
795
  def _ResetTimeoutStrategy(self):
796
    """Creates a new timeout strategy.
797

798
    """
799
    self._timeout_strategy = \
800
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801

    
802
  def CheckPriorityIncrease(self):
803
    """Checks whether priority can and should be increased.
804

805
    Called when locks couldn't be acquired.
806

807
    """
808
    op = self.op
809

    
810
    # Exhausted all retries and next round should not use blocking acquire
811
    # for locks?
812
    if (self._timeout_strategy.Peek() is None and
813
        op.priority > constants.OP_PRIO_HIGHEST):
814
      logging.debug("Increasing priority")
815
      op.priority -= 1
816
      self._ResetTimeoutStrategy()
817
      return True
818

    
819
    return False
820

    
821
  def GetNextLockTimeout(self):
822
    """Returns the next lock acquire timeout.
823

824
    """
825
    return self._timeout_strategy.Next()
826

    
827

    
828
class _JobProcessor(object):
829
  def __init__(self, queue, opexec_fn, job,
830
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831
    """Initializes this class.
832

833
    """
834
    self.queue = queue
835
    self.opexec_fn = opexec_fn
836
    self.job = job
837
    self._timeout_strategy_factory = _timeout_strategy_factory
838

    
839
  @staticmethod
840
  def _FindNextOpcode(job, timeout_strategy_factory):
841
    """Locates the next opcode to run.
842

843
    @type job: L{_QueuedJob}
844
    @param job: Job object
845
    @param timeout_strategy_factory: Callable to create new timeout strategy
846

847
    """
848
    # Create some sort of a cache to speed up locating next opcode for future
849
    # lookups
850
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
851
    # pending and one for processed ops.
852
    if job.ops_iter is None:
853
      job.ops_iter = enumerate(job.ops)
854

    
855
    # Find next opcode to run
856
    while True:
857
      try:
858
        (idx, op) = job.ops_iter.next()
859
      except StopIteration:
860
        raise errors.ProgrammerError("Called for a finished job")
861

    
862
      if op.status == constants.OP_STATUS_RUNNING:
863
        # Found an opcode already marked as running
864
        raise errors.ProgrammerError("Called for job marked as running")
865

    
866
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867
                             timeout_strategy_factory)
868

    
869
      if op.status == constants.OP_STATUS_CANCELED:
870
        # Cancelled jobs are handled by the caller
871
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872
                              for i in job.ops[idx:])
873

    
874
      elif op.status in constants.OPS_FINALIZED:
875
        # This is a job that was partially completed before master daemon
876
        # shutdown, so it can be expected that some opcodes are already
877
        # completed successfully (if any did error out, then the whole job
878
        # should have been aborted and not resubmitted for processing).
879
        logging.info("%s: opcode %s already processed, skipping",
880
                     opctx.log_prefix, opctx.summary)
881
        continue
882

    
883
      return opctx
884

    
885
  @staticmethod
886
  def _MarkWaitlock(job, op):
887
    """Marks an opcode as waiting for locks.
888

889
    The job's start timestamp is also set if necessary.
890

891
    @type job: L{_QueuedJob}
892
    @param job: Job object
893
    @type op: L{_QueuedOpCode}
894
    @param op: Opcode object
895

896
    """
897
    assert op in job.ops
898
    assert op.status in (constants.OP_STATUS_QUEUED,
899
                         constants.OP_STATUS_WAITLOCK)
900

    
901
    update = False
902

    
903
    op.result = None
904

    
905
    if op.status == constants.OP_STATUS_QUEUED:
906
      op.status = constants.OP_STATUS_WAITLOCK
907
      update = True
908

    
909
    if op.start_timestamp is None:
910
      op.start_timestamp = TimeStampNow()
911
      update = True
912

    
913
    if job.start_timestamp is None:
914
      job.start_timestamp = op.start_timestamp
915
      update = True
916

    
917
    assert op.status == constants.OP_STATUS_WAITLOCK
918

    
919
    return update
920

    
921
  def _ExecOpCodeUnlocked(self, opctx):
922
    """Processes one opcode and returns the result.
923

924
    """
925
    op = opctx.op
926

    
927
    assert op.status == constants.OP_STATUS_WAITLOCK
928

    
929
    timeout = opctx.GetNextLockTimeout()
930

    
931
    try:
932
      # Make sure not to hold queue lock while calling ExecOpCode
933
      result = self.opexec_fn(op.input,
934
                              _OpExecCallbacks(self.queue, self.job, op),
935
                              timeout=timeout, priority=op.priority)
936
    except mcpu.LockAcquireTimeout:
937
      assert timeout is not None, "Received timeout for blocking acquire"
938
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
939

    
940
      assert op.status in (constants.OP_STATUS_WAITLOCK,
941
                           constants.OP_STATUS_CANCELING)
942

    
943
      # Was job cancelled while we were waiting for the lock?
944
      if op.status == constants.OP_STATUS_CANCELING:
945
        return (constants.OP_STATUS_CANCELING, None)
946

    
947
      # Stay in waitlock while trying to re-acquire lock
948
      return (constants.OP_STATUS_WAITLOCK, None)
949
    except CancelJob:
950
      logging.exception("%s: Canceling job", opctx.log_prefix)
951
      assert op.status == constants.OP_STATUS_CANCELING
952
      return (constants.OP_STATUS_CANCELING, None)
953
    except Exception, err: # pylint: disable-msg=W0703
954
      logging.exception("%s: Caught exception in %s",
955
                        opctx.log_prefix, opctx.summary)
956
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
957
    else:
958
      logging.debug("%s: %s successful",
959
                    opctx.log_prefix, opctx.summary)
960
      return (constants.OP_STATUS_SUCCESS, result)
961

    
962
  def __call__(self, _nextop_fn=None):
963
    """Continues execution of a job.
964

965
    @param _nextop_fn: Callback function for tests
966
    @rtype: bool
967
    @return: True if job is finished, False if processor needs to be called
968
             again
969

970
    """
971
    queue = self.queue
972
    job = self.job
973

    
974
    logging.debug("Processing job %s", job.id)
975

    
976
    queue.acquire(shared=1)
977
    try:
978
      opcount = len(job.ops)
979

    
980
      # Is a previous opcode still pending?
981
      if job.cur_opctx:
982
        opctx = job.cur_opctx
983
        job.cur_opctx = None
984
      else:
985
        if __debug__ and _nextop_fn:
986
          _nextop_fn()
987
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
988

    
989
      op = opctx.op
990

    
991
      # Consistency check
992
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
993
                                     constants.OP_STATUS_CANCELING,
994
                                     constants.OP_STATUS_CANCELED)
995
                        for i in job.ops[opctx.index + 1:])
996

    
997
      assert op.status in (constants.OP_STATUS_QUEUED,
998
                           constants.OP_STATUS_WAITLOCK,
999
                           constants.OP_STATUS_CANCELING,
1000
                           constants.OP_STATUS_CANCELED)
1001

    
1002
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1003
              op.priority >= constants.OP_PRIO_HIGHEST)
1004

    
1005
      if op.status not in (constants.OP_STATUS_CANCELING,
1006
                           constants.OP_STATUS_CANCELED):
1007
        assert op.status in (constants.OP_STATUS_QUEUED,
1008
                             constants.OP_STATUS_WAITLOCK)
1009

    
1010
        # Prepare to start opcode
1011
        if self._MarkWaitlock(job, op):
1012
          # Write to disk
1013
          queue.UpdateJobUnlocked(job)
1014

    
1015
        assert op.status == constants.OP_STATUS_WAITLOCK
1016
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1017
        assert job.start_timestamp and op.start_timestamp
1018

    
1019
        logging.info("%s: opcode %s waiting for locks",
1020
                     opctx.log_prefix, opctx.summary)
1021

    
1022
        queue.release()
1023
        try:
1024
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1025
        finally:
1026
          queue.acquire(shared=1)
1027

    
1028
        op.status = op_status
1029
        op.result = op_result
1030

    
1031
        if op.status == constants.OP_STATUS_WAITLOCK:
1032
          # Couldn't get locks in time
1033
          assert not op.end_timestamp
1034
        else:
1035
          # Finalize opcode
1036
          op.end_timestamp = TimeStampNow()
1037

    
1038
          if op.status == constants.OP_STATUS_CANCELING:
1039
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1040
                                  for i in job.ops[opctx.index:])
1041
          else:
1042
            assert op.status in constants.OPS_FINALIZED
1043

    
1044
      if op.status == constants.OP_STATUS_WAITLOCK:
1045
        finalize = False
1046

    
1047
        if opctx.CheckPriorityIncrease():
1048
          # Priority was changed, need to update on-disk file
1049
          queue.UpdateJobUnlocked(job)
1050

    
1051
        # Keep around for another round
1052
        job.cur_opctx = opctx
1053

    
1054
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1055
                op.priority >= constants.OP_PRIO_HIGHEST)
1056

    
1057
        # In no case must the status be finalized here
1058
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1059

    
1060
      else:
1061
        # Ensure all opcodes so far have been successful
1062
        assert (opctx.index == 0 or
1063
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1064
                           for i in job.ops[:opctx.index]))
1065

    
1066
        # Reset context
1067
        job.cur_opctx = None
1068

    
1069
        if op.status == constants.OP_STATUS_SUCCESS:
1070
          finalize = False
1071

    
1072
        elif op.status == constants.OP_STATUS_ERROR:
1073
          # Ensure failed opcode has an exception as its result
1074
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1075

    
1076
          to_encode = errors.OpExecError("Preceding opcode failed")
1077
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1078
                                _EncodeOpError(to_encode))
1079
          finalize = True
1080

    
1081
          # Consistency check
1082
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1083
                            errors.GetEncodedError(i.result)
1084
                            for i in job.ops[opctx.index:])
1085

    
1086
        elif op.status == constants.OP_STATUS_CANCELING:
1087
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1088
                                "Job canceled by request")
1089
          finalize = True
1090

    
1091
        elif op.status == constants.OP_STATUS_CANCELED:
1092
          finalize = True
1093

    
1094
        else:
1095
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1096

    
1097
        # Finalizing or last opcode?
1098
        if finalize or opctx.index == (opcount - 1):
1099
          # All opcodes have been run, finalize job
1100
          job.end_timestamp = TimeStampNow()
1101

    
1102
        # Write to disk. If the job status is final, this is the final write
1103
        # allowed. Once the file has been written, it can be archived anytime.
1104
        queue.UpdateJobUnlocked(job)
1105

    
1106
        if finalize or opctx.index == (opcount - 1):
1107
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1108
          return True
1109

    
1110
      return False
1111
    finally:
1112
      queue.release()
1113

    
1114

    
1115
class _JobQueueWorker(workerpool.BaseWorker):
1116
  """The actual job workers.
1117

1118
  """
1119
  def RunTask(self, job): # pylint: disable-msg=W0221
1120
    """Job executor.
1121

1122
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1123
    L{_QueuedOpCode} classes.
1124

1125
    @type job: L{_QueuedJob}
1126
    @param job: the job to be processed
1127

1128
    """
1129
    queue = job.queue
1130
    assert queue == self.pool.queue
1131

    
1132
    self.SetTaskName("Job%s" % job.id)
1133

    
1134
    proc = mcpu.Processor(queue.context, job.id)
1135

    
1136
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1137
      # Schedule again
1138
      raise workerpool.DeferTask(priority=job.CalcPriority())
1139

    
1140

    
1141
class _JobQueueWorkerPool(workerpool.WorkerPool):
1142
  """Simple class implementing a job-processing workerpool.
1143

1144
  """
1145
  def __init__(self, queue):
1146
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1147
                                              JOBQUEUE_THREADS,
1148
                                              _JobQueueWorker)
1149
    self.queue = queue
1150

    
1151

    
1152
def _RequireOpenQueue(fn):
1153
  """Decorator for "public" functions.
1154

1155
  This function should be used for all 'public' functions. That is,
1156
  functions usually called from other classes. Note that this should
1157
  be applied only to methods (not plain functions), since it expects
1158
  that the decorated function is called with a first argument that has
1159
  a '_queue_filelock' argument.
1160

1161
  @warning: Use this decorator only after locking.ssynchronized
1162

1163
  Example::
1164
    @locking.ssynchronized(_LOCK)
1165
    @_RequireOpenQueue
1166
    def Example(self):
1167
      pass
1168

1169
  """
1170
  def wrapper(self, *args, **kwargs):
1171
    # pylint: disable-msg=W0212
1172
    assert self._queue_filelock is not None, "Queue should be open"
1173
    return fn(self, *args, **kwargs)
1174
  return wrapper
1175

    
1176

    
1177
class JobQueue(object):
1178
  """Queue used to manage the jobs.
1179

1180
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1181

1182
  """
1183
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1184

    
1185
  def __init__(self, context):
1186
    """Constructor for JobQueue.
1187

1188
    The constructor will initialize the job queue object and then
1189
    start loading the current jobs from disk, either for starting them
1190
    (if they were queue) or for aborting them (if they were already
1191
    running).
1192

1193
    @type context: GanetiContext
1194
    @param context: the context object for access to the configuration
1195
        data and other ganeti objects
1196

1197
    """
1198
    self.context = context
1199
    self._memcache = weakref.WeakValueDictionary()
1200
    self._my_hostname = netutils.Hostname.GetSysName()
1201

    
1202
    # The Big JobQueue lock. If a code block or method acquires it in shared
1203
    # mode safe it must guarantee concurrency with all the code acquiring it in
1204
    # shared mode, including itself. In order not to acquire it at all
1205
    # concurrency must be guaranteed with all code acquiring it in shared mode
1206
    # and all code acquiring it exclusively.
1207
    self._lock = locking.SharedLock("JobQueue")
1208

    
1209
    self.acquire = self._lock.acquire
1210
    self.release = self._lock.release
1211

    
1212
    # Initialize the queue, and acquire the filelock.
1213
    # This ensures no other process is working on the job queue.
1214
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1215

    
1216
    # Read serial file
1217
    self._last_serial = jstore.ReadSerial()
1218
    assert self._last_serial is not None, ("Serial file was modified between"
1219
                                           " check in jstore and here")
1220

    
1221
    # Get initial list of nodes
1222
    self._nodes = dict((n.name, n.primary_ip)
1223
                       for n in self.context.cfg.GetAllNodesInfo().values()
1224
                       if n.master_candidate)
1225

    
1226
    # Remove master node
1227
    self._nodes.pop(self._my_hostname, None)
1228

    
1229
    # TODO: Check consistency across nodes
1230

    
1231
    self._queue_size = 0
1232
    self._UpdateQueueSizeUnlocked()
1233
    self._drained = self._IsQueueMarkedDrain()
1234

    
1235
    # Setup worker pool
1236
    self._wpool = _JobQueueWorkerPool(self)
1237
    try:
1238
      self._InspectQueue()
1239
    except:
1240
      self._wpool.TerminateWorkers()
1241
      raise
1242

    
1243
  @locking.ssynchronized(_LOCK)
1244
  @_RequireOpenQueue
1245
  def _InspectQueue(self):
1246
    """Loads the whole job queue and resumes unfinished jobs.
1247

1248
    This function needs the lock here because WorkerPool.AddTask() may start a
1249
    job while we're still doing our work.
1250

1251
    """
1252
    logging.info("Inspecting job queue")
1253

    
1254
    restartjobs = []
1255

    
1256
    all_job_ids = self._GetJobIDsUnlocked()
1257
    jobs_count = len(all_job_ids)
1258
    lastinfo = time.time()
1259
    for idx, job_id in enumerate(all_job_ids):
1260
      # Give an update every 1000 jobs or 10 seconds
1261
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1262
          idx == (jobs_count - 1)):
1263
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1264
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1265
        lastinfo = time.time()
1266

    
1267
      job = self._LoadJobUnlocked(job_id)
1268

    
1269
      # a failure in loading the job can cause 'None' to be returned
1270
      if job is None:
1271
        continue
1272

    
1273
      status = job.CalcStatus()
1274

    
1275
      if status == constants.JOB_STATUS_QUEUED:
1276
        restartjobs.append(job)
1277

    
1278
      elif status in (constants.JOB_STATUS_RUNNING,
1279
                      constants.JOB_STATUS_WAITLOCK,
1280
                      constants.JOB_STATUS_CANCELING):
1281
        logging.warning("Unfinished job %s found: %s", job.id, job)
1282

    
1283
        if status == constants.JOB_STATUS_WAITLOCK:
1284
          # Restart job
1285
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1286
          restartjobs.append(job)
1287
        else:
1288
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1289
                                "Unclean master daemon shutdown")
1290

    
1291
        self.UpdateJobUnlocked(job)
1292

    
1293
    if restartjobs:
1294
      logging.info("Restarting %s jobs", len(restartjobs))
1295
      self._EnqueueJobs(restartjobs)
1296

    
1297
    logging.info("Job queue inspection finished")
1298

    
1299
  @locking.ssynchronized(_LOCK)
1300
  @_RequireOpenQueue
1301
  def AddNode(self, node):
1302
    """Register a new node with the queue.
1303

1304
    @type node: L{objects.Node}
1305
    @param node: the node object to be added
1306

1307
    """
1308
    node_name = node.name
1309
    assert node_name != self._my_hostname
1310

    
1311
    # Clean queue directory on added node
1312
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1313
    msg = result.fail_msg
1314
    if msg:
1315
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1316
                      node_name, msg)
1317

    
1318
    if not node.master_candidate:
1319
      # remove if existing, ignoring errors
1320
      self._nodes.pop(node_name, None)
1321
      # and skip the replication of the job ids
1322
      return
1323

    
1324
    # Upload the whole queue excluding archived jobs
1325
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1326

    
1327
    # Upload current serial file
1328
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1329

    
1330
    for file_name in files:
1331
      # Read file content
1332
      content = utils.ReadFile(file_name)
1333

    
1334
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1335
                                                  [node.primary_ip],
1336
                                                  file_name, content)
1337
      msg = result[node_name].fail_msg
1338
      if msg:
1339
        logging.error("Failed to upload file %s to node %s: %s",
1340
                      file_name, node_name, msg)
1341

    
1342
    self._nodes[node_name] = node.primary_ip
1343

    
1344
  @locking.ssynchronized(_LOCK)
1345
  @_RequireOpenQueue
1346
  def RemoveNode(self, node_name):
1347
    """Callback called when removing nodes from the cluster.
1348

1349
    @type node_name: str
1350
    @param node_name: the name of the node to remove
1351

1352
    """
1353
    self._nodes.pop(node_name, None)
1354

    
1355
  @staticmethod
1356
  def _CheckRpcResult(result, nodes, failmsg):
1357
    """Verifies the status of an RPC call.
1358

1359
    Since we aim to keep consistency should this node (the current
1360
    master) fail, we will log errors if our rpc fail, and especially
1361
    log the case when more than half of the nodes fails.
1362

1363
    @param result: the data as returned from the rpc call
1364
    @type nodes: list
1365
    @param nodes: the list of nodes we made the call to
1366
    @type failmsg: str
1367
    @param failmsg: the identifier to be used for logging
1368

1369
    """
1370
    failed = []
1371
    success = []
1372

    
1373
    for node in nodes:
1374
      msg = result[node].fail_msg
1375
      if msg:
1376
        failed.append(node)
1377
        logging.error("RPC call %s (%s) failed on node %s: %s",
1378
                      result[node].call, failmsg, node, msg)
1379
      else:
1380
        success.append(node)
1381

    
1382
    # +1 for the master node
1383
    if (len(success) + 1) < len(failed):
1384
      # TODO: Handle failing nodes
1385
      logging.error("More than half of the nodes failed")
1386

    
1387
  def _GetNodeIp(self):
1388
    """Helper for returning the node name/ip list.
1389

1390
    @rtype: (list, list)
1391
    @return: a tuple of two lists, the first one with the node
1392
        names and the second one with the node addresses
1393

1394
    """
1395
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1396
    name_list = self._nodes.keys()
1397
    addr_list = [self._nodes[name] for name in name_list]
1398
    return name_list, addr_list
1399

    
1400
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1401
    """Writes a file locally and then replicates it to all nodes.
1402

1403
    This function will replace the contents of a file on the local
1404
    node and then replicate it to all the other nodes we have.
1405

1406
    @type file_name: str
1407
    @param file_name: the path of the file to be replicated
1408
    @type data: str
1409
    @param data: the new contents of the file
1410
    @type replicate: boolean
1411
    @param replicate: whether to spread the changes to the remote nodes
1412

1413
    """
1414
    getents = runtime.GetEnts()
1415
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1416
                    gid=getents.masterd_gid)
1417

    
1418
    if replicate:
1419
      names, addrs = self._GetNodeIp()
1420
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1421
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1422

    
1423
  def _RenameFilesUnlocked(self, rename):
1424
    """Renames a file locally and then replicate the change.
1425

1426
    This function will rename a file in the local queue directory
1427
    and then replicate this rename to all the other nodes we have.
1428

1429
    @type rename: list of (old, new)
1430
    @param rename: List containing tuples mapping old to new names
1431

1432
    """
1433
    # Rename them locally
1434
    for old, new in rename:
1435
      utils.RenameFile(old, new, mkdir=True)
1436

    
1437
    # ... and on all nodes
1438
    names, addrs = self._GetNodeIp()
1439
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1440
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1441

    
1442
  @staticmethod
1443
  def _FormatJobID(job_id):
1444
    """Convert a job ID to string format.
1445

1446
    Currently this just does C{str(job_id)} after performing some
1447
    checks, but if we want to change the job id format this will
1448
    abstract this change.
1449

1450
    @type job_id: int or long
1451
    @param job_id: the numeric job id
1452
    @rtype: str
1453
    @return: the formatted job id
1454

1455
    """
1456
    if not isinstance(job_id, (int, long)):
1457
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1458
    if job_id < 0:
1459
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1460

    
1461
    return str(job_id)
1462

    
1463
  @classmethod
1464
  def _GetArchiveDirectory(cls, job_id):
1465
    """Returns the archive directory for a job.
1466

1467
    @type job_id: str
1468
    @param job_id: Job identifier
1469
    @rtype: str
1470
    @return: Directory name
1471

1472
    """
1473
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1474

    
1475
  def _NewSerialsUnlocked(self, count):
1476
    """Generates a new job identifier.
1477

1478
    Job identifiers are unique during the lifetime of a cluster.
1479

1480
    @type count: integer
1481
    @param count: how many serials to return
1482
    @rtype: str
1483
    @return: a string representing the job identifier.
1484

1485
    """
1486
    assert count > 0
1487
    # New number
1488
    serial = self._last_serial + count
1489

    
1490
    # Write to file
1491
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1492
                             "%s\n" % serial, True)
1493

    
1494
    result = [self._FormatJobID(v)
1495
              for v in range(self._last_serial, serial + 1)]
1496
    # Keep it only if we were able to write the file
1497
    self._last_serial = serial
1498

    
1499
    return result
1500

    
1501
  @staticmethod
1502
  def _GetJobPath(job_id):
1503
    """Returns the job file for a given job id.
1504

1505
    @type job_id: str
1506
    @param job_id: the job identifier
1507
    @rtype: str
1508
    @return: the path to the job file
1509

1510
    """
1511
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1512

    
1513
  @classmethod
1514
  def _GetArchivedJobPath(cls, job_id):
1515
    """Returns the archived job file for a give job id.
1516

1517
    @type job_id: str
1518
    @param job_id: the job identifier
1519
    @rtype: str
1520
    @return: the path to the archived job file
1521

1522
    """
1523
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1524
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1525

    
1526
  def _GetJobIDsUnlocked(self, sort=True):
1527
    """Return all known job IDs.
1528

1529
    The method only looks at disk because it's a requirement that all
1530
    jobs are present on disk (so in the _memcache we don't have any
1531
    extra IDs).
1532

1533
    @type sort: boolean
1534
    @param sort: perform sorting on the returned job ids
1535
    @rtype: list
1536
    @return: the list of job IDs
1537

1538
    """
1539
    jlist = []
1540
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1541
      m = self._RE_JOB_FILE.match(filename)
1542
      if m:
1543
        jlist.append(m.group(1))
1544
    if sort:
1545
      jlist = utils.NiceSort(jlist)
1546
    return jlist
1547

    
1548
  def _LoadJobUnlocked(self, job_id):
1549
    """Loads a job from the disk or memory.
1550

1551
    Given a job id, this will return the cached job object if
1552
    existing, or try to load the job from the disk. If loading from
1553
    disk, it will also add the job to the cache.
1554

1555
    @param job_id: the job id
1556
    @rtype: L{_QueuedJob} or None
1557
    @return: either None or the job object
1558

1559
    """
1560
    job = self._memcache.get(job_id, None)
1561
    if job:
1562
      logging.debug("Found job %s in memcache", job_id)
1563
      return job
1564

    
1565
    try:
1566
      job = self._LoadJobFromDisk(job_id)
1567
      if job is None:
1568
        return job
1569
    except errors.JobFileCorrupted:
1570
      old_path = self._GetJobPath(job_id)
1571
      new_path = self._GetArchivedJobPath(job_id)
1572
      if old_path == new_path:
1573
        # job already archived (future case)
1574
        logging.exception("Can't parse job %s", job_id)
1575
      else:
1576
        # non-archived case
1577
        logging.exception("Can't parse job %s, will archive.", job_id)
1578
        self._RenameFilesUnlocked([(old_path, new_path)])
1579
      return None
1580

    
1581
    self._memcache[job_id] = job
1582
    logging.debug("Added job %s to the cache", job_id)
1583
    return job
1584

    
1585
  def _LoadJobFromDisk(self, job_id):
1586
    """Load the given job file from disk.
1587

1588
    Given a job file, read, load and restore it in a _QueuedJob format.
1589

1590
    @type job_id: string
1591
    @param job_id: job identifier
1592
    @rtype: L{_QueuedJob} or None
1593
    @return: either None or the job object
1594

1595
    """
1596
    filepath = self._GetJobPath(job_id)
1597
    logging.debug("Loading job from %s", filepath)
1598
    try:
1599
      raw_data = utils.ReadFile(filepath)
1600
    except EnvironmentError, err:
1601
      if err.errno in (errno.ENOENT, ):
1602
        return None
1603
      raise
1604

    
1605
    try:
1606
      data = serializer.LoadJson(raw_data)
1607
      job = _QueuedJob.Restore(self, data)
1608
    except Exception, err: # pylint: disable-msg=W0703
1609
      raise errors.JobFileCorrupted(err)
1610

    
1611
    return job
1612

    
1613
  def SafeLoadJobFromDisk(self, job_id):
1614
    """Load the given job file from disk.
1615

1616
    Given a job file, read, load and restore it in a _QueuedJob format.
1617
    In case of error reading the job, it gets returned as None, and the
1618
    exception is logged.
1619

1620
    @type job_id: string
1621
    @param job_id: job identifier
1622
    @rtype: L{_QueuedJob} or None
1623
    @return: either None or the job object
1624

1625
    """
1626
    try:
1627
      return self._LoadJobFromDisk(job_id)
1628
    except (errors.JobFileCorrupted, EnvironmentError):
1629
      logging.exception("Can't load/parse job %s", job_id)
1630
      return None
1631

    
1632
  @staticmethod
1633
  def _IsQueueMarkedDrain():
1634
    """Check if the queue is marked from drain.
1635

1636
    This currently uses the queue drain file, which makes it a
1637
    per-node flag. In the future this can be moved to the config file.
1638

1639
    @rtype: boolean
1640
    @return: True of the job queue is marked for draining
1641

1642
    """
1643
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1644

    
1645
  def _UpdateQueueSizeUnlocked(self):
1646
    """Update the queue size.
1647

1648
    """
1649
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1650

    
1651
  @locking.ssynchronized(_LOCK)
1652
  @_RequireOpenQueue
1653
  def SetDrainFlag(self, drain_flag):
1654
    """Sets the drain flag for the queue.
1655

1656
    @type drain_flag: boolean
1657
    @param drain_flag: Whether to set or unset the drain flag
1658

1659
    """
1660
    getents = runtime.GetEnts()
1661

    
1662
    if drain_flag:
1663
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1664
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1665
    else:
1666
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1667

    
1668
    self._drained = drain_flag
1669

    
1670
    return True
1671

    
1672
  @_RequireOpenQueue
1673
  def _SubmitJobUnlocked(self, job_id, ops):
1674
    """Create and store a new job.
1675

1676
    This enters the job into our job queue and also puts it on the new
1677
    queue, in order for it to be picked up by the queue processors.
1678

1679
    @type job_id: job ID
1680
    @param job_id: the job ID for the new job
1681
    @type ops: list
1682
    @param ops: The list of OpCodes that will become the new job.
1683
    @rtype: L{_QueuedJob}
1684
    @return: the job object to be queued
1685
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1686
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1687
    @raise errors.GenericError: If an opcode is not valid
1688

1689
    """
1690
    # Ok when sharing the big job queue lock, as the drain file is created when
1691
    # the lock is exclusive.
1692
    if self._drained:
1693
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1694

    
1695
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1696
      raise errors.JobQueueFull()
1697

    
1698
    job = _QueuedJob(self, job_id, ops)
1699

    
1700
    # Check priority
1701
    for idx, op in enumerate(job.ops):
1702
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1703
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1704
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1705
                                  " are %s" % (idx, op.priority, allowed))
1706

    
1707
    # Write to disk
1708
    self.UpdateJobUnlocked(job)
1709

    
1710
    self._queue_size += 1
1711

    
1712
    logging.debug("Adding new job %s to the cache", job_id)
1713
    self._memcache[job_id] = job
1714

    
1715
    return job
1716

    
1717
  @locking.ssynchronized(_LOCK)
1718
  @_RequireOpenQueue
1719
  def SubmitJob(self, ops):
1720
    """Create and store a new job.
1721

1722
    @see: L{_SubmitJobUnlocked}
1723

1724
    """
1725
    job_id = self._NewSerialsUnlocked(1)[0]
1726
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1727
    return job_id
1728

    
1729
  @locking.ssynchronized(_LOCK)
1730
  @_RequireOpenQueue
1731
  def SubmitManyJobs(self, jobs):
1732
    """Create and store multiple jobs.
1733

1734
    @see: L{_SubmitJobUnlocked}
1735

1736
    """
1737
    results = []
1738
    added_jobs = []
1739
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1740
    for job_id, ops in zip(all_job_ids, jobs):
1741
      try:
1742
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1743
        status = True
1744
        data = job_id
1745
      except errors.GenericError, err:
1746
        data = str(err)
1747
        status = False
1748
      results.append((status, data))
1749

    
1750
    self._EnqueueJobs(added_jobs)
1751

    
1752
    return results
1753

    
1754
  def _EnqueueJobs(self, jobs):
1755
    """Helper function to add jobs to worker pool's queue.
1756

1757
    @type jobs: list
1758
    @param jobs: List of all jobs
1759

1760
    """
1761
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1762
                             priority=[job.CalcPriority() for job in jobs])
1763

    
1764
  @_RequireOpenQueue
1765
  def UpdateJobUnlocked(self, job, replicate=True):
1766
    """Update a job's on disk storage.
1767

1768
    After a job has been modified, this function needs to be called in
1769
    order to write the changes to disk and replicate them to the other
1770
    nodes.
1771

1772
    @type job: L{_QueuedJob}
1773
    @param job: the changed job
1774
    @type replicate: boolean
1775
    @param replicate: whether to replicate the change to remote nodes
1776

1777
    """
1778
    filename = self._GetJobPath(job.id)
1779
    data = serializer.DumpJson(job.Serialize(), indent=False)
1780
    logging.debug("Writing job %s to %s", job.id, filename)
1781
    self._UpdateJobQueueFile(filename, data, replicate)
1782

    
1783
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1784
                        timeout):
1785
    """Waits for changes in a job.
1786

1787
    @type job_id: string
1788
    @param job_id: Job identifier
1789
    @type fields: list of strings
1790
    @param fields: Which fields to check for changes
1791
    @type prev_job_info: list or None
1792
    @param prev_job_info: Last job information returned
1793
    @type prev_log_serial: int
1794
    @param prev_log_serial: Last job message serial number
1795
    @type timeout: float
1796
    @param timeout: maximum time to wait in seconds
1797
    @rtype: tuple (job info, log entries)
1798
    @return: a tuple of the job information as required via
1799
        the fields parameter, and the log entries as a list
1800

1801
        if the job has not changed and the timeout has expired,
1802
        we instead return a special value,
1803
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1804
        as such by the clients
1805

1806
    """
1807
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1808

    
1809
    helper = _WaitForJobChangesHelper()
1810

    
1811
    return helper(self._GetJobPath(job_id), load_fn,
1812
                  fields, prev_job_info, prev_log_serial, timeout)
1813

    
1814
  @locking.ssynchronized(_LOCK)
1815
  @_RequireOpenQueue
1816
  def CancelJob(self, job_id):
1817
    """Cancels a job.
1818

1819
    This will only succeed if the job has not started yet.
1820

1821
    @type job_id: string
1822
    @param job_id: job ID of job to be cancelled.
1823

1824
    """
1825
    logging.info("Cancelling job %s", job_id)
1826

    
1827
    job = self._LoadJobUnlocked(job_id)
1828
    if not job:
1829
      logging.debug("Job %s not found", job_id)
1830
      return (False, "Job %s not found" % job_id)
1831

    
1832
    (success, msg) = job.Cancel()
1833

    
1834
    if success:
1835
      self.UpdateJobUnlocked(job)
1836

    
1837
    return (success, msg)
1838

    
1839
  @_RequireOpenQueue
1840
  def _ArchiveJobsUnlocked(self, jobs):
1841
    """Archives jobs.
1842

1843
    @type jobs: list of L{_QueuedJob}
1844
    @param jobs: Job objects
1845
    @rtype: int
1846
    @return: Number of archived jobs
1847

1848
    """
1849
    archive_jobs = []
1850
    rename_files = []
1851
    for job in jobs:
1852
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1853
        logging.debug("Job %s is not yet done", job.id)
1854
        continue
1855

    
1856
      archive_jobs.append(job)
1857

    
1858
      old = self._GetJobPath(job.id)
1859
      new = self._GetArchivedJobPath(job.id)
1860
      rename_files.append((old, new))
1861

    
1862
    # TODO: What if 1..n files fail to rename?
1863
    self._RenameFilesUnlocked(rename_files)
1864

    
1865
    logging.debug("Successfully archived job(s) %s",
1866
                  utils.CommaJoin(job.id for job in archive_jobs))
1867

    
1868
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1869
    # the files, we update the cached queue size from the filesystem. When we
1870
    # get around to fix the TODO: above, we can use the number of actually
1871
    # archived jobs to fix this.
1872
    self._UpdateQueueSizeUnlocked()
1873
    return len(archive_jobs)
1874

    
1875
  @locking.ssynchronized(_LOCK)
1876
  @_RequireOpenQueue
1877
  def ArchiveJob(self, job_id):
1878
    """Archives a job.
1879

1880
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1881

1882
    @type job_id: string
1883
    @param job_id: Job ID of job to be archived.
1884
    @rtype: bool
1885
    @return: Whether job was archived
1886

1887
    """
1888
    logging.info("Archiving job %s", job_id)
1889

    
1890
    job = self._LoadJobUnlocked(job_id)
1891
    if not job:
1892
      logging.debug("Job %s not found", job_id)
1893
      return False
1894

    
1895
    return self._ArchiveJobsUnlocked([job]) == 1
1896

    
1897
  @locking.ssynchronized(_LOCK)
1898
  @_RequireOpenQueue
1899
  def AutoArchiveJobs(self, age, timeout):
1900
    """Archives all jobs based on age.
1901

1902
    The method will archive all jobs which are older than the age
1903
    parameter. For jobs that don't have an end timestamp, the start
1904
    timestamp will be considered. The special '-1' age will cause
1905
    archival of all jobs (that are not running or queued).
1906

1907
    @type age: int
1908
    @param age: the minimum age in seconds
1909

1910
    """
1911
    logging.info("Archiving jobs with age more than %s seconds", age)
1912

    
1913
    now = time.time()
1914
    end_time = now + timeout
1915
    archived_count = 0
1916
    last_touched = 0
1917

    
1918
    all_job_ids = self._GetJobIDsUnlocked()
1919
    pending = []
1920
    for idx, job_id in enumerate(all_job_ids):
1921
      last_touched = idx + 1
1922

    
1923
      # Not optimal because jobs could be pending
1924
      # TODO: Measure average duration for job archival and take number of
1925
      # pending jobs into account.
1926
      if time.time() > end_time:
1927
        break
1928

    
1929
      # Returns None if the job failed to load
1930
      job = self._LoadJobUnlocked(job_id)
1931
      if job:
1932
        if job.end_timestamp is None:
1933
          if job.start_timestamp is None:
1934
            job_age = job.received_timestamp
1935
          else:
1936
            job_age = job.start_timestamp
1937
        else:
1938
          job_age = job.end_timestamp
1939

    
1940
        if age == -1 or now - job_age[0] > age:
1941
          pending.append(job)
1942

    
1943
          # Archive 10 jobs at a time
1944
          if len(pending) >= 10:
1945
            archived_count += self._ArchiveJobsUnlocked(pending)
1946
            pending = []
1947

    
1948
    if pending:
1949
      archived_count += self._ArchiveJobsUnlocked(pending)
1950

    
1951
    return (archived_count, len(all_job_ids) - last_touched)
1952

    
1953
  def QueryJobs(self, job_ids, fields):
1954
    """Returns a list of jobs in queue.
1955

1956
    @type job_ids: list
1957
    @param job_ids: sequence of job identifiers or None for all
1958
    @type fields: list
1959
    @param fields: names of fields to return
1960
    @rtype: list
1961
    @return: list one element per job, each element being list with
1962
        the requested fields
1963

1964
    """
1965
    jobs = []
1966
    list_all = False
1967
    if not job_ids:
1968
      # Since files are added to/removed from the queue atomically, there's no
1969
      # risk of getting the job ids in an inconsistent state.
1970
      job_ids = self._GetJobIDsUnlocked()
1971
      list_all = True
1972

    
1973
    for job_id in job_ids:
1974
      job = self.SafeLoadJobFromDisk(job_id)
1975
      if job is not None:
1976
        jobs.append(job.GetInfo(fields))
1977
      elif not list_all:
1978
        jobs.append(None)
1979

    
1980
    return jobs
1981

    
1982
  @locking.ssynchronized(_LOCK)
1983
  @_RequireOpenQueue
1984
  def Shutdown(self):
1985
    """Stops the job queue.
1986

1987
    This shutdowns all the worker threads an closes the queue.
1988

1989
    """
1990
    self._wpool.TerminateWorkers()
1991

    
1992
    self._queue_filelock.Close()
1993
    self._queue_filelock = None