Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 66e884e1

History | View | Annotate | Download (57.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
59

    
60

    
61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
63

    
64
# member lock names to be passed to @ssynchronized decorator
65
_LOCK = "_lock"
66
_QUEUE = "_queue"
67

    
68

    
69
class CancelJob(Exception):
70
  """Special exception to cancel a job.
71

72
  """
73

    
74

    
75
def TimeStampNow():
76
  """Returns the current timestamp.
77

78
  @rtype: tuple
79
  @return: the current time in the (seconds, microseconds) format
80

81
  """
82
  return utils.SplitTime(time.time())
83

    
84

    
85
class _QueuedOpCode(object):
86
  """Encapsulates an opcode object.
87

88
  @ivar log: holds the execution log and consists of tuples
89
  of the form C{(log_serial, timestamp, level, message)}
90
  @ivar input: the OpCode we encapsulate
91
  @ivar status: the current status
92
  @ivar result: the result of the LU execution
93
  @ivar start_timestamp: timestamp for the start of the execution
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
               "__weakref__"]
101

    
102
  def __init__(self, op):
103
    """Constructor for the _QuededOpCode.
104

105
    @type op: L{opcodes.OpCode}
106
    @param op: the opcode we encapsulate
107

108
    """
109
    self.input = op
110
    self.status = constants.OP_STATUS_QUEUED
111
    self.result = None
112
    self.log = []
113
    self.start_timestamp = None
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

    
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

    
120
  @classmethod
121
  def Restore(cls, state):
122
    """Restore the _QueuedOpCode from the serialized form.
123

124
    @type state: dict
125
    @param state: the serialized state
126
    @rtype: _QueuedOpCode
127
    @return: a new _QueuedOpCode instance
128

129
    """
130
    obj = _QueuedOpCode.__new__(cls)
131
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132
    obj.status = state["status"]
133
    obj.result = state["result"]
134
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
    return obj
140

    
141
  def Serialize(self):
142
    """Serializes this _QueuedOpCode.
143

144
    @rtype: dict
145
    @return: the dictionary holding the serialized state
146

147
    """
148
    return {
149
      "input": self.input.__getstate__(),
150
      "status": self.status,
151
      "result": self.result,
152
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

    
159

    
160
class _QueuedJob(object):
161
  """In-memory job representation.
162

163
  This is what we use to track the user-submitted jobs. Locking must
164
  be taken care of by users of this class.
165

166
  @type queue: L{JobQueue}
167
  @ivar queue: the parent queue
168
  @ivar id: the job ID
169
  @type ops: list
170
  @ivar ops: the list of _QueuedOpCode that constitute the job
171
  @type log_serial: int
172
  @ivar log_serial: holds the index for the next log entry
173
  @ivar received_timestamp: the timestamp for when the job was received
174
  @ivar start_timestmap: the timestamp for start of execution
175
  @ivar end_timestamp: the timestamp for end of execution
176

177
  """
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180
               "received_timestamp", "start_timestamp", "end_timestamp",
181
               "__weakref__"]
182

    
183
  def __init__(self, queue, job_id, ops):
184
    """Constructor for the _QueuedJob.
185

186
    @type queue: L{JobQueue}
187
    @param queue: our parent queue
188
    @type job_id: job_id
189
    @param job_id: our job id
190
    @type ops: list
191
    @param ops: the list of opcodes we hold, which will be encapsulated
192
        in _QueuedOpCodes
193

194
    """
195
    if not ops:
196
      raise errors.GenericError("A job needs at least one opcode")
197

    
198
    self.queue = queue
199
    self.id = job_id
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
    self.received_timestamp = TimeStampNow()
203
    self.start_timestamp = None
204
    self.end_timestamp = None
205

    
206
    self._InitInMemory(self)
207

    
208
  @staticmethod
209
  def _InitInMemory(obj):
210
    """Initializes in-memory variables.
211

212
    """
213
    obj.ops_iter = None
214
    obj.cur_opctx = None
215

    
216
  def __repr__(self):
217
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218
              "id=%s" % self.id,
219
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220

    
221
    return "<%s at %#x>" % (" ".join(status), id(self))
222

    
223
  @classmethod
224
  def Restore(cls, queue, state):
225
    """Restore a _QueuedJob from serialized state:
226

227
    @type queue: L{JobQueue}
228
    @param queue: to which queue the restored job belongs
229
    @type state: dict
230
    @param state: the serialized state
231
    @rtype: _JobQueue
232
    @return: the restored _JobQueue instance
233

234
    """
235
    obj = _QueuedJob.__new__(cls)
236
    obj.queue = queue
237
    obj.id = state["id"]
238
    obj.received_timestamp = state.get("received_timestamp", None)
239
    obj.start_timestamp = state.get("start_timestamp", None)
240
    obj.end_timestamp = state.get("end_timestamp", None)
241

    
242
    obj.ops = []
243
    obj.log_serial = 0
244
    for op_state in state["ops"]:
245
      op = _QueuedOpCode.Restore(op_state)
246
      for log_entry in op.log:
247
        obj.log_serial = max(obj.log_serial, log_entry[0])
248
      obj.ops.append(op)
249

    
250
    cls._InitInMemory(obj)
251

    
252
    return obj
253

    
254
  def Serialize(self):
255
    """Serialize the _JobQueue instance.
256

257
    @rtype: dict
258
    @return: the serialized state
259

260
    """
261
    return {
262
      "id": self.id,
263
      "ops": [op.Serialize() for op in self.ops],
264
      "start_timestamp": self.start_timestamp,
265
      "end_timestamp": self.end_timestamp,
266
      "received_timestamp": self.received_timestamp,
267
      }
268

    
269
  def CalcStatus(self):
270
    """Compute the status of this job.
271

272
    This function iterates over all the _QueuedOpCodes in the job and
273
    based on their status, computes the job status.
274

275
    The algorithm is:
276
      - if we find a cancelled, or finished with error, the job
277
        status will be the same
278
      - otherwise, the last opcode with the status one of:
279
          - waitlock
280
          - canceling
281
          - running
282

283
        will determine the job status
284

285
      - otherwise, it means either all opcodes are queued, or success,
286
        and the job status will be the same
287

288
    @return: the job status
289

290
    """
291
    status = constants.JOB_STATUS_QUEUED
292

    
293
    all_success = True
294
    for op in self.ops:
295
      if op.status == constants.OP_STATUS_SUCCESS:
296
        continue
297

    
298
      all_success = False
299

    
300
      if op.status == constants.OP_STATUS_QUEUED:
301
        pass
302
      elif op.status == constants.OP_STATUS_WAITLOCK:
303
        status = constants.JOB_STATUS_WAITLOCK
304
      elif op.status == constants.OP_STATUS_RUNNING:
305
        status = constants.JOB_STATUS_RUNNING
306
      elif op.status == constants.OP_STATUS_CANCELING:
307
        status = constants.JOB_STATUS_CANCELING
308
        break
309
      elif op.status == constants.OP_STATUS_ERROR:
310
        status = constants.JOB_STATUS_ERROR
311
        # The whole job fails if one opcode failed
312
        break
313
      elif op.status == constants.OP_STATUS_CANCELED:
314
        status = constants.OP_STATUS_CANCELED
315
        break
316

    
317
    if all_success:
318
      status = constants.JOB_STATUS_SUCCESS
319

    
320
    return status
321

    
322
  def CalcPriority(self):
323
    """Gets the current priority for this job.
324

325
    Only unfinished opcodes are considered. When all are done, the default
326
    priority is used.
327

328
    @rtype: int
329

330
    """
331
    priorities = [op.priority for op in self.ops
332
                  if op.status not in constants.OPS_FINALIZED]
333

    
334
    if not priorities:
335
      # All opcodes are done, assume default priority
336
      return constants.OP_PRIO_DEFAULT
337

    
338
    return min(priorities)
339

    
340
  def GetLogEntries(self, newer_than):
341
    """Selectively returns the log entries.
342

343
    @type newer_than: None or int
344
    @param newer_than: if this is None, return all log entries,
345
        otherwise return only the log entries with serial higher
346
        than this value
347
    @rtype: list
348
    @return: the list of the log entries selected
349

350
    """
351
    if newer_than is None:
352
      serial = -1
353
    else:
354
      serial = newer_than
355

    
356
    entries = []
357
    for op in self.ops:
358
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359

    
360
    return entries
361

    
362
  def GetInfo(self, fields):
363
    """Returns information about a job.
364

365
    @type fields: list
366
    @param fields: names of fields to return
367
    @rtype: list
368
    @return: list with one element for each field
369
    @raise errors.OpExecError: when an invalid field
370
        has been passed
371

372
    """
373
    row = []
374
    for fname in fields:
375
      if fname == "id":
376
        row.append(self.id)
377
      elif fname == "status":
378
        row.append(self.CalcStatus())
379
      elif fname == "priority":
380
        row.append(self.CalcPriority())
381
      elif fname == "ops":
382
        row.append([op.input.__getstate__() for op in self.ops])
383
      elif fname == "opresult":
384
        row.append([op.result for op in self.ops])
385
      elif fname == "opstatus":
386
        row.append([op.status for op in self.ops])
387
      elif fname == "oplog":
388
        row.append([op.log for op in self.ops])
389
      elif fname == "opstart":
390
        row.append([op.start_timestamp for op in self.ops])
391
      elif fname == "opexec":
392
        row.append([op.exec_timestamp for op in self.ops])
393
      elif fname == "opend":
394
        row.append([op.end_timestamp for op in self.ops])
395
      elif fname == "oppriority":
396
        row.append([op.priority for op in self.ops])
397
      elif fname == "received_ts":
398
        row.append(self.received_timestamp)
399
      elif fname == "start_ts":
400
        row.append(self.start_timestamp)
401
      elif fname == "end_ts":
402
        row.append(self.end_timestamp)
403
      elif fname == "summary":
404
        row.append([op.input.Summary() for op in self.ops])
405
      else:
406
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
407
    return row
408

    
409
  def MarkUnfinishedOps(self, status, result):
410
    """Mark unfinished opcodes with a given status and result.
411

412
    This is an utility function for marking all running or waiting to
413
    be run opcodes with a given status. Opcodes which are already
414
    finalised are not changed.
415

416
    @param status: a given opcode status
417
    @param result: the opcode result
418

419
    """
420
    not_marked = True
421
    for op in self.ops:
422
      if op.status in constants.OPS_FINALIZED:
423
        assert not_marked, "Finalized opcodes found after non-finalized ones"
424
        continue
425
      op.status = status
426
      op.result = result
427
      not_marked = False
428

    
429
  def Cancel(self):
430
    """Marks job as canceled/-ing if possible.
431

432
    @rtype: tuple; (bool, string)
433
    @return: Boolean describing whether job was successfully canceled or marked
434
      as canceling and a text message
435

436
    """
437
    status = self.CalcStatus()
438

    
439
    if status == constants.JOB_STATUS_QUEUED:
440
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441
                             "Job canceled by request")
442
      return (True, "Job %s canceled" % self.id)
443

    
444
    elif status == constants.JOB_STATUS_WAITLOCK:
445
      # The worker will notice the new status and cancel the job
446
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447
      return (True, "Job %s will be canceled" % self.id)
448

    
449
    else:
450
      logging.debug("Job %s is no longer waiting in the queue", self.id)
451
      return (False, "Job %s is no longer waiting in the queue" % self.id)
452

    
453

    
454
class _OpExecCallbacks(mcpu.OpExecCbBase):
455
  def __init__(self, queue, job, op):
456
    """Initializes this class.
457

458
    @type queue: L{JobQueue}
459
    @param queue: Job queue
460
    @type job: L{_QueuedJob}
461
    @param job: Job object
462
    @type op: L{_QueuedOpCode}
463
    @param op: OpCode
464

465
    """
466
    assert queue, "Queue is missing"
467
    assert job, "Job is missing"
468
    assert op, "Opcode is missing"
469

    
470
    self._queue = queue
471
    self._job = job
472
    self._op = op
473

    
474
  def _CheckCancel(self):
475
    """Raises an exception to cancel the job if asked to.
476

477
    """
478
    # Cancel here if we were asked to
479
    if self._op.status == constants.OP_STATUS_CANCELING:
480
      logging.debug("Canceling opcode")
481
      raise CancelJob()
482

    
483
  @locking.ssynchronized(_QUEUE, shared=1)
484
  def NotifyStart(self):
485
    """Mark the opcode as running, not lock-waiting.
486

487
    This is called from the mcpu code as a notifier function, when the LU is
488
    finally about to start the Exec() method. Of course, to have end-user
489
    visible results, the opcode must be initially (before calling into
490
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
491

492
    """
493
    assert self._op in self._job.ops
494
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495
                               constants.OP_STATUS_CANCELING)
496

    
497
    # Cancel here if we were asked to
498
    self._CheckCancel()
499

    
500
    logging.debug("Opcode is now running")
501

    
502
    self._op.status = constants.OP_STATUS_RUNNING
503
    self._op.exec_timestamp = TimeStampNow()
504

    
505
    # And finally replicate the job status
506
    self._queue.UpdateJobUnlocked(self._job)
507

    
508
  @locking.ssynchronized(_QUEUE, shared=1)
509
  def _AppendFeedback(self, timestamp, log_type, log_msg):
510
    """Internal feedback append function, with locks
511

512
    """
513
    self._job.log_serial += 1
514
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
516

    
517
  def Feedback(self, *args):
518
    """Append a log entry.
519

520
    """
521
    assert len(args) < 3
522

    
523
    if len(args) == 1:
524
      log_type = constants.ELOG_MESSAGE
525
      log_msg = args[0]
526
    else:
527
      (log_type, log_msg) = args
528

    
529
    # The time is split to make serialization easier and not lose
530
    # precision.
531
    timestamp = utils.SplitTime(time.time())
532
    self._AppendFeedback(timestamp, log_type, log_msg)
533

    
534
  def CheckCancel(self):
535
    """Check whether job has been cancelled.
536

537
    """
538
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
539
                               constants.OP_STATUS_CANCELING)
540

    
541
    # Cancel here if we were asked to
542
    self._CheckCancel()
543

    
544

    
545
class _JobChangesChecker(object):
546
  def __init__(self, fields, prev_job_info, prev_log_serial):
547
    """Initializes this class.
548

549
    @type fields: list of strings
550
    @param fields: Fields requested by LUXI client
551
    @type prev_job_info: string
552
    @param prev_job_info: previous job info, as passed by the LUXI client
553
    @type prev_log_serial: string
554
    @param prev_log_serial: previous job serial, as passed by the LUXI client
555

556
    """
557
    self._fields = fields
558
    self._prev_job_info = prev_job_info
559
    self._prev_log_serial = prev_log_serial
560

    
561
  def __call__(self, job):
562
    """Checks whether job has changed.
563

564
    @type job: L{_QueuedJob}
565
    @param job: Job object
566

567
    """
568
    status = job.CalcStatus()
569
    job_info = job.GetInfo(self._fields)
570
    log_entries = job.GetLogEntries(self._prev_log_serial)
571

    
572
    # Serializing and deserializing data can cause type changes (e.g. from
573
    # tuple to list) or precision loss. We're doing it here so that we get
574
    # the same modifications as the data received from the client. Without
575
    # this, the comparison afterwards might fail without the data being
576
    # significantly different.
577
    # TODO: we just deserialized from disk, investigate how to make sure that
578
    # the job info and log entries are compatible to avoid this further step.
579
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
580
    # efficient, though floats will be tricky
581
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
582
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
583

    
584
    # Don't even try to wait if the job is no longer running, there will be
585
    # no changes.
586
    if (status not in (constants.JOB_STATUS_QUEUED,
587
                       constants.JOB_STATUS_RUNNING,
588
                       constants.JOB_STATUS_WAITLOCK) or
589
        job_info != self._prev_job_info or
590
        (log_entries and self._prev_log_serial != log_entries[0][0])):
591
      logging.debug("Job %s changed", job.id)
592
      return (job_info, log_entries)
593

    
594
    return None
595

    
596

    
597
class _JobFileChangesWaiter(object):
598
  def __init__(self, filename):
599
    """Initializes this class.
600

601
    @type filename: string
602
    @param filename: Path to job file
603
    @raises errors.InotifyError: if the notifier cannot be setup
604

605
    """
606
    self._wm = pyinotify.WatchManager()
607
    self._inotify_handler = \
608
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
609
    self._notifier = \
610
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
611
    try:
612
      self._inotify_handler.enable()
613
    except Exception:
614
      # pyinotify doesn't close file descriptors automatically
615
      self._notifier.stop()
616
      raise
617

    
618
  def _OnInotify(self, notifier_enabled):
619
    """Callback for inotify.
620

621
    """
622
    if not notifier_enabled:
623
      self._inotify_handler.enable()
624

    
625
  def Wait(self, timeout):
626
    """Waits for the job file to change.
627

628
    @type timeout: float
629
    @param timeout: Timeout in seconds
630
    @return: Whether there have been events
631

632
    """
633
    assert timeout >= 0
634
    have_events = self._notifier.check_events(timeout * 1000)
635
    if have_events:
636
      self._notifier.read_events()
637
    self._notifier.process_events()
638
    return have_events
639

    
640
  def Close(self):
641
    """Closes underlying notifier and its file descriptor.
642

643
    """
644
    self._notifier.stop()
645

    
646

    
647
class _JobChangesWaiter(object):
648
  def __init__(self, filename):
649
    """Initializes this class.
650

651
    @type filename: string
652
    @param filename: Path to job file
653

654
    """
655
    self._filewaiter = None
656
    self._filename = filename
657

    
658
  def Wait(self, timeout):
659
    """Waits for a job to change.
660

661
    @type timeout: float
662
    @param timeout: Timeout in seconds
663
    @return: Whether there have been events
664

665
    """
666
    if self._filewaiter:
667
      return self._filewaiter.Wait(timeout)
668

    
669
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
670
    # If this point is reached, return immediately and let caller check the job
671
    # file again in case there were changes since the last check. This avoids a
672
    # race condition.
673
    self._filewaiter = _JobFileChangesWaiter(self._filename)
674

    
675
    return True
676

    
677
  def Close(self):
678
    """Closes underlying waiter.
679

680
    """
681
    if self._filewaiter:
682
      self._filewaiter.Close()
683

    
684

    
685
class _WaitForJobChangesHelper(object):
686
  """Helper class using inotify to wait for changes in a job file.
687

688
  This class takes a previous job status and serial, and alerts the client when
689
  the current job status has changed.
690

691
  """
692
  @staticmethod
693
  def _CheckForChanges(job_load_fn, check_fn):
694
    job = job_load_fn()
695
    if not job:
696
      raise errors.JobLost()
697

    
698
    result = check_fn(job)
699
    if result is None:
700
      raise utils.RetryAgain()
701

    
702
    return result
703

    
704
  def __call__(self, filename, job_load_fn,
705
               fields, prev_job_info, prev_log_serial, timeout):
706
    """Waits for changes on a job.
707

708
    @type filename: string
709
    @param filename: File on which to wait for changes
710
    @type job_load_fn: callable
711
    @param job_load_fn: Function to load job
712
    @type fields: list of strings
713
    @param fields: Which fields to check for changes
714
    @type prev_job_info: list or None
715
    @param prev_job_info: Last job information returned
716
    @type prev_log_serial: int
717
    @param prev_log_serial: Last job message serial number
718
    @type timeout: float
719
    @param timeout: maximum time to wait in seconds
720

721
    """
722
    try:
723
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724
      waiter = _JobChangesWaiter(filename)
725
      try:
726
        return utils.Retry(compat.partial(self._CheckForChanges,
727
                                          job_load_fn, check_fn),
728
                           utils.RETRY_REMAINING_TIME, timeout,
729
                           wait_fn=waiter.Wait)
730
      finally:
731
        waiter.Close()
732
    except (errors.InotifyError, errors.JobLost):
733
      return None
734
    except utils.RetryTimeout:
735
      return constants.JOB_NOTCHANGED
736

    
737

    
738
def _EncodeOpError(err):
739
  """Encodes an error which occurred while processing an opcode.
740

741
  """
742
  if isinstance(err, errors.GenericError):
743
    to_encode = err
744
  else:
745
    to_encode = errors.OpExecError(str(err))
746

    
747
  return errors.EncodeException(to_encode)
748

    
749

    
750
class _TimeoutStrategyWrapper:
751
  def __init__(self, fn):
752
    """Initializes this class.
753

754
    """
755
    self._fn = fn
756
    self._next = None
757

    
758
  def _Advance(self):
759
    """Gets the next timeout if necessary.
760

761
    """
762
    if self._next is None:
763
      self._next = self._fn()
764

    
765
  def Peek(self):
766
    """Returns the next timeout.
767

768
    """
769
    self._Advance()
770
    return self._next
771

    
772
  def Next(self):
773
    """Returns the current timeout and advances the internal state.
774

775
    """
776
    self._Advance()
777
    result = self._next
778
    self._next = None
779
    return result
780

    
781

    
782
class _OpExecContext:
783
  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784
    """Initializes this class.
785

786
    """
787
    self.op = op
788
    self.index = index
789
    self.log_prefix = log_prefix
790
    self.summary = op.input.Summary()
791

    
792
    self._timeout_strategy_factory = timeout_strategy_factory
793
    self._ResetTimeoutStrategy()
794

    
795
  def _ResetTimeoutStrategy(self):
796
    """Creates a new timeout strategy.
797

798
    """
799
    self._timeout_strategy = \
800
      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801

    
802
  def CheckPriorityIncrease(self):
803
    """Checks whether priority can and should be increased.
804

805
    Called when locks couldn't be acquired.
806

807
    """
808
    op = self.op
809

    
810
    # Exhausted all retries and next round should not use blocking acquire
811
    # for locks?
812
    if (self._timeout_strategy.Peek() is None and
813
        op.priority > constants.OP_PRIO_HIGHEST):
814
      logging.debug("Increasing priority")
815
      op.priority -= 1
816
      self._ResetTimeoutStrategy()
817
      return True
818

    
819
    return False
820

    
821
  def GetNextLockTimeout(self):
822
    """Returns the next lock acquire timeout.
823

824
    """
825
    return self._timeout_strategy.Next()
826

    
827

    
828
class _JobProcessor(object):
829
  def __init__(self, queue, opexec_fn, job,
830
               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831
    """Initializes this class.
832

833
    """
834
    self.queue = queue
835
    self.opexec_fn = opexec_fn
836
    self.job = job
837
    self._timeout_strategy_factory = _timeout_strategy_factory
838

    
839
  @staticmethod
840
  def _FindNextOpcode(job, timeout_strategy_factory):
841
    """Locates the next opcode to run.
842

843
    @type job: L{_QueuedJob}
844
    @param job: Job object
845
    @param timeout_strategy_factory: Callable to create new timeout strategy
846

847
    """
848
    # Create some sort of a cache to speed up locating next opcode for future
849
    # lookups
850
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
851
    # pending and one for processed ops.
852
    if job.ops_iter is None:
853
      job.ops_iter = enumerate(job.ops)
854

    
855
    # Find next opcode to run
856
    while True:
857
      try:
858
        (idx, op) = job.ops_iter.next()
859
      except StopIteration:
860
        raise errors.ProgrammerError("Called for a finished job")
861

    
862
      if op.status == constants.OP_STATUS_RUNNING:
863
        # Found an opcode already marked as running
864
        raise errors.ProgrammerError("Called for job marked as running")
865

    
866
      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867
                             timeout_strategy_factory)
868

    
869
      if op.status == constants.OP_STATUS_CANCELED:
870
        # Cancelled jobs are handled by the caller
871
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872
                              for i in job.ops[idx:])
873

    
874
      elif op.status in constants.OPS_FINALIZED:
875
        # This is a job that was partially completed before master daemon
876
        # shutdown, so it can be expected that some opcodes are already
877
        # completed successfully (if any did error out, then the whole job
878
        # should have been aborted and not resubmitted for processing).
879
        logging.info("%s: opcode %s already processed, skipping",
880
                     opctx.log_prefix, opctx.summary)
881
        continue
882

    
883
      return opctx
884

    
885
  @staticmethod
886
  def _MarkWaitlock(job, op):
887
    """Marks an opcode as waiting for locks.
888

889
    The job's start timestamp is also set if necessary.
890

891
    @type job: L{_QueuedJob}
892
    @param job: Job object
893
    @type op: L{_QueuedOpCode}
894
    @param op: Opcode object
895

896
    """
897
    assert op in job.ops
898

    
899
    op.status = constants.OP_STATUS_WAITLOCK
900
    op.result = None
901
    op.start_timestamp = TimeStampNow()
902

    
903
    if job.start_timestamp is None:
904
      job.start_timestamp = op.start_timestamp
905

    
906
  def _ExecOpCodeUnlocked(self, opctx):
907
    """Processes one opcode and returns the result.
908

909
    """
910
    op = opctx.op
911

    
912
    assert op.status == constants.OP_STATUS_WAITLOCK
913

    
914
    timeout = opctx.GetNextLockTimeout()
915

    
916
    try:
917
      # Make sure not to hold queue lock while calling ExecOpCode
918
      result = self.opexec_fn(op.input,
919
                              _OpExecCallbacks(self.queue, self.job, op),
920
                              timeout=timeout, priority=op.priority)
921
    except mcpu.LockAcquireTimeout:
922
      assert timeout is not None, "Received timeout for blocking acquire"
923
      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
924

    
925
      assert op.status in (constants.OP_STATUS_WAITLOCK,
926
                           constants.OP_STATUS_CANCELING)
927

    
928
      # Was job cancelled while we were waiting for the lock?
929
      if op.status == constants.OP_STATUS_CANCELING:
930
        return (constants.OP_STATUS_CANCELING, None)
931

    
932
      return (constants.OP_STATUS_QUEUED, None)
933
    except CancelJob:
934
      logging.exception("%s: Canceling job", opctx.log_prefix)
935
      assert op.status == constants.OP_STATUS_CANCELING
936
      return (constants.OP_STATUS_CANCELING, None)
937
    except Exception, err: # pylint: disable-msg=W0703
938
      logging.exception("%s: Caught exception in %s",
939
                        opctx.log_prefix, opctx.summary)
940
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
941
    else:
942
      logging.debug("%s: %s successful",
943
                    opctx.log_prefix, opctx.summary)
944
      return (constants.OP_STATUS_SUCCESS, result)
945

    
946
  def __call__(self, _nextop_fn=None):
947
    """Continues execution of a job.
948

949
    @param _nextop_fn: Callback function for tests
950
    @rtype: bool
951
    @return: True if job is finished, False if processor needs to be called
952
             again
953

954
    """
955
    queue = self.queue
956
    job = self.job
957

    
958
    logging.debug("Processing job %s", job.id)
959

    
960
    queue.acquire(shared=1)
961
    try:
962
      opcount = len(job.ops)
963

    
964
      # Is a previous opcode still pending?
965
      if job.cur_opctx:
966
        opctx = job.cur_opctx
967
      else:
968
        if __debug__ and _nextop_fn:
969
          _nextop_fn()
970
        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
971

    
972
      op = opctx.op
973

    
974
      # Consistency check
975
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
976
                                     constants.OP_STATUS_CANCELED)
977
                        for i in job.ops[opctx.index:])
978

    
979
      assert op.status in (constants.OP_STATUS_QUEUED,
980
                           constants.OP_STATUS_WAITLOCK,
981
                           constants.OP_STATUS_CANCELED)
982

    
983
      assert (op.priority <= constants.OP_PRIO_LOWEST and
984
              op.priority >= constants.OP_PRIO_HIGHEST)
985

    
986
      if op.status != constants.OP_STATUS_CANCELED:
987
        # Prepare to start opcode
988
        self._MarkWaitlock(job, op)
989

    
990
        assert op.status == constants.OP_STATUS_WAITLOCK
991
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
992

    
993
        # Write to disk
994
        queue.UpdateJobUnlocked(job)
995

    
996
        logging.info("%s: opcode %s waiting for locks",
997
                     opctx.log_prefix, opctx.summary)
998

    
999
        queue.release()
1000
        try:
1001
          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1002
        finally:
1003
          queue.acquire(shared=1)
1004

    
1005
        op.status = op_status
1006
        op.result = op_result
1007

    
1008
        if op.status == constants.OP_STATUS_QUEUED:
1009
          # Couldn't get locks in time
1010
          assert not op.end_timestamp
1011
        else:
1012
          # Finalize opcode
1013
          op.end_timestamp = TimeStampNow()
1014

    
1015
          if op.status == constants.OP_STATUS_CANCELING:
1016
            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1017
                                  for i in job.ops[opctx.index:])
1018
          else:
1019
            assert op.status in constants.OPS_FINALIZED
1020

    
1021
      if op.status == constants.OP_STATUS_QUEUED:
1022
        finalize = False
1023

    
1024
        opctx.CheckPriorityIncrease()
1025

    
1026
        # Keep around for another round
1027
        job.cur_opctx = opctx
1028

    
1029
        assert (op.priority <= constants.OP_PRIO_LOWEST and
1030
                op.priority >= constants.OP_PRIO_HIGHEST)
1031

    
1032
        # In no case must the status be finalized here
1033
        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1034

    
1035
        queue.UpdateJobUnlocked(job)
1036

    
1037
      else:
1038
        # Ensure all opcodes so far have been successful
1039
        assert (opctx.index == 0 or
1040
                compat.all(i.status == constants.OP_STATUS_SUCCESS
1041
                           for i in job.ops[:opctx.index]))
1042

    
1043
        # Reset context
1044
        job.cur_opctx = None
1045

    
1046
        if op.status == constants.OP_STATUS_SUCCESS:
1047
          finalize = False
1048

    
1049
        elif op.status == constants.OP_STATUS_ERROR:
1050
          # Ensure failed opcode has an exception as its result
1051
          assert errors.GetEncodedError(job.ops[opctx.index].result)
1052

    
1053
          to_encode = errors.OpExecError("Preceding opcode failed")
1054
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1055
                                _EncodeOpError(to_encode))
1056
          finalize = True
1057

    
1058
          # Consistency check
1059
          assert compat.all(i.status == constants.OP_STATUS_ERROR and
1060
                            errors.GetEncodedError(i.result)
1061
                            for i in job.ops[opctx.index:])
1062

    
1063
        elif op.status == constants.OP_STATUS_CANCELING:
1064
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1065
                                "Job canceled by request")
1066
          finalize = True
1067

    
1068
        elif op.status == constants.OP_STATUS_CANCELED:
1069
          finalize = True
1070

    
1071
        else:
1072
          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1073

    
1074
        # Finalizing or last opcode?
1075
        if finalize or opctx.index == (opcount - 1):
1076
          # All opcodes have been run, finalize job
1077
          job.end_timestamp = TimeStampNow()
1078

    
1079
        # Write to disk. If the job status is final, this is the final write
1080
        # allowed. Once the file has been written, it can be archived anytime.
1081
        queue.UpdateJobUnlocked(job)
1082

    
1083
        if finalize or opctx.index == (opcount - 1):
1084
          logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1085
          return True
1086

    
1087
      return False
1088
    finally:
1089
      queue.release()
1090

    
1091

    
1092
class _JobQueueWorker(workerpool.BaseWorker):
1093
  """The actual job workers.
1094

1095
  """
1096
  def RunTask(self, job): # pylint: disable-msg=W0221
1097
    """Job executor.
1098

1099
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
1100
    L{_QueuedOpCode} classes.
1101

1102
    @type job: L{_QueuedJob}
1103
    @param job: the job to be processed
1104

1105
    """
1106
    queue = job.queue
1107
    assert queue == self.pool.queue
1108

    
1109
    self.SetTaskName("Job%s" % job.id)
1110

    
1111
    proc = mcpu.Processor(queue.context, job.id)
1112

    
1113
    if not _JobProcessor(queue, proc.ExecOpCode, job)():
1114
      # Schedule again
1115
      raise workerpool.DeferTask(priority=job.CalcPriority())
1116

    
1117

    
1118
class _JobQueueWorkerPool(workerpool.WorkerPool):
1119
  """Simple class implementing a job-processing workerpool.
1120

1121
  """
1122
  def __init__(self, queue):
1123
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
1124
                                              JOBQUEUE_THREADS,
1125
                                              _JobQueueWorker)
1126
    self.queue = queue
1127

    
1128

    
1129
def _RequireOpenQueue(fn):
1130
  """Decorator for "public" functions.
1131

1132
  This function should be used for all 'public' functions. That is,
1133
  functions usually called from other classes. Note that this should
1134
  be applied only to methods (not plain functions), since it expects
1135
  that the decorated function is called with a first argument that has
1136
  a '_queue_filelock' argument.
1137

1138
  @warning: Use this decorator only after locking.ssynchronized
1139

1140
  Example::
1141
    @locking.ssynchronized(_LOCK)
1142
    @_RequireOpenQueue
1143
    def Example(self):
1144
      pass
1145

1146
  """
1147
  def wrapper(self, *args, **kwargs):
1148
    # pylint: disable-msg=W0212
1149
    assert self._queue_filelock is not None, "Queue should be open"
1150
    return fn(self, *args, **kwargs)
1151
  return wrapper
1152

    
1153

    
1154
class JobQueue(object):
1155
  """Queue used to manage the jobs.
1156

1157
  @cvar _RE_JOB_FILE: regex matching the valid job file names
1158

1159
  """
1160
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1161

    
1162
  def __init__(self, context):
1163
    """Constructor for JobQueue.
1164

1165
    The constructor will initialize the job queue object and then
1166
    start loading the current jobs from disk, either for starting them
1167
    (if they were queue) or for aborting them (if they were already
1168
    running).
1169

1170
    @type context: GanetiContext
1171
    @param context: the context object for access to the configuration
1172
        data and other ganeti objects
1173

1174
    """
1175
    self.context = context
1176
    self._memcache = weakref.WeakValueDictionary()
1177
    self._my_hostname = netutils.Hostname.GetSysName()
1178

    
1179
    # The Big JobQueue lock. If a code block or method acquires it in shared
1180
    # mode safe it must guarantee concurrency with all the code acquiring it in
1181
    # shared mode, including itself. In order not to acquire it at all
1182
    # concurrency must be guaranteed with all code acquiring it in shared mode
1183
    # and all code acquiring it exclusively.
1184
    self._lock = locking.SharedLock("JobQueue")
1185

    
1186
    self.acquire = self._lock.acquire
1187
    self.release = self._lock.release
1188

    
1189
    # Initialize the queue, and acquire the filelock.
1190
    # This ensures no other process is working on the job queue.
1191
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1192

    
1193
    # Read serial file
1194
    self._last_serial = jstore.ReadSerial()
1195
    assert self._last_serial is not None, ("Serial file was modified between"
1196
                                           " check in jstore and here")
1197

    
1198
    # Get initial list of nodes
1199
    self._nodes = dict((n.name, n.primary_ip)
1200
                       for n in self.context.cfg.GetAllNodesInfo().values()
1201
                       if n.master_candidate)
1202

    
1203
    # Remove master node
1204
    self._nodes.pop(self._my_hostname, None)
1205

    
1206
    # TODO: Check consistency across nodes
1207

    
1208
    self._queue_size = 0
1209
    self._UpdateQueueSizeUnlocked()
1210
    self._drained = self._IsQueueMarkedDrain()
1211

    
1212
    # Setup worker pool
1213
    self._wpool = _JobQueueWorkerPool(self)
1214
    try:
1215
      self._InspectQueue()
1216
    except:
1217
      self._wpool.TerminateWorkers()
1218
      raise
1219

    
1220
  @locking.ssynchronized(_LOCK)
1221
  @_RequireOpenQueue
1222
  def _InspectQueue(self):
1223
    """Loads the whole job queue and resumes unfinished jobs.
1224

1225
    This function needs the lock here because WorkerPool.AddTask() may start a
1226
    job while we're still doing our work.
1227

1228
    """
1229
    logging.info("Inspecting job queue")
1230

    
1231
    restartjobs = []
1232

    
1233
    all_job_ids = self._GetJobIDsUnlocked()
1234
    jobs_count = len(all_job_ids)
1235
    lastinfo = time.time()
1236
    for idx, job_id in enumerate(all_job_ids):
1237
      # Give an update every 1000 jobs or 10 seconds
1238
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1239
          idx == (jobs_count - 1)):
1240
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1241
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1242
        lastinfo = time.time()
1243

    
1244
      job = self._LoadJobUnlocked(job_id)
1245

    
1246
      # a failure in loading the job can cause 'None' to be returned
1247
      if job is None:
1248
        continue
1249

    
1250
      status = job.CalcStatus()
1251

    
1252
      if status == constants.JOB_STATUS_QUEUED:
1253
        restartjobs.append(job)
1254

    
1255
      elif status in (constants.JOB_STATUS_RUNNING,
1256
                      constants.JOB_STATUS_WAITLOCK,
1257
                      constants.JOB_STATUS_CANCELING):
1258
        logging.warning("Unfinished job %s found: %s", job.id, job)
1259

    
1260
        if status == constants.JOB_STATUS_WAITLOCK:
1261
          # Restart job
1262
          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1263
          restartjobs.append(job)
1264
        else:
1265
          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1266
                                "Unclean master daemon shutdown")
1267

    
1268
        self.UpdateJobUnlocked(job)
1269

    
1270
    if restartjobs:
1271
      logging.info("Restarting %s jobs", len(restartjobs))
1272
      self._EnqueueJobs(restartjobs)
1273

    
1274
    logging.info("Job queue inspection finished")
1275

    
1276
  @locking.ssynchronized(_LOCK)
1277
  @_RequireOpenQueue
1278
  def AddNode(self, node):
1279
    """Register a new node with the queue.
1280

1281
    @type node: L{objects.Node}
1282
    @param node: the node object to be added
1283

1284
    """
1285
    node_name = node.name
1286
    assert node_name != self._my_hostname
1287

    
1288
    # Clean queue directory on added node
1289
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1290
    msg = result.fail_msg
1291
    if msg:
1292
      logging.warning("Cannot cleanup queue directory on node %s: %s",
1293
                      node_name, msg)
1294

    
1295
    if not node.master_candidate:
1296
      # remove if existing, ignoring errors
1297
      self._nodes.pop(node_name, None)
1298
      # and skip the replication of the job ids
1299
      return
1300

    
1301
    # Upload the whole queue excluding archived jobs
1302
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1303

    
1304
    # Upload current serial file
1305
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1306

    
1307
    for file_name in files:
1308
      # Read file content
1309
      content = utils.ReadFile(file_name)
1310

    
1311
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1312
                                                  [node.primary_ip],
1313
                                                  file_name, content)
1314
      msg = result[node_name].fail_msg
1315
      if msg:
1316
        logging.error("Failed to upload file %s to node %s: %s",
1317
                      file_name, node_name, msg)
1318

    
1319
    self._nodes[node_name] = node.primary_ip
1320

    
1321
  @locking.ssynchronized(_LOCK)
1322
  @_RequireOpenQueue
1323
  def RemoveNode(self, node_name):
1324
    """Callback called when removing nodes from the cluster.
1325

1326
    @type node_name: str
1327
    @param node_name: the name of the node to remove
1328

1329
    """
1330
    self._nodes.pop(node_name, None)
1331

    
1332
  @staticmethod
1333
  def _CheckRpcResult(result, nodes, failmsg):
1334
    """Verifies the status of an RPC call.
1335

1336
    Since we aim to keep consistency should this node (the current
1337
    master) fail, we will log errors if our rpc fail, and especially
1338
    log the case when more than half of the nodes fails.
1339

1340
    @param result: the data as returned from the rpc call
1341
    @type nodes: list
1342
    @param nodes: the list of nodes we made the call to
1343
    @type failmsg: str
1344
    @param failmsg: the identifier to be used for logging
1345

1346
    """
1347
    failed = []
1348
    success = []
1349

    
1350
    for node in nodes:
1351
      msg = result[node].fail_msg
1352
      if msg:
1353
        failed.append(node)
1354
        logging.error("RPC call %s (%s) failed on node %s: %s",
1355
                      result[node].call, failmsg, node, msg)
1356
      else:
1357
        success.append(node)
1358

    
1359
    # +1 for the master node
1360
    if (len(success) + 1) < len(failed):
1361
      # TODO: Handle failing nodes
1362
      logging.error("More than half of the nodes failed")
1363

    
1364
  def _GetNodeIp(self):
1365
    """Helper for returning the node name/ip list.
1366

1367
    @rtype: (list, list)
1368
    @return: a tuple of two lists, the first one with the node
1369
        names and the second one with the node addresses
1370

1371
    """
1372
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1373
    name_list = self._nodes.keys()
1374
    addr_list = [self._nodes[name] for name in name_list]
1375
    return name_list, addr_list
1376

    
1377
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1378
    """Writes a file locally and then replicates it to all nodes.
1379

1380
    This function will replace the contents of a file on the local
1381
    node and then replicate it to all the other nodes we have.
1382

1383
    @type file_name: str
1384
    @param file_name: the path of the file to be replicated
1385
    @type data: str
1386
    @param data: the new contents of the file
1387
    @type replicate: boolean
1388
    @param replicate: whether to spread the changes to the remote nodes
1389

1390
    """
1391
    getents = runtime.GetEnts()
1392
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1393
                    gid=getents.masterd_gid)
1394

    
1395
    if replicate:
1396
      names, addrs = self._GetNodeIp()
1397
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1398
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1399

    
1400
  def _RenameFilesUnlocked(self, rename):
1401
    """Renames a file locally and then replicate the change.
1402

1403
    This function will rename a file in the local queue directory
1404
    and then replicate this rename to all the other nodes we have.
1405

1406
    @type rename: list of (old, new)
1407
    @param rename: List containing tuples mapping old to new names
1408

1409
    """
1410
    # Rename them locally
1411
    for old, new in rename:
1412
      utils.RenameFile(old, new, mkdir=True)
1413

    
1414
    # ... and on all nodes
1415
    names, addrs = self._GetNodeIp()
1416
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1417
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1418

    
1419
  @staticmethod
1420
  def _FormatJobID(job_id):
1421
    """Convert a job ID to string format.
1422

1423
    Currently this just does C{str(job_id)} after performing some
1424
    checks, but if we want to change the job id format this will
1425
    abstract this change.
1426

1427
    @type job_id: int or long
1428
    @param job_id: the numeric job id
1429
    @rtype: str
1430
    @return: the formatted job id
1431

1432
    """
1433
    if not isinstance(job_id, (int, long)):
1434
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1435
    if job_id < 0:
1436
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1437

    
1438
    return str(job_id)
1439

    
1440
  @classmethod
1441
  def _GetArchiveDirectory(cls, job_id):
1442
    """Returns the archive directory for a job.
1443

1444
    @type job_id: str
1445
    @param job_id: Job identifier
1446
    @rtype: str
1447
    @return: Directory name
1448

1449
    """
1450
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1451

    
1452
  def _NewSerialsUnlocked(self, count):
1453
    """Generates a new job identifier.
1454

1455
    Job identifiers are unique during the lifetime of a cluster.
1456

1457
    @type count: integer
1458
    @param count: how many serials to return
1459
    @rtype: str
1460
    @return: a string representing the job identifier.
1461

1462
    """
1463
    assert count > 0
1464
    # New number
1465
    serial = self._last_serial + count
1466

    
1467
    # Write to file
1468
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1469
                             "%s\n" % serial, True)
1470

    
1471
    result = [self._FormatJobID(v)
1472
              for v in range(self._last_serial, serial + 1)]
1473
    # Keep it only if we were able to write the file
1474
    self._last_serial = serial
1475

    
1476
    return result
1477

    
1478
  @staticmethod
1479
  def _GetJobPath(job_id):
1480
    """Returns the job file for a given job id.
1481

1482
    @type job_id: str
1483
    @param job_id: the job identifier
1484
    @rtype: str
1485
    @return: the path to the job file
1486

1487
    """
1488
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1489

    
1490
  @classmethod
1491
  def _GetArchivedJobPath(cls, job_id):
1492
    """Returns the archived job file for a give job id.
1493

1494
    @type job_id: str
1495
    @param job_id: the job identifier
1496
    @rtype: str
1497
    @return: the path to the archived job file
1498

1499
    """
1500
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1501
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1502

    
1503
  def _GetJobIDsUnlocked(self, sort=True):
1504
    """Return all known job IDs.
1505

1506
    The method only looks at disk because it's a requirement that all
1507
    jobs are present on disk (so in the _memcache we don't have any
1508
    extra IDs).
1509

1510
    @type sort: boolean
1511
    @param sort: perform sorting on the returned job ids
1512
    @rtype: list
1513
    @return: the list of job IDs
1514

1515
    """
1516
    jlist = []
1517
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1518
      m = self._RE_JOB_FILE.match(filename)
1519
      if m:
1520
        jlist.append(m.group(1))
1521
    if sort:
1522
      jlist = utils.NiceSort(jlist)
1523
    return jlist
1524

    
1525
  def _LoadJobUnlocked(self, job_id):
1526
    """Loads a job from the disk or memory.
1527

1528
    Given a job id, this will return the cached job object if
1529
    existing, or try to load the job from the disk. If loading from
1530
    disk, it will also add the job to the cache.
1531

1532
    @param job_id: the job id
1533
    @rtype: L{_QueuedJob} or None
1534
    @return: either None or the job object
1535

1536
    """
1537
    job = self._memcache.get(job_id, None)
1538
    if job:
1539
      logging.debug("Found job %s in memcache", job_id)
1540
      return job
1541

    
1542
    try:
1543
      job = self._LoadJobFromDisk(job_id)
1544
      if job is None:
1545
        return job
1546
    except errors.JobFileCorrupted:
1547
      old_path = self._GetJobPath(job_id)
1548
      new_path = self._GetArchivedJobPath(job_id)
1549
      if old_path == new_path:
1550
        # job already archived (future case)
1551
        logging.exception("Can't parse job %s", job_id)
1552
      else:
1553
        # non-archived case
1554
        logging.exception("Can't parse job %s, will archive.", job_id)
1555
        self._RenameFilesUnlocked([(old_path, new_path)])
1556
      return None
1557

    
1558
    self._memcache[job_id] = job
1559
    logging.debug("Added job %s to the cache", job_id)
1560
    return job
1561

    
1562
  def _LoadJobFromDisk(self, job_id):
1563
    """Load the given job file from disk.
1564

1565
    Given a job file, read, load and restore it in a _QueuedJob format.
1566

1567
    @type job_id: string
1568
    @param job_id: job identifier
1569
    @rtype: L{_QueuedJob} or None
1570
    @return: either None or the job object
1571

1572
    """
1573
    filepath = self._GetJobPath(job_id)
1574
    logging.debug("Loading job from %s", filepath)
1575
    try:
1576
      raw_data = utils.ReadFile(filepath)
1577
    except EnvironmentError, err:
1578
      if err.errno in (errno.ENOENT, ):
1579
        return None
1580
      raise
1581

    
1582
    try:
1583
      data = serializer.LoadJson(raw_data)
1584
      job = _QueuedJob.Restore(self, data)
1585
    except Exception, err: # pylint: disable-msg=W0703
1586
      raise errors.JobFileCorrupted(err)
1587

    
1588
    return job
1589

    
1590
  def SafeLoadJobFromDisk(self, job_id):
1591
    """Load the given job file from disk.
1592

1593
    Given a job file, read, load and restore it in a _QueuedJob format.
1594
    In case of error reading the job, it gets returned as None, and the
1595
    exception is logged.
1596

1597
    @type job_id: string
1598
    @param job_id: job identifier
1599
    @rtype: L{_QueuedJob} or None
1600
    @return: either None or the job object
1601

1602
    """
1603
    try:
1604
      return self._LoadJobFromDisk(job_id)
1605
    except (errors.JobFileCorrupted, EnvironmentError):
1606
      logging.exception("Can't load/parse job %s", job_id)
1607
      return None
1608

    
1609
  @staticmethod
1610
  def _IsQueueMarkedDrain():
1611
    """Check if the queue is marked from drain.
1612

1613
    This currently uses the queue drain file, which makes it a
1614
    per-node flag. In the future this can be moved to the config file.
1615

1616
    @rtype: boolean
1617
    @return: True of the job queue is marked for draining
1618

1619
    """
1620
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1621

    
1622
  def _UpdateQueueSizeUnlocked(self):
1623
    """Update the queue size.
1624

1625
    """
1626
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1627

    
1628
  @locking.ssynchronized(_LOCK)
1629
  @_RequireOpenQueue
1630
  def SetDrainFlag(self, drain_flag):
1631
    """Sets the drain flag for the queue.
1632

1633
    @type drain_flag: boolean
1634
    @param drain_flag: Whether to set or unset the drain flag
1635

1636
    """
1637
    getents = runtime.GetEnts()
1638

    
1639
    if drain_flag:
1640
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1641
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1642
    else:
1643
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1644

    
1645
    self._drained = drain_flag
1646

    
1647
    return True
1648

    
1649
  @_RequireOpenQueue
1650
  def _SubmitJobUnlocked(self, job_id, ops):
1651
    """Create and store a new job.
1652

1653
    This enters the job into our job queue and also puts it on the new
1654
    queue, in order for it to be picked up by the queue processors.
1655

1656
    @type job_id: job ID
1657
    @param job_id: the job ID for the new job
1658
    @type ops: list
1659
    @param ops: The list of OpCodes that will become the new job.
1660
    @rtype: L{_QueuedJob}
1661
    @return: the job object to be queued
1662
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1663
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1664
    @raise errors.GenericError: If an opcode is not valid
1665

1666
    """
1667
    # Ok when sharing the big job queue lock, as the drain file is created when
1668
    # the lock is exclusive.
1669
    if self._drained:
1670
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1671

    
1672
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1673
      raise errors.JobQueueFull()
1674

    
1675
    job = _QueuedJob(self, job_id, ops)
1676

    
1677
    # Check priority
1678
    for idx, op in enumerate(job.ops):
1679
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1680
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1681
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1682
                                  " are %s" % (idx, op.priority, allowed))
1683

    
1684
    # Write to disk
1685
    self.UpdateJobUnlocked(job)
1686

    
1687
    self._queue_size += 1
1688

    
1689
    logging.debug("Adding new job %s to the cache", job_id)
1690
    self._memcache[job_id] = job
1691

    
1692
    return job
1693

    
1694
  @locking.ssynchronized(_LOCK)
1695
  @_RequireOpenQueue
1696
  def SubmitJob(self, ops):
1697
    """Create and store a new job.
1698

1699
    @see: L{_SubmitJobUnlocked}
1700

1701
    """
1702
    job_id = self._NewSerialsUnlocked(1)[0]
1703
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1704
    return job_id
1705

    
1706
  @locking.ssynchronized(_LOCK)
1707
  @_RequireOpenQueue
1708
  def SubmitManyJobs(self, jobs):
1709
    """Create and store multiple jobs.
1710

1711
    @see: L{_SubmitJobUnlocked}
1712

1713
    """
1714
    results = []
1715
    added_jobs = []
1716
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1717
    for job_id, ops in zip(all_job_ids, jobs):
1718
      try:
1719
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1720
        status = True
1721
        data = job_id
1722
      except errors.GenericError, err:
1723
        data = str(err)
1724
        status = False
1725
      results.append((status, data))
1726

    
1727
    self._EnqueueJobs(added_jobs)
1728

    
1729
    return results
1730

    
1731
  def _EnqueueJobs(self, jobs):
1732
    """Helper function to add jobs to worker pool's queue.
1733

1734
    @type jobs: list
1735
    @param jobs: List of all jobs
1736

1737
    """
1738
    self._wpool.AddManyTasks([(job, ) for job in jobs],
1739
                             priority=[job.CalcPriority() for job in jobs])
1740

    
1741
  @_RequireOpenQueue
1742
  def UpdateJobUnlocked(self, job, replicate=True):
1743
    """Update a job's on disk storage.
1744

1745
    After a job has been modified, this function needs to be called in
1746
    order to write the changes to disk and replicate them to the other
1747
    nodes.
1748

1749
    @type job: L{_QueuedJob}
1750
    @param job: the changed job
1751
    @type replicate: boolean
1752
    @param replicate: whether to replicate the change to remote nodes
1753

1754
    """
1755
    filename = self._GetJobPath(job.id)
1756
    data = serializer.DumpJson(job.Serialize(), indent=False)
1757
    logging.debug("Writing job %s to %s", job.id, filename)
1758
    self._UpdateJobQueueFile(filename, data, replicate)
1759

    
1760
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1761
                        timeout):
1762
    """Waits for changes in a job.
1763

1764
    @type job_id: string
1765
    @param job_id: Job identifier
1766
    @type fields: list of strings
1767
    @param fields: Which fields to check for changes
1768
    @type prev_job_info: list or None
1769
    @param prev_job_info: Last job information returned
1770
    @type prev_log_serial: int
1771
    @param prev_log_serial: Last job message serial number
1772
    @type timeout: float
1773
    @param timeout: maximum time to wait in seconds
1774
    @rtype: tuple (job info, log entries)
1775
    @return: a tuple of the job information as required via
1776
        the fields parameter, and the log entries as a list
1777

1778
        if the job has not changed and the timeout has expired,
1779
        we instead return a special value,
1780
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1781
        as such by the clients
1782

1783
    """
1784
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1785

    
1786
    helper = _WaitForJobChangesHelper()
1787

    
1788
    return helper(self._GetJobPath(job_id), load_fn,
1789
                  fields, prev_job_info, prev_log_serial, timeout)
1790

    
1791
  @locking.ssynchronized(_LOCK)
1792
  @_RequireOpenQueue
1793
  def CancelJob(self, job_id):
1794
    """Cancels a job.
1795

1796
    This will only succeed if the job has not started yet.
1797

1798
    @type job_id: string
1799
    @param job_id: job ID of job to be cancelled.
1800

1801
    """
1802
    logging.info("Cancelling job %s", job_id)
1803

    
1804
    job = self._LoadJobUnlocked(job_id)
1805
    if not job:
1806
      logging.debug("Job %s not found", job_id)
1807
      return (False, "Job %s not found" % job_id)
1808

    
1809
    (success, msg) = job.Cancel()
1810

    
1811
    if success:
1812
      self.UpdateJobUnlocked(job)
1813

    
1814
    return (success, msg)
1815

    
1816
  @_RequireOpenQueue
1817
  def _ArchiveJobsUnlocked(self, jobs):
1818
    """Archives jobs.
1819

1820
    @type jobs: list of L{_QueuedJob}
1821
    @param jobs: Job objects
1822
    @rtype: int
1823
    @return: Number of archived jobs
1824

1825
    """
1826
    archive_jobs = []
1827
    rename_files = []
1828
    for job in jobs:
1829
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1830
        logging.debug("Job %s is not yet done", job.id)
1831
        continue
1832

    
1833
      archive_jobs.append(job)
1834

    
1835
      old = self._GetJobPath(job.id)
1836
      new = self._GetArchivedJobPath(job.id)
1837
      rename_files.append((old, new))
1838

    
1839
    # TODO: What if 1..n files fail to rename?
1840
    self._RenameFilesUnlocked(rename_files)
1841

    
1842
    logging.debug("Successfully archived job(s) %s",
1843
                  utils.CommaJoin(job.id for job in archive_jobs))
1844

    
1845
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1846
    # the files, we update the cached queue size from the filesystem. When we
1847
    # get around to fix the TODO: above, we can use the number of actually
1848
    # archived jobs to fix this.
1849
    self._UpdateQueueSizeUnlocked()
1850
    return len(archive_jobs)
1851

    
1852
  @locking.ssynchronized(_LOCK)
1853
  @_RequireOpenQueue
1854
  def ArchiveJob(self, job_id):
1855
    """Archives a job.
1856

1857
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1858

1859
    @type job_id: string
1860
    @param job_id: Job ID of job to be archived.
1861
    @rtype: bool
1862
    @return: Whether job was archived
1863

1864
    """
1865
    logging.info("Archiving job %s", job_id)
1866

    
1867
    job = self._LoadJobUnlocked(job_id)
1868
    if not job:
1869
      logging.debug("Job %s not found", job_id)
1870
      return False
1871

    
1872
    return self._ArchiveJobsUnlocked([job]) == 1
1873

    
1874
  @locking.ssynchronized(_LOCK)
1875
  @_RequireOpenQueue
1876
  def AutoArchiveJobs(self, age, timeout):
1877
    """Archives all jobs based on age.
1878

1879
    The method will archive all jobs which are older than the age
1880
    parameter. For jobs that don't have an end timestamp, the start
1881
    timestamp will be considered. The special '-1' age will cause
1882
    archival of all jobs (that are not running or queued).
1883

1884
    @type age: int
1885
    @param age: the minimum age in seconds
1886

1887
    """
1888
    logging.info("Archiving jobs with age more than %s seconds", age)
1889

    
1890
    now = time.time()
1891
    end_time = now + timeout
1892
    archived_count = 0
1893
    last_touched = 0
1894

    
1895
    all_job_ids = self._GetJobIDsUnlocked()
1896
    pending = []
1897
    for idx, job_id in enumerate(all_job_ids):
1898
      last_touched = idx + 1
1899

    
1900
      # Not optimal because jobs could be pending
1901
      # TODO: Measure average duration for job archival and take number of
1902
      # pending jobs into account.
1903
      if time.time() > end_time:
1904
        break
1905

    
1906
      # Returns None if the job failed to load
1907
      job = self._LoadJobUnlocked(job_id)
1908
      if job:
1909
        if job.end_timestamp is None:
1910
          if job.start_timestamp is None:
1911
            job_age = job.received_timestamp
1912
          else:
1913
            job_age = job.start_timestamp
1914
        else:
1915
          job_age = job.end_timestamp
1916

    
1917
        if age == -1 or now - job_age[0] > age:
1918
          pending.append(job)
1919

    
1920
          # Archive 10 jobs at a time
1921
          if len(pending) >= 10:
1922
            archived_count += self._ArchiveJobsUnlocked(pending)
1923
            pending = []
1924

    
1925
    if pending:
1926
      archived_count += self._ArchiveJobsUnlocked(pending)
1927

    
1928
    return (archived_count, len(all_job_ids) - last_touched)
1929

    
1930
  def QueryJobs(self, job_ids, fields):
1931
    """Returns a list of jobs in queue.
1932

1933
    @type job_ids: list
1934
    @param job_ids: sequence of job identifiers or None for all
1935
    @type fields: list
1936
    @param fields: names of fields to return
1937
    @rtype: list
1938
    @return: list one element per job, each element being list with
1939
        the requested fields
1940

1941
    """
1942
    jobs = []
1943
    list_all = False
1944
    if not job_ids:
1945
      # Since files are added to/removed from the queue atomically, there's no
1946
      # risk of getting the job ids in an inconsistent state.
1947
      job_ids = self._GetJobIDsUnlocked()
1948
      list_all = True
1949

    
1950
    for job_id in job_ids:
1951
      job = self.SafeLoadJobFromDisk(job_id)
1952
      if job is not None:
1953
        jobs.append(job.GetInfo(fields))
1954
      elif not list_all:
1955
        jobs.append(None)
1956

    
1957
    return jobs
1958

    
1959
  @locking.ssynchronized(_LOCK)
1960
  @_RequireOpenQueue
1961
  def Shutdown(self):
1962
    """Stops the job queue.
1963

1964
    This shutdowns all the worker threads an closes the queue.
1965

1966
    """
1967
    self._wpool.TerminateWorkers()
1968

    
1969
    self._queue_filelock.Close()
1970
    self._queue_filelock = None