Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 194c8ca4

History | View | Annotate | Download (59.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
try:
39
  # pylint: disable-msg=E0611
40
  from pyinotify import pyinotify
41
except ImportError:
42
  import pyinotify
43

    
44
from ganeti import asyncnotifier
45
from ganeti import constants
46
from ganeti import serializer
47
from ganeti import workerpool
48
from ganeti import locking
49
from ganeti import opcodes
50
from ganeti import errors
51
from ganeti import mcpu
52
from ganeti import utils
53
from ganeti import jstore
54
from ganeti import rpc
55
from ganeti import runtime
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log", "priority",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
    # Get initial priority (it might change during the lifetime of this opcode)
117
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118

    
119
  @classmethod
120
  def Restore(cls, state):
121
    """Restore the _QueuedOpCode from the serialized form.
122

123
    @type state: dict
124
    @param state: the serialized state
125
    @rtype: _QueuedOpCode
126
    @return: a new _QueuedOpCode instance
127

128
    """
129
    obj = _QueuedOpCode.__new__(cls)
130
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131
    obj.status = state["status"]
132
    obj.result = state["result"]
133
    obj.log = state["log"]
134
    obj.start_timestamp = state.get("start_timestamp", None)
135
    obj.exec_timestamp = state.get("exec_timestamp", None)
136
    obj.end_timestamp = state.get("end_timestamp", None)
137
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138
    return obj
139

    
140
  def Serialize(self):
141
    """Serializes this _QueuedOpCode.
142

143
    @rtype: dict
144
    @return: the dictionary holding the serialized state
145

146
    """
147
    return {
148
      "input": self.input.__getstate__(),
149
      "status": self.status,
150
      "result": self.result,
151
      "log": self.log,
152
      "start_timestamp": self.start_timestamp,
153
      "exec_timestamp": self.exec_timestamp,
154
      "end_timestamp": self.end_timestamp,
155
      "priority": self.priority,
156
      }
157

    
158

    
159
class _QueuedJob(object):
160
  """In-memory job representation.
161

162
  This is what we use to track the user-submitted jobs. Locking must
163
  be taken care of by users of this class.
164

165
  @type queue: L{JobQueue}
166
  @ivar queue: the parent queue
167
  @ivar id: the job ID
168
  @type ops: list
169
  @ivar ops: the list of _QueuedOpCode that constitute the job
170
  @type log_serial: int
171
  @ivar log_serial: holds the index for the next log entry
172
  @ivar received_timestamp: the timestamp for when the job was received
173
  @ivar start_timestmap: the timestamp for start of execution
174
  @ivar end_timestamp: the timestamp for end of execution
175

176
  """
177
  # pylint: disable-msg=W0212
178
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179
               "received_timestamp", "start_timestamp", "end_timestamp",
180
               "__weakref__"]
181

    
182
  def __init__(self, queue, job_id, ops):
183
    """Constructor for the _QueuedJob.
184

185
    @type queue: L{JobQueue}
186
    @param queue: our parent queue
187
    @type job_id: job_id
188
    @param job_id: our job id
189
    @type ops: list
190
    @param ops: the list of opcodes we hold, which will be encapsulated
191
        in _QueuedOpCodes
192

193
    """
194
    if not ops:
195
      raise errors.GenericError("A job needs at least one opcode")
196

    
197
    self.queue = queue
198
    self.id = job_id
199
    self.ops = [_QueuedOpCode(op) for op in ops]
200
    self.log_serial = 0
201
    self.received_timestamp = TimeStampNow()
202
    self.start_timestamp = None
203
    self.end_timestamp = None
204

    
205
    self._InitInMemory(self)
206

    
207
  @staticmethod
208
  def _InitInMemory(obj):
209
    """Initializes in-memory variables.
210

211
    """
212
    obj.ops_iter = None
213
    obj.cur_opctx = None
214

    
215
  def __repr__(self):
216
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217
              "id=%s" % self.id,
218
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219

    
220
    return "<%s at %#x>" % (" ".join(status), id(self))
221

    
222
  @classmethod
223
  def Restore(cls, queue, state):
224
    """Restore a _QueuedJob from serialized state:
225

226
    @type queue: L{JobQueue}
227
    @param queue: to which queue the restored job belongs
228
    @type state: dict
229
    @param state: the serialized state
230
    @rtype: _JobQueue
231
    @return: the restored _JobQueue instance
232

233
    """
234
    obj = _QueuedJob.__new__(cls)
235
    obj.queue = queue
236
    obj.id = state["id"]
237
    obj.received_timestamp = state.get("received_timestamp", None)
238
    obj.start_timestamp = state.get("start_timestamp", None)
239
    obj.end_timestamp = state.get("end_timestamp", None)
240

    
241
    obj.ops = []
242
    obj.log_serial = 0
243
    for op_state in state["ops"]:
244
      op = _QueuedOpCode.Restore(op_state)
245
      for log_entry in op.log:
246
        obj.log_serial = max(obj.log_serial, log_entry[0])
247
      obj.ops.append(op)
248

    
249
    cls._InitInMemory(obj)
250

    
251
    return obj
252

    
253
  def Serialize(self):
254
    """Serialize the _JobQueue instance.
255

256
    @rtype: dict
257
    @return: the serialized state
258

259
    """
260
    return {
261
      "id": self.id,
262
      "ops": [op.Serialize() for op in self.ops],
263
      "start_timestamp": self.start_timestamp,
264
      "end_timestamp": self.end_timestamp,
265
      "received_timestamp": self.received_timestamp,
266
      }
267

    
268
  def CalcStatus(self):
269
    """Compute the status of this job.
270

271
    This function iterates over all the _QueuedOpCodes in the job and
272
    based on their status, computes the job status.
273

274
    The algorithm is:
275
      - if we find a cancelled, or finished with error, the job
276
        status will be the same
277
      - otherwise, the last opcode with the status one of:
278
          - waitlock
279
          - canceling
280
          - running
281

282
        will determine the job status
283

284
      - otherwise, it means either all opcodes are queued, or success,
285
        and the job status will be the same
286

287
    @return: the job status
288

289
    """
290
    status = constants.JOB_STATUS_QUEUED
291

    
292
    all_success = True
293
    for op in self.ops:
294
      if op.status == constants.OP_STATUS_SUCCESS:
295
        continue
296

    
297
      all_success = False
298

    
299
      if op.status == constants.OP_STATUS_QUEUED:
300
        pass
301
      elif op.status == constants.OP_STATUS_WAITLOCK:
302
        status = constants.JOB_STATUS_WAITLOCK
303
      elif op.status == constants.OP_STATUS_RUNNING:
304
        status = constants.JOB_STATUS_RUNNING
305
      elif op.status == constants.OP_STATUS_CANCELING:
306
        status = constants.JOB_STATUS_CANCELING
307
        break
308
      elif op.status == constants.OP_STATUS_ERROR:
309
        status = constants.JOB_STATUS_ERROR
310
        # The whole job fails if one opcode failed
311
        break
312
      elif op.status == constants.OP_STATUS_CANCELED:
313
        status = constants.OP_STATUS_CANCELED
314
        break
315

    
316
    if all_success:
317
      status = constants.JOB_STATUS_SUCCESS
318

    
319
    return status
320

    
321
  def CalcPriority(self):
322
    """Gets the current priority for this job.
323

324
    Only unfinished opcodes are considered. When all are done, the default
325
    priority is used.
326

327
    @rtype: int
328

329
    """
330
    priorities = [op.priority for op in self.ops
331
                  if op.status not in constants.OPS_FINALIZED]
332

    
333
    if not priorities:
334
      # All opcodes are done, assume default priority
335
      return constants.OP_PRIO_DEFAULT
336

    
337
    return min(priorities)
338

    
339
  def GetLogEntries(self, newer_than):
340
    """Selectively returns the log entries.
341

342
    @type newer_than: None or int
343
    @param newer_than: if this is None, return all log entries,
344
        otherwise return only the log entries with serial higher
345
        than this value
346
    @rtype: list
347
    @return: the list of the log entries selected
348

349
    """
350
    if newer_than is None:
351
      serial = -1
352
    else:
353
      serial = newer_than
354

    
355
    entries = []
356
    for op in self.ops:
357
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358

    
359
    return entries
360

    
361
  def GetInfo(self, fields):
362
    """Returns information about a job.
363

364
    @type fields: list
365
    @param fields: names of fields to return
366
    @rtype: list
367
    @return: list with one element for each field
368
    @raise errors.OpExecError: when an invalid field
369
        has been passed
370

371
    """
372
    row = []
373
    for fname in fields:
374
      if fname == "id":
375
        row.append(self.id)
376
      elif fname == "status":
377
        row.append(self.CalcStatus())
378
      elif fname == "priority":
379
        row.append(self.CalcPriority())
380
      elif fname == "ops":
381
        row.append([op.input.__getstate__() for op in self.ops])
382
      elif fname == "opresult":
383
        row.append([op.result for op in self.ops])
384
      elif fname == "opstatus":
385
        row.append([op.status for op in self.ops])
386
      elif fname == "oplog":
387
        row.append([op.log for op in self.ops])
388
      elif fname == "opstart":
389
        row.append([op.start_timestamp for op in self.ops])
390
      elif fname == "opexec":
391
        row.append([op.exec_timestamp for op in self.ops])
392
      elif fname == "opend":
393
        row.append([op.end_timestamp for op in self.ops])
394
      elif fname == "oppriority":
395
        row.append([op.priority for op in self.ops])
396
      elif fname == "received_ts":
397
        row.append(self.received_timestamp)
398
      elif fname == "start_ts":
399
        row.append(self.start_timestamp)
400
      elif fname == "end_ts":
401
        row.append(self.end_timestamp)
402
      elif fname == "summary":
403
        row.append([op.input.Summary() for op in self.ops])
404
      else:
405
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
406
    return row
407

    
408
  def MarkUnfinishedOps(self, status, result):
409
    """Mark unfinished opcodes with a given status and result.
410

411
    This is an utility function for marking all running or waiting to
412
    be run opcodes with a given status. Opcodes which are already
413
    finalised are not changed.
414

415
    @param status: a given opcode status
416
    @param result: the opcode result
417

418
    """
419
    not_marked = True
420
    for op in self.ops:
421
      if op.status in constants.OPS_FINALIZED:
422
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423
        continue
424
      op.status = status
425
      op.result = result
426
      not_marked = False
427

    
428
  def Finalize(self):
429
    """Marks the job as finalized.
430

431
    """
432
    self.end_timestamp = TimeStampNow()
433

    
434
  def Cancel(self):
435
    """Marks job as canceled/-ing if possible.
436

437
    @rtype: tuple; (bool, string)
438
    @return: Boolean describing whether job was successfully canceled or marked
439
      as canceling and a text message
440

441
    """
442
    status = self.CalcStatus()
443

    
444
    if status == constants.JOB_STATUS_QUEUED:
445
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446
                             "Job canceled by request")
447
      self.Finalize()
448
      return (True, "Job %s canceled" % self.id)
449

    
450
    elif status == constants.JOB_STATUS_WAITLOCK:
451
      # The worker will notice the new status and cancel the job
452
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453
      return (True, "Job %s will be canceled" % self.id)
454

    
455
    else:
456
      logging.debug("Job %s is no longer waiting in the queue", self.id)
457
      return (False, "Job %s is no longer waiting in the queue" % self.id)
458

    
459

    
460
class _OpExecCallbacks(mcpu.OpExecCbBase):
461
  def __init__(self, queue, job, op):
462
    """Initializes this class.
463

464
    @type queue: L{JobQueue}
465
    @param queue: Job queue
466
    @type job: L{_QueuedJob}
467
    @param job: Job object
468
    @type op: L{_QueuedOpCode}
469
    @param op: OpCode
470

471
    """
472
    assert queue, "Queue is missing"
473
    assert job, "Job is missing"
474
    assert op, "Opcode is missing"
475

    
476
    self._queue = queue
477
    self._job = job
478
    self._op = op
479

    
480
  def _CheckCancel(self):
481
    """Raises an exception to cancel the job if asked to.
482

483
    """
484
    # Cancel here if we were asked to
485
    if self._op.status == constants.OP_STATUS_CANCELING:
486
      logging.debug("Canceling opcode")
487
      raise CancelJob()
488

    
489
  @locking.ssynchronized(_QUEUE, shared=1)
490
  def NotifyStart(self):
491
    """Mark the opcode as running, not lock-waiting.
492

493
    This is called from the mcpu code as a notifier function, when the LU is
494
    finally about to start the Exec() method. Of course, to have end-user
495
    visible results, the opcode must be initially (before calling into
496
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
497

498
    """
499
    assert self._op in self._job.ops
500
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
501
                               constants.OP_STATUS_CANCELING)
502

    
503
    # Cancel here if we were asked to
504
    self._CheckCancel()
505

    
506
    logging.debug("Opcode is now running")
507

    
508
    self._op.status = constants.OP_STATUS_RUNNING
509
    self._op.exec_timestamp = TimeStampNow()
510

    
511
    # And finally replicate the job status
512
    self._queue.UpdateJobUnlocked(self._job)
513

    
514
  @locking.ssynchronized(_QUEUE, shared=1)
515
  def _AppendFeedback(self, timestamp, log_type, log_msg):
516
    """Internal feedback append function, with locks
517

518
    """
519
    self._job.log_serial += 1
520
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
522

    
523
  def Feedback(self, *args):
524
    """Append a log entry.
525

526
    """
527
    assert len(args) < 3
528

    
529
    if len(args) == 1:
530
      log_type = constants.ELOG_MESSAGE
531
      log_msg = args[0]
532
    else:
533
      (log_type, log_msg) = args
534

    
535
    # The time is split to make serialization easier and not lose
536
    # precision.
537
    timestamp = utils.SplitTime(time.time())
538
    self._AppendFeedback(timestamp, log_type, log_msg)
539

    
540
  def CheckCancel(self):
541
    """Check whether job has been cancelled.
542

543
    """
544
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
545
                               constants.OP_STATUS_CANCELING)
546

    
547
    # Cancel here if we were asked to
548
    self._CheckCancel()
549

    
550
  def SubmitManyJobs(self, jobs):
551
    """Submits jobs for processing.
552

553
    See L{JobQueue.SubmitManyJobs}.
554

555
    """
556
    # Locking is done in job queue
557
    return self._queue.SubmitManyJobs(jobs)
558

    
559

    
560
class _JobChangesChecker(object):
561
  def __init__(self, fields, prev_job_info, prev_log_serial):
562
    """Initializes this class.
563

564
    @type fields: list of strings
565
    @param fields: Fields requested by LUXI client
566
    @type prev_job_info: string
567
    @param prev_job_info: previous job info, as passed by the LUXI client
568
    @type prev_log_serial: string
569
    @param prev_log_serial: previous job serial, as passed by the LUXI client
570

571
    """
572
    self._fields = fields
573
    self._prev_job_info = prev_job_info
574
    self._prev_log_serial = prev_log_serial
575

    
576
  def __call__(self, job):
577
    """Checks whether job has changed.
578

579
    @type job: L{_QueuedJob}
580
    @param job: Job object
581

582
    """
583
    status = job.CalcStatus()
584
    job_info = job.GetInfo(self._fields)
585
    log_entries = job.GetLogEntries(self._prev_log_serial)
586

    
587
    # Serializing and deserializing data can cause type changes (e.g. from
588
    # tuple to list) or precision loss. We're doing it here so that we get
589
    # the same modifications as the data received from the client. Without
590
    # this, the comparison afterwards might fail without the data being
591
    # significantly different.
592
    # TODO: we just deserialized from disk, investigate how to make sure that
593
    # the job info and log entries are compatible to avoid this further step.
594
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
595
    # efficient, though floats will be tricky
596
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
597
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
598

    
599
    # Don't even try to wait if the job is no longer running, there will be
600
    # no changes.
601
    if (status not in (constants.JOB_STATUS_QUEUED,
602
                       constants.JOB_STATUS_RUNNING,
603
                       constants.JOB_STATUS_WAITLOCK) or
604
        job_info != self._prev_job_info or
605
        (log_entries and self._prev_log_serial != log_entries[0][0])):
606
      logging.debug("Job %s changed", job.id)
607
      return (job_info, log_entries)
608

    
609
    return None
610

    
611

    
612
class _JobFileChangesWaiter(object):
613
  def __init__(self, filename):
614
    """Initializes this class.
615

616
    @type filename: string
617
    @param filename: Path to job file
618
    @raises errors.InotifyError: if the notifier cannot be setup
619

620
    """
621
    self._wm = pyinotify.WatchManager()
622
    self._inotify_handler = \
623
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
624
    self._notifier = \
625
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
626
    try:
627
      self._inotify_handler.enable()
628
    except Exception:
629
      # pyinotify doesn't close file descriptors automatically
630
      self._notifier.stop()
631
      raise
632

    
633
  def _OnInotify(self, notifier_enabled):
634
    """Callback for inotify.
635

636
    """
637
    if not notifier_enabled:
638
      self._inotify_handler.enable()
639

    
640
  def Wait(self, timeout):
641
    """Waits for the job file to change.
642

643
    @type timeout: float
644
    @param timeout: Timeout in seconds
645
    @return: Whether there have been events
646

647
    """
648
    assert timeout >= 0
649
    have_events = self._notifier.check_events(timeout * 1000)
650
    if have_events:
651
      self._notifier.read_events()
652
    self._notifier.process_events()
653
    return have_events
654

    
655
  def Close(self):
656
    """Closes underlying notifier and its file descriptor.
657

658
    """
659
    self._notifier.stop()
660

    
661

    
662
class _JobChangesWaiter(object):
663
  def __init__(self, filename):
664
    """Initializes this class.
665

666
    @type filename: string
667
    @param filename: Path to job file
668

669
    """
670
    self._filewaiter = None
671
    self._filename = filename
672

    
673
  def Wait(self, timeout):
674
    """Waits for a job to change.
675

676
    @type timeout: float
677
    @param timeout: Timeout in seconds
678
    @return: Whether there have been events
679

680
    """
681
    if self._filewaiter:
682
      return self._filewaiter.Wait(timeout)
683

    
684
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
685
    # If this point is reached, return immediately and let caller check the job
686
    # file again in case there were changes since the last check. This avoids a
687
    # race condition.
688
    self._filewaiter = _JobFileChangesWaiter(self._filename)
689

    
690
    return True
691

    
692
  def Close(self):
693
    """Closes underlying waiter.
694

695
    """
696
    if self._filewaiter:
697
      self._filewaiter.Close()
698

    
699

    
700
class _WaitForJobChangesHelper(object):
701
  """Helper class using inotify to wait for changes in a job file.
702

703
  This class takes a previous job status and serial, and alerts the client when
704
  the current job status has changed.
705

706
  """
707
  @staticmethod
708
  def _CheckForChanges(job_load_fn, check_fn):
709
    job = job_load_fn()
710
    if not job:
711
      raise errors.JobLost()
712

    
713
    result = check_fn(job)
714
    if result is None:
715
      raise utils.RetryAgain()
716

    
717
    return result
718

    
719
  def __call__(self, filename, job_load_fn,
720
               fields, prev_job_info, prev_log_serial, timeout):
721
    """Waits for changes on a job.
722

723
    @type filename: string
724
    @param filename: File on which to wait for changes
725
    @type job_load_fn: callable
726
    @param job_load_fn: Function to load job
727
    @type fields: list of strings
728
    @param fields: Which fields to check for changes
729
    @type prev_job_info: list or None
730
    @param prev_job_info: Last job information returned
731
    @type prev_log_serial: int
732
    @param prev_log_serial: Last job message serial number
733
    @type timeout: float
734
    @param timeout: maximum time to wait in seconds
735

736
    """
737
    try:
738
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
739
      waiter = _JobChangesWaiter(filename)
740
      try:
741
        return utils.Retry(compat.partial(self._CheckForChanges,
742
                                          job_load_fn, check_fn),
743
                           utils.RETRY_REMAINING_TIME, timeout,
744
                           wait_fn=waiter.Wait)
745
      finally:
746
        waiter.Close()
747
    except (errors.InotifyError, errors.JobLost):
748
      return None
749
    except utils.RetryTimeout:
750
      return constants.JOB_NOTCHANGED
751

    
752

    
753
def _EncodeOpError(err):
754
  """Encodes an error which occurred while processing an opcode.
755

756
  """
757
  if isinstance(err, errors.GenericError):
758
    to_encode = err
759
  else:
760
    to_encode = errors.OpExecError(str(err))
761

    
762
  return errors.EncodeException(to_encode)
763

    
764

    
765
class _TimeoutStrategyWrapper:
766
  def __init__(self, fn):
767
    """Initializes this class.
768

769
    """
770
    self._fn = fn
771
    self._next = None
772

    
773
  def _Advance(self):
774
    """Gets the next timeout if necessary.
775

776
    """
777
    if self._next is None:
778
      self._next = self._fn()
779

    
780
  def Peek(self):
781
    """Returns the next timeout.
782

783
    """
784
    self._Advance()
785
    return self._next
786

    
787
  def Next(self):
788
    """Returns the current timeout and advances the internal state.
789

790
    """
791
    self._Advance()
792
    result = self._next
793
    self._next = None
794
    return result
795

    
796

    
797
class _OpExecContext:
798
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
799
    """Initializes this class.
800

801
    """
802
    self.op = op
803
    self.index = index
804
    self.log_prefix = log_prefix
805
    self.summary = op.input.Summary()
806

    
807
    self._timeout_strategy_factory = timeout_strategy_factory
808
    self._ResetTimeoutStrategy()
809

    
810
  def _ResetTimeoutStrategy(self):
811
    """Creates a new timeout strategy.
812

813
    """
814
    self._timeout_strategy = \
815
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
816

    
817
  def CheckPriorityIncrease(self):
818
    """Checks whether priority can and should be increased.
819

820
    Called when locks couldn't be acquired.
821

822
    """
823
    op = self.op
824

    
825
    # Exhausted all retries and next round should not use blocking acquire
826
    # for locks?
827
    if (self._timeout_strategy.Peek() is None and
828
        op.priority > constants.OP_PRIO_HIGHEST):
829
      logging.debug("Increasing priority")
830
      op.priority -= 1
831
      self._ResetTimeoutStrategy()
832
      return True
833

    
834
    return False
835

    
836
  def GetNextLockTimeout(self):
837
    """Returns the next lock acquire timeout.
838

839
    """
840
    return self._timeout_strategy.Next()
841

    
842

    
843
class _JobProcessor(object):
844
  def __init__(self, queue, opexec_fn, job,
845
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
846
    """Initializes this class.
847

848
    """
849
    self.queue = queue
850
    self.opexec_fn = opexec_fn
851
    self.job = job
852
    self._timeout_strategy_factory = _timeout_strategy_factory
853

    
854
  @staticmethod
855
  def _FindNextOpcode(job, timeout_strategy_factory):
856
    """Locates the next opcode to run.
857

858
    @type job: L{_QueuedJob}
859
    @param job: Job object
860
    @param timeout_strategy_factory: Callable to create new timeout strategy
861

862
    """
863
    # Create some sort of a cache to speed up locating next opcode for future
864
    # lookups
865
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
866
    # pending and one for processed ops.
867
    if job.ops_iter is None:
868
      job.ops_iter = enumerate(job.ops)
869

    
870
    # Find next opcode to run
871
    while True:
872
      try:
873
        (idx, op) = job.ops_iter.next()
874
      except StopIteration:
875
        raise errors.ProgrammerError("Called for a finished job")
876

    
877
      if op.status == constants.OP_STATUS_RUNNING:
878
        # Found an opcode already marked as running
879
        raise errors.ProgrammerError("Called for job marked as running")
880

    
881
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
882
                             timeout_strategy_factory)
883

    
884
      if op.status not in constants.OPS_FINALIZED:
885
        return opctx
886

    
887
      # This is a job that was partially completed before master daemon
888
      # shutdown, so it can be expected that some opcodes are already
889
      # completed successfully (if any did error out, then the whole job
890
      # should have been aborted and not resubmitted for processing).
891
      logging.info("%s: opcode %s already processed, skipping",
892
                   opctx.log_prefix, opctx.summary)
893

    
894
  @staticmethod
895
  def _MarkWaitlock(job, op):
896
    """Marks an opcode as waiting for locks.
897

898
    The job's start timestamp is also set if necessary.
899

900
    @type job: L{_QueuedJob}
901
    @param job: Job object
902
    @type op: L{_QueuedOpCode}
903
    @param op: Opcode object
904

905
    """
906
    assert op in job.ops
907
    assert op.status in (constants.OP_STATUS_QUEUED,
908
                         constants.OP_STATUS_WAITLOCK)
909

    
910
    update = False
911

    
912
    op.result = None
913

    
914
    if op.status == constants.OP_STATUS_QUEUED:
915
      op.status = constants.OP_STATUS_WAITLOCK
916
      update = True
917

    
918
    if op.start_timestamp is None:
919
      op.start_timestamp = TimeStampNow()
920
      update = True
921

    
922
    if job.start_timestamp is None:
923
      job.start_timestamp = op.start_timestamp
924
      update = True
925

    
926
    assert op.status == constants.OP_STATUS_WAITLOCK
927

    
928
    return update
929

    
930
  def _ExecOpCodeUnlocked(self, opctx):
931
    """Processes one opcode and returns the result.
932

933
    """
934
    op = opctx.op
935

    
936
    assert op.status == constants.OP_STATUS_WAITLOCK
937

    
938
    timeout = opctx.GetNextLockTimeout()
939

    
940
    try:
941
      # Make sure not to hold queue lock while calling ExecOpCode
942
      result = self.opexec_fn(op.input,
943
                              _OpExecCallbacks(self.queue, self.job, op),
944
                              timeout=timeout, priority=op.priority)
945
    except mcpu.LockAcquireTimeout:
946
      assert timeout is not None, "Received timeout for blocking acquire"
947
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
948

    
949
      assert op.status in (constants.OP_STATUS_WAITLOCK,
950
                           constants.OP_STATUS_CANCELING)
951

    
952
      # Was job cancelled while we were waiting for the lock?
953
      if op.status == constants.OP_STATUS_CANCELING:
954
        return (constants.OP_STATUS_CANCELING, None)
955

    
956
      # Stay in waitlock while trying to re-acquire lock
957
      return (constants.OP_STATUS_WAITLOCK, None)
958
    except CancelJob:
959
      logging.exception("%s: Canceling job", opctx.log_prefix)
960
      assert op.status == constants.OP_STATUS_CANCELING
961
      return (constants.OP_STATUS_CANCELING, None)
962
    except Exception, err: # pylint: disable-msg=W0703
963
      logging.exception("%s: Caught exception in %s",
964
                        opctx.log_prefix, opctx.summary)
965
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
966
    else:
967
      logging.debug("%s: %s successful",
968
                    opctx.log_prefix, opctx.summary)
969
      return (constants.OP_STATUS_SUCCESS, result)
970

    
971
  def __call__(self, _nextop_fn=None):
972
    """Continues execution of a job.
973

974
    @param _nextop_fn: Callback function for tests
975
    @rtype: bool
976
    @return: True if job is finished, False if processor needs to be called
977
             again
978

979
    """
980
    queue = self.queue
981
    job = self.job
982

    
983
    logging.debug("Processing job %s", job.id)
984

    
985
    queue.acquire(shared=1)
986
    try:
987
      opcount = len(job.ops)
988

    
989
      # Don't do anything for finalized jobs
990
      if job.CalcStatus() in constants.JOBS_FINALIZED:
991
        return True
992

    
993
      # Is a previous opcode still pending?
994
      if job.cur_opctx:
995
        opctx = job.cur_opctx
996
        job.cur_opctx = None
997
      else:
998
        if __debug__ and _nextop_fn:
999
          _nextop_fn()
1000
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1001

    
1002
      op = opctx.op
1003

    
1004
      # Consistency check
1005
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1006
                                     constants.OP_STATUS_CANCELING)
1007
                        for i in job.ops[opctx.index + 1:])
1008

    
1009
      assert op.status in (constants.OP_STATUS_QUEUED,
1010
                           constants.OP_STATUS_WAITLOCK,
1011
                           constants.OP_STATUS_CANCELING)
1012

    
1013
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1014
              op.priority >= constants.OP_PRIO_HIGHEST)
1015

    
1016
      if op.status != constants.OP_STATUS_CANCELING:
1017
        assert op.status in (constants.OP_STATUS_QUEUED,
1018
                             constants.OP_STATUS_WAITLOCK)
1019

    
1020
        # Prepare to start opcode
1021
        if self._MarkWaitlock(job, op):
1022
          # Write to disk
1023
          queue.UpdateJobUnlocked(job)
1024

    
1025
        assert op.status == constants.OP_STATUS_WAITLOCK
1026
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1027
        assert job.start_timestamp and op.start_timestamp
1028

    
1029
        logging.info("%s: opcode %s waiting for locks",
1030
                     opctx.log_prefix, opctx.summary)
1031

    
1032
        queue.release()
1033
        try:
1034
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1035
        finally:
1036
          queue.acquire(shared=1)
1037

    
1038
        op.status = op_status
1039
        op.result = op_result
1040

    
1041
        if op.status == constants.OP_STATUS_WAITLOCK:
1042
          # Couldn't get locks in time
1043
          assert not op.end_timestamp
1044
        else:
1045
          # Finalize opcode
1046
          op.end_timestamp = TimeStampNow()
1047

    
1048
          if op.status == constants.OP_STATUS_CANCELING:
1049
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1050
                                  for i in job.ops[opctx.index:])
1051
          else:
1052
            assert op.status in constants.OPS_FINALIZED
1053

    
1054
      if op.status == constants.OP_STATUS_WAITLOCK:
1055
        finalize = False
1056

    
1057
        if opctx.CheckPriorityIncrease():
1058
          # Priority was changed, need to update on-disk file
1059
          queue.UpdateJobUnlocked(job)
1060

    
1061
        # Keep around for another round
1062
        job.cur_opctx = opctx
1063

    
1064
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1065
                op.priority >= constants.OP_PRIO_HIGHEST)
1066

    
1067
        # In no case must the status be finalized here
1068
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1069

    
1070
      else:
1071
        # Ensure all opcodes so far have been successful
1072
        assert (opctx.index == 0 or
1073
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1074
                           for i in job.ops[:opctx.index]))
1075

    
1076
        # Reset context
1077
        job.cur_opctx = None
1078

    
1079
        if op.status == constants.OP_STATUS_SUCCESS:
1080
          finalize = False
1081

    
1082
        elif op.status == constants.OP_STATUS_ERROR:
1083
          # Ensure failed opcode has an exception as its result
1084
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1085

    
1086
          to_encode = errors.OpExecError("Preceding opcode failed")
1087
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1088
                                _EncodeOpError(to_encode))
1089
          finalize = True
1090

    
1091
          # Consistency check
1092
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1093
                            errors.GetEncodedError(i.result)
1094
                            for i in job.ops[opctx.index:])
1095

    
1096
        elif op.status == constants.OP_STATUS_CANCELING:
1097
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1098
                                "Job canceled by request")
1099
          finalize = True
1100

    
1101
        else:
1102
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1103

    
1104
        if opctx.index == (opcount - 1):
1105
          # Finalize on last opcode
1106
          finalize = True
1107

    
1108
        if finalize:
1109
          # All opcodes have been run, finalize job
1110
          job.Finalize()
1111

    
1112
        # Write to disk. If the job status is final, this is the final write
1113
        # allowed. Once the file has been written, it can be archived anytime.
1114
        queue.UpdateJobUnlocked(job)
1115

    
1116
        if finalize:
1117
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1118
          return True
1119

    
1120
      return False
1121
    finally:
1122
      queue.release()
1123

    
1124

    
1125
class _JobQueueWorker(workerpool.BaseWorker):
1126
  """The actual job workers.
1127

1128
  """
1129
  def RunTask(self, job): # pylint: disable-msg=W0221
1130
    """Job executor.
1131

1132
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1133
    L{_QueuedOpCode} classes.
1134

1135
    @type job: L{_QueuedJob}
1136
    @param job: the job to be processed
1137

1138
    """
1139
    queue = job.queue
1140
    assert queue == self.pool.queue
1141

    
1142
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1143
    setname_fn(None)
1144

    
1145
    proc = mcpu.Processor(queue.context, job.id)
1146

    
1147
    # Create wrapper for setting thread name
1148
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1149
                                    proc.ExecOpCode)
1150

    
1151
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1152
      # Schedule again
1153
      raise workerpool.DeferTask(priority=job.CalcPriority())
1154

    
1155
  @staticmethod
1156
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1157
    """Updates the worker thread name to include a short summary of the opcode.
1158

1159
    @param setname_fn: Callable setting worker thread name
1160
    @param execop_fn: Callable for executing opcode (usually
1161
                      L{mcpu.Processor.ExecOpCode})
1162

1163
    """
1164
    setname_fn(op)
1165
    try:
1166
      return execop_fn(op, *args, **kwargs)
1167
    finally:
1168
      setname_fn(None)
1169

    
1170
  @staticmethod
1171
  def _GetWorkerName(job, op):
1172
    """Sets the worker thread name.
1173

1174
    @type job: L{_QueuedJob}
1175
    @type op: L{opcodes.OpCode}
1176

1177
    """
1178
    parts = ["Job%s" % job.id]
1179

    
1180
    if op:
1181
      parts.append(op.TinySummary())
1182

    
1183
    return "/".join(parts)
1184

    
1185

    
1186
class _JobQueueWorkerPool(workerpool.WorkerPool):
1187
  """Simple class implementing a job-processing workerpool.
1188

1189
  """
1190
  def __init__(self, queue):
1191
    super(_JobQueueWorkerPool, self).__init__("Jq",
1192
                                              JOBQUEUE_THREADS,
1193
                                              _JobQueueWorker)
1194
    self.queue = queue
1195

    
1196

    
1197
def _RequireOpenQueue(fn):
1198
  """Decorator for "public" functions.
1199

1200
  This function should be used for all 'public' functions. That is,
1201
  functions usually called from other classes. Note that this should
1202
  be applied only to methods (not plain functions), since it expects
1203
  that the decorated function is called with a first argument that has
1204
  a '_queue_filelock' argument.
1205

1206
  @warning: Use this decorator only after locking.ssynchronized
1207

1208
  Example::
1209
    @locking.ssynchronized(_LOCK)
1210
    @_RequireOpenQueue
1211
    def Example(self):
1212
      pass
1213

1214
  """
1215
  def wrapper(self, *args, **kwargs):
1216
    # pylint: disable-msg=W0212
1217
    assert self._queue_filelock is not None, "Queue should be open"
1218
    return fn(self, *args, **kwargs)
1219
  return wrapper
1220

    
1221

    
1222
class JobQueue(object):
1223
  """Queue used to manage the jobs.
1224

1225
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1226

1227
  """
1228
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1229

    
1230
  def __init__(self, context):
1231
    """Constructor for JobQueue.
1232

1233
    The constructor will initialize the job queue object and then
1234
    start loading the current jobs from disk, either for starting them
1235
    (if they were queue) or for aborting them (if they were already
1236
    running).
1237

1238
    @type context: GanetiContext
1239
    @param context: the context object for access to the configuration
1240
        data and other ganeti objects
1241

1242
    """
1243
    self.context = context
1244
    self._memcache = weakref.WeakValueDictionary()
1245
    self._my_hostname = netutils.Hostname.GetSysName()
1246

    
1247
    # The Big JobQueue lock. If a code block or method acquires it in shared
1248
    # mode safe it must guarantee concurrency with all the code acquiring it in
1249
    # shared mode, including itself. In order not to acquire it at all
1250
    # concurrency must be guaranteed with all code acquiring it in shared mode
1251
    # and all code acquiring it exclusively.
1252
    self._lock = locking.SharedLock("JobQueue")
1253

    
1254
    self.acquire = self._lock.acquire
1255
    self.release = self._lock.release
1256

    
1257
    # Initialize the queue, and acquire the filelock.
1258
    # This ensures no other process is working on the job queue.
1259
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1260

    
1261
    # Read serial file
1262
    self._last_serial = jstore.ReadSerial()
1263
    assert self._last_serial is not None, ("Serial file was modified between"
1264
                                           " check in jstore and here")
1265

    
1266
    # Get initial list of nodes
1267
    self._nodes = dict((n.name, n.primary_ip)
1268
                       for n in self.context.cfg.GetAllNodesInfo().values()
1269
                       if n.master_candidate)
1270

    
1271
    # Remove master node
1272
    self._nodes.pop(self._my_hostname, None)
1273

    
1274
    # TODO: Check consistency across nodes
1275

    
1276
    self._queue_size = 0
1277
    self._UpdateQueueSizeUnlocked()
1278
    self._drained = jstore.CheckDrainFlag()
1279

    
1280
    # Setup worker pool
1281
    self._wpool = _JobQueueWorkerPool(self)
1282
    try:
1283
      self._InspectQueue()
1284
    except:
1285
      self._wpool.TerminateWorkers()
1286
      raise
1287

    
1288
  @locking.ssynchronized(_LOCK)
1289
  @_RequireOpenQueue
1290
  def _InspectQueue(self):
1291
    """Loads the whole job queue and resumes unfinished jobs.
1292

1293
    This function needs the lock here because WorkerPool.AddTask() may start a
1294
    job while we're still doing our work.
1295

1296
    """
1297
    logging.info("Inspecting job queue")
1298

    
1299
    restartjobs = []
1300

    
1301
    all_job_ids = self._GetJobIDsUnlocked()
1302
    jobs_count = len(all_job_ids)
1303
    lastinfo = time.time()
1304
    for idx, job_id in enumerate(all_job_ids):
1305
      # Give an update every 1000 jobs or 10 seconds
1306
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1307
          idx == (jobs_count - 1)):
1308
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1309
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1310
        lastinfo = time.time()
1311

    
1312
      job = self._LoadJobUnlocked(job_id)
1313

    
1314
      # a failure in loading the job can cause 'None' to be returned
1315
      if job is None:
1316
        continue
1317

    
1318
      status = job.CalcStatus()
1319

    
1320
      if status == constants.JOB_STATUS_QUEUED:
1321
        restartjobs.append(job)
1322

    
1323
      elif status in (constants.JOB_STATUS_RUNNING,
1324
                      constants.JOB_STATUS_WAITLOCK,
1325
                      constants.JOB_STATUS_CANCELING):
1326
        logging.warning("Unfinished job %s found: %s", job.id, job)
1327

    
1328
        if status == constants.JOB_STATUS_WAITLOCK:
1329
          # Restart job
1330
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1331
          restartjobs.append(job)
1332
        else:
1333
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1334
                                "Unclean master daemon shutdown")
1335

    
1336
        self.UpdateJobUnlocked(job)
1337

    
1338
    if restartjobs:
1339
      logging.info("Restarting %s jobs", len(restartjobs))
1340
      self._EnqueueJobs(restartjobs)
1341

    
1342
    logging.info("Job queue inspection finished")
1343

    
1344
  @locking.ssynchronized(_LOCK)
1345
  @_RequireOpenQueue
1346
  def AddNode(self, node):
1347
    """Register a new node with the queue.
1348

1349
    @type node: L{objects.Node}
1350
    @param node: the node object to be added
1351

1352
    """
1353
    node_name = node.name
1354
    assert node_name != self._my_hostname
1355

    
1356
    # Clean queue directory on added node
1357
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1358
    msg = result.fail_msg
1359
    if msg:
1360
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1361
                      node_name, msg)
1362

    
1363
    if not node.master_candidate:
1364
      # remove if existing, ignoring errors
1365
      self._nodes.pop(node_name, None)
1366
      # and skip the replication of the job ids
1367
      return
1368

    
1369
    # Upload the whole queue excluding archived jobs
1370
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1371

    
1372
    # Upload current serial file
1373
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1374

    
1375
    for file_name in files:
1376
      # Read file content
1377
      content = utils.ReadFile(file_name)
1378

    
1379
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1380
                                                  [node.primary_ip],
1381
                                                  file_name, content)
1382
      msg = result[node_name].fail_msg
1383
      if msg:
1384
        logging.error("Failed to upload file %s to node %s: %s",
1385
                      file_name, node_name, msg)
1386

    
1387
    self._nodes[node_name] = node.primary_ip
1388

    
1389
  @locking.ssynchronized(_LOCK)
1390
  @_RequireOpenQueue
1391
  def RemoveNode(self, node_name):
1392
    """Callback called when removing nodes from the cluster.
1393

1394
    @type node_name: str
1395
    @param node_name: the name of the node to remove
1396

1397
    """
1398
    self._nodes.pop(node_name, None)
1399

    
1400
  @staticmethod
1401
  def _CheckRpcResult(result, nodes, failmsg):
1402
    """Verifies the status of an RPC call.
1403

1404
    Since we aim to keep consistency should this node (the current
1405
    master) fail, we will log errors if our rpc fail, and especially
1406
    log the case when more than half of the nodes fails.
1407

1408
    @param result: the data as returned from the rpc call
1409
    @type nodes: list
1410
    @param nodes: the list of nodes we made the call to
1411
    @type failmsg: str
1412
    @param failmsg: the identifier to be used for logging
1413

1414
    """
1415
    failed = []
1416
    success = []
1417

    
1418
    for node in nodes:
1419
      msg = result[node].fail_msg
1420
      if msg:
1421
        failed.append(node)
1422
        logging.error("RPC call %s (%s) failed on node %s: %s",
1423
                      result[node].call, failmsg, node, msg)
1424
      else:
1425
        success.append(node)
1426

    
1427
    # +1 for the master node
1428
    if (len(success) + 1) < len(failed):
1429
      # TODO: Handle failing nodes
1430
      logging.error("More than half of the nodes failed")
1431

    
1432
  def _GetNodeIp(self):
1433
    """Helper for returning the node name/ip list.
1434

1435
    @rtype: (list, list)
1436
    @return: a tuple of two lists, the first one with the node
1437
        names and the second one with the node addresses
1438

1439
    """
1440
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1441
    name_list = self._nodes.keys()
1442
    addr_list = [self._nodes[name] for name in name_list]
1443
    return name_list, addr_list
1444

    
1445
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1446
    """Writes a file locally and then replicates it to all nodes.
1447

1448
    This function will replace the contents of a file on the local
1449
    node and then replicate it to all the other nodes we have.
1450

1451
    @type file_name: str
1452
    @param file_name: the path of the file to be replicated
1453
    @type data: str
1454
    @param data: the new contents of the file
1455
    @type replicate: boolean
1456
    @param replicate: whether to spread the changes to the remote nodes
1457

1458
    """
1459
    getents = runtime.GetEnts()
1460
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1461
                    gid=getents.masterd_gid)
1462

    
1463
    if replicate:
1464
      names, addrs = self._GetNodeIp()
1465
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1466
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1467

    
1468
  def _RenameFilesUnlocked(self, rename):
1469
    """Renames a file locally and then replicate the change.
1470

1471
    This function will rename a file in the local queue directory
1472
    and then replicate this rename to all the other nodes we have.
1473

1474
    @type rename: list of (old, new)
1475
    @param rename: List containing tuples mapping old to new names
1476

1477
    """
1478
    # Rename them locally
1479
    for old, new in rename:
1480
      utils.RenameFile(old, new, mkdir=True)
1481

    
1482
    # ... and on all nodes
1483
    names, addrs = self._GetNodeIp()
1484
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1485
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1486

    
1487
  @staticmethod
1488
  def _FormatJobID(job_id):
1489
    """Convert a job ID to string format.
1490

1491
    Currently this just does C{str(job_id)} after performing some
1492
    checks, but if we want to change the job id format this will
1493
    abstract this change.
1494

1495
    @type job_id: int or long
1496
    @param job_id: the numeric job id
1497
    @rtype: str
1498
    @return: the formatted job id
1499

1500
    """
1501
    if not isinstance(job_id, (int, long)):
1502
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1503
    if job_id < 0:
1504
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1505

    
1506
    return str(job_id)
1507

    
1508
  @classmethod
1509
  def _GetArchiveDirectory(cls, job_id):
1510
    """Returns the archive directory for a job.
1511

1512
    @type job_id: str
1513
    @param job_id: Job identifier
1514
    @rtype: str
1515
    @return: Directory name
1516

1517
    """
1518
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1519

    
1520
  def _NewSerialsUnlocked(self, count):
1521
    """Generates a new job identifier.
1522

1523
    Job identifiers are unique during the lifetime of a cluster.
1524

1525
    @type count: integer
1526
    @param count: how many serials to return
1527
    @rtype: str
1528
    @return: a string representing the job identifier.
1529

1530
    """
1531
    assert count > 0
1532
    # New number
1533
    serial = self._last_serial + count
1534

    
1535
    # Write to file
1536
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1537
                             "%s\n" % serial, True)
1538

    
1539
    result = [self._FormatJobID(v)
1540
              for v in range(self._last_serial, serial + 1)]
1541
    # Keep it only if we were able to write the file
1542
    self._last_serial = serial
1543

    
1544
    return result
1545

    
1546
  @staticmethod
1547
  def _GetJobPath(job_id):
1548
    """Returns the job file for a given job id.
1549

1550
    @type job_id: str
1551
    @param job_id: the job identifier
1552
    @rtype: str
1553
    @return: the path to the job file
1554

1555
    """
1556
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1557

    
1558
  @classmethod
1559
  def _GetArchivedJobPath(cls, job_id):
1560
    """Returns the archived job file for a give job id.
1561

1562
    @type job_id: str
1563
    @param job_id: the job identifier
1564
    @rtype: str
1565
    @return: the path to the archived job file
1566

1567
    """
1568
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1569
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1570

    
1571
  def _GetJobIDsUnlocked(self, sort=True):
1572
    """Return all known job IDs.
1573

1574
    The method only looks at disk because it's a requirement that all
1575
    jobs are present on disk (so in the _memcache we don't have any
1576
    extra IDs).
1577

1578
    @type sort: boolean
1579
    @param sort: perform sorting on the returned job ids
1580
    @rtype: list
1581
    @return: the list of job IDs
1582

1583
    """
1584
    jlist = []
1585
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1586
      m = self._RE_JOB_FILE.match(filename)
1587
      if m:
1588
        jlist.append(m.group(1))
1589
    if sort:
1590
      jlist = utils.NiceSort(jlist)
1591
    return jlist
1592

    
1593
  def _LoadJobUnlocked(self, job_id):
1594
    """Loads a job from the disk or memory.
1595

1596
    Given a job id, this will return the cached job object if
1597
    existing, or try to load the job from the disk. If loading from
1598
    disk, it will also add the job to the cache.
1599

1600
    @param job_id: the job id
1601
    @rtype: L{_QueuedJob} or None
1602
    @return: either None or the job object
1603

1604
    """
1605
    job = self._memcache.get(job_id, None)
1606
    if job:
1607
      logging.debug("Found job %s in memcache", job_id)
1608
      return job
1609

    
1610
    try:
1611
      job = self._LoadJobFromDisk(job_id, False)
1612
      if job is None:
1613
        return job
1614
    except errors.JobFileCorrupted:
1615
      old_path = self._GetJobPath(job_id)
1616
      new_path = self._GetArchivedJobPath(job_id)
1617
      if old_path == new_path:
1618
        # job already archived (future case)
1619
        logging.exception("Can't parse job %s", job_id)
1620
      else:
1621
        # non-archived case
1622
        logging.exception("Can't parse job %s, will archive.", job_id)
1623
        self._RenameFilesUnlocked([(old_path, new_path)])
1624
      return None
1625

    
1626
    self._memcache[job_id] = job
1627
    logging.debug("Added job %s to the cache", job_id)
1628
    return job
1629

    
1630
  def _LoadJobFromDisk(self, job_id, try_archived):
1631
    """Load the given job file from disk.
1632

1633
    Given a job file, read, load and restore it in a _QueuedJob format.
1634

1635
    @type job_id: string
1636
    @param job_id: job identifier
1637
    @type try_archived: bool
1638
    @param try_archived: Whether to try loading an archived job
1639
    @rtype: L{_QueuedJob} or None
1640
    @return: either None or the job object
1641

1642
    """
1643
    path_functions = [self._GetJobPath]
1644

    
1645
    if try_archived:
1646
      path_functions.append(self._GetArchivedJobPath)
1647

    
1648
    raw_data = None
1649

    
1650
    for fn in path_functions:
1651
      filepath = fn(job_id)
1652
      logging.debug("Loading job from %s", filepath)
1653
      try:
1654
        raw_data = utils.ReadFile(filepath)
1655
      except EnvironmentError, err:
1656
        if err.errno != errno.ENOENT:
1657
          raise
1658
      else:
1659
        break
1660

    
1661
    if not raw_data:
1662
      return None
1663

    
1664
    try:
1665
      data = serializer.LoadJson(raw_data)
1666
      job = _QueuedJob.Restore(self, data)
1667
    except Exception, err: # pylint: disable-msg=W0703
1668
      raise errors.JobFileCorrupted(err)
1669

    
1670
    return job
1671

    
1672
  def SafeLoadJobFromDisk(self, job_id, try_archived):
1673
    """Load the given job file from disk.
1674

1675
    Given a job file, read, load and restore it in a _QueuedJob format.
1676
    In case of error reading the job, it gets returned as None, and the
1677
    exception is logged.
1678

1679
    @type job_id: string
1680
    @param job_id: job identifier
1681
    @type try_archived: bool
1682
    @param try_archived: Whether to try loading an archived job
1683
    @rtype: L{_QueuedJob} or None
1684
    @return: either None or the job object
1685

1686
    """
1687
    try:
1688
      return self._LoadJobFromDisk(job_id, try_archived)
1689
    except (errors.JobFileCorrupted, EnvironmentError):
1690
      logging.exception("Can't load/parse job %s", job_id)
1691
      return None
1692

    
1693
  def _UpdateQueueSizeUnlocked(self):
1694
    """Update the queue size.
1695

1696
    """
1697
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1698

    
1699
  @locking.ssynchronized(_LOCK)
1700
  @_RequireOpenQueue
1701
  def SetDrainFlag(self, drain_flag):
1702
    """Sets the drain flag for the queue.
1703

1704
    @type drain_flag: boolean
1705
    @param drain_flag: Whether to set or unset the drain flag
1706

1707
    """
1708
    jstore.SetDrainFlag(drain_flag)
1709

    
1710
    self._drained = drain_flag
1711

    
1712
    return True
1713

    
1714
  @_RequireOpenQueue
1715
  def _SubmitJobUnlocked(self, job_id, ops):
1716
    """Create and store a new job.
1717

1718
    This enters the job into our job queue and also puts it on the new
1719
    queue, in order for it to be picked up by the queue processors.
1720

1721
    @type job_id: job ID
1722
    @param job_id: the job ID for the new job
1723
    @type ops: list
1724
    @param ops: The list of OpCodes that will become the new job.
1725
    @rtype: L{_QueuedJob}
1726
    @return: the job object to be queued
1727
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1728
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1729
    @raise errors.GenericError: If an opcode is not valid
1730

1731
    """
1732
    # Ok when sharing the big job queue lock, as the drain file is created when
1733
    # the lock is exclusive.
1734
    if self._drained:
1735
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1736

    
1737
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1738
      raise errors.JobQueueFull()
1739

    
1740
    job = _QueuedJob(self, job_id, ops)
1741

    
1742
    # Check priority
1743
    for idx, op in enumerate(job.ops):
1744
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1745
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1746
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1747
                                  " are %s" % (idx, op.priority, allowed))
1748

    
1749
    # Write to disk
1750
    self.UpdateJobUnlocked(job)
1751

    
1752
    self._queue_size += 1
1753

    
1754
    logging.debug("Adding new job %s to the cache", job_id)
1755
    self._memcache[job_id] = job
1756

    
1757
    return job
1758

    
1759
  @locking.ssynchronized(_LOCK)
1760
  @_RequireOpenQueue
1761
  def SubmitJob(self, ops):
1762
    """Create and store a new job.
1763

1764
    @see: L{_SubmitJobUnlocked}
1765

1766
    """
1767
    job_id = self._NewSerialsUnlocked(1)[0]
1768
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1769
    return job_id
1770

    
1771
  @locking.ssynchronized(_LOCK)
1772
  @_RequireOpenQueue
1773
  def SubmitManyJobs(self, jobs):
1774
    """Create and store multiple jobs.
1775

1776
    @see: L{_SubmitJobUnlocked}
1777

1778
    """
1779
    results = []
1780
    added_jobs = []
1781
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1782
    for job_id, ops in zip(all_job_ids, jobs):
1783
      try:
1784
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1785
        status = True
1786
        data = job_id
1787
      except errors.GenericError, err:
1788
        data = ("%s; opcodes %s" %
1789
                (err, utils.CommaJoin(op.Summary() for op in ops)))
1790
        status = False
1791
      results.append((status, data))
1792

    
1793
    self._EnqueueJobs(added_jobs)
1794

    
1795
    return results
1796

    
1797
  def _EnqueueJobs(self, jobs):
1798
    """Helper function to add jobs to worker pool's queue.
1799

1800
    @type jobs: list
1801
    @param jobs: List of all jobs
1802

1803
    """
1804
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1805
                             priority=[job.CalcPriority() for job in jobs])
1806

    
1807
  @_RequireOpenQueue
1808
  def UpdateJobUnlocked(self, job, replicate=True):
1809
    """Update a job's on disk storage.
1810

1811
    After a job has been modified, this function needs to be called in
1812
    order to write the changes to disk and replicate them to the other
1813
    nodes.
1814

1815
    @type job: L{_QueuedJob}
1816
    @param job: the changed job
1817
    @type replicate: boolean
1818
    @param replicate: whether to replicate the change to remote nodes
1819

1820
    """
1821
    if __debug__:
1822
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1823
      assert (finalized ^ (job.end_timestamp is None))
1824

    
1825
    filename = self._GetJobPath(job.id)
1826
    data = serializer.DumpJson(job.Serialize(), indent=False)
1827
    logging.debug("Writing job %s to %s", job.id, filename)
1828
    self._UpdateJobQueueFile(filename, data, replicate)
1829

    
1830
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1831
                        timeout):
1832
    """Waits for changes in a job.
1833

1834
    @type job_id: string
1835
    @param job_id: Job identifier
1836
    @type fields: list of strings
1837
    @param fields: Which fields to check for changes
1838
    @type prev_job_info: list or None
1839
    @param prev_job_info: Last job information returned
1840
    @type prev_log_serial: int
1841
    @param prev_log_serial: Last job message serial number
1842
    @type timeout: float
1843
    @param timeout: maximum time to wait in seconds
1844
    @rtype: tuple (job info, log entries)
1845
    @return: a tuple of the job information as required via
1846
        the fields parameter, and the log entries as a list
1847

1848
        if the job has not changed and the timeout has expired,
1849
        we instead return a special value,
1850
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1851
        as such by the clients
1852

1853
    """
1854
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
1855

    
1856
    helper = _WaitForJobChangesHelper()
1857

    
1858
    return helper(self._GetJobPath(job_id), load_fn,
1859
                  fields, prev_job_info, prev_log_serial, timeout)
1860

    
1861
  @locking.ssynchronized(_LOCK)
1862
  @_RequireOpenQueue
1863
  def CancelJob(self, job_id):
1864
    """Cancels a job.
1865

1866
    This will only succeed if the job has not started yet.
1867

1868
    @type job_id: string
1869
    @param job_id: job ID of job to be cancelled.
1870

1871
    """
1872
    logging.info("Cancelling job %s", job_id)
1873

    
1874
    job = self._LoadJobUnlocked(job_id)
1875
    if not job:
1876
      logging.debug("Job %s not found", job_id)
1877
      return (False, "Job %s not found" % job_id)
1878

    
1879
    (success, msg) = job.Cancel()
1880

    
1881
    if success:
1882
      # If the job was finalized (e.g. cancelled), this is the final write
1883
      # allowed. The job can be archived anytime.
1884
      self.UpdateJobUnlocked(job)
1885

    
1886
    return (success, msg)
1887

    
1888
  @_RequireOpenQueue
1889
  def _ArchiveJobsUnlocked(self, jobs):
1890
    """Archives jobs.
1891

1892
    @type jobs: list of L{_QueuedJob}
1893
    @param jobs: Job objects
1894
    @rtype: int
1895
    @return: Number of archived jobs
1896

1897
    """
1898
    archive_jobs = []
1899
    rename_files = []
1900
    for job in jobs:
1901
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1902
        logging.debug("Job %s is not yet done", job.id)
1903
        continue
1904

    
1905
      archive_jobs.append(job)
1906

    
1907
      old = self._GetJobPath(job.id)
1908
      new = self._GetArchivedJobPath(job.id)
1909
      rename_files.append((old, new))
1910

    
1911
    # TODO: What if 1..n files fail to rename?
1912
    self._RenameFilesUnlocked(rename_files)
1913

    
1914
    logging.debug("Successfully archived job(s) %s",
1915
                  utils.CommaJoin(job.id for job in archive_jobs))
1916

    
1917
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1918
    # the files, we update the cached queue size from the filesystem. When we
1919
    # get around to fix the TODO: above, we can use the number of actually
1920
    # archived jobs to fix this.
1921
    self._UpdateQueueSizeUnlocked()
1922
    return len(archive_jobs)
1923

    
1924
  @locking.ssynchronized(_LOCK)
1925
  @_RequireOpenQueue
1926
  def ArchiveJob(self, job_id):
1927
    """Archives a job.
1928

1929
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1930

1931
    @type job_id: string
1932
    @param job_id: Job ID of job to be archived.
1933
    @rtype: bool
1934
    @return: Whether job was archived
1935

1936
    """
1937
    logging.info("Archiving job %s", job_id)
1938

    
1939
    job = self._LoadJobUnlocked(job_id)
1940
    if not job:
1941
      logging.debug("Job %s not found", job_id)
1942
      return False
1943

    
1944
    return self._ArchiveJobsUnlocked([job]) == 1
1945

    
1946
  @locking.ssynchronized(_LOCK)
1947
  @_RequireOpenQueue
1948
  def AutoArchiveJobs(self, age, timeout):
1949
    """Archives all jobs based on age.
1950

1951
    The method will archive all jobs which are older than the age
1952
    parameter. For jobs that don't have an end timestamp, the start
1953
    timestamp will be considered. The special '-1' age will cause
1954
    archival of all jobs (that are not running or queued).
1955

1956
    @type age: int
1957
    @param age: the minimum age in seconds
1958

1959
    """
1960
    logging.info("Archiving jobs with age more than %s seconds", age)
1961

    
1962
    now = time.time()
1963
    end_time = now + timeout
1964
    archived_count = 0
1965
    last_touched = 0
1966

    
1967
    all_job_ids = self._GetJobIDsUnlocked()
1968
    pending = []
1969
    for idx, job_id in enumerate(all_job_ids):
1970
      last_touched = idx + 1
1971

    
1972
      # Not optimal because jobs could be pending
1973
      # TODO: Measure average duration for job archival and take number of
1974
      # pending jobs into account.
1975
      if time.time() > end_time:
1976
        break
1977

    
1978
      # Returns None if the job failed to load
1979
      job = self._LoadJobUnlocked(job_id)
1980
      if job:
1981
        if job.end_timestamp is None:
1982
          if job.start_timestamp is None:
1983
            job_age = job.received_timestamp
1984
          else:
1985
            job_age = job.start_timestamp
1986
        else:
1987
          job_age = job.end_timestamp
1988

    
1989
        if age == -1 or now - job_age[0] > age:
1990
          pending.append(job)
1991

    
1992
          # Archive 10 jobs at a time
1993
          if len(pending) >= 10:
1994
            archived_count += self._ArchiveJobsUnlocked(pending)
1995
            pending = []
1996

    
1997
    if pending:
1998
      archived_count += self._ArchiveJobsUnlocked(pending)
1999

    
2000
    return (archived_count, len(all_job_ids) - last_touched)
2001

    
2002
  def QueryJobs(self, job_ids, fields):
2003
    """Returns a list of jobs in queue.
2004

2005
    @type job_ids: list
2006
    @param job_ids: sequence of job identifiers or None for all
2007
    @type fields: list
2008
    @param fields: names of fields to return
2009
    @rtype: list
2010
    @return: list one element per job, each element being list with
2011
        the requested fields
2012

2013
    """
2014
    jobs = []
2015
    list_all = False
2016
    if not job_ids:
2017
      # Since files are added to/removed from the queue atomically, there's no
2018
      # risk of getting the job ids in an inconsistent state.
2019
      job_ids = self._GetJobIDsUnlocked()
2020
      list_all = True
2021

    
2022
    for job_id in job_ids:
2023
      job = self.SafeLoadJobFromDisk(job_id, True)
2024
      if job is not None:
2025
        jobs.append(job.GetInfo(fields))
2026
      elif not list_all:
2027
        jobs.append(None)
2028

    
2029
    return jobs
2030

    
2031
  @locking.ssynchronized(_LOCK)
2032
  @_RequireOpenQueue
2033
  def Shutdown(self):
2034
    """Stops the job queue.
2035

2036
    This shutdowns all the worker threads an closes the queue.
2037

2038
    """
2039
    self._wpool.TerminateWorkers()
2040

    
2041
    self._queue_filelock.Close()
2042
    self._queue_filelock = None