Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 45df0793

History | View | Annotate | Download (59.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
try:
39
  # pylint: disable-msg=E0611
40
  from pyinotify import pyinotify
41
except ImportError:
42
  import pyinotify
43

    
44
from ganeti import asyncnotifier
45
from ganeti import constants
46
from ganeti import serializer
47
from ganeti import workerpool
48
from ganeti import locking
49
from ganeti import opcodes
50
from ganeti import errors
51
from ganeti import mcpu
52
from ganeti import utils
53
from ganeti import jstore
54
from ganeti import rpc
55
from ganeti import runtime
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log", "priority",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
    # Get initial priority (it might change during the lifetime of this opcode)
117
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118

    
119
  @classmethod
120
  def Restore(cls, state):
121
    """Restore the _QueuedOpCode from the serialized form.
122

123
    @type state: dict
124
    @param state: the serialized state
125
    @rtype: _QueuedOpCode
126
    @return: a new _QueuedOpCode instance
127

128
    """
129
    obj = _QueuedOpCode.__new__(cls)
130
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131
    obj.status = state["status"]
132
    obj.result = state["result"]
133
    obj.log = state["log"]
134
    obj.start_timestamp = state.get("start_timestamp", None)
135
    obj.exec_timestamp = state.get("exec_timestamp", None)
136
    obj.end_timestamp = state.get("end_timestamp", None)
137
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138
    return obj
139

    
140
  def Serialize(self):
141
    """Serializes this _QueuedOpCode.
142

143
    @rtype: dict
144
    @return: the dictionary holding the serialized state
145

146
    """
147
    return {
148
      "input": self.input.__getstate__(),
149
      "status": self.status,
150
      "result": self.result,
151
      "log": self.log,
152
      "start_timestamp": self.start_timestamp,
153
      "exec_timestamp": self.exec_timestamp,
154
      "end_timestamp": self.end_timestamp,
155
      "priority": self.priority,
156
      }
157

    
158

    
159
class _QueuedJob(object):
160
  """In-memory job representation.
161

162
  This is what we use to track the user-submitted jobs. Locking must
163
  be taken care of by users of this class.
164

165
  @type queue: L{JobQueue}
166
  @ivar queue: the parent queue
167
  @ivar id: the job ID
168
  @type ops: list
169
  @ivar ops: the list of _QueuedOpCode that constitute the job
170
  @type log_serial: int
171
  @ivar log_serial: holds the index for the next log entry
172
  @ivar received_timestamp: the timestamp for when the job was received
173
  @ivar start_timestmap: the timestamp for start of execution
174
  @ivar end_timestamp: the timestamp for end of execution
175

176
  """
177
  # pylint: disable-msg=W0212
178
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179
               "received_timestamp", "start_timestamp", "end_timestamp",
180
               "__weakref__"]
181

    
182
  def __init__(self, queue, job_id, ops):
183
    """Constructor for the _QueuedJob.
184

185
    @type queue: L{JobQueue}
186
    @param queue: our parent queue
187
    @type job_id: job_id
188
    @param job_id: our job id
189
    @type ops: list
190
    @param ops: the list of opcodes we hold, which will be encapsulated
191
        in _QueuedOpCodes
192

193
    """
194
    if not ops:
195
      raise errors.GenericError("A job needs at least one opcode")
196

    
197
    self.queue = queue
198
    self.id = job_id
199
    self.ops = [_QueuedOpCode(op) for op in ops]
200
    self.log_serial = 0
201
    self.received_timestamp = TimeStampNow()
202
    self.start_timestamp = None
203
    self.end_timestamp = None
204

    
205
    self._InitInMemory(self)
206

    
207
  @staticmethod
208
  def _InitInMemory(obj):
209
    """Initializes in-memory variables.
210

211
    """
212
    obj.ops_iter = None
213
    obj.cur_opctx = None
214

    
215
  def __repr__(self):
216
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217
              "id=%s" % self.id,
218
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219

    
220
    return "<%s at %#x>" % (" ".join(status), id(self))
221

    
222
  @classmethod
223
  def Restore(cls, queue, state):
224
    """Restore a _QueuedJob from serialized state:
225

226
    @type queue: L{JobQueue}
227
    @param queue: to which queue the restored job belongs
228
    @type state: dict
229
    @param state: the serialized state
230
    @rtype: _JobQueue
231
    @return: the restored _JobQueue instance
232

233
    """
234
    obj = _QueuedJob.__new__(cls)
235
    obj.queue = queue
236
    obj.id = state["id"]
237
    obj.received_timestamp = state.get("received_timestamp", None)
238
    obj.start_timestamp = state.get("start_timestamp", None)
239
    obj.end_timestamp = state.get("end_timestamp", None)
240

    
241
    obj.ops = []
242
    obj.log_serial = 0
243
    for op_state in state["ops"]:
244
      op = _QueuedOpCode.Restore(op_state)
245
      for log_entry in op.log:
246
        obj.log_serial = max(obj.log_serial, log_entry[0])
247
      obj.ops.append(op)
248

    
249
    cls._InitInMemory(obj)
250

    
251
    return obj
252

    
253
  def Serialize(self):
254
    """Serialize the _JobQueue instance.
255

256
    @rtype: dict
257
    @return: the serialized state
258

259
    """
260
    return {
261
      "id": self.id,
262
      "ops": [op.Serialize() for op in self.ops],
263
      "start_timestamp": self.start_timestamp,
264
      "end_timestamp": self.end_timestamp,
265
      "received_timestamp": self.received_timestamp,
266
      }
267

    
268
  def CalcStatus(self):
269
    """Compute the status of this job.
270

271
    This function iterates over all the _QueuedOpCodes in the job and
272
    based on their status, computes the job status.
273

274
    The algorithm is:
275
      - if we find a cancelled, or finished with error, the job
276
        status will be the same
277
      - otherwise, the last opcode with the status one of:
278
          - waitlock
279
          - canceling
280
          - running
281

282
        will determine the job status
283

284
      - otherwise, it means either all opcodes are queued, or success,
285
        and the job status will be the same
286

287
    @return: the job status
288

289
    """
290
    status = constants.JOB_STATUS_QUEUED
291

    
292
    all_success = True
293
    for op in self.ops:
294
      if op.status == constants.OP_STATUS_SUCCESS:
295
        continue
296

    
297
      all_success = False
298

    
299
      if op.status == constants.OP_STATUS_QUEUED:
300
        pass
301
      elif op.status == constants.OP_STATUS_WAITLOCK:
302
        status = constants.JOB_STATUS_WAITLOCK
303
      elif op.status == constants.OP_STATUS_RUNNING:
304
        status = constants.JOB_STATUS_RUNNING
305
      elif op.status == constants.OP_STATUS_CANCELING:
306
        status = constants.JOB_STATUS_CANCELING
307
        break
308
      elif op.status == constants.OP_STATUS_ERROR:
309
        status = constants.JOB_STATUS_ERROR
310
        # The whole job fails if one opcode failed
311
        break
312
      elif op.status == constants.OP_STATUS_CANCELED:
313
        status = constants.OP_STATUS_CANCELED
314
        break
315

    
316
    if all_success:
317
      status = constants.JOB_STATUS_SUCCESS
318

    
319
    return status
320

    
321
  def CalcPriority(self):
322
    """Gets the current priority for this job.
323

324
    Only unfinished opcodes are considered. When all are done, the default
325
    priority is used.
326

327
    @rtype: int
328

329
    """
330
    priorities = [op.priority for op in self.ops
331
                  if op.status not in constants.OPS_FINALIZED]
332

    
333
    if not priorities:
334
      # All opcodes are done, assume default priority
335
      return constants.OP_PRIO_DEFAULT
336

    
337
    return min(priorities)
338

    
339
  def GetLogEntries(self, newer_than):
340
    """Selectively returns the log entries.
341

342
    @type newer_than: None or int
343
    @param newer_than: if this is None, return all log entries,
344
        otherwise return only the log entries with serial higher
345
        than this value
346
    @rtype: list
347
    @return: the list of the log entries selected
348

349
    """
350
    if newer_than is None:
351
      serial = -1
352
    else:
353
      serial = newer_than
354

    
355
    entries = []
356
    for op in self.ops:
357
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358

    
359
    return entries
360

    
361
  def GetInfo(self, fields):
362
    """Returns information about a job.
363

364
    @type fields: list
365
    @param fields: names of fields to return
366
    @rtype: list
367
    @return: list with one element for each field
368
    @raise errors.OpExecError: when an invalid field
369
        has been passed
370

371
    """
372
    row = []
373
    for fname in fields:
374
      if fname == "id":
375
        row.append(self.id)
376
      elif fname == "status":
377
        row.append(self.CalcStatus())
378
      elif fname == "priority":
379
        row.append(self.CalcPriority())
380
      elif fname == "ops":
381
        row.append([op.input.__getstate__() for op in self.ops])
382
      elif fname == "opresult":
383
        row.append([op.result for op in self.ops])
384
      elif fname == "opstatus":
385
        row.append([op.status for op in self.ops])
386
      elif fname == "oplog":
387
        row.append([op.log for op in self.ops])
388
      elif fname == "opstart":
389
        row.append([op.start_timestamp for op in self.ops])
390
      elif fname == "opexec":
391
        row.append([op.exec_timestamp for op in self.ops])
392
      elif fname == "opend":
393
        row.append([op.end_timestamp for op in self.ops])
394
      elif fname == "oppriority":
395
        row.append([op.priority for op in self.ops])
396
      elif fname == "received_ts":
397
        row.append(self.received_timestamp)
398
      elif fname == "start_ts":
399
        row.append(self.start_timestamp)
400
      elif fname == "end_ts":
401
        row.append(self.end_timestamp)
402
      elif fname == "summary":
403
        row.append([op.input.Summary() for op in self.ops])
404
      else:
405
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
406
    return row
407

    
408
  def MarkUnfinishedOps(self, status, result):
409
    """Mark unfinished opcodes with a given status and result.
410

411
    This is an utility function for marking all running or waiting to
412
    be run opcodes with a given status. Opcodes which are already
413
    finalised are not changed.
414

415
    @param status: a given opcode status
416
    @param result: the opcode result
417

418
    """
419
    not_marked = True
420
    for op in self.ops:
421
      if op.status in constants.OPS_FINALIZED:
422
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423
        continue
424
      op.status = status
425
      op.result = result
426
      not_marked = False
427

    
428
  def Finalize(self):
429
    """Marks the job as finalized.
430

431
    """
432
    self.end_timestamp = TimeStampNow()
433

    
434
  def Cancel(self):
435
    """Marks job as canceled/-ing if possible.
436

437
    @rtype: tuple; (bool, string)
438
    @return: Boolean describing whether job was successfully canceled or marked
439
      as canceling and a text message
440

441
    """
442
    status = self.CalcStatus()
443

    
444
    if status == constants.JOB_STATUS_QUEUED:
445
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446
                             "Job canceled by request")
447
      self.Finalize()
448
      return (True, "Job %s canceled" % self.id)
449

    
450
    elif status == constants.JOB_STATUS_WAITLOCK:
451
      # The worker will notice the new status and cancel the job
452
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453
      return (True, "Job %s will be canceled" % self.id)
454

    
455
    else:
456
      logging.debug("Job %s is no longer waiting in the queue", self.id)
457
      return (False, "Job %s is no longer waiting in the queue" % self.id)
458

    
459

    
460
class _OpExecCallbacks(mcpu.OpExecCbBase):
461
  def __init__(self, queue, job, op):
462
    """Initializes this class.
463

464
    @type queue: L{JobQueue}
465
    @param queue: Job queue
466
    @type job: L{_QueuedJob}
467
    @param job: Job object
468
    @type op: L{_QueuedOpCode}
469
    @param op: OpCode
470

471
    """
472
    assert queue, "Queue is missing"
473
    assert job, "Job is missing"
474
    assert op, "Opcode is missing"
475

    
476
    self._queue = queue
477
    self._job = job
478
    self._op = op
479

    
480
  def _CheckCancel(self):
481
    """Raises an exception to cancel the job if asked to.
482

483
    """
484
    # Cancel here if we were asked to
485
    if self._op.status == constants.OP_STATUS_CANCELING:
486
      logging.debug("Canceling opcode")
487
      raise CancelJob()
488

    
489
  @locking.ssynchronized(_QUEUE, shared=1)
490
  def NotifyStart(self):
491
    """Mark the opcode as running, not lock-waiting.
492

493
    This is called from the mcpu code as a notifier function, when the LU is
494
    finally about to start the Exec() method. Of course, to have end-user
495
    visible results, the opcode must be initially (before calling into
496
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
497

498
    """
499
    assert self._op in self._job.ops
500
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
501
                               constants.OP_STATUS_CANCELING)
502

    
503
    # Cancel here if we were asked to
504
    self._CheckCancel()
505

    
506
    logging.debug("Opcode is now running")
507

    
508
    self._op.status = constants.OP_STATUS_RUNNING
509
    self._op.exec_timestamp = TimeStampNow()
510

    
511
    # And finally replicate the job status
512
    self._queue.UpdateJobUnlocked(self._job)
513

    
514
  @locking.ssynchronized(_QUEUE, shared=1)
515
  def _AppendFeedback(self, timestamp, log_type, log_msg):
516
    """Internal feedback append function, with locks
517

518
    """
519
    self._job.log_serial += 1
520
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
522

    
523
  def Feedback(self, *args):
524
    """Append a log entry.
525

526
    """
527
    assert len(args) < 3
528

    
529
    if len(args) == 1:
530
      log_type = constants.ELOG_MESSAGE
531
      log_msg = args[0]
532
    else:
533
      (log_type, log_msg) = args
534

    
535
    # The time is split to make serialization easier and not lose
536
    # precision.
537
    timestamp = utils.SplitTime(time.time())
538
    self._AppendFeedback(timestamp, log_type, log_msg)
539

    
540
  def CheckCancel(self):
541
    """Check whether job has been cancelled.
542

543
    """
544
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
545
                               constants.OP_STATUS_CANCELING)
546

    
547
    # Cancel here if we were asked to
548
    self._CheckCancel()
549

    
550
  def SubmitManyJobs(self, jobs):
551
    """Submits jobs for processing.
552

553
    See L{JobQueue.SubmitManyJobs}.
554

555
    """
556
    # Locking is done in job queue
557
    return self._queue.SubmitManyJobs(jobs)
558

    
559

    
560
class _JobChangesChecker(object):
561
  def __init__(self, fields, prev_job_info, prev_log_serial):
562
    """Initializes this class.
563

564
    @type fields: list of strings
565
    @param fields: Fields requested by LUXI client
566
    @type prev_job_info: string
567
    @param prev_job_info: previous job info, as passed by the LUXI client
568
    @type prev_log_serial: string
569
    @param prev_log_serial: previous job serial, as passed by the LUXI client
570

571
    """
572
    self._fields = fields
573
    self._prev_job_info = prev_job_info
574
    self._prev_log_serial = prev_log_serial
575

    
576
  def __call__(self, job):
577
    """Checks whether job has changed.
578

579
    @type job: L{_QueuedJob}
580
    @param job: Job object
581

582
    """
583
    status = job.CalcStatus()
584
    job_info = job.GetInfo(self._fields)
585
    log_entries = job.GetLogEntries(self._prev_log_serial)
586

    
587
    # Serializing and deserializing data can cause type changes (e.g. from
588
    # tuple to list) or precision loss. We're doing it here so that we get
589
    # the same modifications as the data received from the client. Without
590
    # this, the comparison afterwards might fail without the data being
591
    # significantly different.
592
    # TODO: we just deserialized from disk, investigate how to make sure that
593
    # the job info and log entries are compatible to avoid this further step.
594
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
595
    # efficient, though floats will be tricky
596
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
597
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
598

    
599
    # Don't even try to wait if the job is no longer running, there will be
600
    # no changes.
601
    if (status not in (constants.JOB_STATUS_QUEUED,
602
                       constants.JOB_STATUS_RUNNING,
603
                       constants.JOB_STATUS_WAITLOCK) or
604
        job_info != self._prev_job_info or
605
        (log_entries and self._prev_log_serial != log_entries[0][0])):
606
      logging.debug("Job %s changed", job.id)
607
      return (job_info, log_entries)
608

    
609
    return None
610

    
611

    
612
class _JobFileChangesWaiter(object):
613
  def __init__(self, filename):
614
    """Initializes this class.
615

616
    @type filename: string
617
    @param filename: Path to job file
618
    @raises errors.InotifyError: if the notifier cannot be setup
619

620
    """
621
    self._wm = pyinotify.WatchManager()
622
    self._inotify_handler = \
623
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
624
    self._notifier = \
625
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
626
    try:
627
      self._inotify_handler.enable()
628
    except Exception:
629
      # pyinotify doesn't close file descriptors automatically
630
      self._notifier.stop()
631
      raise
632

    
633
  def _OnInotify(self, notifier_enabled):
634
    """Callback for inotify.
635

636
    """
637
    if not notifier_enabled:
638
      self._inotify_handler.enable()
639

    
640
  def Wait(self, timeout):
641
    """Waits for the job file to change.
642

643
    @type timeout: float
644
    @param timeout: Timeout in seconds
645
    @return: Whether there have been events
646

647
    """
648
    assert timeout >= 0
649
    have_events = self._notifier.check_events(timeout * 1000)
650
    if have_events:
651
      self._notifier.read_events()
652
    self._notifier.process_events()
653
    return have_events
654

    
655
  def Close(self):
656
    """Closes underlying notifier and its file descriptor.
657

658
    """
659
    self._notifier.stop()
660

    
661

    
662
class _JobChangesWaiter(object):
663
  def __init__(self, filename):
664
    """Initializes this class.
665

666
    @type filename: string
667
    @param filename: Path to job file
668

669
    """
670
    self._filewaiter = None
671
    self._filename = filename
672

    
673
  def Wait(self, timeout):
674
    """Waits for a job to change.
675

676
    @type timeout: float
677
    @param timeout: Timeout in seconds
678
    @return: Whether there have been events
679

680
    """
681
    if self._filewaiter:
682
      return self._filewaiter.Wait(timeout)
683

    
684
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
685
    # If this point is reached, return immediately and let caller check the job
686
    # file again in case there were changes since the last check. This avoids a
687
    # race condition.
688
    self._filewaiter = _JobFileChangesWaiter(self._filename)
689

    
690
    return True
691

    
692
  def Close(self):
693
    """Closes underlying waiter.
694

695
    """
696
    if self._filewaiter:
697
      self._filewaiter.Close()
698

    
699

    
700
class _WaitForJobChangesHelper(object):
701
  """Helper class using inotify to wait for changes in a job file.
702

703
  This class takes a previous job status and serial, and alerts the client when
704
  the current job status has changed.
705

706
  """
707
  @staticmethod
708
  def _CheckForChanges(job_load_fn, check_fn):
709
    job = job_load_fn()
710
    if not job:
711
      raise errors.JobLost()
712

    
713
    result = check_fn(job)
714
    if result is None:
715
      raise utils.RetryAgain()
716

    
717
    return result
718

    
719
  def __call__(self, filename, job_load_fn,
720
               fields, prev_job_info, prev_log_serial, timeout):
721
    """Waits for changes on a job.
722

723
    @type filename: string
724
    @param filename: File on which to wait for changes
725
    @type job_load_fn: callable
726
    @param job_load_fn: Function to load job
727
    @type fields: list of strings
728
    @param fields: Which fields to check for changes
729
    @type prev_job_info: list or None
730
    @param prev_job_info: Last job information returned
731
    @type prev_log_serial: int
732
    @param prev_log_serial: Last job message serial number
733
    @type timeout: float
734
    @param timeout: maximum time to wait in seconds
735

736
    """
737
    try:
738
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
739
      waiter = _JobChangesWaiter(filename)
740
      try:
741
        return utils.Retry(compat.partial(self._CheckForChanges,
742
                                          job_load_fn, check_fn),
743
                           utils.RETRY_REMAINING_TIME, timeout,
744
                           wait_fn=waiter.Wait)
745
      finally:
746
        waiter.Close()
747
    except (errors.InotifyError, errors.JobLost):
748
      return None
749
    except utils.RetryTimeout:
750
      return constants.JOB_NOTCHANGED
751

    
752

    
753
def _EncodeOpError(err):
754
  """Encodes an error which occurred while processing an opcode.
755

756
  """
757
  if isinstance(err, errors.GenericError):
758
    to_encode = err
759
  else:
760
    to_encode = errors.OpExecError(str(err))
761

    
762
  return errors.EncodeException(to_encode)
763

    
764

    
765
class _TimeoutStrategyWrapper:
766
  def __init__(self, fn):
767
    """Initializes this class.
768

769
    """
770
    self._fn = fn
771
    self._next = None
772

    
773
  def _Advance(self):
774
    """Gets the next timeout if necessary.
775

776
    """
777
    if self._next is None:
778
      self._next = self._fn()
779

    
780
  def Peek(self):
781
    """Returns the next timeout.
782

783
    """
784
    self._Advance()
785
    return self._next
786

    
787
  def Next(self):
788
    """Returns the current timeout and advances the internal state.
789

790
    """
791
    self._Advance()
792
    result = self._next
793
    self._next = None
794
    return result
795

    
796

    
797
class _OpExecContext:
798
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
799
    """Initializes this class.
800

801
    """
802
    self.op = op
803
    self.index = index
804
    self.log_prefix = log_prefix
805
    self.summary = op.input.Summary()
806

    
807
    self._timeout_strategy_factory = timeout_strategy_factory
808
    self._ResetTimeoutStrategy()
809

    
810
  def _ResetTimeoutStrategy(self):
811
    """Creates a new timeout strategy.
812

813
    """
814
    self._timeout_strategy = \
815
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
816

    
817
  def CheckPriorityIncrease(self):
818
    """Checks whether priority can and should be increased.
819

820
    Called when locks couldn't be acquired.
821

822
    """
823
    op = self.op
824

    
825
    # Exhausted all retries and next round should not use blocking acquire
826
    # for locks?
827
    if (self._timeout_strategy.Peek() is None and
828
        op.priority > constants.OP_PRIO_HIGHEST):
829
      logging.debug("Increasing priority")
830
      op.priority -= 1
831
      self._ResetTimeoutStrategy()
832
      return True
833

    
834
    return False
835

    
836
  def GetNextLockTimeout(self):
837
    """Returns the next lock acquire timeout.
838

839
    """
840
    return self._timeout_strategy.Next()
841

    
842

    
843
class _JobProcessor(object):
844
  def __init__(self, queue, opexec_fn, job,
845
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
846
    """Initializes this class.
847

848
    """
849
    self.queue = queue
850
    self.opexec_fn = opexec_fn
851
    self.job = job
852
    self._timeout_strategy_factory = _timeout_strategy_factory
853

    
854
  @staticmethod
855
  def _FindNextOpcode(job, timeout_strategy_factory):
856
    """Locates the next opcode to run.
857

858
    @type job: L{_QueuedJob}
859
    @param job: Job object
860
    @param timeout_strategy_factory: Callable to create new timeout strategy
861

862
    """
863
    # Create some sort of a cache to speed up locating next opcode for future
864
    # lookups
865
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
866
    # pending and one for processed ops.
867
    if job.ops_iter is None:
868
      job.ops_iter = enumerate(job.ops)
869

    
870
    # Find next opcode to run
871
    while True:
872
      try:
873
        (idx, op) = job.ops_iter.next()
874
      except StopIteration:
875
        raise errors.ProgrammerError("Called for a finished job")
876

    
877
      if op.status == constants.OP_STATUS_RUNNING:
878
        # Found an opcode already marked as running
879
        raise errors.ProgrammerError("Called for job marked as running")
880

    
881
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
882
                             timeout_strategy_factory)
883

    
884
      if op.status not in constants.OPS_FINALIZED:
885
        return opctx
886

    
887
      # This is a job that was partially completed before master daemon
888
      # shutdown, so it can be expected that some opcodes are already
889
      # completed successfully (if any did error out, then the whole job
890
      # should have been aborted and not resubmitted for processing).
891
      logging.info("%s: opcode %s already processed, skipping",
892
                   opctx.log_prefix, opctx.summary)
893

    
894
  @staticmethod
895
  def _MarkWaitlock(job, op):
896
    """Marks an opcode as waiting for locks.
897

898
    The job's start timestamp is also set if necessary.
899

900
    @type job: L{_QueuedJob}
901
    @param job: Job object
902
    @type op: L{_QueuedOpCode}
903
    @param op: Opcode object
904

905
    """
906
    assert op in job.ops
907
    assert op.status in (constants.OP_STATUS_QUEUED,
908
                         constants.OP_STATUS_WAITLOCK)
909

    
910
    update = False
911

    
912
    op.result = None
913

    
914
    if op.status == constants.OP_STATUS_QUEUED:
915
      op.status = constants.OP_STATUS_WAITLOCK
916
      update = True
917

    
918
    if op.start_timestamp is None:
919
      op.start_timestamp = TimeStampNow()
920
      update = True
921

    
922
    if job.start_timestamp is None:
923
      job.start_timestamp = op.start_timestamp
924
      update = True
925

    
926
    assert op.status == constants.OP_STATUS_WAITLOCK
927

    
928
    return update
929

    
930
  def _ExecOpCodeUnlocked(self, opctx):
931
    """Processes one opcode and returns the result.
932

933
    """
934
    op = opctx.op
935

    
936
    assert op.status == constants.OP_STATUS_WAITLOCK
937

    
938
    timeout = opctx.GetNextLockTimeout()
939

    
940
    try:
941
      # Make sure not to hold queue lock while calling ExecOpCode
942
      result = self.opexec_fn(op.input,
943
                              _OpExecCallbacks(self.queue, self.job, op),
944
                              timeout=timeout, priority=op.priority)
945
    except mcpu.LockAcquireTimeout:
946
      assert timeout is not None, "Received timeout for blocking acquire"
947
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
948

    
949
      assert op.status in (constants.OP_STATUS_WAITLOCK,
950
                           constants.OP_STATUS_CANCELING)
951

    
952
      # Was job cancelled while we were waiting for the lock?
953
      if op.status == constants.OP_STATUS_CANCELING:
954
        return (constants.OP_STATUS_CANCELING, None)
955

    
956
      # Stay in waitlock while trying to re-acquire lock
957
      return (constants.OP_STATUS_WAITLOCK, None)
958
    except CancelJob:
959
      logging.exception("%s: Canceling job", opctx.log_prefix)
960
      assert op.status == constants.OP_STATUS_CANCELING
961
      return (constants.OP_STATUS_CANCELING, None)
962
    except Exception, err: # pylint: disable-msg=W0703
963
      logging.exception("%s: Caught exception in %s",
964
                        opctx.log_prefix, opctx.summary)
965
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
966
    else:
967
      logging.debug("%s: %s successful",
968
                    opctx.log_prefix, opctx.summary)
969
      return (constants.OP_STATUS_SUCCESS, result)
970

    
971
  def __call__(self, _nextop_fn=None):
972
    """Continues execution of a job.
973

974
    @param _nextop_fn: Callback function for tests
975
    @rtype: bool
976
    @return: True if job is finished, False if processor needs to be called
977
             again
978

979
    """
980
    queue = self.queue
981
    job = self.job
982

    
983
    logging.debug("Processing job %s", job.id)
984

    
985
    queue.acquire(shared=1)
986
    try:
987
      opcount = len(job.ops)
988

    
989
      # Don't do anything for finalized jobs
990
      if job.CalcStatus() in constants.JOBS_FINALIZED:
991
        return True
992

    
993
      # Is a previous opcode still pending?
994
      if job.cur_opctx:
995
        opctx = job.cur_opctx
996
        job.cur_opctx = None
997
      else:
998
        if __debug__ and _nextop_fn:
999
          _nextop_fn()
1000
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1001

    
1002
      op = opctx.op
1003

    
1004
      # Consistency check
1005
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1006
                                     constants.OP_STATUS_CANCELING)
1007
                        for i in job.ops[opctx.index + 1:])
1008

    
1009
      assert op.status in (constants.OP_STATUS_QUEUED,
1010
                           constants.OP_STATUS_WAITLOCK,
1011
                           constants.OP_STATUS_CANCELING)
1012

    
1013
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1014
              op.priority >= constants.OP_PRIO_HIGHEST)
1015

    
1016
      if op.status != constants.OP_STATUS_CANCELING:
1017
        assert op.status in (constants.OP_STATUS_QUEUED,
1018
                             constants.OP_STATUS_WAITLOCK)
1019

    
1020
        # Prepare to start opcode
1021
        if self._MarkWaitlock(job, op):
1022
          # Write to disk
1023
          queue.UpdateJobUnlocked(job)
1024

    
1025
        assert op.status == constants.OP_STATUS_WAITLOCK
1026
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1027
        assert job.start_timestamp and op.start_timestamp
1028

    
1029
        logging.info("%s: opcode %s waiting for locks",
1030
                     opctx.log_prefix, opctx.summary)
1031

    
1032
        queue.release()
1033
        try:
1034
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1035
        finally:
1036
          queue.acquire(shared=1)
1037

    
1038
        op.status = op_status
1039
        op.result = op_result
1040

    
1041
        if op.status == constants.OP_STATUS_WAITLOCK:
1042
          # Couldn't get locks in time
1043
          assert not op.end_timestamp
1044
        else:
1045
          # Finalize opcode
1046
          op.end_timestamp = TimeStampNow()
1047

    
1048
          if op.status == constants.OP_STATUS_CANCELING:
1049
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1050
                                  for i in job.ops[opctx.index:])
1051
          else:
1052
            assert op.status in constants.OPS_FINALIZED
1053

    
1054
      if op.status == constants.OP_STATUS_WAITLOCK:
1055
        finalize = False
1056

    
1057
        if opctx.CheckPriorityIncrease():
1058
          # Priority was changed, need to update on-disk file
1059
          queue.UpdateJobUnlocked(job)
1060

    
1061
        # Keep around for another round
1062
        job.cur_opctx = opctx
1063

    
1064
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1065
                op.priority >= constants.OP_PRIO_HIGHEST)
1066

    
1067
        # In no case must the status be finalized here
1068
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1069

    
1070
      else:
1071
        # Ensure all opcodes so far have been successful
1072
        assert (opctx.index == 0 or
1073
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1074
                           for i in job.ops[:opctx.index]))
1075

    
1076
        # Reset context
1077
        job.cur_opctx = None
1078

    
1079
        if op.status == constants.OP_STATUS_SUCCESS:
1080
          finalize = False
1081

    
1082
        elif op.status == constants.OP_STATUS_ERROR:
1083
          # Ensure failed opcode has an exception as its result
1084
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1085

    
1086
          to_encode = errors.OpExecError("Preceding opcode failed")
1087
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1088
                                _EncodeOpError(to_encode))
1089
          finalize = True
1090

    
1091
          # Consistency check
1092
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1093
                            errors.GetEncodedError(i.result)
1094
                            for i in job.ops[opctx.index:])
1095

    
1096
        elif op.status == constants.OP_STATUS_CANCELING:
1097
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1098
                                "Job canceled by request")
1099
          finalize = True
1100

    
1101
        else:
1102
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1103

    
1104
        if opctx.index == (opcount - 1):
1105
          # Finalize on last opcode
1106
          finalize = True
1107

    
1108
        if finalize:
1109
          # All opcodes have been run, finalize job
1110
          job.Finalize()
1111

    
1112
        # Write to disk. If the job status is final, this is the final write
1113
        # allowed. Once the file has been written, it can be archived anytime.
1114
        queue.UpdateJobUnlocked(job)
1115

    
1116
        if finalize:
1117
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1118
          return True
1119

    
1120
      return False
1121
    finally:
1122
      queue.release()
1123

    
1124

    
1125
class _JobQueueWorker(workerpool.BaseWorker):
1126
  """The actual job workers.
1127

1128
  """
1129
  def RunTask(self, job): # pylint: disable-msg=W0221
1130
    """Job executor.
1131

1132
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1133
    L{_QueuedOpCode} classes.
1134

1135
    @type job: L{_QueuedJob}
1136
    @param job: the job to be processed
1137

1138
    """
1139
    queue = job.queue
1140
    assert queue == self.pool.queue
1141

    
1142
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1143
    setname_fn(None)
1144

    
1145
    proc = mcpu.Processor(queue.context, job.id)
1146

    
1147
    # Create wrapper for setting thread name
1148
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1149
                                    proc.ExecOpCode)
1150

    
1151
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1152
      # Schedule again
1153
      raise workerpool.DeferTask(priority=job.CalcPriority())
1154

    
1155
  @staticmethod
1156
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1157
    """Updates the worker thread name to include a short summary of the opcode.
1158

1159
    @param setname_fn: Callable setting worker thread name
1160
    @param execop_fn: Callable for executing opcode (usually
1161
                      L{mcpu.Processor.ExecOpCode})
1162

1163
    """
1164
    setname_fn(op)
1165
    try:
1166
      return execop_fn(op, *args, **kwargs)
1167
    finally:
1168
      setname_fn(None)
1169

    
1170
  @staticmethod
1171
  def _GetWorkerName(job, op):
1172
    """Sets the worker thread name.
1173

1174
    @type job: L{_QueuedJob}
1175
    @type op: L{opcodes.OpCode}
1176

1177
    """
1178
    parts = ["Job%s" % job.id]
1179

    
1180
    if op:
1181
      parts.append(op.TinySummary())
1182

    
1183
    return "/".join(parts)
1184

    
1185

    
1186
class _JobQueueWorkerPool(workerpool.WorkerPool):
1187
  """Simple class implementing a job-processing workerpool.
1188

1189
  """
1190
  def __init__(self, queue):
1191
    super(_JobQueueWorkerPool, self).__init__("Jq",
1192
                                              JOBQUEUE_THREADS,
1193
                                              _JobQueueWorker)
1194
    self.queue = queue
1195

    
1196

    
1197
def _RequireOpenQueue(fn):
1198
  """Decorator for "public" functions.
1199

1200
  This function should be used for all 'public' functions. That is,
1201
  functions usually called from other classes. Note that this should
1202
  be applied only to methods (not plain functions), since it expects
1203
  that the decorated function is called with a first argument that has
1204
  a '_queue_filelock' argument.
1205

1206
  @warning: Use this decorator only after locking.ssynchronized
1207

1208
  Example::
1209
    @locking.ssynchronized(_LOCK)
1210
    @_RequireOpenQueue
1211
    def Example(self):
1212
      pass
1213

1214
  """
1215
  def wrapper(self, *args, **kwargs):
1216
    # pylint: disable-msg=W0212
1217
    assert self._queue_filelock is not None, "Queue should be open"
1218
    return fn(self, *args, **kwargs)
1219
  return wrapper
1220

    
1221

    
1222
class JobQueue(object):
1223
  """Queue used to manage the jobs.
1224

1225
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1226

1227
  """
1228
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1229

    
1230
  def __init__(self, context):
1231
    """Constructor for JobQueue.
1232

1233
    The constructor will initialize the job queue object and then
1234
    start loading the current jobs from disk, either for starting them
1235
    (if they were queue) or for aborting them (if they were already
1236
    running).
1237

1238
    @type context: GanetiContext
1239
    @param context: the context object for access to the configuration
1240
        data and other ganeti objects
1241

1242
    """
1243
    self.context = context
1244
    self._memcache = weakref.WeakValueDictionary()
1245
    self._my_hostname = netutils.Hostname.GetSysName()
1246

    
1247
    # The Big JobQueue lock. If a code block or method acquires it in shared
1248
    # mode safe it must guarantee concurrency with all the code acquiring it in
1249
    # shared mode, including itself. In order not to acquire it at all
1250
    # concurrency must be guaranteed with all code acquiring it in shared mode
1251
    # and all code acquiring it exclusively.
1252
    self._lock = locking.SharedLock("JobQueue")
1253

    
1254
    self.acquire = self._lock.acquire
1255
    self.release = self._lock.release
1256

    
1257
    # Initialize the queue, and acquire the filelock.
1258
    # This ensures no other process is working on the job queue.
1259
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1260

    
1261
    # Read serial file
1262
    self._last_serial = jstore.ReadSerial()
1263
    assert self._last_serial is not None, ("Serial file was modified between"
1264
                                           " check in jstore and here")
1265

    
1266
    # Get initial list of nodes
1267
    self._nodes = dict((n.name, n.primary_ip)
1268
                       for n in self.context.cfg.GetAllNodesInfo().values()
1269
                       if n.master_candidate)
1270

    
1271
    # Remove master node
1272
    self._nodes.pop(self._my_hostname, None)
1273

    
1274
    # TODO: Check consistency across nodes
1275

    
1276
    self._queue_size = 0
1277
    self._UpdateQueueSizeUnlocked()
1278
    self._drained = jstore.CheckDrainFlag()
1279

    
1280
    # Setup worker pool
1281
    self._wpool = _JobQueueWorkerPool(self)
1282
    try:
1283
      self._InspectQueue()
1284
    except:
1285
      self._wpool.TerminateWorkers()
1286
      raise
1287

    
1288
  @locking.ssynchronized(_LOCK)
1289
  @_RequireOpenQueue
1290
  def _InspectQueue(self):
1291
    """Loads the whole job queue and resumes unfinished jobs.
1292

1293
    This function needs the lock here because WorkerPool.AddTask() may start a
1294
    job while we're still doing our work.
1295

1296
    """
1297
    logging.info("Inspecting job queue")
1298

    
1299
    restartjobs = []
1300

    
1301
    all_job_ids = self._GetJobIDsUnlocked()
1302
    jobs_count = len(all_job_ids)
1303
    lastinfo = time.time()
1304
    for idx, job_id in enumerate(all_job_ids):
1305
      # Give an update every 1000 jobs or 10 seconds
1306
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1307
          idx == (jobs_count - 1)):
1308
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1309
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1310
        lastinfo = time.time()
1311

    
1312
      job = self._LoadJobUnlocked(job_id)
1313

    
1314
      # a failure in loading the job can cause 'None' to be returned
1315
      if job is None:
1316
        continue
1317

    
1318
      status = job.CalcStatus()
1319

    
1320
      if status == constants.JOB_STATUS_QUEUED:
1321
        restartjobs.append(job)
1322

    
1323
      elif status in (constants.JOB_STATUS_RUNNING,
1324
                      constants.JOB_STATUS_WAITLOCK,
1325
                      constants.JOB_STATUS_CANCELING):
1326
        logging.warning("Unfinished job %s found: %s", job.id, job)
1327

    
1328
        if status == constants.JOB_STATUS_WAITLOCK:
1329
          # Restart job
1330
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1331
          restartjobs.append(job)
1332
        else:
1333
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1334
                                "Unclean master daemon shutdown")
1335
          job.Finalize()
1336

    
1337
        self.UpdateJobUnlocked(job)
1338

    
1339
    if restartjobs:
1340
      logging.info("Restarting %s jobs", len(restartjobs))
1341
      self._EnqueueJobs(restartjobs)
1342

    
1343
    logging.info("Job queue inspection finished")
1344

    
1345
  @locking.ssynchronized(_LOCK)
1346
  @_RequireOpenQueue
1347
  def AddNode(self, node):
1348
    """Register a new node with the queue.
1349

1350
    @type node: L{objects.Node}
1351
    @param node: the node object to be added
1352

1353
    """
1354
    node_name = node.name
1355
    assert node_name != self._my_hostname
1356

    
1357
    # Clean queue directory on added node
1358
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1359
    msg = result.fail_msg
1360
    if msg:
1361
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1362
                      node_name, msg)
1363

    
1364
    if not node.master_candidate:
1365
      # remove if existing, ignoring errors
1366
      self._nodes.pop(node_name, None)
1367
      # and skip the replication of the job ids
1368
      return
1369

    
1370
    # Upload the whole queue excluding archived jobs
1371
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1372

    
1373
    # Upload current serial file
1374
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1375

    
1376
    for file_name in files:
1377
      # Read file content
1378
      content = utils.ReadFile(file_name)
1379

    
1380
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1381
                                                  [node.primary_ip],
1382
                                                  file_name, content)
1383
      msg = result[node_name].fail_msg
1384
      if msg:
1385
        logging.error("Failed to upload file %s to node %s: %s",
1386
                      file_name, node_name, msg)
1387

    
1388
    self._nodes[node_name] = node.primary_ip
1389

    
1390
  @locking.ssynchronized(_LOCK)
1391
  @_RequireOpenQueue
1392
  def RemoveNode(self, node_name):
1393
    """Callback called when removing nodes from the cluster.
1394

1395
    @type node_name: str
1396
    @param node_name: the name of the node to remove
1397

1398
    """
1399
    self._nodes.pop(node_name, None)
1400

    
1401
  @staticmethod
1402
  def _CheckRpcResult(result, nodes, failmsg):
1403
    """Verifies the status of an RPC call.
1404

1405
    Since we aim to keep consistency should this node (the current
1406
    master) fail, we will log errors if our rpc fail, and especially
1407
    log the case when more than half of the nodes fails.
1408

1409
    @param result: the data as returned from the rpc call
1410
    @type nodes: list
1411
    @param nodes: the list of nodes we made the call to
1412
    @type failmsg: str
1413
    @param failmsg: the identifier to be used for logging
1414

1415
    """
1416
    failed = []
1417
    success = []
1418

    
1419
    for node in nodes:
1420
      msg = result[node].fail_msg
1421
      if msg:
1422
        failed.append(node)
1423
        logging.error("RPC call %s (%s) failed on node %s: %s",
1424
                      result[node].call, failmsg, node, msg)
1425
      else:
1426
        success.append(node)
1427

    
1428
    # +1 for the master node
1429
    if (len(success) + 1) < len(failed):
1430
      # TODO: Handle failing nodes
1431
      logging.error("More than half of the nodes failed")
1432

    
1433
  def _GetNodeIp(self):
1434
    """Helper for returning the node name/ip list.
1435

1436
    @rtype: (list, list)
1437
    @return: a tuple of two lists, the first one with the node
1438
        names and the second one with the node addresses
1439

1440
    """
1441
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1442
    name_list = self._nodes.keys()
1443
    addr_list = [self._nodes[name] for name in name_list]
1444
    return name_list, addr_list
1445

    
1446
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1447
    """Writes a file locally and then replicates it to all nodes.
1448

1449
    This function will replace the contents of a file on the local
1450
    node and then replicate it to all the other nodes we have.
1451

1452
    @type file_name: str
1453
    @param file_name: the path of the file to be replicated
1454
    @type data: str
1455
    @param data: the new contents of the file
1456
    @type replicate: boolean
1457
    @param replicate: whether to spread the changes to the remote nodes
1458

1459
    """
1460
    getents = runtime.GetEnts()
1461
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1462
                    gid=getents.masterd_gid)
1463

    
1464
    if replicate:
1465
      names, addrs = self._GetNodeIp()
1466
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1467
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1468

    
1469
  def _RenameFilesUnlocked(self, rename):
1470
    """Renames a file locally and then replicate the change.
1471

1472
    This function will rename a file in the local queue directory
1473
    and then replicate this rename to all the other nodes we have.
1474

1475
    @type rename: list of (old, new)
1476
    @param rename: List containing tuples mapping old to new names
1477

1478
    """
1479
    # Rename them locally
1480
    for old, new in rename:
1481
      utils.RenameFile(old, new, mkdir=True)
1482

    
1483
    # ... and on all nodes
1484
    names, addrs = self._GetNodeIp()
1485
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1486
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1487

    
1488
  @staticmethod
1489
  def _FormatJobID(job_id):
1490
    """Convert a job ID to string format.
1491

1492
    Currently this just does C{str(job_id)} after performing some
1493
    checks, but if we want to change the job id format this will
1494
    abstract this change.
1495

1496
    @type job_id: int or long
1497
    @param job_id: the numeric job id
1498
    @rtype: str
1499
    @return: the formatted job id
1500

1501
    """
1502
    if not isinstance(job_id, (int, long)):
1503
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1504
    if job_id < 0:
1505
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1506

    
1507
    return str(job_id)
1508

    
1509
  @classmethod
1510
  def _GetArchiveDirectory(cls, job_id):
1511
    """Returns the archive directory for a job.
1512

1513
    @type job_id: str
1514
    @param job_id: Job identifier
1515
    @rtype: str
1516
    @return: Directory name
1517

1518
    """
1519
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1520

    
1521
  def _NewSerialsUnlocked(self, count):
1522
    """Generates a new job identifier.
1523

1524
    Job identifiers are unique during the lifetime of a cluster.
1525

1526
    @type count: integer
1527
    @param count: how many serials to return
1528
    @rtype: str
1529
    @return: a string representing the job identifier.
1530

1531
    """
1532
    assert count > 0
1533
    # New number
1534
    serial = self._last_serial + count
1535

    
1536
    # Write to file
1537
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1538
                             "%s\n" % serial, True)
1539

    
1540
    result = [self._FormatJobID(v)
1541
              for v in range(self._last_serial + 1, serial + 1)]
1542

    
1543
    # Keep it only if we were able to write the file
1544
    self._last_serial = serial
1545

    
1546
    assert len(result) == count
1547

    
1548
    return result
1549

    
1550
  @staticmethod
1551
  def _GetJobPath(job_id):
1552
    """Returns the job file for a given job id.
1553

1554
    @type job_id: str
1555
    @param job_id: the job identifier
1556
    @rtype: str
1557
    @return: the path to the job file
1558

1559
    """
1560
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1561

    
1562
  @classmethod
1563
  def _GetArchivedJobPath(cls, job_id):
1564
    """Returns the archived job file for a give job id.
1565

1566
    @type job_id: str
1567
    @param job_id: the job identifier
1568
    @rtype: str
1569
    @return: the path to the archived job file
1570

1571
    """
1572
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1573
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1574

    
1575
  def _GetJobIDsUnlocked(self, sort=True):
1576
    """Return all known job IDs.
1577

1578
    The method only looks at disk because it's a requirement that all
1579
    jobs are present on disk (so in the _memcache we don't have any
1580
    extra IDs).
1581

1582
    @type sort: boolean
1583
    @param sort: perform sorting on the returned job ids
1584
    @rtype: list
1585
    @return: the list of job IDs
1586

1587
    """
1588
    jlist = []
1589
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1590
      m = self._RE_JOB_FILE.match(filename)
1591
      if m:
1592
        jlist.append(m.group(1))
1593
    if sort:
1594
      jlist = utils.NiceSort(jlist)
1595
    return jlist
1596

    
1597
  def _LoadJobUnlocked(self, job_id):
1598
    """Loads a job from the disk or memory.
1599

1600
    Given a job id, this will return the cached job object if
1601
    existing, or try to load the job from the disk. If loading from
1602
    disk, it will also add the job to the cache.
1603

1604
    @param job_id: the job id
1605
    @rtype: L{_QueuedJob} or None
1606
    @return: either None or the job object
1607

1608
    """
1609
    job = self._memcache.get(job_id, None)
1610
    if job:
1611
      logging.debug("Found job %s in memcache", job_id)
1612
      return job
1613

    
1614
    try:
1615
      job = self._LoadJobFromDisk(job_id, False)
1616
      if job is None:
1617
        return job
1618
    except errors.JobFileCorrupted:
1619
      old_path = self._GetJobPath(job_id)
1620
      new_path = self._GetArchivedJobPath(job_id)
1621
      if old_path == new_path:
1622
        # job already archived (future case)
1623
        logging.exception("Can't parse job %s", job_id)
1624
      else:
1625
        # non-archived case
1626
        logging.exception("Can't parse job %s, will archive.", job_id)
1627
        self._RenameFilesUnlocked([(old_path, new_path)])
1628
      return None
1629

    
1630
    self._memcache[job_id] = job
1631
    logging.debug("Added job %s to the cache", job_id)
1632
    return job
1633

    
1634
  def _LoadJobFromDisk(self, job_id, try_archived):
1635
    """Load the given job file from disk.
1636

1637
    Given a job file, read, load and restore it in a _QueuedJob format.
1638

1639
    @type job_id: string
1640
    @param job_id: job identifier
1641
    @type try_archived: bool
1642
    @param try_archived: Whether to try loading an archived job
1643
    @rtype: L{_QueuedJob} or None
1644
    @return: either None or the job object
1645

1646
    """
1647
    path_functions = [self._GetJobPath]
1648

    
1649
    if try_archived:
1650
      path_functions.append(self._GetArchivedJobPath)
1651

    
1652
    raw_data = None
1653

    
1654
    for fn in path_functions:
1655
      filepath = fn(job_id)
1656
      logging.debug("Loading job from %s", filepath)
1657
      try:
1658
        raw_data = utils.ReadFile(filepath)
1659
      except EnvironmentError, err:
1660
        if err.errno != errno.ENOENT:
1661
          raise
1662
      else:
1663
        break
1664

    
1665
    if not raw_data:
1666
      return None
1667

    
1668
    try:
1669
      data = serializer.LoadJson(raw_data)
1670
      job = _QueuedJob.Restore(self, data)
1671
    except Exception, err: # pylint: disable-msg=W0703
1672
      raise errors.JobFileCorrupted(err)
1673

    
1674
    return job
1675

    
1676
  def SafeLoadJobFromDisk(self, job_id, try_archived):
1677
    """Load the given job file from disk.
1678

1679
    Given a job file, read, load and restore it in a _QueuedJob format.
1680
    In case of error reading the job, it gets returned as None, and the
1681
    exception is logged.
1682

1683
    @type job_id: string
1684
    @param job_id: job identifier
1685
    @type try_archived: bool
1686
    @param try_archived: Whether to try loading an archived job
1687
    @rtype: L{_QueuedJob} or None
1688
    @return: either None or the job object
1689

1690
    """
1691
    try:
1692
      return self._LoadJobFromDisk(job_id, try_archived)
1693
    except (errors.JobFileCorrupted, EnvironmentError):
1694
      logging.exception("Can't load/parse job %s", job_id)
1695
      return None
1696

    
1697
  def _UpdateQueueSizeUnlocked(self):
1698
    """Update the queue size.
1699

1700
    """
1701
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1702

    
1703
  @locking.ssynchronized(_LOCK)
1704
  @_RequireOpenQueue
1705
  def SetDrainFlag(self, drain_flag):
1706
    """Sets the drain flag for the queue.
1707

1708
    @type drain_flag: boolean
1709
    @param drain_flag: Whether to set or unset the drain flag
1710

1711
    """
1712
    jstore.SetDrainFlag(drain_flag)
1713

    
1714
    self._drained = drain_flag
1715

    
1716
    return True
1717

    
1718
  @_RequireOpenQueue
1719
  def _SubmitJobUnlocked(self, job_id, ops):
1720
    """Create and store a new job.
1721

1722
    This enters the job into our job queue and also puts it on the new
1723
    queue, in order for it to be picked up by the queue processors.
1724

1725
    @type job_id: job ID
1726
    @param job_id: the job ID for the new job
1727
    @type ops: list
1728
    @param ops: The list of OpCodes that will become the new job.
1729
    @rtype: L{_QueuedJob}
1730
    @return: the job object to be queued
1731
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1732
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1733
    @raise errors.GenericError: If an opcode is not valid
1734

1735
    """
1736
    # Ok when sharing the big job queue lock, as the drain file is created when
1737
    # the lock is exclusive.
1738
    if self._drained:
1739
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1740

    
1741
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1742
      raise errors.JobQueueFull()
1743

    
1744
    job = _QueuedJob(self, job_id, ops)
1745

    
1746
    # Check priority
1747
    for idx, op in enumerate(job.ops):
1748
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1749
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1750
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1751
                                  " are %s" % (idx, op.priority, allowed))
1752

    
1753
    # Write to disk
1754
    self.UpdateJobUnlocked(job)
1755

    
1756
    self._queue_size += 1
1757

    
1758
    logging.debug("Adding new job %s to the cache", job_id)
1759
    self._memcache[job_id] = job
1760

    
1761
    return job
1762

    
1763
  @locking.ssynchronized(_LOCK)
1764
  @_RequireOpenQueue
1765
  def SubmitJob(self, ops):
1766
    """Create and store a new job.
1767

1768
    @see: L{_SubmitJobUnlocked}
1769

1770
    """
1771
    job_id = self._NewSerialsUnlocked(1)[0]
1772
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1773
    return job_id
1774

    
1775
  @locking.ssynchronized(_LOCK)
1776
  @_RequireOpenQueue
1777
  def SubmitManyJobs(self, jobs):
1778
    """Create and store multiple jobs.
1779

1780
    @see: L{_SubmitJobUnlocked}
1781

1782
    """
1783
    results = []
1784
    added_jobs = []
1785
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1786
    for job_id, ops in zip(all_job_ids, jobs):
1787
      try:
1788
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1789
        status = True
1790
        data = job_id
1791
      except errors.GenericError, err:
1792
        data = ("%s; opcodes %s" %
1793
                (err, utils.CommaJoin(op.Summary() for op in ops)))
1794
        status = False
1795
      results.append((status, data))
1796

    
1797
    self._EnqueueJobs(added_jobs)
1798

    
1799
    return results
1800

    
1801
  def _EnqueueJobs(self, jobs):
1802
    """Helper function to add jobs to worker pool's queue.
1803

1804
    @type jobs: list
1805
    @param jobs: List of all jobs
1806

1807
    """
1808
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1809
                             priority=[job.CalcPriority() for job in jobs])
1810

    
1811
  @_RequireOpenQueue
1812
  def UpdateJobUnlocked(self, job, replicate=True):
1813
    """Update a job's on disk storage.
1814

1815
    After a job has been modified, this function needs to be called in
1816
    order to write the changes to disk and replicate them to the other
1817
    nodes.
1818

1819
    @type job: L{_QueuedJob}
1820
    @param job: the changed job
1821
    @type replicate: boolean
1822
    @param replicate: whether to replicate the change to remote nodes
1823

1824
    """
1825
    if __debug__:
1826
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1827
      assert (finalized ^ (job.end_timestamp is None))
1828

    
1829
    filename = self._GetJobPath(job.id)
1830
    data = serializer.DumpJson(job.Serialize(), indent=False)
1831
    logging.debug("Writing job %s to %s", job.id, filename)
1832
    self._UpdateJobQueueFile(filename, data, replicate)
1833

    
1834
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1835
                        timeout):
1836
    """Waits for changes in a job.
1837

1838
    @type job_id: string
1839
    @param job_id: Job identifier
1840
    @type fields: list of strings
1841
    @param fields: Which fields to check for changes
1842
    @type prev_job_info: list or None
1843
    @param prev_job_info: Last job information returned
1844
    @type prev_log_serial: int
1845
    @param prev_log_serial: Last job message serial number
1846
    @type timeout: float
1847
    @param timeout: maximum time to wait in seconds
1848
    @rtype: tuple (job info, log entries)
1849
    @return: a tuple of the job information as required via
1850
        the fields parameter, and the log entries as a list
1851

1852
        if the job has not changed and the timeout has expired,
1853
        we instead return a special value,
1854
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1855
        as such by the clients
1856

1857
    """
1858
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
1859

    
1860
    helper = _WaitForJobChangesHelper()
1861

    
1862
    return helper(self._GetJobPath(job_id), load_fn,
1863
                  fields, prev_job_info, prev_log_serial, timeout)
1864

    
1865
  @locking.ssynchronized(_LOCK)
1866
  @_RequireOpenQueue
1867
  def CancelJob(self, job_id):
1868
    """Cancels a job.
1869

1870
    This will only succeed if the job has not started yet.
1871

1872
    @type job_id: string
1873
    @param job_id: job ID of job to be cancelled.
1874

1875
    """
1876
    logging.info("Cancelling job %s", job_id)
1877

    
1878
    job = self._LoadJobUnlocked(job_id)
1879
    if not job:
1880
      logging.debug("Job %s not found", job_id)
1881
      return (False, "Job %s not found" % job_id)
1882

    
1883
    (success, msg) = job.Cancel()
1884

    
1885
    if success:
1886
      # If the job was finalized (e.g. cancelled), this is the final write
1887
      # allowed. The job can be archived anytime.
1888
      self.UpdateJobUnlocked(job)
1889

    
1890
    return (success, msg)
1891

    
1892
  @_RequireOpenQueue
1893
  def _ArchiveJobsUnlocked(self, jobs):
1894
    """Archives jobs.
1895

1896
    @type jobs: list of L{_QueuedJob}
1897
    @param jobs: Job objects
1898
    @rtype: int
1899
    @return: Number of archived jobs
1900

1901
    """
1902
    archive_jobs = []
1903
    rename_files = []
1904
    for job in jobs:
1905
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1906
        logging.debug("Job %s is not yet done", job.id)
1907
        continue
1908

    
1909
      archive_jobs.append(job)
1910

    
1911
      old = self._GetJobPath(job.id)
1912
      new = self._GetArchivedJobPath(job.id)
1913
      rename_files.append((old, new))
1914

    
1915
    # TODO: What if 1..n files fail to rename?
1916
    self._RenameFilesUnlocked(rename_files)
1917

    
1918
    logging.debug("Successfully archived job(s) %s",
1919
                  utils.CommaJoin(job.id for job in archive_jobs))
1920

    
1921
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1922
    # the files, we update the cached queue size from the filesystem. When we
1923
    # get around to fix the TODO: above, we can use the number of actually
1924
    # archived jobs to fix this.
1925
    self._UpdateQueueSizeUnlocked()
1926
    return len(archive_jobs)
1927

    
1928
  @locking.ssynchronized(_LOCK)
1929
  @_RequireOpenQueue
1930
  def ArchiveJob(self, job_id):
1931
    """Archives a job.
1932

1933
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1934

1935
    @type job_id: string
1936
    @param job_id: Job ID of job to be archived.
1937
    @rtype: bool
1938
    @return: Whether job was archived
1939

1940
    """
1941
    logging.info("Archiving job %s", job_id)
1942

    
1943
    job = self._LoadJobUnlocked(job_id)
1944
    if not job:
1945
      logging.debug("Job %s not found", job_id)
1946
      return False
1947

    
1948
    return self._ArchiveJobsUnlocked([job]) == 1
1949

    
1950
  @locking.ssynchronized(_LOCK)
1951
  @_RequireOpenQueue
1952
  def AutoArchiveJobs(self, age, timeout):
1953
    """Archives all jobs based on age.
1954

1955
    The method will archive all jobs which are older than the age
1956
    parameter. For jobs that don't have an end timestamp, the start
1957
    timestamp will be considered. The special '-1' age will cause
1958
    archival of all jobs (that are not running or queued).
1959

1960
    @type age: int
1961
    @param age: the minimum age in seconds
1962

1963
    """
1964
    logging.info("Archiving jobs with age more than %s seconds", age)
1965

    
1966
    now = time.time()
1967
    end_time = now + timeout
1968
    archived_count = 0
1969
    last_touched = 0
1970

    
1971
    all_job_ids = self._GetJobIDsUnlocked()
1972
    pending = []
1973
    for idx, job_id in enumerate(all_job_ids):
1974
      last_touched = idx + 1
1975

    
1976
      # Not optimal because jobs could be pending
1977
      # TODO: Measure average duration for job archival and take number of
1978
      # pending jobs into account.
1979
      if time.time() > end_time:
1980
        break
1981

    
1982
      # Returns None if the job failed to load
1983
      job = self._LoadJobUnlocked(job_id)
1984
      if job:
1985
        if job.end_timestamp is None:
1986
          if job.start_timestamp is None:
1987
            job_age = job.received_timestamp
1988
          else:
1989
            job_age = job.start_timestamp
1990
        else:
1991
          job_age = job.end_timestamp
1992

    
1993
        if age == -1 or now - job_age[0] > age:
1994
          pending.append(job)
1995

    
1996
          # Archive 10 jobs at a time
1997
          if len(pending) >= 10:
1998
            archived_count += self._ArchiveJobsUnlocked(pending)
1999
            pending = []
2000

    
2001
    if pending:
2002
      archived_count += self._ArchiveJobsUnlocked(pending)
2003

    
2004
    return (archived_count, len(all_job_ids) - last_touched)
2005

    
2006
  def QueryJobs(self, job_ids, fields):
2007
    """Returns a list of jobs in queue.
2008

2009
    @type job_ids: list
2010
    @param job_ids: sequence of job identifiers or None for all
2011
    @type fields: list
2012
    @param fields: names of fields to return
2013
    @rtype: list
2014
    @return: list one element per job, each element being list with
2015
        the requested fields
2016

2017
    """
2018
    jobs = []
2019
    list_all = False
2020
    if not job_ids:
2021
      # Since files are added to/removed from the queue atomically, there's no
2022
      # risk of getting the job ids in an inconsistent state.
2023
      job_ids = self._GetJobIDsUnlocked()
2024
      list_all = True
2025

    
2026
    for job_id in job_ids:
2027
      job = self.SafeLoadJobFromDisk(job_id, True)
2028
      if job is not None:
2029
        jobs.append(job.GetInfo(fields))
2030
      elif not list_all:
2031
        jobs.append(None)
2032

    
2033
    return jobs
2034

    
2035
  @locking.ssynchronized(_LOCK)
2036
  @_RequireOpenQueue
2037
  def Shutdown(self):
2038
    """Stops the job queue.
2039

2040
    This shutdowns all the worker threads an closes the queue.
2041

2042
    """
2043
    self._wpool.TerminateWorkers()
2044

    
2045
    self._queue_filelock.Close()
2046
    self._queue_filelock = None