Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 66bd7445

History | View | Annotate | Download (58.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
    self._InitInMemory(self)
207

    
208
  @staticmethod
209
  def _InitInMemory(obj):
210
    """Initializes in-memory variables.
211

212
    """
213
    obj.ops_iter = None
214
    obj.cur_opctx = None
215

    
216
  def __repr__(self):
217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218
              "id=%s" % self.id,
219
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220

    
221
    return "<%s at %#x>" % (" ".join(status), id(self))
222

    
223
  @classmethod
224
  def Restore(cls, queue, state):
225
    """Restore a _QueuedJob from serialized state:
226

227
    @type queue: L{JobQueue}
228
    @param queue: to which queue the restored job belongs
229
    @type state: dict
230
    @param state: the serialized state
231
    @rtype: _JobQueue
232
    @return: the restored _JobQueue instance
233

234
    """
235
    obj = _QueuedJob.__new__(cls)
236
    obj.queue = queue
237
    obj.id = state["id"]
238
    obj.received_timestamp = state.get("received_timestamp", None)
239
    obj.start_timestamp = state.get("start_timestamp", None)
240
    obj.end_timestamp = state.get("end_timestamp", None)
241

    
242
    obj.ops = []
243
    obj.log_serial = 0
244
    for op_state in state["ops"]:
245
      op = _QueuedOpCode.Restore(op_state)
246
      for log_entry in op.log:
247
        obj.log_serial = max(obj.log_serial, log_entry[0])
248
      obj.ops.append(op)
249

    
250
    cls._InitInMemory(obj)
251

    
252
    return obj
253

    
254
  def Serialize(self):
255
    """Serialize the _JobQueue instance.
256

257
    @rtype: dict
258
    @return: the serialized state
259

260
    """
261
    return {
262
      "id": self.id,
263
      "ops": [op.Serialize() for op in self.ops],
264
      "start_timestamp": self.start_timestamp,
265
      "end_timestamp": self.end_timestamp,
266
      "received_timestamp": self.received_timestamp,
267
      }
268

    
269
  def CalcStatus(self):
270
    """Compute the status of this job.
271

272
    This function iterates over all the _QueuedOpCodes in the job and
273
    based on their status, computes the job status.
274

275
    The algorithm is:
276
      - if we find a cancelled, or finished with error, the job
277
        status will be the same
278
      - otherwise, the last opcode with the status one of:
279
          - waitlock
280
          - canceling
281
          - running
282

283
        will determine the job status
284

285
      - otherwise, it means either all opcodes are queued, or success,
286
        and the job status will be the same
287

288
    @return: the job status
289

290
    """
291
    status = constants.JOB_STATUS_QUEUED
292

    
293
    all_success = True
294
    for op in self.ops:
295
      if op.status == constants.OP_STATUS_SUCCESS:
296
        continue
297

    
298
      all_success = False
299

    
300
      if op.status == constants.OP_STATUS_QUEUED:
301
        pass
302
      elif op.status == constants.OP_STATUS_WAITLOCK:
303
        status = constants.JOB_STATUS_WAITLOCK
304
      elif op.status == constants.OP_STATUS_RUNNING:
305
        status = constants.JOB_STATUS_RUNNING
306
      elif op.status == constants.OP_STATUS_CANCELING:
307
        status = constants.JOB_STATUS_CANCELING
308
        break
309
      elif op.status == constants.OP_STATUS_ERROR:
310
        status = constants.JOB_STATUS_ERROR
311
        # The whole job fails if one opcode failed
312
        break
313
      elif op.status == constants.OP_STATUS_CANCELED:
314
        status = constants.OP_STATUS_CANCELED
315
        break
316

    
317
    if all_success:
318
      status = constants.JOB_STATUS_SUCCESS
319

    
320
    return status
321

    
322
  def CalcPriority(self):
323
    """Gets the current priority for this job.
324

325
    Only unfinished opcodes are considered. When all are done, the default
326
    priority is used.
327

328
    @rtype: int
329

330
    """
331
    priorities = [op.priority for op in self.ops
332
                  if op.status not in constants.OPS_FINALIZED]
333

    
334
    if not priorities:
335
      # All opcodes are done, assume default priority
336
      return constants.OP_PRIO_DEFAULT
337

    
338
    return min(priorities)
339

    
340
  def GetLogEntries(self, newer_than):
341
    """Selectively returns the log entries.
342

343
    @type newer_than: None or int
344
    @param newer_than: if this is None, return all log entries,
345
        otherwise return only the log entries with serial higher
346
        than this value
347
    @rtype: list
348
    @return: the list of the log entries selected
349

350
    """
351
    if newer_than is None:
352
      serial = -1
353
    else:
354
      serial = newer_than
355

    
356
    entries = []
357
    for op in self.ops:
358
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359

    
360
    return entries
361

    
362
  def GetInfo(self, fields):
363
    """Returns information about a job.
364

365
    @type fields: list
366
    @param fields: names of fields to return
367
    @rtype: list
368
    @return: list with one element for each field
369
    @raise errors.OpExecError: when an invalid field
370
        has been passed
371

372
    """
373
    row = []
374
    for fname in fields:
375
      if fname == "id":
376
        row.append(self.id)
377
      elif fname == "status":
378
        row.append(self.CalcStatus())
379
      elif fname == "priority":
380
        row.append(self.CalcPriority())
381
      elif fname == "ops":
382
        row.append([op.input.__getstate__() for op in self.ops])
383
      elif fname == "opresult":
384
        row.append([op.result for op in self.ops])
385
      elif fname == "opstatus":
386
        row.append([op.status for op in self.ops])
387
      elif fname == "oplog":
388
        row.append([op.log for op in self.ops])
389
      elif fname == "opstart":
390
        row.append([op.start_timestamp for op in self.ops])
391
      elif fname == "opexec":
392
        row.append([op.exec_timestamp for op in self.ops])
393
      elif fname == "opend":
394
        row.append([op.end_timestamp for op in self.ops])
395
      elif fname == "oppriority":
396
        row.append([op.priority for op in self.ops])
397
      elif fname == "received_ts":
398
        row.append(self.received_timestamp)
399
      elif fname == "start_ts":
400
        row.append(self.start_timestamp)
401
      elif fname == "end_ts":
402
        row.append(self.end_timestamp)
403
      elif fname == "summary":
404
        row.append([op.input.Summary() for op in self.ops])
405
      else:
406
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
407
    return row
408

    
409
  def MarkUnfinishedOps(self, status, result):
410
    """Mark unfinished opcodes with a given status and result.
411

412
    This is an utility function for marking all running or waiting to
413
    be run opcodes with a given status. Opcodes which are already
414
    finalised are not changed.
415

416
    @param status: a given opcode status
417
    @param result: the opcode result
418

419
    """
420
    not_marked = True
421
    for op in self.ops:
422
      if op.status in constants.OPS_FINALIZED:
423
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424
        continue
425
      op.status = status
426
      op.result = result
427
      not_marked = False
428

    
429
  def Finalize(self):
430
    """Marks the job as finalized.
431

432
    """
433
    self.end_timestamp = TimeStampNow()
434

    
435
  def Cancel(self):
436
    """Marks job as canceled/-ing if possible.
437

438
    @rtype: tuple; (bool, string)
439
    @return: Boolean describing whether job was successfully canceled or marked
440
      as canceling and a text message
441

442
    """
443
    status = self.CalcStatus()
444

    
445
    if status == constants.JOB_STATUS_QUEUED:
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447
                             "Job canceled by request")
448
      self.Finalize()
449
      return (True, "Job %s canceled" % self.id)
450

    
451
    elif status == constants.JOB_STATUS_WAITLOCK:
452
      # The worker will notice the new status and cancel the job
453
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454
      return (True, "Job %s will be canceled" % self.id)
455

    
456
    else:
457
      logging.debug("Job %s is no longer waiting in the queue", self.id)
458
      return (False, "Job %s is no longer waiting in the queue" % self.id)
459

    
460

    
461
class _OpExecCallbacks(mcpu.OpExecCbBase):
462
  def __init__(self, queue, job, op):
463
    """Initializes this class.
464

465
    @type queue: L{JobQueue}
466
    @param queue: Job queue
467
    @type job: L{_QueuedJob}
468
    @param job: Job object
469
    @type op: L{_QueuedOpCode}
470
    @param op: OpCode
471

472
    """
473
    assert queue, "Queue is missing"
474
    assert job, "Job is missing"
475
    assert op, "Opcode is missing"
476

    
477
    self._queue = queue
478
    self._job = job
479
    self._op = op
480

    
481
  def _CheckCancel(self):
482
    """Raises an exception to cancel the job if asked to.
483

484
    """
485
    # Cancel here if we were asked to
486
    if self._op.status == constants.OP_STATUS_CANCELING:
487
      logging.debug("Canceling opcode")
488
      raise CancelJob()
489

    
490
  @locking.ssynchronized(_QUEUE, shared=1)
491
  def NotifyStart(self):
492
    """Mark the opcode as running, not lock-waiting.
493

494
    This is called from the mcpu code as a notifier function, when the LU is
495
    finally about to start the Exec() method. Of course, to have end-user
496
    visible results, the opcode must be initially (before calling into
497
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
498

499
    """
500
    assert self._op in self._job.ops
501
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
502
                               constants.OP_STATUS_CANCELING)
503

    
504
    # Cancel here if we were asked to
505
    self._CheckCancel()
506

    
507
    logging.debug("Opcode is now running")
508

    
509
    self._op.status = constants.OP_STATUS_RUNNING
510
    self._op.exec_timestamp = TimeStampNow()
511

    
512
    # And finally replicate the job status
513
    self._queue.UpdateJobUnlocked(self._job)
514

    
515
  @locking.ssynchronized(_QUEUE, shared=1)
516
  def _AppendFeedback(self, timestamp, log_type, log_msg):
517
    """Internal feedback append function, with locks
518

519
    """
520
    self._job.log_serial += 1
521
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
523

    
524
  def Feedback(self, *args):
525
    """Append a log entry.
526

527
    """
528
    assert len(args) < 3
529

    
530
    if len(args) == 1:
531
      log_type = constants.ELOG_MESSAGE
532
      log_msg = args[0]
533
    else:
534
      (log_type, log_msg) = args
535

    
536
    # The time is split to make serialization easier and not lose
537
    # precision.
538
    timestamp = utils.SplitTime(time.time())
539
    self._AppendFeedback(timestamp, log_type, log_msg)
540

    
541
  def CheckCancel(self):
542
    """Check whether job has been cancelled.
543

544
    """
545
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
546
                               constants.OP_STATUS_CANCELING)
547

    
548
    # Cancel here if we were asked to
549
    self._CheckCancel()
550

    
551

    
552
class _JobChangesChecker(object):
553
  def __init__(self, fields, prev_job_info, prev_log_serial):
554
    """Initializes this class.
555

556
    @type fields: list of strings
557
    @param fields: Fields requested by LUXI client
558
    @type prev_job_info: string
559
    @param prev_job_info: previous job info, as passed by the LUXI client
560
    @type prev_log_serial: string
561
    @param prev_log_serial: previous job serial, as passed by the LUXI client
562

563
    """
564
    self._fields = fields
565
    self._prev_job_info = prev_job_info
566
    self._prev_log_serial = prev_log_serial
567

    
568
  def __call__(self, job):
569
    """Checks whether job has changed.
570

571
    @type job: L{_QueuedJob}
572
    @param job: Job object
573

574
    """
575
    status = job.CalcStatus()
576
    job_info = job.GetInfo(self._fields)
577
    log_entries = job.GetLogEntries(self._prev_log_serial)
578

    
579
    # Serializing and deserializing data can cause type changes (e.g. from
580
    # tuple to list) or precision loss. We're doing it here so that we get
581
    # the same modifications as the data received from the client. Without
582
    # this, the comparison afterwards might fail without the data being
583
    # significantly different.
584
    # TODO: we just deserialized from disk, investigate how to make sure that
585
    # the job info and log entries are compatible to avoid this further step.
586
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
587
    # efficient, though floats will be tricky
588
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
589
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
590

    
591
    # Don't even try to wait if the job is no longer running, there will be
592
    # no changes.
593
    if (status not in (constants.JOB_STATUS_QUEUED,
594
                       constants.JOB_STATUS_RUNNING,
595
                       constants.JOB_STATUS_WAITLOCK) or
596
        job_info != self._prev_job_info or
597
        (log_entries and self._prev_log_serial != log_entries[0][0])):
598
      logging.debug("Job %s changed", job.id)
599
      return (job_info, log_entries)
600

    
601
    return None
602

    
603

    
604
class _JobFileChangesWaiter(object):
605
  def __init__(self, filename):
606
    """Initializes this class.
607

608
    @type filename: string
609
    @param filename: Path to job file
610
    @raises errors.InotifyError: if the notifier cannot be setup
611

612
    """
613
    self._wm = pyinotify.WatchManager()
614
    self._inotify_handler = \
615
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
616
    self._notifier = \
617
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
618
    try:
619
      self._inotify_handler.enable()
620
    except Exception:
621
      # pyinotify doesn't close file descriptors automatically
622
      self._notifier.stop()
623
      raise
624

    
625
  def _OnInotify(self, notifier_enabled):
626
    """Callback for inotify.
627

628
    """
629
    if not notifier_enabled:
630
      self._inotify_handler.enable()
631

    
632
  def Wait(self, timeout):
633
    """Waits for the job file to change.
634

635
    @type timeout: float
636
    @param timeout: Timeout in seconds
637
    @return: Whether there have been events
638

639
    """
640
    assert timeout >= 0
641
    have_events = self._notifier.check_events(timeout * 1000)
642
    if have_events:
643
      self._notifier.read_events()
644
    self._notifier.process_events()
645
    return have_events
646

    
647
  def Close(self):
648
    """Closes underlying notifier and its file descriptor.
649

650
    """
651
    self._notifier.stop()
652

    
653

    
654
class _JobChangesWaiter(object):
655
  def __init__(self, filename):
656
    """Initializes this class.
657

658
    @type filename: string
659
    @param filename: Path to job file
660

661
    """
662
    self._filewaiter = None
663
    self._filename = filename
664

    
665
  def Wait(self, timeout):
666
    """Waits for a job to change.
667

668
    @type timeout: float
669
    @param timeout: Timeout in seconds
670
    @return: Whether there have been events
671

672
    """
673
    if self._filewaiter:
674
      return self._filewaiter.Wait(timeout)
675

    
676
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
677
    # If this point is reached, return immediately and let caller check the job
678
    # file again in case there were changes since the last check. This avoids a
679
    # race condition.
680
    self._filewaiter = _JobFileChangesWaiter(self._filename)
681

    
682
    return True
683

    
684
  def Close(self):
685
    """Closes underlying waiter.
686

687
    """
688
    if self._filewaiter:
689
      self._filewaiter.Close()
690

    
691

    
692
class _WaitForJobChangesHelper(object):
693
  """Helper class using inotify to wait for changes in a job file.
694

695
  This class takes a previous job status and serial, and alerts the client when
696
  the current job status has changed.
697

698
  """
699
  @staticmethod
700
  def _CheckForChanges(job_load_fn, check_fn):
701
    job = job_load_fn()
702
    if not job:
703
      raise errors.JobLost()
704

    
705
    result = check_fn(job)
706
    if result is None:
707
      raise utils.RetryAgain()
708

    
709
    return result
710

    
711
  def __call__(self, filename, job_load_fn,
712
               fields, prev_job_info, prev_log_serial, timeout):
713
    """Waits for changes on a job.
714

715
    @type filename: string
716
    @param filename: File on which to wait for changes
717
    @type job_load_fn: callable
718
    @param job_load_fn: Function to load job
719
    @type fields: list of strings
720
    @param fields: Which fields to check for changes
721
    @type prev_job_info: list or None
722
    @param prev_job_info: Last job information returned
723
    @type prev_log_serial: int
724
    @param prev_log_serial: Last job message serial number
725
    @type timeout: float
726
    @param timeout: maximum time to wait in seconds
727

728
    """
729
    try:
730
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
731
      waiter = _JobChangesWaiter(filename)
732
      try:
733
        return utils.Retry(compat.partial(self._CheckForChanges,
734
                                          job_load_fn, check_fn),
735
                           utils.RETRY_REMAINING_TIME, timeout,
736
                           wait_fn=waiter.Wait)
737
      finally:
738
        waiter.Close()
739
    except (errors.InotifyError, errors.JobLost):
740
      return None
741
    except utils.RetryTimeout:
742
      return constants.JOB_NOTCHANGED
743

    
744

    
745
def _EncodeOpError(err):
746
  """Encodes an error which occurred while processing an opcode.
747

748
  """
749
  if isinstance(err, errors.GenericError):
750
    to_encode = err
751
  else:
752
    to_encode = errors.OpExecError(str(err))
753

    
754
  return errors.EncodeException(to_encode)
755

    
756

    
757
class _TimeoutStrategyWrapper:
758
  def __init__(self, fn):
759
    """Initializes this class.
760

761
    """
762
    self._fn = fn
763
    self._next = None
764

    
765
  def _Advance(self):
766
    """Gets the next timeout if necessary.
767

768
    """
769
    if self._next is None:
770
      self._next = self._fn()
771

    
772
  def Peek(self):
773
    """Returns the next timeout.
774

775
    """
776
    self._Advance()
777
    return self._next
778

    
779
  def Next(self):
780
    """Returns the current timeout and advances the internal state.
781

782
    """
783
    self._Advance()
784
    result = self._next
785
    self._next = None
786
    return result
787

    
788

    
789
class _OpExecContext:
790
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
791
    """Initializes this class.
792

793
    """
794
    self.op = op
795
    self.index = index
796
    self.log_prefix = log_prefix
797
    self.summary = op.input.Summary()
798

    
799
    self._timeout_strategy_factory = timeout_strategy_factory
800
    self._ResetTimeoutStrategy()
801

    
802
  def _ResetTimeoutStrategy(self):
803
    """Creates a new timeout strategy.
804

805
    """
806
    self._timeout_strategy = \
807
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
808

    
809
  def CheckPriorityIncrease(self):
810
    """Checks whether priority can and should be increased.
811

812
    Called when locks couldn't be acquired.
813

814
    """
815
    op = self.op
816

    
817
    # Exhausted all retries and next round should not use blocking acquire
818
    # for locks?
819
    if (self._timeout_strategy.Peek() is None and
820
        op.priority > constants.OP_PRIO_HIGHEST):
821
      logging.debug("Increasing priority")
822
      op.priority -= 1
823
      self._ResetTimeoutStrategy()
824
      return True
825

    
826
    return False
827

    
828
  def GetNextLockTimeout(self):
829
    """Returns the next lock acquire timeout.
830

831
    """
832
    return self._timeout_strategy.Next()
833

    
834

    
835
class _JobProcessor(object):
836
  def __init__(self, queue, opexec_fn, job,
837
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
838
    """Initializes this class.
839

840
    """
841
    self.queue = queue
842
    self.opexec_fn = opexec_fn
843
    self.job = job
844
    self._timeout_strategy_factory = _timeout_strategy_factory
845

    
846
  @staticmethod
847
  def _FindNextOpcode(job, timeout_strategy_factory):
848
    """Locates the next opcode to run.
849

850
    @type job: L{_QueuedJob}
851
    @param job: Job object
852
    @param timeout_strategy_factory: Callable to create new timeout strategy
853

854
    """
855
    # Create some sort of a cache to speed up locating next opcode for future
856
    # lookups
857
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
858
    # pending and one for processed ops.
859
    if job.ops_iter is None:
860
      job.ops_iter = enumerate(job.ops)
861

    
862
    # Find next opcode to run
863
    while True:
864
      try:
865
        (idx, op) = job.ops_iter.next()
866
      except StopIteration:
867
        raise errors.ProgrammerError("Called for a finished job")
868

    
869
      if op.status == constants.OP_STATUS_RUNNING:
870
        # Found an opcode already marked as running
871
        raise errors.ProgrammerError("Called for job marked as running")
872

    
873
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
874
                             timeout_strategy_factory)
875

    
876
      if op.status not in constants.OPS_FINALIZED:
877
        return opctx
878

    
879
      # This is a job that was partially completed before master daemon
880
      # shutdown, so it can be expected that some opcodes are already
881
      # completed successfully (if any did error out, then the whole job
882
      # should have been aborted and not resubmitted for processing).
883
      logging.info("%s: opcode %s already processed, skipping",
884
                   opctx.log_prefix, opctx.summary)
885

    
886
  @staticmethod
887
  def _MarkWaitlock(job, op):
888
    """Marks an opcode as waiting for locks.
889

890
    The job's start timestamp is also set if necessary.
891

892
    @type job: L{_QueuedJob}
893
    @param job: Job object
894
    @type op: L{_QueuedOpCode}
895
    @param op: Opcode object
896

897
    """
898
    assert op in job.ops
899
    assert op.status in (constants.OP_STATUS_QUEUED,
900
                         constants.OP_STATUS_WAITLOCK)
901

    
902
    update = False
903

    
904
    op.result = None
905

    
906
    if op.status == constants.OP_STATUS_QUEUED:
907
      op.status = constants.OP_STATUS_WAITLOCK
908
      update = True
909

    
910
    if op.start_timestamp is None:
911
      op.start_timestamp = TimeStampNow()
912
      update = True
913

    
914
    if job.start_timestamp is None:
915
      job.start_timestamp = op.start_timestamp
916
      update = True
917

    
918
    assert op.status == constants.OP_STATUS_WAITLOCK
919

    
920
    return update
921

    
922
  def _ExecOpCodeUnlocked(self, opctx):
923
    """Processes one opcode and returns the result.
924

925
    """
926
    op = opctx.op
927

    
928
    assert op.status == constants.OP_STATUS_WAITLOCK
929

    
930
    timeout = opctx.GetNextLockTimeout()
931

    
932
    try:
933
      # Make sure not to hold queue lock while calling ExecOpCode
934
      result = self.opexec_fn(op.input,
935
                              _OpExecCallbacks(self.queue, self.job, op),
936
                              timeout=timeout, priority=op.priority)
937
    except mcpu.LockAcquireTimeout:
938
      assert timeout is not None, "Received timeout for blocking acquire"
939
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
940

    
941
      assert op.status in (constants.OP_STATUS_WAITLOCK,
942
                           constants.OP_STATUS_CANCELING)
943

    
944
      # Was job cancelled while we were waiting for the lock?
945
      if op.status == constants.OP_STATUS_CANCELING:
946
        return (constants.OP_STATUS_CANCELING, None)
947

    
948
      # Stay in waitlock while trying to re-acquire lock
949
      return (constants.OP_STATUS_WAITLOCK, None)
950
    except CancelJob:
951
      logging.exception("%s: Canceling job", opctx.log_prefix)
952
      assert op.status == constants.OP_STATUS_CANCELING
953
      return (constants.OP_STATUS_CANCELING, None)
954
    except Exception, err: # pylint: disable-msg=W0703
955
      logging.exception("%s: Caught exception in %s",
956
                        opctx.log_prefix, opctx.summary)
957
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
958
    else:
959
      logging.debug("%s: %s successful",
960
                    opctx.log_prefix, opctx.summary)
961
      return (constants.OP_STATUS_SUCCESS, result)
962

    
963
  def __call__(self, _nextop_fn=None):
964
    """Continues execution of a job.
965

966
    @param _nextop_fn: Callback function for tests
967
    @rtype: bool
968
    @return: True if job is finished, False if processor needs to be called
969
             again
970

971
    """
972
    queue = self.queue
973
    job = self.job
974

    
975
    logging.debug("Processing job %s", job.id)
976

    
977
    queue.acquire(shared=1)
978
    try:
979
      opcount = len(job.ops)
980

    
981
      # Don't do anything for finalized jobs
982
      if job.CalcStatus() in constants.JOBS_FINALIZED:
983
        return True
984

    
985
      # Is a previous opcode still pending?
986
      if job.cur_opctx:
987
        opctx = job.cur_opctx
988
        job.cur_opctx = None
989
      else:
990
        if __debug__ and _nextop_fn:
991
          _nextop_fn()
992
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
993

    
994
      op = opctx.op
995

    
996
      # Consistency check
997
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
998
                                     constants.OP_STATUS_CANCELING)
999
                        for i in job.ops[opctx.index + 1:])
1000

    
1001
      assert op.status in (constants.OP_STATUS_QUEUED,
1002
                           constants.OP_STATUS_WAITLOCK,
1003
                           constants.OP_STATUS_CANCELING)
1004

    
1005
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1006
              op.priority >= constants.OP_PRIO_HIGHEST)
1007

    
1008
      if op.status != constants.OP_STATUS_CANCELING:
1009
        assert op.status in (constants.OP_STATUS_QUEUED,
1010
                             constants.OP_STATUS_WAITLOCK)
1011

    
1012
        # Prepare to start opcode
1013
        if self._MarkWaitlock(job, op):
1014
          # Write to disk
1015
          queue.UpdateJobUnlocked(job)
1016

    
1017
        assert op.status == constants.OP_STATUS_WAITLOCK
1018
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1019
        assert job.start_timestamp and op.start_timestamp
1020

    
1021
        logging.info("%s: opcode %s waiting for locks",
1022
                     opctx.log_prefix, opctx.summary)
1023

    
1024
        queue.release()
1025
        try:
1026
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1027
        finally:
1028
          queue.acquire(shared=1)
1029

    
1030
        op.status = op_status
1031
        op.result = op_result
1032

    
1033
        if op.status == constants.OP_STATUS_WAITLOCK:
1034
          # Couldn't get locks in time
1035
          assert not op.end_timestamp
1036
        else:
1037
          # Finalize opcode
1038
          op.end_timestamp = TimeStampNow()
1039

    
1040
          if op.status == constants.OP_STATUS_CANCELING:
1041
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1042
                                  for i in job.ops[opctx.index:])
1043
          else:
1044
            assert op.status in constants.OPS_FINALIZED
1045

    
1046
      if op.status == constants.OP_STATUS_WAITLOCK:
1047
        finalize = False
1048

    
1049
        if opctx.CheckPriorityIncrease():
1050
          # Priority was changed, need to update on-disk file
1051
          queue.UpdateJobUnlocked(job)
1052

    
1053
        # Keep around for another round
1054
        job.cur_opctx = opctx
1055

    
1056
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1057
                op.priority >= constants.OP_PRIO_HIGHEST)
1058

    
1059
        # In no case must the status be finalized here
1060
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1061

    
1062
      else:
1063
        # Ensure all opcodes so far have been successful
1064
        assert (opctx.index == 0 or
1065
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1066
                           for i in job.ops[:opctx.index]))
1067

    
1068
        # Reset context
1069
        job.cur_opctx = None
1070

    
1071
        if op.status == constants.OP_STATUS_SUCCESS:
1072
          finalize = False
1073

    
1074
        elif op.status == constants.OP_STATUS_ERROR:
1075
          # Ensure failed opcode has an exception as its result
1076
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1077

    
1078
          to_encode = errors.OpExecError("Preceding opcode failed")
1079
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1080
                                _EncodeOpError(to_encode))
1081
          finalize = True
1082

    
1083
          # Consistency check
1084
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1085
                            errors.GetEncodedError(i.result)
1086
                            for i in job.ops[opctx.index:])
1087

    
1088
        elif op.status == constants.OP_STATUS_CANCELING:
1089
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1090
                                "Job canceled by request")
1091
          finalize = True
1092

    
1093
        else:
1094
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1095

    
1096
        if opctx.index == (opcount - 1):
1097
          # Finalize on last opcode
1098
          finalize = True
1099

    
1100
        if finalize:
1101
          # All opcodes have been run, finalize job
1102
          job.Finalize()
1103

    
1104
        # Write to disk. If the job status is final, this is the final write
1105
        # allowed. Once the file has been written, it can be archived anytime.
1106
        queue.UpdateJobUnlocked(job)
1107

    
1108
        if finalize:
1109
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1110
          return True
1111

    
1112
      return False
1113
    finally:
1114
      queue.release()
1115

    
1116

    
1117
class _JobQueueWorker(workerpool.BaseWorker):
1118
  """The actual job workers.
1119

1120
  """
1121
  def RunTask(self, job): # pylint: disable-msg=W0221
1122
    """Job executor.
1123

1124
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1125
    L{_QueuedOpCode} classes.
1126

1127
    @type job: L{_QueuedJob}
1128
    @param job: the job to be processed
1129

1130
    """
1131
    queue = job.queue
1132
    assert queue == self.pool.queue
1133

    
1134
    self.SetTaskName("Job%s" % job.id)
1135

    
1136
    proc = mcpu.Processor(queue.context, job.id)
1137

    
1138
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1139
      # Schedule again
1140
      raise workerpool.DeferTask(priority=job.CalcPriority())
1141

    
1142

    
1143
class _JobQueueWorkerPool(workerpool.WorkerPool):
1144
  """Simple class implementing a job-processing workerpool.
1145

1146
  """
1147
  def __init__(self, queue):
1148
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1149
                                              JOBQUEUE_THREADS,
1150
                                              _JobQueueWorker)
1151
    self.queue = queue
1152

    
1153

    
1154
def _RequireOpenQueue(fn):
1155
  """Decorator for "public" functions.
1156

1157
  This function should be used for all 'public' functions. That is,
1158
  functions usually called from other classes. Note that this should
1159
  be applied only to methods (not plain functions), since it expects
1160
  that the decorated function is called with a first argument that has
1161
  a '_queue_filelock' argument.
1162

1163
  @warning: Use this decorator only after locking.ssynchronized
1164

1165
  Example::
1166
    @locking.ssynchronized(_LOCK)
1167
    @_RequireOpenQueue
1168
    def Example(self):
1169
      pass
1170

1171
  """
1172
  def wrapper(self, *args, **kwargs):
1173
    # pylint: disable-msg=W0212
1174
    assert self._queue_filelock is not None, "Queue should be open"
1175
    return fn(self, *args, **kwargs)
1176
  return wrapper
1177

    
1178

    
1179
class JobQueue(object):
1180
  """Queue used to manage the jobs.
1181

1182
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1183

1184
  """
1185
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1186

    
1187
  def __init__(self, context):
1188
    """Constructor for JobQueue.
1189

1190
    The constructor will initialize the job queue object and then
1191
    start loading the current jobs from disk, either for starting them
1192
    (if they were queue) or for aborting them (if they were already
1193
    running).
1194

1195
    @type context: GanetiContext
1196
    @param context: the context object for access to the configuration
1197
        data and other ganeti objects
1198

1199
    """
1200
    self.context = context
1201
    self._memcache = weakref.WeakValueDictionary()
1202
    self._my_hostname = netutils.Hostname.GetSysName()
1203

    
1204
    # The Big JobQueue lock. If a code block or method acquires it in shared
1205
    # mode safe it must guarantee concurrency with all the code acquiring it in
1206
    # shared mode, including itself. In order not to acquire it at all
1207
    # concurrency must be guaranteed with all code acquiring it in shared mode
1208
    # and all code acquiring it exclusively.
1209
    self._lock = locking.SharedLock("JobQueue")
1210

    
1211
    self.acquire = self._lock.acquire
1212
    self.release = self._lock.release
1213

    
1214
    # Initialize the queue, and acquire the filelock.
1215
    # This ensures no other process is working on the job queue.
1216
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1217

    
1218
    # Read serial file
1219
    self._last_serial = jstore.ReadSerial()
1220
    assert self._last_serial is not None, ("Serial file was modified between"
1221
                                           " check in jstore and here")
1222

    
1223
    # Get initial list of nodes
1224
    self._nodes = dict((n.name, n.primary_ip)
1225
                       for n in self.context.cfg.GetAllNodesInfo().values()
1226
                       if n.master_candidate)
1227

    
1228
    # Remove master node
1229
    self._nodes.pop(self._my_hostname, None)
1230

    
1231
    # TODO: Check consistency across nodes
1232

    
1233
    self._queue_size = 0
1234
    self._UpdateQueueSizeUnlocked()
1235
    self._drained = self._IsQueueMarkedDrain()
1236

    
1237
    # Setup worker pool
1238
    self._wpool = _JobQueueWorkerPool(self)
1239
    try:
1240
      self._InspectQueue()
1241
    except:
1242
      self._wpool.TerminateWorkers()
1243
      raise
1244

    
1245
  @locking.ssynchronized(_LOCK)
1246
  @_RequireOpenQueue
1247
  def _InspectQueue(self):
1248
    """Loads the whole job queue and resumes unfinished jobs.
1249

1250
    This function needs the lock here because WorkerPool.AddTask() may start a
1251
    job while we're still doing our work.
1252

1253
    """
1254
    logging.info("Inspecting job queue")
1255

    
1256
    restartjobs = []
1257

    
1258
    all_job_ids = self._GetJobIDsUnlocked()
1259
    jobs_count = len(all_job_ids)
1260
    lastinfo = time.time()
1261
    for idx, job_id in enumerate(all_job_ids):
1262
      # Give an update every 1000 jobs or 10 seconds
1263
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1264
          idx == (jobs_count - 1)):
1265
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1266
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1267
        lastinfo = time.time()
1268

    
1269
      job = self._LoadJobUnlocked(job_id)
1270

    
1271
      # a failure in loading the job can cause 'None' to be returned
1272
      if job is None:
1273
        continue
1274

    
1275
      status = job.CalcStatus()
1276

    
1277
      if status == constants.JOB_STATUS_QUEUED:
1278
        restartjobs.append(job)
1279

    
1280
      elif status in (constants.JOB_STATUS_RUNNING,
1281
                      constants.JOB_STATUS_WAITLOCK,
1282
                      constants.JOB_STATUS_CANCELING):
1283
        logging.warning("Unfinished job %s found: %s", job.id, job)
1284

    
1285
        if status == constants.JOB_STATUS_WAITLOCK:
1286
          # Restart job
1287
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1288
          restartjobs.append(job)
1289
        else:
1290
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1291
                                "Unclean master daemon shutdown")
1292

    
1293
        self.UpdateJobUnlocked(job)
1294

    
1295
    if restartjobs:
1296
      logging.info("Restarting %s jobs", len(restartjobs))
1297
      self._EnqueueJobs(restartjobs)
1298

    
1299
    logging.info("Job queue inspection finished")
1300

    
1301
  @locking.ssynchronized(_LOCK)
1302
  @_RequireOpenQueue
1303
  def AddNode(self, node):
1304
    """Register a new node with the queue.
1305

1306
    @type node: L{objects.Node}
1307
    @param node: the node object to be added
1308

1309
    """
1310
    node_name = node.name
1311
    assert node_name != self._my_hostname
1312

    
1313
    # Clean queue directory on added node
1314
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1315
    msg = result.fail_msg
1316
    if msg:
1317
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1318
                      node_name, msg)
1319

    
1320
    if not node.master_candidate:
1321
      # remove if existing, ignoring errors
1322
      self._nodes.pop(node_name, None)
1323
      # and skip the replication of the job ids
1324
      return
1325

    
1326
    # Upload the whole queue excluding archived jobs
1327
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1328

    
1329
    # Upload current serial file
1330
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1331

    
1332
    for file_name in files:
1333
      # Read file content
1334
      content = utils.ReadFile(file_name)
1335

    
1336
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1337
                                                  [node.primary_ip],
1338
                                                  file_name, content)
1339
      msg = result[node_name].fail_msg
1340
      if msg:
1341
        logging.error("Failed to upload file %s to node %s: %s",
1342
                      file_name, node_name, msg)
1343

    
1344
    self._nodes[node_name] = node.primary_ip
1345

    
1346
  @locking.ssynchronized(_LOCK)
1347
  @_RequireOpenQueue
1348
  def RemoveNode(self, node_name):
1349
    """Callback called when removing nodes from the cluster.
1350

1351
    @type node_name: str
1352
    @param node_name: the name of the node to remove
1353

1354
    """
1355
    self._nodes.pop(node_name, None)
1356

    
1357
  @staticmethod
1358
  def _CheckRpcResult(result, nodes, failmsg):
1359
    """Verifies the status of an RPC call.
1360

1361
    Since we aim to keep consistency should this node (the current
1362
    master) fail, we will log errors if our rpc fail, and especially
1363
    log the case when more than half of the nodes fails.
1364

1365
    @param result: the data as returned from the rpc call
1366
    @type nodes: list
1367
    @param nodes: the list of nodes we made the call to
1368
    @type failmsg: str
1369
    @param failmsg: the identifier to be used for logging
1370

1371
    """
1372
    failed = []
1373
    success = []
1374

    
1375
    for node in nodes:
1376
      msg = result[node].fail_msg
1377
      if msg:
1378
        failed.append(node)
1379
        logging.error("RPC call %s (%s) failed on node %s: %s",
1380
                      result[node].call, failmsg, node, msg)
1381
      else:
1382
        success.append(node)
1383

    
1384
    # +1 for the master node
1385
    if (len(success) + 1) < len(failed):
1386
      # TODO: Handle failing nodes
1387
      logging.error("More than half of the nodes failed")
1388

    
1389
  def _GetNodeIp(self):
1390
    """Helper for returning the node name/ip list.
1391

1392
    @rtype: (list, list)
1393
    @return: a tuple of two lists, the first one with the node
1394
        names and the second one with the node addresses
1395

1396
    """
1397
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1398
    name_list = self._nodes.keys()
1399
    addr_list = [self._nodes[name] for name in name_list]
1400
    return name_list, addr_list
1401

    
1402
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1403
    """Writes a file locally and then replicates it to all nodes.
1404

1405
    This function will replace the contents of a file on the local
1406
    node and then replicate it to all the other nodes we have.
1407

1408
    @type file_name: str
1409
    @param file_name: the path of the file to be replicated
1410
    @type data: str
1411
    @param data: the new contents of the file
1412
    @type replicate: boolean
1413
    @param replicate: whether to spread the changes to the remote nodes
1414

1415
    """
1416
    getents = runtime.GetEnts()
1417
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1418
                    gid=getents.masterd_gid)
1419

    
1420
    if replicate:
1421
      names, addrs = self._GetNodeIp()
1422
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1423
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1424

    
1425
  def _RenameFilesUnlocked(self, rename):
1426
    """Renames a file locally and then replicate the change.
1427

1428
    This function will rename a file in the local queue directory
1429
    and then replicate this rename to all the other nodes we have.
1430

1431
    @type rename: list of (old, new)
1432
    @param rename: List containing tuples mapping old to new names
1433

1434
    """
1435
    # Rename them locally
1436
    for old, new in rename:
1437
      utils.RenameFile(old, new, mkdir=True)
1438

    
1439
    # ... and on all nodes
1440
    names, addrs = self._GetNodeIp()
1441
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1442
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1443

    
1444
  @staticmethod
1445
  def _FormatJobID(job_id):
1446
    """Convert a job ID to string format.
1447

1448
    Currently this just does C{str(job_id)} after performing some
1449
    checks, but if we want to change the job id format this will
1450
    abstract this change.
1451

1452
    @type job_id: int or long
1453
    @param job_id: the numeric job id
1454
    @rtype: str
1455
    @return: the formatted job id
1456

1457
    """
1458
    if not isinstance(job_id, (int, long)):
1459
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1460
    if job_id < 0:
1461
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1462

    
1463
    return str(job_id)
1464

    
1465
  @classmethod
1466
  def _GetArchiveDirectory(cls, job_id):
1467
    """Returns the archive directory for a job.
1468

1469
    @type job_id: str
1470
    @param job_id: Job identifier
1471
    @rtype: str
1472
    @return: Directory name
1473

1474
    """
1475
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1476

    
1477
  def _NewSerialsUnlocked(self, count):
1478
    """Generates a new job identifier.
1479

1480
    Job identifiers are unique during the lifetime of a cluster.
1481

1482
    @type count: integer
1483
    @param count: how many serials to return
1484
    @rtype: str
1485
    @return: a string representing the job identifier.
1486

1487
    """
1488
    assert count > 0
1489
    # New number
1490
    serial = self._last_serial + count
1491

    
1492
    # Write to file
1493
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1494
                             "%s\n" % serial, True)
1495

    
1496
    result = [self._FormatJobID(v)
1497
              for v in range(self._last_serial, serial + 1)]
1498
    # Keep it only if we were able to write the file
1499
    self._last_serial = serial
1500

    
1501
    return result
1502

    
1503
  @staticmethod
1504
  def _GetJobPath(job_id):
1505
    """Returns the job file for a given job id.
1506

1507
    @type job_id: str
1508
    @param job_id: the job identifier
1509
    @rtype: str
1510
    @return: the path to the job file
1511

1512
    """
1513
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1514

    
1515
  @classmethod
1516
  def _GetArchivedJobPath(cls, job_id):
1517
    """Returns the archived job file for a give job id.
1518

1519
    @type job_id: str
1520
    @param job_id: the job identifier
1521
    @rtype: str
1522
    @return: the path to the archived job file
1523

1524
    """
1525
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1526
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1527

    
1528
  def _GetJobIDsUnlocked(self, sort=True):
1529
    """Return all known job IDs.
1530

1531
    The method only looks at disk because it's a requirement that all
1532
    jobs are present on disk (so in the _memcache we don't have any
1533
    extra IDs).
1534

1535
    @type sort: boolean
1536
    @param sort: perform sorting on the returned job ids
1537
    @rtype: list
1538
    @return: the list of job IDs
1539

1540
    """
1541
    jlist = []
1542
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1543
      m = self._RE_JOB_FILE.match(filename)
1544
      if m:
1545
        jlist.append(m.group(1))
1546
    if sort:
1547
      jlist = utils.NiceSort(jlist)
1548
    return jlist
1549

    
1550
  def _LoadJobUnlocked(self, job_id):
1551
    """Loads a job from the disk or memory.
1552

1553
    Given a job id, this will return the cached job object if
1554
    existing, or try to load the job from the disk. If loading from
1555
    disk, it will also add the job to the cache.
1556

1557
    @param job_id: the job id
1558
    @rtype: L{_QueuedJob} or None
1559
    @return: either None or the job object
1560

1561
    """
1562
    job = self._memcache.get(job_id, None)
1563
    if job:
1564
      logging.debug("Found job %s in memcache", job_id)
1565
      return job
1566

    
1567
    try:
1568
      job = self._LoadJobFromDisk(job_id)
1569
      if job is None:
1570
        return job
1571
    except errors.JobFileCorrupted:
1572
      old_path = self._GetJobPath(job_id)
1573
      new_path = self._GetArchivedJobPath(job_id)
1574
      if old_path == new_path:
1575
        # job already archived (future case)
1576
        logging.exception("Can't parse job %s", job_id)
1577
      else:
1578
        # non-archived case
1579
        logging.exception("Can't parse job %s, will archive.", job_id)
1580
        self._RenameFilesUnlocked([(old_path, new_path)])
1581
      return None
1582

    
1583
    self._memcache[job_id] = job
1584
    logging.debug("Added job %s to the cache", job_id)
1585
    return job
1586

    
1587
  def _LoadJobFromDisk(self, job_id):
1588
    """Load the given job file from disk.
1589

1590
    Given a job file, read, load and restore it in a _QueuedJob format.
1591

1592
    @type job_id: string
1593
    @param job_id: job identifier
1594
    @rtype: L{_QueuedJob} or None
1595
    @return: either None or the job object
1596

1597
    """
1598
    filepath = self._GetJobPath(job_id)
1599
    logging.debug("Loading job from %s", filepath)
1600
    try:
1601
      raw_data = utils.ReadFile(filepath)
1602
    except EnvironmentError, err:
1603
      if err.errno in (errno.ENOENT, ):
1604
        return None
1605
      raise
1606

    
1607
    try:
1608
      data = serializer.LoadJson(raw_data)
1609
      job = _QueuedJob.Restore(self, data)
1610
    except Exception, err: # pylint: disable-msg=W0703
1611
      raise errors.JobFileCorrupted(err)
1612

    
1613
    return job
1614

    
1615
  def SafeLoadJobFromDisk(self, job_id):
1616
    """Load the given job file from disk.
1617

1618
    Given a job file, read, load and restore it in a _QueuedJob format.
1619
    In case of error reading the job, it gets returned as None, and the
1620
    exception is logged.
1621

1622
    @type job_id: string
1623
    @param job_id: job identifier
1624
    @rtype: L{_QueuedJob} or None
1625
    @return: either None or the job object
1626

1627
    """
1628
    try:
1629
      return self._LoadJobFromDisk(job_id)
1630
    except (errors.JobFileCorrupted, EnvironmentError):
1631
      logging.exception("Can't load/parse job %s", job_id)
1632
      return None
1633

    
1634
  @staticmethod
1635
  def _IsQueueMarkedDrain():
1636
    """Check if the queue is marked from drain.
1637

1638
    This currently uses the queue drain file, which makes it a
1639
    per-node flag. In the future this can be moved to the config file.
1640

1641
    @rtype: boolean
1642
    @return: True of the job queue is marked for draining
1643

1644
    """
1645
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1646

    
1647
  def _UpdateQueueSizeUnlocked(self):
1648
    """Update the queue size.
1649

1650
    """
1651
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1652

    
1653
  @locking.ssynchronized(_LOCK)
1654
  @_RequireOpenQueue
1655
  def SetDrainFlag(self, drain_flag):
1656
    """Sets the drain flag for the queue.
1657

1658
    @type drain_flag: boolean
1659
    @param drain_flag: Whether to set or unset the drain flag
1660

1661
    """
1662
    getents = runtime.GetEnts()
1663

    
1664
    if drain_flag:
1665
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1666
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1667
    else:
1668
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1669

    
1670
    self._drained = drain_flag
1671

    
1672
    return True
1673

    
1674
  @_RequireOpenQueue
1675
  def _SubmitJobUnlocked(self, job_id, ops):
1676
    """Create and store a new job.
1677

1678
    This enters the job into our job queue and also puts it on the new
1679
    queue, in order for it to be picked up by the queue processors.
1680

1681
    @type job_id: job ID
1682
    @param job_id: the job ID for the new job
1683
    @type ops: list
1684
    @param ops: The list of OpCodes that will become the new job.
1685
    @rtype: L{_QueuedJob}
1686
    @return: the job object to be queued
1687
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1688
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1689
    @raise errors.GenericError: If an opcode is not valid
1690

1691
    """
1692
    # Ok when sharing the big job queue lock, as the drain file is created when
1693
    # the lock is exclusive.
1694
    if self._drained:
1695
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1696

    
1697
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1698
      raise errors.JobQueueFull()
1699

    
1700
    job = _QueuedJob(self, job_id, ops)
1701

    
1702
    # Check priority
1703
    for idx, op in enumerate(job.ops):
1704
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1705
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1706
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1707
                                  " are %s" % (idx, op.priority, allowed))
1708

    
1709
    # Write to disk
1710
    self.UpdateJobUnlocked(job)
1711

    
1712
    self._queue_size += 1
1713

    
1714
    logging.debug("Adding new job %s to the cache", job_id)
1715
    self._memcache[job_id] = job
1716

    
1717
    return job
1718

    
1719
  @locking.ssynchronized(_LOCK)
1720
  @_RequireOpenQueue
1721
  def SubmitJob(self, ops):
1722
    """Create and store a new job.
1723

1724
    @see: L{_SubmitJobUnlocked}
1725

1726
    """
1727
    job_id = self._NewSerialsUnlocked(1)[0]
1728
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1729
    return job_id
1730

    
1731
  @locking.ssynchronized(_LOCK)
1732
  @_RequireOpenQueue
1733
  def SubmitManyJobs(self, jobs):
1734
    """Create and store multiple jobs.
1735

1736
    @see: L{_SubmitJobUnlocked}
1737

1738
    """
1739
    results = []
1740
    added_jobs = []
1741
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1742
    for job_id, ops in zip(all_job_ids, jobs):
1743
      try:
1744
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1745
        status = True
1746
        data = job_id
1747
      except errors.GenericError, err:
1748
        data = str(err)
1749
        status = False
1750
      results.append((status, data))
1751

    
1752
    self._EnqueueJobs(added_jobs)
1753

    
1754
    return results
1755

    
1756
  def _EnqueueJobs(self, jobs):
1757
    """Helper function to add jobs to worker pool's queue.
1758

1759
    @type jobs: list
1760
    @param jobs: List of all jobs
1761

1762
    """
1763
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1764
                             priority=[job.CalcPriority() for job in jobs])
1765

    
1766
  @_RequireOpenQueue
1767
  def UpdateJobUnlocked(self, job, replicate=True):
1768
    """Update a job's on disk storage.
1769

1770
    After a job has been modified, this function needs to be called in
1771
    order to write the changes to disk and replicate them to the other
1772
    nodes.
1773

1774
    @type job: L{_QueuedJob}
1775
    @param job: the changed job
1776
    @type replicate: boolean
1777
    @param replicate: whether to replicate the change to remote nodes
1778

1779
    """
1780
    if __debug__:
1781
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1782
      assert (finalized ^ (job.end_timestamp is None))
1783

    
1784
    filename = self._GetJobPath(job.id)
1785
    data = serializer.DumpJson(job.Serialize(), indent=False)
1786
    logging.debug("Writing job %s to %s", job.id, filename)
1787
    self._UpdateJobQueueFile(filename, data, replicate)
1788

    
1789
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1790
                        timeout):
1791
    """Waits for changes in a job.
1792

1793
    @type job_id: string
1794
    @param job_id: Job identifier
1795
    @type fields: list of strings
1796
    @param fields: Which fields to check for changes
1797
    @type prev_job_info: list or None
1798
    @param prev_job_info: Last job information returned
1799
    @type prev_log_serial: int
1800
    @param prev_log_serial: Last job message serial number
1801
    @type timeout: float
1802
    @param timeout: maximum time to wait in seconds
1803
    @rtype: tuple (job info, log entries)
1804
    @return: a tuple of the job information as required via
1805
        the fields parameter, and the log entries as a list
1806

1807
        if the job has not changed and the timeout has expired,
1808
        we instead return a special value,
1809
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1810
        as such by the clients
1811

1812
    """
1813
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1814

    
1815
    helper = _WaitForJobChangesHelper()
1816

    
1817
    return helper(self._GetJobPath(job_id), load_fn,
1818
                  fields, prev_job_info, prev_log_serial, timeout)
1819

    
1820
  @locking.ssynchronized(_LOCK)
1821
  @_RequireOpenQueue
1822
  def CancelJob(self, job_id):
1823
    """Cancels a job.
1824

1825
    This will only succeed if the job has not started yet.
1826

1827
    @type job_id: string
1828
    @param job_id: job ID of job to be cancelled.
1829

1830
    """
1831
    logging.info("Cancelling job %s", job_id)
1832

    
1833
    job = self._LoadJobUnlocked(job_id)
1834
    if not job:
1835
      logging.debug("Job %s not found", job_id)
1836
      return (False, "Job %s not found" % job_id)
1837

    
1838
    (success, msg) = job.Cancel()
1839

    
1840
    if success:
1841
      # If the job was finalized (e.g. cancelled), this is the final write
1842
      # allowed. The job can be archived anytime.
1843
      self.UpdateJobUnlocked(job)
1844

    
1845
    return (success, msg)
1846

    
1847
  @_RequireOpenQueue
1848
  def _ArchiveJobsUnlocked(self, jobs):
1849
    """Archives jobs.
1850

1851
    @type jobs: list of L{_QueuedJob}
1852
    @param jobs: Job objects
1853
    @rtype: int
1854
    @return: Number of archived jobs
1855

1856
    """
1857
    archive_jobs = []
1858
    rename_files = []
1859
    for job in jobs:
1860
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1861
        logging.debug("Job %s is not yet done", job.id)
1862
        continue
1863

    
1864
      archive_jobs.append(job)
1865

    
1866
      old = self._GetJobPath(job.id)
1867
      new = self._GetArchivedJobPath(job.id)
1868
      rename_files.append((old, new))
1869

    
1870
    # TODO: What if 1..n files fail to rename?
1871
    self._RenameFilesUnlocked(rename_files)
1872

    
1873
    logging.debug("Successfully archived job(s) %s",
1874
                  utils.CommaJoin(job.id for job in archive_jobs))
1875

    
1876
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1877
    # the files, we update the cached queue size from the filesystem. When we
1878
    # get around to fix the TODO: above, we can use the number of actually
1879
    # archived jobs to fix this.
1880
    self._UpdateQueueSizeUnlocked()
1881
    return len(archive_jobs)
1882

    
1883
  @locking.ssynchronized(_LOCK)
1884
  @_RequireOpenQueue
1885
  def ArchiveJob(self, job_id):
1886
    """Archives a job.
1887

1888
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1889

1890
    @type job_id: string
1891
    @param job_id: Job ID of job to be archived.
1892
    @rtype: bool
1893
    @return: Whether job was archived
1894

1895
    """
1896
    logging.info("Archiving job %s", job_id)
1897

    
1898
    job = self._LoadJobUnlocked(job_id)
1899
    if not job:
1900
      logging.debug("Job %s not found", job_id)
1901
      return False
1902

    
1903
    return self._ArchiveJobsUnlocked([job]) == 1
1904

    
1905
  @locking.ssynchronized(_LOCK)
1906
  @_RequireOpenQueue
1907
  def AutoArchiveJobs(self, age, timeout):
1908
    """Archives all jobs based on age.
1909

1910
    The method will archive all jobs which are older than the age
1911
    parameter. For jobs that don't have an end timestamp, the start
1912
    timestamp will be considered. The special '-1' age will cause
1913
    archival of all jobs (that are not running or queued).
1914

1915
    @type age: int
1916
    @param age: the minimum age in seconds
1917

1918
    """
1919
    logging.info("Archiving jobs with age more than %s seconds", age)
1920

    
1921
    now = time.time()
1922
    end_time = now + timeout
1923
    archived_count = 0
1924
    last_touched = 0
1925

    
1926
    all_job_ids = self._GetJobIDsUnlocked()
1927
    pending = []
1928
    for idx, job_id in enumerate(all_job_ids):
1929
      last_touched = idx + 1
1930

    
1931
      # Not optimal because jobs could be pending
1932
      # TODO: Measure average duration for job archival and take number of
1933
      # pending jobs into account.
1934
      if time.time() > end_time:
1935
        break
1936

    
1937
      # Returns None if the job failed to load
1938
      job = self._LoadJobUnlocked(job_id)
1939
      if job:
1940
        if job.end_timestamp is None:
1941
          if job.start_timestamp is None:
1942
            job_age = job.received_timestamp
1943
          else:
1944
            job_age = job.start_timestamp
1945
        else:
1946
          job_age = job.end_timestamp
1947

    
1948
        if age == -1 or now - job_age[0] > age:
1949
          pending.append(job)
1950

    
1951
          # Archive 10 jobs at a time
1952
          if len(pending) >= 10:
1953
            archived_count += self._ArchiveJobsUnlocked(pending)
1954
            pending = []
1955

    
1956
    if pending:
1957
      archived_count += self._ArchiveJobsUnlocked(pending)
1958

    
1959
    return (archived_count, len(all_job_ids) - last_touched)
1960

    
1961
  def QueryJobs(self, job_ids, fields):
1962
    """Returns a list of jobs in queue.
1963

1964
    @type job_ids: list
1965
    @param job_ids: sequence of job identifiers or None for all
1966
    @type fields: list
1967
    @param fields: names of fields to return
1968
    @rtype: list
1969
    @return: list one element per job, each element being list with
1970
        the requested fields
1971

1972
    """
1973
    jobs = []
1974
    list_all = False
1975
    if not job_ids:
1976
      # Since files are added to/removed from the queue atomically, there's no
1977
      # risk of getting the job ids in an inconsistent state.
1978
      job_ids = self._GetJobIDsUnlocked()
1979
      list_all = True
1980

    
1981
    for job_id in job_ids:
1982
      job = self.SafeLoadJobFromDisk(job_id)
1983
      if job is not None:
1984
        jobs.append(job.GetInfo(fields))
1985
      elif not list_all:
1986
        jobs.append(None)
1987

    
1988
    return jobs
1989

    
1990
  @locking.ssynchronized(_LOCK)
1991
  @_RequireOpenQueue
1992
  def Shutdown(self):
1993
    """Stops the job queue.
1994

1995
    This shutdowns all the worker threads an closes the queue.
1996

1997
    """
1998
    self._wpool.TerminateWorkers()
1999

    
2000
    self._queue_filelock.Close()
2001
    self._queue_filelock = None