Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 0aeeb6e3

History | View | Annotate | Download (59.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import logging
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
try:
39
  # pylint: disable-msg=E0611
40
  from pyinotify import pyinotify
41
except ImportError:
42
  import pyinotify
43

    
44
from ganeti import asyncnotifier
45
from ganeti import constants
46
from ganeti import serializer
47
from ganeti import workerpool
48
from ganeti import locking
49
from ganeti import opcodes
50
from ganeti import errors
51
from ganeti import mcpu
52
from ganeti import utils
53
from ganeti import jstore
54
from ganeti import rpc
55
from ganeti import runtime
56
from ganeti import netutils
57
from ganeti import compat
58

    
59

    
60
JOBQUEUE_THREADS = 25
61
JOBS_PER_ARCHIVE_DIRECTORY = 10000
62

    
63
# member lock names to be passed to @ssynchronized decorator
64
_LOCK = "_lock"
65
_QUEUE = "_queue"
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log", "priority",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
    # Get initial priority (it might change during the lifetime of this opcode)
117
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118

    
119
  @classmethod
120
  def Restore(cls, state):
121
    """Restore the _QueuedOpCode from the serialized form.
122

123
    @type state: dict
124
    @param state: the serialized state
125
    @rtype: _QueuedOpCode
126
    @return: a new _QueuedOpCode instance
127

128
    """
129
    obj = _QueuedOpCode.__new__(cls)
130
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131
    obj.status = state["status"]
132
    obj.result = state["result"]
133
    obj.log = state["log"]
134
    obj.start_timestamp = state.get("start_timestamp", None)
135
    obj.exec_timestamp = state.get("exec_timestamp", None)
136
    obj.end_timestamp = state.get("end_timestamp", None)
137
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138
    return obj
139

    
140
  def Serialize(self):
141
    """Serializes this _QueuedOpCode.
142

143
    @rtype: dict
144
    @return: the dictionary holding the serialized state
145

146
    """
147
    return {
148
      "input": self.input.__getstate__(),
149
      "status": self.status,
150
      "result": self.result,
151
      "log": self.log,
152
      "start_timestamp": self.start_timestamp,
153
      "exec_timestamp": self.exec_timestamp,
154
      "end_timestamp": self.end_timestamp,
155
      "priority": self.priority,
156
      }
157

    
158

    
159
class _QueuedJob(object):
160
  """In-memory job representation.
161

162
  This is what we use to track the user-submitted jobs. Locking must
163
  be taken care of by users of this class.
164

165
  @type queue: L{JobQueue}
166
  @ivar queue: the parent queue
167
  @ivar id: the job ID
168
  @type ops: list
169
  @ivar ops: the list of _QueuedOpCode that constitute the job
170
  @type log_serial: int
171
  @ivar log_serial: holds the index for the next log entry
172
  @ivar received_timestamp: the timestamp for when the job was received
173
  @ivar start_timestmap: the timestamp for start of execution
174
  @ivar end_timestamp: the timestamp for end of execution
175

176
  """
177
  # pylint: disable-msg=W0212
178
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179
               "received_timestamp", "start_timestamp", "end_timestamp",
180
               "__weakref__"]
181

    
182
  def __init__(self, queue, job_id, ops):
183
    """Constructor for the _QueuedJob.
184

185
    @type queue: L{JobQueue}
186
    @param queue: our parent queue
187
    @type job_id: job_id
188
    @param job_id: our job id
189
    @type ops: list
190
    @param ops: the list of opcodes we hold, which will be encapsulated
191
        in _QueuedOpCodes
192

193
    """
194
    if not ops:
195
      raise errors.GenericError("A job needs at least one opcode")
196

    
197
    self.queue = queue
198
    self.id = job_id
199
    self.ops = [_QueuedOpCode(op) for op in ops]
200
    self.log_serial = 0
201
    self.received_timestamp = TimeStampNow()
202
    self.start_timestamp = None
203
    self.end_timestamp = None
204

    
205
    self._InitInMemory(self)
206

    
207
  @staticmethod
208
  def _InitInMemory(obj):
209
    """Initializes in-memory variables.
210

211
    """
212
    obj.ops_iter = None
213
    obj.cur_opctx = None
214

    
215
  def __repr__(self):
216
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217
              "id=%s" % self.id,
218
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219

    
220
    return "<%s at %#x>" % (" ".join(status), id(self))
221

    
222
  @classmethod
223
  def Restore(cls, queue, state):
224
    """Restore a _QueuedJob from serialized state:
225

226
    @type queue: L{JobQueue}
227
    @param queue: to which queue the restored job belongs
228
    @type state: dict
229
    @param state: the serialized state
230
    @rtype: _JobQueue
231
    @return: the restored _JobQueue instance
232

233
    """
234
    obj = _QueuedJob.__new__(cls)
235
    obj.queue = queue
236
    obj.id = state["id"]
237
    obj.received_timestamp = state.get("received_timestamp", None)
238
    obj.start_timestamp = state.get("start_timestamp", None)
239
    obj.end_timestamp = state.get("end_timestamp", None)
240

    
241
    obj.ops = []
242
    obj.log_serial = 0
243
    for op_state in state["ops"]:
244
      op = _QueuedOpCode.Restore(op_state)
245
      for log_entry in op.log:
246
        obj.log_serial = max(obj.log_serial, log_entry[0])
247
      obj.ops.append(op)
248

    
249
    cls._InitInMemory(obj)
250

    
251
    return obj
252

    
253
  def Serialize(self):
254
    """Serialize the _JobQueue instance.
255

256
    @rtype: dict
257
    @return: the serialized state
258

259
    """
260
    return {
261
      "id": self.id,
262
      "ops": [op.Serialize() for op in self.ops],
263
      "start_timestamp": self.start_timestamp,
264
      "end_timestamp": self.end_timestamp,
265
      "received_timestamp": self.received_timestamp,
266
      }
267

    
268
  def CalcStatus(self):
269
    """Compute the status of this job.
270

271
    This function iterates over all the _QueuedOpCodes in the job and
272
    based on their status, computes the job status.
273

274
    The algorithm is:
275
      - if we find a cancelled, or finished with error, the job
276
        status will be the same
277
      - otherwise, the last opcode with the status one of:
278
          - waitlock
279
          - canceling
280
          - running
281

282
        will determine the job status
283

284
      - otherwise, it means either all opcodes are queued, or success,
285
        and the job status will be the same
286

287
    @return: the job status
288

289
    """
290
    status = constants.JOB_STATUS_QUEUED
291

    
292
    all_success = True
293
    for op in self.ops:
294
      if op.status == constants.OP_STATUS_SUCCESS:
295
        continue
296

    
297
      all_success = False
298

    
299
      if op.status == constants.OP_STATUS_QUEUED:
300
        pass
301
      elif op.status == constants.OP_STATUS_WAITLOCK:
302
        status = constants.JOB_STATUS_WAITLOCK
303
      elif op.status == constants.OP_STATUS_RUNNING:
304
        status = constants.JOB_STATUS_RUNNING
305
      elif op.status == constants.OP_STATUS_CANCELING:
306
        status = constants.JOB_STATUS_CANCELING
307
        break
308
      elif op.status == constants.OP_STATUS_ERROR:
309
        status = constants.JOB_STATUS_ERROR
310
        # The whole job fails if one opcode failed
311
        break
312
      elif op.status == constants.OP_STATUS_CANCELED:
313
        status = constants.OP_STATUS_CANCELED
314
        break
315

    
316
    if all_success:
317
      status = constants.JOB_STATUS_SUCCESS
318

    
319
    return status
320

    
321
  def CalcPriority(self):
322
    """Gets the current priority for this job.
323

324
    Only unfinished opcodes are considered. When all are done, the default
325
    priority is used.
326

327
    @rtype: int
328

329
    """
330
    priorities = [op.priority for op in self.ops
331
                  if op.status not in constants.OPS_FINALIZED]
332

    
333
    if not priorities:
334
      # All opcodes are done, assume default priority
335
      return constants.OP_PRIO_DEFAULT
336

    
337
    return min(priorities)
338

    
339
  def GetLogEntries(self, newer_than):
340
    """Selectively returns the log entries.
341

342
    @type newer_than: None or int
343
    @param newer_than: if this is None, return all log entries,
344
        otherwise return only the log entries with serial higher
345
        than this value
346
    @rtype: list
347
    @return: the list of the log entries selected
348

349
    """
350
    if newer_than is None:
351
      serial = -1
352
    else:
353
      serial = newer_than
354

    
355
    entries = []
356
    for op in self.ops:
357
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358

    
359
    return entries
360

    
361
  def GetInfo(self, fields):
362
    """Returns information about a job.
363

364
    @type fields: list
365
    @param fields: names of fields to return
366
    @rtype: list
367
    @return: list with one element for each field
368
    @raise errors.OpExecError: when an invalid field
369
        has been passed
370

371
    """
372
    row = []
373
    for fname in fields:
374
      if fname == "id":
375
        row.append(self.id)
376
      elif fname == "status":
377
        row.append(self.CalcStatus())
378
      elif fname == "priority":
379
        row.append(self.CalcPriority())
380
      elif fname == "ops":
381
        row.append([op.input.__getstate__() for op in self.ops])
382
      elif fname == "opresult":
383
        row.append([op.result for op in self.ops])
384
      elif fname == "opstatus":
385
        row.append([op.status for op in self.ops])
386
      elif fname == "oplog":
387
        row.append([op.log for op in self.ops])
388
      elif fname == "opstart":
389
        row.append([op.start_timestamp for op in self.ops])
390
      elif fname == "opexec":
391
        row.append([op.exec_timestamp for op in self.ops])
392
      elif fname == "opend":
393
        row.append([op.end_timestamp for op in self.ops])
394
      elif fname == "oppriority":
395
        row.append([op.priority for op in self.ops])
396
      elif fname == "received_ts":
397
        row.append(self.received_timestamp)
398
      elif fname == "start_ts":
399
        row.append(self.start_timestamp)
400
      elif fname == "end_ts":
401
        row.append(self.end_timestamp)
402
      elif fname == "summary":
403
        row.append([op.input.Summary() for op in self.ops])
404
      else:
405
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
406
    return row
407

    
408
  def MarkUnfinishedOps(self, status, result):
409
    """Mark unfinished opcodes with a given status and result.
410

411
    This is an utility function for marking all running or waiting to
412
    be run opcodes with a given status. Opcodes which are already
413
    finalised are not changed.
414

415
    @param status: a given opcode status
416
    @param result: the opcode result
417

418
    """
419
    not_marked = True
420
    for op in self.ops:
421
      if op.status in constants.OPS_FINALIZED:
422
        assert not_marked, "Finalized opcodes found after non-finalized ones"
423
        continue
424
      op.status = status
425
      op.result = result
426
      not_marked = False
427

    
428
  def Cancel(self):
429
    """Marks job as canceled/-ing if possible.
430

431
    @rtype: tuple; (bool, string)
432
    @return: Boolean describing whether job was successfully canceled or marked
433
      as canceling and a text message
434

435
    """
436
    status = self.CalcStatus()
437

    
438
    if status == constants.JOB_STATUS_QUEUED:
439
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
440
                             "Job canceled by request")
441
      return (True, "Job %s canceled" % self.id)
442

    
443
    elif status == constants.JOB_STATUS_WAITLOCK:
444
      # The worker will notice the new status and cancel the job
445
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446
      return (True, "Job %s will be canceled" % self.id)
447

    
448
    else:
449
      logging.debug("Job %s is no longer waiting in the queue", self.id)
450
      return (False, "Job %s is no longer waiting in the queue" % self.id)
451

    
452

    
453
class _OpExecCallbacks(mcpu.OpExecCbBase):
454
  def __init__(self, queue, job, op):
455
    """Initializes this class.
456

457
    @type queue: L{JobQueue}
458
    @param queue: Job queue
459
    @type job: L{_QueuedJob}
460
    @param job: Job object
461
    @type op: L{_QueuedOpCode}
462
    @param op: OpCode
463

464
    """
465
    assert queue, "Queue is missing"
466
    assert job, "Job is missing"
467
    assert op, "Opcode is missing"
468

    
469
    self._queue = queue
470
    self._job = job
471
    self._op = op
472

    
473
  def _CheckCancel(self):
474
    """Raises an exception to cancel the job if asked to.
475

476
    """
477
    # Cancel here if we were asked to
478
    if self._op.status == constants.OP_STATUS_CANCELING:
479
      logging.debug("Canceling opcode")
480
      raise CancelJob()
481

    
482
  @locking.ssynchronized(_QUEUE, shared=1)
483
  def NotifyStart(self):
484
    """Mark the opcode as running, not lock-waiting.
485

486
    This is called from the mcpu code as a notifier function, when the LU is
487
    finally about to start the Exec() method. Of course, to have end-user
488
    visible results, the opcode must be initially (before calling into
489
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
490

491
    """
492
    assert self._op in self._job.ops
493
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494
                               constants.OP_STATUS_CANCELING)
495

    
496
    # Cancel here if we were asked to
497
    self._CheckCancel()
498

    
499
    logging.debug("Opcode is now running")
500

    
501
    self._op.status = constants.OP_STATUS_RUNNING
502
    self._op.exec_timestamp = TimeStampNow()
503

    
504
    # And finally replicate the job status
505
    self._queue.UpdateJobUnlocked(self._job)
506

    
507
  @locking.ssynchronized(_QUEUE, shared=1)
508
  def _AppendFeedback(self, timestamp, log_type, log_msg):
509
    """Internal feedback append function, with locks
510

511
    """
512
    self._job.log_serial += 1
513
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
515

    
516
  def Feedback(self, *args):
517
    """Append a log entry.
518

519
    """
520
    assert len(args) < 3
521

    
522
    if len(args) == 1:
523
      log_type = constants.ELOG_MESSAGE
524
      log_msg = args[0]
525
    else:
526
      (log_type, log_msg) = args
527

    
528
    # The time is split to make serialization easier and not lose
529
    # precision.
530
    timestamp = utils.SplitTime(time.time())
531
    self._AppendFeedback(timestamp, log_type, log_msg)
532

    
533
  def CheckCancel(self):
534
    """Check whether job has been cancelled.
535

536
    """
537
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538
                               constants.OP_STATUS_CANCELING)
539

    
540
    # Cancel here if we were asked to
541
    self._CheckCancel()
542

    
543
  def SubmitManyJobs(self, jobs):
544
    """Submits jobs for processing.
545

546
    See L{JobQueue.SubmitManyJobs}.
547

548
    """
549
    # Locking is done in job queue
550
    return self._queue.SubmitManyJobs(jobs)
551

    
552

    
553
class _JobChangesChecker(object):
554
  def __init__(self, fields, prev_job_info, prev_log_serial):
555
    """Initializes this class.
556

557
    @type fields: list of strings
558
    @param fields: Fields requested by LUXI client
559
    @type prev_job_info: string
560
    @param prev_job_info: previous job info, as passed by the LUXI client
561
    @type prev_log_serial: string
562
    @param prev_log_serial: previous job serial, as passed by the LUXI client
563

564
    """
565
    self._fields = fields
566
    self._prev_job_info = prev_job_info
567
    self._prev_log_serial = prev_log_serial
568

    
569
  def __call__(self, job):
570
    """Checks whether job has changed.
571

572
    @type job: L{_QueuedJob}
573
    @param job: Job object
574

575
    """
576
    status = job.CalcStatus()
577
    job_info = job.GetInfo(self._fields)
578
    log_entries = job.GetLogEntries(self._prev_log_serial)
579

    
580
    # Serializing and deserializing data can cause type changes (e.g. from
581
    # tuple to list) or precision loss. We're doing it here so that we get
582
    # the same modifications as the data received from the client. Without
583
    # this, the comparison afterwards might fail without the data being
584
    # significantly different.
585
    # TODO: we just deserialized from disk, investigate how to make sure that
586
    # the job info and log entries are compatible to avoid this further step.
587
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
588
    # efficient, though floats will be tricky
589
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
590
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
591

    
592
    # Don't even try to wait if the job is no longer running, there will be
593
    # no changes.
594
    if (status not in (constants.JOB_STATUS_QUEUED,
595
                       constants.JOB_STATUS_RUNNING,
596
                       constants.JOB_STATUS_WAITLOCK) or
597
        job_info != self._prev_job_info or
598
        (log_entries and self._prev_log_serial != log_entries[0][0])):
599
      logging.debug("Job %s changed", job.id)
600
      return (job_info, log_entries)
601

    
602
    return None
603

    
604

    
605
class _JobFileChangesWaiter(object):
606
  def __init__(self, filename):
607
    """Initializes this class.
608

609
    @type filename: string
610
    @param filename: Path to job file
611
    @raises errors.InotifyError: if the notifier cannot be setup
612

613
    """
614
    self._wm = pyinotify.WatchManager()
615
    self._inotify_handler = \
616
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
617
    self._notifier = \
618
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
619
    try:
620
      self._inotify_handler.enable()
621
    except Exception:
622
      # pyinotify doesn't close file descriptors automatically
623
      self._notifier.stop()
624
      raise
625

    
626
  def _OnInotify(self, notifier_enabled):
627
    """Callback for inotify.
628

629
    """
630
    if not notifier_enabled:
631
      self._inotify_handler.enable()
632

    
633
  def Wait(self, timeout):
634
    """Waits for the job file to change.
635

636
    @type timeout: float
637
    @param timeout: Timeout in seconds
638
    @return: Whether there have been events
639

640
    """
641
    assert timeout >= 0
642
    have_events = self._notifier.check_events(timeout * 1000)
643
    if have_events:
644
      self._notifier.read_events()
645
    self._notifier.process_events()
646
    return have_events
647

    
648
  def Close(self):
649
    """Closes underlying notifier and its file descriptor.
650

651
    """
652
    self._notifier.stop()
653

    
654

    
655
class _JobChangesWaiter(object):
656
  def __init__(self, filename):
657
    """Initializes this class.
658

659
    @type filename: string
660
    @param filename: Path to job file
661

662
    """
663
    self._filewaiter = None
664
    self._filename = filename
665

    
666
  def Wait(self, timeout):
667
    """Waits for a job to change.
668

669
    @type timeout: float
670
    @param timeout: Timeout in seconds
671
    @return: Whether there have been events
672

673
    """
674
    if self._filewaiter:
675
      return self._filewaiter.Wait(timeout)
676

    
677
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
678
    # If this point is reached, return immediately and let caller check the job
679
    # file again in case there were changes since the last check. This avoids a
680
    # race condition.
681
    self._filewaiter = _JobFileChangesWaiter(self._filename)
682

    
683
    return True
684

    
685
  def Close(self):
686
    """Closes underlying waiter.
687

688
    """
689
    if self._filewaiter:
690
      self._filewaiter.Close()
691

    
692

    
693
class _WaitForJobChangesHelper(object):
694
  """Helper class using inotify to wait for changes in a job file.
695

696
  This class takes a previous job status and serial, and alerts the client when
697
  the current job status has changed.
698

699
  """
700
  @staticmethod
701
  def _CheckForChanges(job_load_fn, check_fn):
702
    job = job_load_fn()
703
    if not job:
704
      raise errors.JobLost()
705

    
706
    result = check_fn(job)
707
    if result is None:
708
      raise utils.RetryAgain()
709

    
710
    return result
711

    
712
  def __call__(self, filename, job_load_fn,
713
               fields, prev_job_info, prev_log_serial, timeout):
714
    """Waits for changes on a job.
715

716
    @type filename: string
717
    @param filename: File on which to wait for changes
718
    @type job_load_fn: callable
719
    @param job_load_fn: Function to load job
720
    @type fields: list of strings
721
    @param fields: Which fields to check for changes
722
    @type prev_job_info: list or None
723
    @param prev_job_info: Last job information returned
724
    @type prev_log_serial: int
725
    @param prev_log_serial: Last job message serial number
726
    @type timeout: float
727
    @param timeout: maximum time to wait in seconds
728

729
    """
730
    try:
731
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
732
      waiter = _JobChangesWaiter(filename)
733
      try:
734
        return utils.Retry(compat.partial(self._CheckForChanges,
735
                                          job_load_fn, check_fn),
736
                           utils.RETRY_REMAINING_TIME, timeout,
737
                           wait_fn=waiter.Wait)
738
      finally:
739
        waiter.Close()
740
    except (errors.InotifyError, errors.JobLost):
741
      return None
742
    except utils.RetryTimeout:
743
      return constants.JOB_NOTCHANGED
744

    
745

    
746
def _EncodeOpError(err):
747
  """Encodes an error which occurred while processing an opcode.
748

749
  """
750
  if isinstance(err, errors.GenericError):
751
    to_encode = err
752
  else:
753
    to_encode = errors.OpExecError(str(err))
754

    
755
  return errors.EncodeException(to_encode)
756

    
757

    
758
class _TimeoutStrategyWrapper:
759
  def __init__(self, fn):
760
    """Initializes this class.
761

762
    """
763
    self._fn = fn
764
    self._next = None
765

    
766
  def _Advance(self):
767
    """Gets the next timeout if necessary.
768

769
    """
770
    if self._next is None:
771
      self._next = self._fn()
772

    
773
  def Peek(self):
774
    """Returns the next timeout.
775

776
    """
777
    self._Advance()
778
    return self._next
779

    
780
  def Next(self):
781
    """Returns the current timeout and advances the internal state.
782

783
    """
784
    self._Advance()
785
    result = self._next
786
    self._next = None
787
    return result
788

    
789

    
790
class _OpExecContext:
791
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
792
    """Initializes this class.
793

794
    """
795
    self.op = op
796
    self.index = index
797
    self.log_prefix = log_prefix
798
    self.summary = op.input.Summary()
799

    
800
    self._timeout_strategy_factory = timeout_strategy_factory
801
    self._ResetTimeoutStrategy()
802

    
803
  def _ResetTimeoutStrategy(self):
804
    """Creates a new timeout strategy.
805

806
    """
807
    self._timeout_strategy = \
808
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
809

    
810
  def CheckPriorityIncrease(self):
811
    """Checks whether priority can and should be increased.
812

813
    Called when locks couldn't be acquired.
814

815
    """
816
    op = self.op
817

    
818
    # Exhausted all retries and next round should not use blocking acquire
819
    # for locks?
820
    if (self._timeout_strategy.Peek() is None and
821
        op.priority > constants.OP_PRIO_HIGHEST):
822
      logging.debug("Increasing priority")
823
      op.priority -= 1
824
      self._ResetTimeoutStrategy()
825
      return True
826

    
827
    return False
828

    
829
  def GetNextLockTimeout(self):
830
    """Returns the next lock acquire timeout.
831

832
    """
833
    return self._timeout_strategy.Next()
834

    
835

    
836
class _JobProcessor(object):
837
  def __init__(self, queue, opexec_fn, job,
838
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
839
    """Initializes this class.
840

841
    """
842
    self.queue = queue
843
    self.opexec_fn = opexec_fn
844
    self.job = job
845
    self._timeout_strategy_factory = _timeout_strategy_factory
846

    
847
  @staticmethod
848
  def _FindNextOpcode(job, timeout_strategy_factory):
849
    """Locates the next opcode to run.
850

851
    @type job: L{_QueuedJob}
852
    @param job: Job object
853
    @param timeout_strategy_factory: Callable to create new timeout strategy
854

855
    """
856
    # Create some sort of a cache to speed up locating next opcode for future
857
    # lookups
858
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
859
    # pending and one for processed ops.
860
    if job.ops_iter is None:
861
      job.ops_iter = enumerate(job.ops)
862

    
863
    # Find next opcode to run
864
    while True:
865
      try:
866
        (idx, op) = job.ops_iter.next()
867
      except StopIteration:
868
        raise errors.ProgrammerError("Called for a finished job")
869

    
870
      if op.status == constants.OP_STATUS_RUNNING:
871
        # Found an opcode already marked as running
872
        raise errors.ProgrammerError("Called for job marked as running")
873

    
874
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
875
                             timeout_strategy_factory)
876

    
877
      if op.status == constants.OP_STATUS_CANCELED:
878
        # Cancelled jobs are handled by the caller
879
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
880
                              for i in job.ops[idx:])
881

    
882
      elif op.status in constants.OPS_FINALIZED:
883
        # This is a job that was partially completed before master daemon
884
        # shutdown, so it can be expected that some opcodes are already
885
        # completed successfully (if any did error out, then the whole job
886
        # should have been aborted and not resubmitted for processing).
887
        logging.info("%s: opcode %s already processed, skipping",
888
                     opctx.log_prefix, opctx.summary)
889
        continue
890

    
891
      return opctx
892

    
893
  @staticmethod
894
  def _MarkWaitlock(job, op):
895
    """Marks an opcode as waiting for locks.
896

897
    The job's start timestamp is also set if necessary.
898

899
    @type job: L{_QueuedJob}
900
    @param job: Job object
901
    @type op: L{_QueuedOpCode}
902
    @param op: Opcode object
903

904
    """
905
    assert op in job.ops
906
    assert op.status in (constants.OP_STATUS_QUEUED,
907
                         constants.OP_STATUS_WAITLOCK)
908

    
909
    update = False
910

    
911
    op.result = None
912

    
913
    if op.status == constants.OP_STATUS_QUEUED:
914
      op.status = constants.OP_STATUS_WAITLOCK
915
      update = True
916

    
917
    if op.start_timestamp is None:
918
      op.start_timestamp = TimeStampNow()
919
      update = True
920

    
921
    if job.start_timestamp is None:
922
      job.start_timestamp = op.start_timestamp
923
      update = True
924

    
925
    assert op.status == constants.OP_STATUS_WAITLOCK
926

    
927
    return update
928

    
929
  def _ExecOpCodeUnlocked(self, opctx):
930
    """Processes one opcode and returns the result.
931

932
    """
933
    op = opctx.op
934

    
935
    assert op.status == constants.OP_STATUS_WAITLOCK
936

    
937
    timeout = opctx.GetNextLockTimeout()
938

    
939
    try:
940
      # Make sure not to hold queue lock while calling ExecOpCode
941
      result = self.opexec_fn(op.input,
942
                              _OpExecCallbacks(self.queue, self.job, op),
943
                              timeout=timeout, priority=op.priority)
944
    except mcpu.LockAcquireTimeout:
945
      assert timeout is not None, "Received timeout for blocking acquire"
946
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
947

    
948
      assert op.status in (constants.OP_STATUS_WAITLOCK,
949
                           constants.OP_STATUS_CANCELING)
950

    
951
      # Was job cancelled while we were waiting for the lock?
952
      if op.status == constants.OP_STATUS_CANCELING:
953
        return (constants.OP_STATUS_CANCELING, None)
954

    
955
      # Stay in waitlock while trying to re-acquire lock
956
      return (constants.OP_STATUS_WAITLOCK, None)
957
    except CancelJob:
958
      logging.exception("%s: Canceling job", opctx.log_prefix)
959
      assert op.status == constants.OP_STATUS_CANCELING
960
      return (constants.OP_STATUS_CANCELING, None)
961
    except Exception, err: # pylint: disable-msg=W0703
962
      logging.exception("%s: Caught exception in %s",
963
                        opctx.log_prefix, opctx.summary)
964
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
965
    else:
966
      logging.debug("%s: %s successful",
967
                    opctx.log_prefix, opctx.summary)
968
      return (constants.OP_STATUS_SUCCESS, result)
969

    
970
  def __call__(self, _nextop_fn=None):
971
    """Continues execution of a job.
972

973
    @param _nextop_fn: Callback function for tests
974
    @rtype: bool
975
    @return: True if job is finished, False if processor needs to be called
976
             again
977

978
    """
979
    queue = self.queue
980
    job = self.job
981

    
982
    logging.debug("Processing job %s", job.id)
983

    
984
    queue.acquire(shared=1)
985
    try:
986
      opcount = len(job.ops)
987

    
988
      # Is a previous opcode still pending?
989
      if job.cur_opctx:
990
        opctx = job.cur_opctx
991
        job.cur_opctx = None
992
      else:
993
        if __debug__ and _nextop_fn:
994
          _nextop_fn()
995
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
996

    
997
      op = opctx.op
998

    
999
      # Consistency check
1000
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1001
                                     constants.OP_STATUS_CANCELING,
1002
                                     constants.OP_STATUS_CANCELED)
1003
                        for i in job.ops[opctx.index + 1:])
1004

    
1005
      assert op.status in (constants.OP_STATUS_QUEUED,
1006
                           constants.OP_STATUS_WAITLOCK,
1007
                           constants.OP_STATUS_CANCELING,
1008
                           constants.OP_STATUS_CANCELED)
1009

    
1010
      assert (op.priority <= constants.OP_PRIO_LOWEST and
1011
              op.priority >= constants.OP_PRIO_HIGHEST)
1012

    
1013
      if op.status not in (constants.OP_STATUS_CANCELING,
1014
                           constants.OP_STATUS_CANCELED):
1015
        assert op.status in (constants.OP_STATUS_QUEUED,
1016
                             constants.OP_STATUS_WAITLOCK)
1017

    
1018
        # Prepare to start opcode
1019
        if self._MarkWaitlock(job, op):
1020
          # Write to disk
1021
          queue.UpdateJobUnlocked(job)
1022

    
1023
        assert op.status == constants.OP_STATUS_WAITLOCK
1024
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1025
        assert job.start_timestamp and op.start_timestamp
1026

    
1027
        logging.info("%s: opcode %s waiting for locks",
1028
                     opctx.log_prefix, opctx.summary)
1029

    
1030
        queue.release()
1031
        try:
1032
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1033
        finally:
1034
          queue.acquire(shared=1)
1035

    
1036
        op.status = op_status
1037
        op.result = op_result
1038

    
1039
        if op.status == constants.OP_STATUS_WAITLOCK:
1040
          # Couldn't get locks in time
1041
          assert not op.end_timestamp
1042
        else:
1043
          # Finalize opcode
1044
          op.end_timestamp = TimeStampNow()
1045

    
1046
          if op.status == constants.OP_STATUS_CANCELING:
1047
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1048
                                  for i in job.ops[opctx.index:])
1049
          else:
1050
            assert op.status in constants.OPS_FINALIZED
1051

    
1052
      if op.status == constants.OP_STATUS_WAITLOCK:
1053
        finalize = False
1054

    
1055
        if opctx.CheckPriorityIncrease():
1056
          # Priority was changed, need to update on-disk file
1057
          queue.UpdateJobUnlocked(job)
1058

    
1059
        # Keep around for another round
1060
        job.cur_opctx = opctx
1061

    
1062
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1063
                op.priority >= constants.OP_PRIO_HIGHEST)
1064

    
1065
        # In no case must the status be finalized here
1066
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1067

    
1068
      else:
1069
        # Ensure all opcodes so far have been successful
1070
        assert (opctx.index == 0 or
1071
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1072
                           for i in job.ops[:opctx.index]))
1073

    
1074
        # Reset context
1075
        job.cur_opctx = None
1076

    
1077
        if op.status == constants.OP_STATUS_SUCCESS:
1078
          finalize = False
1079

    
1080
        elif op.status == constants.OP_STATUS_ERROR:
1081
          # Ensure failed opcode has an exception as its result
1082
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1083

    
1084
          to_encode = errors.OpExecError("Preceding opcode failed")
1085
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1086
                                _EncodeOpError(to_encode))
1087
          finalize = True
1088

    
1089
          # Consistency check
1090
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1091
                            errors.GetEncodedError(i.result)
1092
                            for i in job.ops[opctx.index:])
1093

    
1094
        elif op.status == constants.OP_STATUS_CANCELING:
1095
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1096
                                "Job canceled by request")
1097
          finalize = True
1098

    
1099
        elif op.status == constants.OP_STATUS_CANCELED:
1100
          finalize = True
1101

    
1102
        else:
1103
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1104

    
1105
        # Finalizing or last opcode?
1106
        if finalize or opctx.index == (opcount - 1):
1107
          # All opcodes have been run, finalize job
1108
          job.end_timestamp = TimeStampNow()
1109

    
1110
        # Write to disk. If the job status is final, this is the final write
1111
        # allowed. Once the file has been written, it can be archived anytime.
1112
        queue.UpdateJobUnlocked(job)
1113

    
1114
        if finalize or opctx.index == (opcount - 1):
1115
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1116
          return True
1117

    
1118
      return False
1119
    finally:
1120
      queue.release()
1121

    
1122

    
1123
class _JobQueueWorker(workerpool.BaseWorker):
1124
  """The actual job workers.
1125

1126
  """
1127
  def RunTask(self, job): # pylint: disable-msg=W0221
1128
    """Job executor.
1129

1130
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1131
    L{_QueuedOpCode} classes.
1132

1133
    @type job: L{_QueuedJob}
1134
    @param job: the job to be processed
1135

1136
    """
1137
    queue = job.queue
1138
    assert queue == self.pool.queue
1139

    
1140
    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1141
    setname_fn(None)
1142

    
1143
    proc = mcpu.Processor(queue.context, job.id)
1144

    
1145
    # Create wrapper for setting thread name
1146
    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1147
                                    proc.ExecOpCode)
1148

    
1149
    if not _JobProcessor(queue, wrap_execop_fn, job)():
1150
      # Schedule again
1151
      raise workerpool.DeferTask(priority=job.CalcPriority())
1152

    
1153
  @staticmethod
1154
  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1155
    """Updates the worker thread name to include a short summary of the opcode.
1156

1157
    @param setname_fn: Callable setting worker thread name
1158
    @param execop_fn: Callable for executing opcode (usually
1159
                      L{mcpu.Processor.ExecOpCode})
1160

1161
    """
1162
    setname_fn(op)
1163
    try:
1164
      return execop_fn(op, *args, **kwargs)
1165
    finally:
1166
      setname_fn(None)
1167

    
1168
  @staticmethod
1169
  def _GetWorkerName(job, op):
1170
    """Sets the worker thread name.
1171

1172
    @type job: L{_QueuedJob}
1173
    @type op: L{opcodes.OpCode}
1174

1175
    """
1176
    parts = ["Job%s" % job.id]
1177

    
1178
    if op:
1179
      parts.append(op.TinySummary())
1180

    
1181
    return "/".join(parts)
1182

    
1183

    
1184
class _JobQueueWorkerPool(workerpool.WorkerPool):
1185
  """Simple class implementing a job-processing workerpool.
1186

1187
  """
1188
  def __init__(self, queue):
1189
    super(_JobQueueWorkerPool, self).__init__("Jq",
1190
                                              JOBQUEUE_THREADS,
1191
                                              _JobQueueWorker)
1192
    self.queue = queue
1193

    
1194

    
1195
def _RequireOpenQueue(fn):
1196
  """Decorator for "public" functions.
1197

1198
  This function should be used for all 'public' functions. That is,
1199
  functions usually called from other classes. Note that this should
1200
  be applied only to methods (not plain functions), since it expects
1201
  that the decorated function is called with a first argument that has
1202
  a '_queue_filelock' argument.
1203

1204
  @warning: Use this decorator only after locking.ssynchronized
1205

1206
  Example::
1207
    @locking.ssynchronized(_LOCK)
1208
    @_RequireOpenQueue
1209
    def Example(self):
1210
      pass
1211

1212
  """
1213
  def wrapper(self, *args, **kwargs):
1214
    # pylint: disable-msg=W0212
1215
    assert self._queue_filelock is not None, "Queue should be open"
1216
    return fn(self, *args, **kwargs)
1217
  return wrapper
1218

    
1219

    
1220
class JobQueue(object):
1221
  """Queue used to manage the jobs.
1222

1223
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1224

1225
  """
1226
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1227

    
1228
  def __init__(self, context):
1229
    """Constructor for JobQueue.
1230

1231
    The constructor will initialize the job queue object and then
1232
    start loading the current jobs from disk, either for starting them
1233
    (if they were queue) or for aborting them (if they were already
1234
    running).
1235

1236
    @type context: GanetiContext
1237
    @param context: the context object for access to the configuration
1238
        data and other ganeti objects
1239

1240
    """
1241
    self.context = context
1242
    self._memcache = weakref.WeakValueDictionary()
1243
    self._my_hostname = netutils.Hostname.GetSysName()
1244

    
1245
    # The Big JobQueue lock. If a code block or method acquires it in shared
1246
    # mode safe it must guarantee concurrency with all the code acquiring it in
1247
    # shared mode, including itself. In order not to acquire it at all
1248
    # concurrency must be guaranteed with all code acquiring it in shared mode
1249
    # and all code acquiring it exclusively.
1250
    self._lock = locking.SharedLock("JobQueue")
1251

    
1252
    self.acquire = self._lock.acquire
1253
    self.release = self._lock.release
1254

    
1255
    # Initialize the queue, and acquire the filelock.
1256
    # This ensures no other process is working on the job queue.
1257
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1258

    
1259
    # Read serial file
1260
    self._last_serial = jstore.ReadSerial()
1261
    assert self._last_serial is not None, ("Serial file was modified between"
1262
                                           " check in jstore and here")
1263

    
1264
    # Get initial list of nodes
1265
    self._nodes = dict((n.name, n.primary_ip)
1266
                       for n in self.context.cfg.GetAllNodesInfo().values()
1267
                       if n.master_candidate)
1268

    
1269
    # Remove master node
1270
    self._nodes.pop(self._my_hostname, None)
1271

    
1272
    # TODO: Check consistency across nodes
1273

    
1274
    self._queue_size = 0
1275
    self._UpdateQueueSizeUnlocked()
1276
    self._drained = jstore.CheckDrainFlag()
1277

    
1278
    # Setup worker pool
1279
    self._wpool = _JobQueueWorkerPool(self)
1280
    try:
1281
      self._InspectQueue()
1282
    except:
1283
      self._wpool.TerminateWorkers()
1284
      raise
1285

    
1286
  @locking.ssynchronized(_LOCK)
1287
  @_RequireOpenQueue
1288
  def _InspectQueue(self):
1289
    """Loads the whole job queue and resumes unfinished jobs.
1290

1291
    This function needs the lock here because WorkerPool.AddTask() may start a
1292
    job while we're still doing our work.
1293

1294
    """
1295
    logging.info("Inspecting job queue")
1296

    
1297
    restartjobs = []
1298

    
1299
    all_job_ids = self._GetJobIDsUnlocked()
1300
    jobs_count = len(all_job_ids)
1301
    lastinfo = time.time()
1302
    for idx, job_id in enumerate(all_job_ids):
1303
      # Give an update every 1000 jobs or 10 seconds
1304
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1305
          idx == (jobs_count - 1)):
1306
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1307
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1308
        lastinfo = time.time()
1309

    
1310
      job = self._LoadJobUnlocked(job_id)
1311

    
1312
      # a failure in loading the job can cause 'None' to be returned
1313
      if job is None:
1314
        continue
1315

    
1316
      status = job.CalcStatus()
1317

    
1318
      if status == constants.JOB_STATUS_QUEUED:
1319
        restartjobs.append(job)
1320

    
1321
      elif status in (constants.JOB_STATUS_RUNNING,
1322
                      constants.JOB_STATUS_WAITLOCK,
1323
                      constants.JOB_STATUS_CANCELING):
1324
        logging.warning("Unfinished job %s found: %s", job.id, job)
1325

    
1326
        if status == constants.JOB_STATUS_WAITLOCK:
1327
          # Restart job
1328
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1329
          restartjobs.append(job)
1330
        else:
1331
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1332
                                "Unclean master daemon shutdown")
1333

    
1334
        self.UpdateJobUnlocked(job)
1335

    
1336
    if restartjobs:
1337
      logging.info("Restarting %s jobs", len(restartjobs))
1338
      self._EnqueueJobs(restartjobs)
1339

    
1340
    logging.info("Job queue inspection finished")
1341

    
1342
  @locking.ssynchronized(_LOCK)
1343
  @_RequireOpenQueue
1344
  def AddNode(self, node):
1345
    """Register a new node with the queue.
1346

1347
    @type node: L{objects.Node}
1348
    @param node: the node object to be added
1349

1350
    """
1351
    node_name = node.name
1352
    assert node_name != self._my_hostname
1353

    
1354
    # Clean queue directory on added node
1355
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1356
    msg = result.fail_msg
1357
    if msg:
1358
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1359
                      node_name, msg)
1360

    
1361
    if not node.master_candidate:
1362
      # remove if existing, ignoring errors
1363
      self._nodes.pop(node_name, None)
1364
      # and skip the replication of the job ids
1365
      return
1366

    
1367
    # Upload the whole queue excluding archived jobs
1368
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1369

    
1370
    # Upload current serial file
1371
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1372

    
1373
    for file_name in files:
1374
      # Read file content
1375
      content = utils.ReadFile(file_name)
1376

    
1377
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1378
                                                  [node.primary_ip],
1379
                                                  file_name, content)
1380
      msg = result[node_name].fail_msg
1381
      if msg:
1382
        logging.error("Failed to upload file %s to node %s: %s",
1383
                      file_name, node_name, msg)
1384

    
1385
    self._nodes[node_name] = node.primary_ip
1386

    
1387
  @locking.ssynchronized(_LOCK)
1388
  @_RequireOpenQueue
1389
  def RemoveNode(self, node_name):
1390
    """Callback called when removing nodes from the cluster.
1391

1392
    @type node_name: str
1393
    @param node_name: the name of the node to remove
1394

1395
    """
1396
    self._nodes.pop(node_name, None)
1397

    
1398
  @staticmethod
1399
  def _CheckRpcResult(result, nodes, failmsg):
1400
    """Verifies the status of an RPC call.
1401

1402
    Since we aim to keep consistency should this node (the current
1403
    master) fail, we will log errors if our rpc fail, and especially
1404
    log the case when more than half of the nodes fails.
1405

1406
    @param result: the data as returned from the rpc call
1407
    @type nodes: list
1408
    @param nodes: the list of nodes we made the call to
1409
    @type failmsg: str
1410
    @param failmsg: the identifier to be used for logging
1411

1412
    """
1413
    failed = []
1414
    success = []
1415

    
1416
    for node in nodes:
1417
      msg = result[node].fail_msg
1418
      if msg:
1419
        failed.append(node)
1420
        logging.error("RPC call %s (%s) failed on node %s: %s",
1421
                      result[node].call, failmsg, node, msg)
1422
      else:
1423
        success.append(node)
1424

    
1425
    # +1 for the master node
1426
    if (len(success) + 1) < len(failed):
1427
      # TODO: Handle failing nodes
1428
      logging.error("More than half of the nodes failed")
1429

    
1430
  def _GetNodeIp(self):
1431
    """Helper for returning the node name/ip list.
1432

1433
    @rtype: (list, list)
1434
    @return: a tuple of two lists, the first one with the node
1435
        names and the second one with the node addresses
1436

1437
    """
1438
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1439
    name_list = self._nodes.keys()
1440
    addr_list = [self._nodes[name] for name in name_list]
1441
    return name_list, addr_list
1442

    
1443
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1444
    """Writes a file locally and then replicates it to all nodes.
1445

1446
    This function will replace the contents of a file on the local
1447
    node and then replicate it to all the other nodes we have.
1448

1449
    @type file_name: str
1450
    @param file_name: the path of the file to be replicated
1451
    @type data: str
1452
    @param data: the new contents of the file
1453
    @type replicate: boolean
1454
    @param replicate: whether to spread the changes to the remote nodes
1455

1456
    """
1457
    getents = runtime.GetEnts()
1458
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1459
                    gid=getents.masterd_gid)
1460

    
1461
    if replicate:
1462
      names, addrs = self._GetNodeIp()
1463
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1464
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1465

    
1466
  def _RenameFilesUnlocked(self, rename):
1467
    """Renames a file locally and then replicate the change.
1468

1469
    This function will rename a file in the local queue directory
1470
    and then replicate this rename to all the other nodes we have.
1471

1472
    @type rename: list of (old, new)
1473
    @param rename: List containing tuples mapping old to new names
1474

1475
    """
1476
    # Rename them locally
1477
    for old, new in rename:
1478
      utils.RenameFile(old, new, mkdir=True)
1479

    
1480
    # ... and on all nodes
1481
    names, addrs = self._GetNodeIp()
1482
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1483
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1484

    
1485
  @staticmethod
1486
  def _FormatJobID(job_id):
1487
    """Convert a job ID to string format.
1488

1489
    Currently this just does C{str(job_id)} after performing some
1490
    checks, but if we want to change the job id format this will
1491
    abstract this change.
1492

1493
    @type job_id: int or long
1494
    @param job_id: the numeric job id
1495
    @rtype: str
1496
    @return: the formatted job id
1497

1498
    """
1499
    if not isinstance(job_id, (int, long)):
1500
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1501
    if job_id < 0:
1502
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1503

    
1504
    return str(job_id)
1505

    
1506
  @classmethod
1507
  def _GetArchiveDirectory(cls, job_id):
1508
    """Returns the archive directory for a job.
1509

1510
    @type job_id: str
1511
    @param job_id: Job identifier
1512
    @rtype: str
1513
    @return: Directory name
1514

1515
    """
1516
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1517

    
1518
  def _NewSerialsUnlocked(self, count):
1519
    """Generates a new job identifier.
1520

1521
    Job identifiers are unique during the lifetime of a cluster.
1522

1523
    @type count: integer
1524
    @param count: how many serials to return
1525
    @rtype: str
1526
    @return: a string representing the job identifier.
1527

1528
    """
1529
    assert count > 0
1530
    # New number
1531
    serial = self._last_serial + count
1532

    
1533
    # Write to file
1534
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1535
                             "%s\n" % serial, True)
1536

    
1537
    result = [self._FormatJobID(v)
1538
              for v in range(self._last_serial, serial + 1)]
1539
    # Keep it only if we were able to write the file
1540
    self._last_serial = serial
1541

    
1542
    return result
1543

    
1544
  @staticmethod
1545
  def _GetJobPath(job_id):
1546
    """Returns the job file for a given job id.
1547

1548
    @type job_id: str
1549
    @param job_id: the job identifier
1550
    @rtype: str
1551
    @return: the path to the job file
1552

1553
    """
1554
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1555

    
1556
  @classmethod
1557
  def _GetArchivedJobPath(cls, job_id):
1558
    """Returns the archived job file for a give job id.
1559

1560
    @type job_id: str
1561
    @param job_id: the job identifier
1562
    @rtype: str
1563
    @return: the path to the archived job file
1564

1565
    """
1566
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1567
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1568

    
1569
  def _GetJobIDsUnlocked(self, sort=True):
1570
    """Return all known job IDs.
1571

1572
    The method only looks at disk because it's a requirement that all
1573
    jobs are present on disk (so in the _memcache we don't have any
1574
    extra IDs).
1575

1576
    @type sort: boolean
1577
    @param sort: perform sorting on the returned job ids
1578
    @rtype: list
1579
    @return: the list of job IDs
1580

1581
    """
1582
    jlist = []
1583
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1584
      m = self._RE_JOB_FILE.match(filename)
1585
      if m:
1586
        jlist.append(m.group(1))
1587
    if sort:
1588
      jlist = utils.NiceSort(jlist)
1589
    return jlist
1590

    
1591
  def _LoadJobUnlocked(self, job_id):
1592
    """Loads a job from the disk or memory.
1593

1594
    Given a job id, this will return the cached job object if
1595
    existing, or try to load the job from the disk. If loading from
1596
    disk, it will also add the job to the cache.
1597

1598
    @param job_id: the job id
1599
    @rtype: L{_QueuedJob} or None
1600
    @return: either None or the job object
1601

1602
    """
1603
    job = self._memcache.get(job_id, None)
1604
    if job:
1605
      logging.debug("Found job %s in memcache", job_id)
1606
      return job
1607

    
1608
    try:
1609
      job = self._LoadJobFromDisk(job_id)
1610
      if job is None:
1611
        return job
1612
    except errors.JobFileCorrupted:
1613
      old_path = self._GetJobPath(job_id)
1614
      new_path = self._GetArchivedJobPath(job_id)
1615
      if old_path == new_path:
1616
        # job already archived (future case)
1617
        logging.exception("Can't parse job %s", job_id)
1618
      else:
1619
        # non-archived case
1620
        logging.exception("Can't parse job %s, will archive.", job_id)
1621
        self._RenameFilesUnlocked([(old_path, new_path)])
1622
      return None
1623

    
1624
    self._memcache[job_id] = job
1625
    logging.debug("Added job %s to the cache", job_id)
1626
    return job
1627

    
1628
  def _LoadJobFromDisk(self, job_id):
1629
    """Load the given job file from disk.
1630

1631
    Given a job file, read, load and restore it in a _QueuedJob format.
1632

1633
    @type job_id: string
1634
    @param job_id: job identifier
1635
    @rtype: L{_QueuedJob} or None
1636
    @return: either None or the job object
1637

1638
    """
1639
    filepath = self._GetJobPath(job_id)
1640
    logging.debug("Loading job from %s", filepath)
1641
    try:
1642
      raw_data = utils.ReadFile(filepath)
1643
    except EnvironmentError, err:
1644
      if err.errno in (errno.ENOENT, ):
1645
        return None
1646
      raise
1647

    
1648
    try:
1649
      data = serializer.LoadJson(raw_data)
1650
      job = _QueuedJob.Restore(self, data)
1651
    except Exception, err: # pylint: disable-msg=W0703
1652
      raise errors.JobFileCorrupted(err)
1653

    
1654
    return job
1655

    
1656
  def SafeLoadJobFromDisk(self, job_id):
1657
    """Load the given job file from disk.
1658

1659
    Given a job file, read, load and restore it in a _QueuedJob format.
1660
    In case of error reading the job, it gets returned as None, and the
1661
    exception is logged.
1662

1663
    @type job_id: string
1664
    @param job_id: job identifier
1665
    @rtype: L{_QueuedJob} or None
1666
    @return: either None or the job object
1667

1668
    """
1669
    try:
1670
      return self._LoadJobFromDisk(job_id)
1671
    except (errors.JobFileCorrupted, EnvironmentError):
1672
      logging.exception("Can't load/parse job %s", job_id)
1673
      return None
1674

    
1675
  def _UpdateQueueSizeUnlocked(self):
1676
    """Update the queue size.
1677

1678
    """
1679
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1680

    
1681
  @locking.ssynchronized(_LOCK)
1682
  @_RequireOpenQueue
1683
  def SetDrainFlag(self, drain_flag):
1684
    """Sets the drain flag for the queue.
1685

1686
    @type drain_flag: boolean
1687
    @param drain_flag: Whether to set or unset the drain flag
1688

1689
    """
1690
    jstore.SetDrainFlag(drain_flag)
1691

    
1692
    self._drained = drain_flag
1693

    
1694
    return True
1695

    
1696
  @_RequireOpenQueue
1697
  def _SubmitJobUnlocked(self, job_id, ops):
1698
    """Create and store a new job.
1699

1700
    This enters the job into our job queue and also puts it on the new
1701
    queue, in order for it to be picked up by the queue processors.
1702

1703
    @type job_id: job ID
1704
    @param job_id: the job ID for the new job
1705
    @type ops: list
1706
    @param ops: The list of OpCodes that will become the new job.
1707
    @rtype: L{_QueuedJob}
1708
    @return: the job object to be queued
1709
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1710
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1711
    @raise errors.GenericError: If an opcode is not valid
1712

1713
    """
1714
    # Ok when sharing the big job queue lock, as the drain file is created when
1715
    # the lock is exclusive.
1716
    if self._drained:
1717
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1718

    
1719
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1720
      raise errors.JobQueueFull()
1721

    
1722
    job = _QueuedJob(self, job_id, ops)
1723

    
1724
    # Check priority
1725
    for idx, op in enumerate(job.ops):
1726
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1727
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1728
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1729
                                  " are %s" % (idx, op.priority, allowed))
1730

    
1731
    # Write to disk
1732
    self.UpdateJobUnlocked(job)
1733

    
1734
    self._queue_size += 1
1735

    
1736
    logging.debug("Adding new job %s to the cache", job_id)
1737
    self._memcache[job_id] = job
1738

    
1739
    return job
1740

    
1741
  @locking.ssynchronized(_LOCK)
1742
  @_RequireOpenQueue
1743
  def SubmitJob(self, ops):
1744
    """Create and store a new job.
1745

1746
    @see: L{_SubmitJobUnlocked}
1747

1748
    """
1749
    job_id = self._NewSerialsUnlocked(1)[0]
1750
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1751
    return job_id
1752

    
1753
  @locking.ssynchronized(_LOCK)
1754
  @_RequireOpenQueue
1755
  def SubmitManyJobs(self, jobs):
1756
    """Create and store multiple jobs.
1757

1758
    @see: L{_SubmitJobUnlocked}
1759

1760
    """
1761
    results = []
1762
    added_jobs = []
1763
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1764
    for job_id, ops in zip(all_job_ids, jobs):
1765
      try:
1766
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1767
        status = True
1768
        data = job_id
1769
      except errors.GenericError, err:
1770
        data = ("%s; opcodes %s" %
1771
                (err, utils.CommaJoin(op.Summary() for op in ops)))
1772
        status = False
1773
      results.append((status, data))
1774

    
1775
    self._EnqueueJobs(added_jobs)
1776

    
1777
    return results
1778

    
1779
  def _EnqueueJobs(self, jobs):
1780
    """Helper function to add jobs to worker pool's queue.
1781

1782
    @type jobs: list
1783
    @param jobs: List of all jobs
1784

1785
    """
1786
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1787
                             priority=[job.CalcPriority() for job in jobs])
1788

    
1789
  @_RequireOpenQueue
1790
  def UpdateJobUnlocked(self, job, replicate=True):
1791
    """Update a job's on disk storage.
1792

1793
    After a job has been modified, this function needs to be called in
1794
    order to write the changes to disk and replicate them to the other
1795
    nodes.
1796

1797
    @type job: L{_QueuedJob}
1798
    @param job: the changed job
1799
    @type replicate: boolean
1800
    @param replicate: whether to replicate the change to remote nodes
1801

1802
    """
1803
    filename = self._GetJobPath(job.id)
1804
    data = serializer.DumpJson(job.Serialize(), indent=False)
1805
    logging.debug("Writing job %s to %s", job.id, filename)
1806
    self._UpdateJobQueueFile(filename, data, replicate)
1807

    
1808
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1809
                        timeout):
1810
    """Waits for changes in a job.
1811

1812
    @type job_id: string
1813
    @param job_id: Job identifier
1814
    @type fields: list of strings
1815
    @param fields: Which fields to check for changes
1816
    @type prev_job_info: list or None
1817
    @param prev_job_info: Last job information returned
1818
    @type prev_log_serial: int
1819
    @param prev_log_serial: Last job message serial number
1820
    @type timeout: float
1821
    @param timeout: maximum time to wait in seconds
1822
    @rtype: tuple (job info, log entries)
1823
    @return: a tuple of the job information as required via
1824
        the fields parameter, and the log entries as a list
1825

1826
        if the job has not changed and the timeout has expired,
1827
        we instead return a special value,
1828
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1829
        as such by the clients
1830

1831
    """
1832
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1833

    
1834
    helper = _WaitForJobChangesHelper()
1835

    
1836
    return helper(self._GetJobPath(job_id), load_fn,
1837
                  fields, prev_job_info, prev_log_serial, timeout)
1838

    
1839
  @locking.ssynchronized(_LOCK)
1840
  @_RequireOpenQueue
1841
  def CancelJob(self, job_id):
1842
    """Cancels a job.
1843

1844
    This will only succeed if the job has not started yet.
1845

1846
    @type job_id: string
1847
    @param job_id: job ID of job to be cancelled.
1848

1849
    """
1850
    logging.info("Cancelling job %s", job_id)
1851

    
1852
    job = self._LoadJobUnlocked(job_id)
1853
    if not job:
1854
      logging.debug("Job %s not found", job_id)
1855
      return (False, "Job %s not found" % job_id)
1856

    
1857
    (success, msg) = job.Cancel()
1858

    
1859
    if success:
1860
      self.UpdateJobUnlocked(job)
1861

    
1862
    return (success, msg)
1863

    
1864
  @_RequireOpenQueue
1865
  def _ArchiveJobsUnlocked(self, jobs):
1866
    """Archives jobs.
1867

1868
    @type jobs: list of L{_QueuedJob}
1869
    @param jobs: Job objects
1870
    @rtype: int
1871
    @return: Number of archived jobs
1872

1873
    """
1874
    archive_jobs = []
1875
    rename_files = []
1876
    for job in jobs:
1877
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1878
        logging.debug("Job %s is not yet done", job.id)
1879
        continue
1880

    
1881
      archive_jobs.append(job)
1882

    
1883
      old = self._GetJobPath(job.id)
1884
      new = self._GetArchivedJobPath(job.id)
1885
      rename_files.append((old, new))
1886

    
1887
    # TODO: What if 1..n files fail to rename?
1888
    self._RenameFilesUnlocked(rename_files)
1889

    
1890
    logging.debug("Successfully archived job(s) %s",
1891
                  utils.CommaJoin(job.id for job in archive_jobs))
1892

    
1893
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1894
    # the files, we update the cached queue size from the filesystem. When we
1895
    # get around to fix the TODO: above, we can use the number of actually
1896
    # archived jobs to fix this.
1897
    self._UpdateQueueSizeUnlocked()
1898
    return len(archive_jobs)
1899

    
1900
  @locking.ssynchronized(_LOCK)
1901
  @_RequireOpenQueue
1902
  def ArchiveJob(self, job_id):
1903
    """Archives a job.
1904

1905
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1906

1907
    @type job_id: string
1908
    @param job_id: Job ID of job to be archived.
1909
    @rtype: bool
1910
    @return: Whether job was archived
1911

1912
    """
1913
    logging.info("Archiving job %s", job_id)
1914

    
1915
    job = self._LoadJobUnlocked(job_id)
1916
    if not job:
1917
      logging.debug("Job %s not found", job_id)
1918
      return False
1919

    
1920
    return self._ArchiveJobsUnlocked([job]) == 1
1921

    
1922
  @locking.ssynchronized(_LOCK)
1923
  @_RequireOpenQueue
1924
  def AutoArchiveJobs(self, age, timeout):
1925
    """Archives all jobs based on age.
1926

1927
    The method will archive all jobs which are older than the age
1928
    parameter. For jobs that don't have an end timestamp, the start
1929
    timestamp will be considered. The special '-1' age will cause
1930
    archival of all jobs (that are not running or queued).
1931

1932
    @type age: int
1933
    @param age: the minimum age in seconds
1934

1935
    """
1936
    logging.info("Archiving jobs with age more than %s seconds", age)
1937

    
1938
    now = time.time()
1939
    end_time = now + timeout
1940
    archived_count = 0
1941
    last_touched = 0
1942

    
1943
    all_job_ids = self._GetJobIDsUnlocked()
1944
    pending = []
1945
    for idx, job_id in enumerate(all_job_ids):
1946
      last_touched = idx + 1
1947

    
1948
      # Not optimal because jobs could be pending
1949
      # TODO: Measure average duration for job archival and take number of
1950
      # pending jobs into account.
1951
      if time.time() > end_time:
1952
        break
1953

    
1954
      # Returns None if the job failed to load
1955
      job = self._LoadJobUnlocked(job_id)
1956
      if job:
1957
        if job.end_timestamp is None:
1958
          if job.start_timestamp is None:
1959
            job_age = job.received_timestamp
1960
          else:
1961
            job_age = job.start_timestamp
1962
        else:
1963
          job_age = job.end_timestamp
1964

    
1965
        if age == -1 or now - job_age[0] > age:
1966
          pending.append(job)
1967

    
1968
          # Archive 10 jobs at a time
1969
          if len(pending) >= 10:
1970
            archived_count += self._ArchiveJobsUnlocked(pending)
1971
            pending = []
1972

    
1973
    if pending:
1974
      archived_count += self._ArchiveJobsUnlocked(pending)
1975

    
1976
    return (archived_count, len(all_job_ids) - last_touched)
1977

    
1978
  def QueryJobs(self, job_ids, fields):
1979
    """Returns a list of jobs in queue.
1980

1981
    @type job_ids: list
1982
    @param job_ids: sequence of job identifiers or None for all
1983
    @type fields: list
1984
    @param fields: names of fields to return
1985
    @rtype: list
1986
    @return: list one element per job, each element being list with
1987
        the requested fields
1988

1989
    """
1990
    jobs = []
1991
    list_all = False
1992
    if not job_ids:
1993
      # Since files are added to/removed from the queue atomically, there's no
1994
      # risk of getting the job ids in an inconsistent state.
1995
      job_ids = self._GetJobIDsUnlocked()
1996
      list_all = True
1997

    
1998
    for job_id in job_ids:
1999
      job = self.SafeLoadJobFromDisk(job_id)
2000
      if job is not None:
2001
        jobs.append(job.GetInfo(fields))
2002
      elif not list_all:
2003
        jobs.append(None)
2004

    
2005
    return jobs
2006

    
2007
  @locking.ssynchronized(_LOCK)
2008
  @_RequireOpenQueue
2009
  def Shutdown(self):
2010
    """Stops the job queue.
2011

2012
    This shutdowns all the worker threads an closes the queue.
2013

2014
    """
2015
    self._wpool.TerminateWorkers()
2016

    
2017
    self._queue_filelock.Close()
2018
    self._queue_filelock = None