Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a3811745

History | View | Annotate | Download (33.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50
JOBQUEUE_THREADS = 25
51

    
52

    
53
def TimeStampNow():
54
  """Returns the current timestamp.
55

56
  @rtype: tuple
57
  @return: the current time in the (seconds, microseconds) format
58

59
  """
60
  return utils.SplitTime(time.time())
61

    
62

    
63
class _QueuedOpCode(object):
64
  """Encasulates an opcode object.
65

66
  @ivar log: holds the execution log and consists of tuples
67
  of the form C{(log_serial, timestamp, level, message)}
68
  @ivar input: the OpCode we encapsulate
69
  @ivar status: the current status
70
  @ivar result: the result of the LU execution
71
  @ivar start_timestamp: timestamp for the start of the execution
72
  @ivar stop_timestamp: timestamp for the end of the execution
73

74
  """
75
  def __init__(self, op):
76
    """Constructor for the _QuededOpCode.
77

78
    @type op: L{opcodes.OpCode}
79
    @param op: the opcode we encapsulate
80

81
    """
82
    self.input = op
83
    self.status = constants.OP_STATUS_QUEUED
84
    self.result = None
85
    self.log = []
86
    self.start_timestamp = None
87
    self.end_timestamp = None
88

    
89
  @classmethod
90
  def Restore(cls, state):
91
    """Restore the _QueuedOpCode from the serialized form.
92

93
    @type state: dict
94
    @param state: the serialized state
95
    @rtype: _QueuedOpCode
96
    @return: a new _QueuedOpCode instance
97

98
    """
99
    obj = _QueuedOpCode.__new__(cls)
100
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101
    obj.status = state["status"]
102
    obj.result = state["result"]
103
    obj.log = state["log"]
104
    obj.start_timestamp = state.get("start_timestamp", None)
105
    obj.end_timestamp = state.get("end_timestamp", None)
106
    return obj
107

    
108
  def Serialize(self):
109
    """Serializes this _QueuedOpCode.
110

111
    @rtype: dict
112
    @return: the dictionary holding the serialized state
113

114
    """
115
    return {
116
      "input": self.input.__getstate__(),
117
      "status": self.status,
118
      "result": self.result,
119
      "log": self.log,
120
      "start_timestamp": self.start_timestamp,
121
      "end_timestamp": self.end_timestamp,
122
      }
123

    
124

    
125
class _QueuedJob(object):
126
  """In-memory job representation.
127

128
  This is what we use to track the user-submitted jobs. Locking must
129
  be taken care of by users of this class.
130

131
  @type queue: L{JobQueue}
132
  @ivar queue: the parent queue
133
  @ivar id: the job ID
134
  @type ops: list
135
  @ivar ops: the list of _QueuedOpCode that constitute the job
136
  @type run_op_index: int
137
  @ivar run_op_index: the currently executing opcode, or -1 if
138
      we didn't yet start executing
139
  @type log_serial: int
140
  @ivar log_serial: holds the index for the next log entry
141
  @ivar received_timestamp: the timestamp for when the job was received
142
  @ivar start_timestmap: the timestamp for start of execution
143
  @ivar end_timestamp: the timestamp for end of execution
144
  @ivar change: a Condition variable we use for waiting for job changes
145

146
  """
147
  def __init__(self, queue, job_id, ops):
148
    """Constructor for the _QueuedJob.
149

150
    @type queue: L{JobQueue}
151
    @param queue: our parent queue
152
    @type job_id: job_id
153
    @param job_id: our job id
154
    @type ops: list
155
    @param ops: the list of opcodes we hold, which will be encapsulated
156
        in _QueuedOpCodes
157

158
    """
159
    if not ops:
160
      # TODO: use a better exception
161
      raise Exception("No opcodes")
162

    
163
    self.queue = queue
164
    self.id = job_id
165
    self.ops = [_QueuedOpCode(op) for op in ops]
166
    self.run_op_index = -1
167
    self.log_serial = 0
168
    self.received_timestamp = TimeStampNow()
169
    self.start_timestamp = None
170
    self.end_timestamp = None
171

    
172
    # Condition to wait for changes
173
    self.change = threading.Condition(self.queue._lock)
174

    
175
  @classmethod
176
  def Restore(cls, queue, state):
177
    """Restore a _QueuedJob from serialized state:
178

179
    @type queue: L{JobQueue}
180
    @param queue: to which queue the restored job belongs
181
    @type state: dict
182
    @param state: the serialized state
183
    @rtype: _JobQueue
184
    @return: the restored _JobQueue instance
185

186
    """
187
    obj = _QueuedJob.__new__(cls)
188
    obj.queue = queue
189
    obj.id = state["id"]
190
    obj.run_op_index = state["run_op_index"]
191
    obj.received_timestamp = state.get("received_timestamp", None)
192
    obj.start_timestamp = state.get("start_timestamp", None)
193
    obj.end_timestamp = state.get("end_timestamp", None)
194

    
195
    obj.ops = []
196
    obj.log_serial = 0
197
    for op_state in state["ops"]:
198
      op = _QueuedOpCode.Restore(op_state)
199
      for log_entry in op.log:
200
        obj.log_serial = max(obj.log_serial, log_entry[0])
201
      obj.ops.append(op)
202

    
203
    # Condition to wait for changes
204
    obj.change = threading.Condition(obj.queue._lock)
205

    
206
    return obj
207

    
208
  def Serialize(self):
209
    """Serialize the _JobQueue instance.
210

211
    @rtype: dict
212
    @return: the serialized state
213

214
    """
215
    return {
216
      "id": self.id,
217
      "ops": [op.Serialize() for op in self.ops],
218
      "run_op_index": self.run_op_index,
219
      "start_timestamp": self.start_timestamp,
220
      "end_timestamp": self.end_timestamp,
221
      "received_timestamp": self.received_timestamp,
222
      }
223

    
224
  def CalcStatus(self):
225
    """Compute the status of this job.
226

227
    This function iterates over all the _QueuedOpCodes in the job and
228
    based on their status, computes the job status.
229

230
    The algorithm is:
231
      - if we find a cancelled, or finished with error, the job
232
        status will be the same
233
      - otherwise, the last opcode with the status one of:
234
          - waitlock
235
          - running
236

237
        will determine the job status
238

239
      - otherwise, it means either all opcodes are queued, or success,
240
        and the job status will be the same
241

242
    @return: the job status
243

244
    """
245
    status = constants.JOB_STATUS_QUEUED
246

    
247
    all_success = True
248
    for op in self.ops:
249
      if op.status == constants.OP_STATUS_SUCCESS:
250
        continue
251

    
252
      all_success = False
253

    
254
      if op.status == constants.OP_STATUS_QUEUED:
255
        pass
256
      elif op.status == constants.OP_STATUS_WAITLOCK:
257
        status = constants.JOB_STATUS_WAITLOCK
258
      elif op.status == constants.OP_STATUS_RUNNING:
259
        status = constants.JOB_STATUS_RUNNING
260
      elif op.status == constants.OP_STATUS_ERROR:
261
        status = constants.JOB_STATUS_ERROR
262
        # The whole job fails if one opcode failed
263
        break
264
      elif op.status == constants.OP_STATUS_CANCELED:
265
        status = constants.OP_STATUS_CANCELED
266
        break
267

    
268
    if all_success:
269
      status = constants.JOB_STATUS_SUCCESS
270

    
271
    return status
272

    
273
  def GetLogEntries(self, newer_than):
274
    """Selectively returns the log entries.
275

276
    @type newer_than: None or int
277
    @param newer_than: if this is None, return all log enties,
278
        otherwise return only the log entries with serial higher
279
        than this value
280
    @rtype: list
281
    @return: the list of the log entries selected
282

283
    """
284
    if newer_than is None:
285
      serial = -1
286
    else:
287
      serial = newer_than
288

    
289
    entries = []
290
    for op in self.ops:
291
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292

    
293
    return entries
294

    
295

    
296
class _JobQueueWorker(workerpool.BaseWorker):
297
  """The actual job workers.
298

299
  """
300
  def _NotifyStart(self):
301
    """Mark the opcode as running, not lock-waiting.
302

303
    This is called from the mcpu code as a notifier function, when the
304
    LU is finally about to start the Exec() method. Of course, to have
305
    end-user visible results, the opcode must be initially (before
306
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
307

308
    """
309
    assert self.queue, "Queue attribute is missing"
310
    assert self.opcode, "Opcode attribute is missing"
311

    
312
    self.queue.acquire()
313
    try:
314
      self.opcode.status = constants.OP_STATUS_RUNNING
315
    finally:
316
      self.queue.release()
317

    
318
  def RunTask(self, job):
319
    """Job executor.
320

321
    This functions processes a job. It is closely tied to the _QueuedJob and
322
    _QueuedOpCode classes.
323

324
    @type job: L{_QueuedJob}
325
    @param job: the job to be processed
326

327
    """
328
    logging.debug("Worker %s processing job %s",
329
                  self.worker_id, job.id)
330
    proc = mcpu.Processor(self.pool.queue.context)
331
    self.queue = queue = job.queue
332
    try:
333
      try:
334
        count = len(job.ops)
335
        for idx, op in enumerate(job.ops):
336
          try:
337
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
338

    
339
            queue.acquire()
340
            try:
341
              job.run_op_index = idx
342
              op.status = constants.OP_STATUS_WAITLOCK
343
              op.result = None
344
              op.start_timestamp = TimeStampNow()
345
              if idx == 0: # first opcode
346
                job.start_timestamp = op.start_timestamp
347
              queue.UpdateJobUnlocked(job)
348

    
349
              input_opcode = op.input
350
            finally:
351
              queue.release()
352

    
353
            def _Log(*args):
354
              """Append a log entry.
355

356
              """
357
              assert len(args) < 3
358

    
359
              if len(args) == 1:
360
                log_type = constants.ELOG_MESSAGE
361
                log_msg = args[0]
362
              else:
363
                log_type, log_msg = args
364

    
365
              # The time is split to make serialization easier and not lose
366
              # precision.
367
              timestamp = utils.SplitTime(time.time())
368

    
369
              queue.acquire()
370
              try:
371
                job.log_serial += 1
372
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
373

    
374
                job.change.notifyAll()
375
              finally:
376
                queue.release()
377

    
378
            # Make sure not to hold lock while _Log is called
379
            self.opcode = op
380
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
381

    
382
            queue.acquire()
383
            try:
384
              op.status = constants.OP_STATUS_SUCCESS
385
              op.result = result
386
              op.end_timestamp = TimeStampNow()
387
              queue.UpdateJobUnlocked(job)
388
            finally:
389
              queue.release()
390

    
391
            logging.debug("Op %s/%s: Successfully finished %s",
392
                          idx + 1, count, op)
393
          except Exception, err:
394
            queue.acquire()
395
            try:
396
              try:
397
                op.status = constants.OP_STATUS_ERROR
398
                op.result = str(err)
399
                op.end_timestamp = TimeStampNow()
400
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
401
              finally:
402
                queue.UpdateJobUnlocked(job)
403
            finally:
404
              queue.release()
405
            raise
406

    
407
      except errors.GenericError, err:
408
        logging.exception("Ganeti exception")
409
      except:
410
        logging.exception("Unhandled exception")
411
    finally:
412
      queue.acquire()
413
      try:
414
        try:
415
          job.run_op_idx = -1
416
          job.end_timestamp = TimeStampNow()
417
          queue.UpdateJobUnlocked(job)
418
        finally:
419
          job_id = job.id
420
          status = job.CalcStatus()
421
      finally:
422
        queue.release()
423
      logging.debug("Worker %s finished job %s, status = %s",
424
                    self.worker_id, job_id, status)
425

    
426

    
427
class _JobQueueWorkerPool(workerpool.WorkerPool):
428
  """Simple class implementing a job-processing workerpool.
429

430
  """
431
  def __init__(self, queue):
432
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
433
                                              _JobQueueWorker)
434
    self.queue = queue
435

    
436

    
437
class JobQueue(object):
438
  """Quue used to manaage the jobs.
439

440
  @cvar _RE_JOB_FILE: regex matching the valid job file names
441

442
  """
443
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444

    
445
  def _RequireOpenQueue(fn):
446
    """Decorator for "public" functions.
447

448
    This function should be used for all 'public' functions. That is,
449
    functions usually called from other classes.
450

451
    @warning: Use this decorator only after utils.LockedMethod!
452

453
    Example::
454
      @utils.LockedMethod
455
      @_RequireOpenQueue
456
      def Example(self):
457
        pass
458

459
    """
460
    def wrapper(self, *args, **kwargs):
461
      assert self._queue_lock is not None, "Queue should be open"
462
      return fn(self, *args, **kwargs)
463
    return wrapper
464

    
465
  def __init__(self, context):
466
    """Constructor for JobQueue.
467

468
    The constructor will initialize the job queue object and then
469
    start loading the current jobs from disk, either for starting them
470
    (if they were queue) or for aborting them (if they were already
471
    running).
472

473
    @type context: GanetiContext
474
    @param context: the context object for access to the configuration
475
        data and other ganeti objects
476

477
    """
478
    self.context = context
479
    self._memcache = weakref.WeakValueDictionary()
480
    self._my_hostname = utils.HostInfo().name
481

    
482
    # Locking
483
    self._lock = threading.Lock()
484
    self.acquire = self._lock.acquire
485
    self.release = self._lock.release
486

    
487
    # Initialize
488
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489

    
490
    # Read serial file
491
    self._last_serial = jstore.ReadSerial()
492
    assert self._last_serial is not None, ("Serial file was modified between"
493
                                           " check in jstore and here")
494

    
495
    # Get initial list of nodes
496
    self._nodes = dict((n.name, n.primary_ip)
497
                       for n in self.context.cfg.GetAllNodesInfo().values())
498

    
499
    # Remove master node
500
    try:
501
      del self._nodes[self._my_hostname]
502
    except ValueError:
503
      pass
504

    
505
    # TODO: Check consistency across nodes
506

    
507
    # Setup worker pool
508
    self._wpool = _JobQueueWorkerPool(self)
509

    
510
    # We need to lock here because WorkerPool.AddTask() may start a job while
511
    # we're still doing our work.
512
    self.acquire()
513
    try:
514
      for job in self._GetJobsUnlocked(None):
515
        # a failure in loading the job can cause 'None' to be returned
516
        if job is None:
517
          continue
518

    
519
        status = job.CalcStatus()
520

    
521
        if status in (constants.JOB_STATUS_QUEUED, ):
522
          self._wpool.AddTask(job)
523

    
524
        elif status in (constants.JOB_STATUS_RUNNING,
525
                        constants.JOB_STATUS_WAITLOCK):
526
          logging.warning("Unfinished job %s found: %s", job.id, job)
527
          try:
528
            for op in job.ops:
529
              op.status = constants.OP_STATUS_ERROR
530
              op.result = "Unclean master daemon shutdown"
531
          finally:
532
            self.UpdateJobUnlocked(job)
533
    finally:
534
      self.release()
535

    
536
  @utils.LockedMethod
537
  @_RequireOpenQueue
538
  def AddNode(self, node):
539
    """Register a new node with the queue.
540

541
    @type node: L{objects.Node}
542
    @param node: the node object to be added
543

544
    """
545
    node_name = node.name
546
    assert node_name != self._my_hostname
547

    
548
    # Clean queue directory on added node
549
    rpc.RpcRunner.call_jobqueue_purge(node_name)
550

    
551
    # Upload the whole queue excluding archived jobs
552
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
553

    
554
    # Upload current serial file
555
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
556

    
557
    for file_name in files:
558
      # Read file content
559
      fd = open(file_name, "r")
560
      try:
561
        content = fd.read()
562
      finally:
563
        fd.close()
564

    
565
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
566
                                                  [node.primary_ip],
567
                                                  file_name, content)
568
      if not result[node_name]:
569
        logging.error("Failed to upload %s to %s", file_name, node_name)
570

    
571
    self._nodes[node_name] = node.primary_ip
572

    
573
  @utils.LockedMethod
574
  @_RequireOpenQueue
575
  def RemoveNode(self, node_name):
576
    """Callback called when removing nodes from the cluster.
577

578
    @type node_name: str
579
    @param node_name: the name of the node to remove
580

581
    """
582
    try:
583
      # The queue is removed by the "leave node" RPC call.
584
      del self._nodes[node_name]
585
    except KeyError:
586
      pass
587

    
588
  def _CheckRpcResult(self, result, nodes, failmsg):
589
    """Verifies the status of an RPC call.
590

591
    Since we aim to keep consistency should this node (the current
592
    master) fail, we will log errors if our rpc fail, and especially
593
    log the case when more than half of the nodes failes.
594

595
    @param result: the data as returned from the rpc call
596
    @type nodes: list
597
    @param nodes: the list of nodes we made the call to
598
    @type failmsg: str
599
    @param failmsg: the identifier to be used for logging
600

601
    """
602
    failed = []
603
    success = []
604

    
605
    for node in nodes:
606
      if result[node]:
607
        success.append(node)
608
      else:
609
        failed.append(node)
610

    
611
    if failed:
612
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
613

    
614
    # +1 for the master node
615
    if (len(success) + 1) < len(failed):
616
      # TODO: Handle failing nodes
617
      logging.error("More than half of the nodes failed")
618

    
619
  def _GetNodeIp(self):
620
    """Helper for returning the node name/ip list.
621

622
    @rtype: (list, list)
623
    @return: a tuple of two lists, the first one with the node
624
        names and the second one with the node addresses
625

626
    """
627
    name_list = self._nodes.keys()
628
    addr_list = [self._nodes[name] for name in name_list]
629
    return name_list, addr_list
630

    
631
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
632
    """Writes a file locally and then replicates it to all nodes.
633

634
    This function will replace the contents of a file on the local
635
    node and then replicate it to all the other nodes we have.
636

637
    @type file_name: str
638
    @param file_name: the path of the file to be replicated
639
    @type data: str
640
    @param data: the new contents of the file
641

642
    """
643
    utils.WriteFile(file_name, data=data)
644

    
645
    names, addrs = self._GetNodeIp()
646
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
647
    self._CheckRpcResult(result, self._nodes,
648
                         "Updating %s" % file_name)
649

    
650
  def _RenameFileUnlocked(self, old, new):
651
    """Renames a file locally and then replicate the change.
652

653
    This function will rename a file in the local queue directory
654
    and then replicate this rename to all the other nodes we have.
655

656
    @type old: str
657
    @param old: the current name of the file
658
    @type new: str
659
    @param new: the new name of the file
660

661
    """
662
    os.rename(old, new)
663

    
664
    names, addrs = self._GetNodeIp()
665
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
666
    self._CheckRpcResult(result, self._nodes,
667
                         "Moving %s to %s" % (old, new))
668

    
669
  def _FormatJobID(self, job_id):
670
    """Convert a job ID to string format.
671

672
    Currently this just does C{str(job_id)} after performing some
673
    checks, but if we want to change the job id format this will
674
    abstract this change.
675

676
    @type job_id: int or long
677
    @param job_id: the numeric job id
678
    @rtype: str
679
    @return: the formatted job id
680

681
    """
682
    if not isinstance(job_id, (int, long)):
683
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
684
    if job_id < 0:
685
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
686

    
687
    return str(job_id)
688

    
689
  def _NewSerialUnlocked(self):
690
    """Generates a new job identifier.
691

692
    Job identifiers are unique during the lifetime of a cluster.
693

694
    @rtype: str
695
    @return: a string representing the job identifier.
696

697
    """
698
    # New number
699
    serial = self._last_serial + 1
700

    
701
    # Write to file
702
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
703
                                        "%s\n" % serial)
704

    
705
    # Keep it only if we were able to write the file
706
    self._last_serial = serial
707

    
708
    return self._FormatJobID(serial)
709

    
710
  @staticmethod
711
  def _GetJobPath(job_id):
712
    """Returns the job file for a given job id.
713

714
    @type job_id: str
715
    @param job_id: the job identifier
716
    @rtype: str
717
    @return: the path to the job file
718

719
    """
720
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
721

    
722
  @staticmethod
723
  def _GetArchivedJobPath(job_id):
724
    """Returns the archived job file for a give job id.
725

726
    @type job_id: str
727
    @param job_id: the job identifier
728
    @rtype: str
729
    @return: the path to the archived job file
730

731
    """
732
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
733

    
734
  @classmethod
735
  def _ExtractJobID(cls, name):
736
    """Extract the job id from a filename.
737

738
    @type name: str
739
    @param name: the job filename
740
    @rtype: job id or None
741
    @return: the job id corresponding to the given filename,
742
        or None if the filename does not represent a valid
743
        job file
744

745
    """
746
    m = cls._RE_JOB_FILE.match(name)
747
    if m:
748
      return m.group(1)
749
    else:
750
      return None
751

    
752
  def _GetJobIDsUnlocked(self, archived=False):
753
    """Return all known job IDs.
754

755
    If the parameter archived is True, archived jobs IDs will be
756
    included. Currently this argument is unused.
757

758
    The method only looks at disk because it's a requirement that all
759
    jobs are present on disk (so in the _memcache we don't have any
760
    extra IDs).
761

762
    @rtype: list
763
    @return: the list of job IDs
764

765
    """
766
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
767
    jlist = utils.NiceSort(jlist)
768
    return jlist
769

    
770
  def _ListJobFiles(self):
771
    """Returns the list of current job files.
772

773
    @rtype: list
774
    @return: the list of job file names
775

776
    """
777
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
778
            if self._RE_JOB_FILE.match(name)]
779

    
780
  def _LoadJobUnlocked(self, job_id):
781
    """Loads a job from the disk or memory.
782

783
    Given a job id, this will return the cached job object if
784
    existing, or try to load the job from the disk. If loading from
785
    disk, it will also add the job to the cache.
786

787
    @param job_id: the job id
788
    @rtype: L{_QueuedJob} or None
789
    @return: either None or the job object
790

791
    """
792
    job = self._memcache.get(job_id, None)
793
    if job:
794
      logging.debug("Found job %s in memcache", job_id)
795
      return job
796

    
797
    filepath = self._GetJobPath(job_id)
798
    logging.debug("Loading job from %s", filepath)
799
    try:
800
      fd = open(filepath, "r")
801
    except IOError, err:
802
      if err.errno in (errno.ENOENT, ):
803
        return None
804
      raise
805
    try:
806
      data = serializer.LoadJson(fd.read())
807
    finally:
808
      fd.close()
809

    
810
    try:
811
      job = _QueuedJob.Restore(self, data)
812
    except Exception, err:
813
      new_path = self._GetArchivedJobPath(job_id)
814
      if filepath == new_path:
815
        # job already archived (future case)
816
        logging.exception("Can't parse job %s", job_id)
817
      else:
818
        # non-archived case
819
        logging.exception("Can't parse job %s, will archive.", job_id)
820
        self._RenameFileUnlocked(filepath, new_path)
821
      return None
822

    
823
    self._memcache[job_id] = job
824
    logging.debug("Added job %s to the cache", job_id)
825
    return job
826

    
827
  def _GetJobsUnlocked(self, job_ids):
828
    """Return a list of jobs based on their IDs.
829

830
    @type job_ids: list
831
    @param job_ids: either an empty list (meaning all jobs),
832
        or a list of job IDs
833
    @rtype: list
834
    @return: the list of job objects
835

836
    """
837
    if not job_ids:
838
      job_ids = self._GetJobIDsUnlocked()
839

    
840
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
841

    
842
  @staticmethod
843
  def _IsQueueMarkedDrain():
844
    """Check if the queue is marked from drain.
845

846
    This currently uses the queue drain file, which makes it a
847
    per-node flag. In the future this can be moved to the config file.
848

849
    @rtype: boolean
850
    @return: True of the job queue is marked for draining
851

852
    """
853
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
854

    
855
  @staticmethod
856
  def SetDrainFlag(drain_flag):
857
    """Sets the drain flag for the queue.
858

859
    This is similar to the function L{backend.JobQueueSetDrainFlag},
860
    and in the future we might merge them.
861

862
    @type drain_flag: boolean
863
    @param drain_flag: wheter to set or unset the drain flag
864

865
    """
866
    if drain_flag:
867
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
868
    else:
869
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
870
    return True
871

    
872
  @utils.LockedMethod
873
  @_RequireOpenQueue
874
  def SubmitJob(self, ops):
875
    """Create and store a new job.
876

877
    This enters the job into our job queue and also puts it on the new
878
    queue, in order for it to be picked up by the queue processors.
879

880
    @type ops: list
881
    @param ops: The list of OpCodes that will become the new job.
882
    @rtype: job ID
883
    @return: the job ID of the newly created job
884
    @raise errors.JobQueueDrainError: if the job is marked for draining
885

886
    """
887
    if self._IsQueueMarkedDrain():
888
      raise errors.JobQueueDrainError()
889
    # Get job identifier
890
    job_id = self._NewSerialUnlocked()
891
    job = _QueuedJob(self, job_id, ops)
892

    
893
    # Write to disk
894
    self.UpdateJobUnlocked(job)
895

    
896
    logging.debug("Adding new job %s to the cache", job_id)
897
    self._memcache[job_id] = job
898

    
899
    # Add to worker pool
900
    self._wpool.AddTask(job)
901

    
902
    return job.id
903

    
904
  @_RequireOpenQueue
905
  def UpdateJobUnlocked(self, job):
906
    """Update a job's on disk storage.
907

908
    After a job has been modified, this function needs to be called in
909
    order to write the changes to disk and replicate them to the other
910
    nodes.
911

912
    @type job: L{_QueuedJob}
913
    @param job: the changed job
914

915
    """
916
    filename = self._GetJobPath(job.id)
917
    data = serializer.DumpJson(job.Serialize(), indent=False)
918
    logging.debug("Writing job %s to %s", job.id, filename)
919
    self._WriteAndReplicateFileUnlocked(filename, data)
920

    
921
    # Notify waiters about potential changes
922
    job.change.notifyAll()
923

    
924
  @utils.LockedMethod
925
  @_RequireOpenQueue
926
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
927
                        timeout):
928
    """Waits for changes in a job.
929

930
    @type job_id: string
931
    @param job_id: Job identifier
932
    @type fields: list of strings
933
    @param fields: Which fields to check for changes
934
    @type prev_job_info: list or None
935
    @param prev_job_info: Last job information returned
936
    @type prev_log_serial: int
937
    @param prev_log_serial: Last job message serial number
938
    @type timeout: float
939
    @param timeout: maximum time to wait
940
    @rtype: tuple (job info, log entries)
941
    @return: a tuple of the job information as required via
942
        the fields parameter, and the log entries as a list
943

944
        if the job has not changed and the timeout has expired,
945
        we instead return a special value,
946
        L{constants.JOB_NOTCHANGED}, which should be interpreted
947
        as such by the clients
948

949
    """
950
    logging.debug("Waiting for changes in job %s", job_id)
951
    end_time = time.time() + timeout
952
    while True:
953
      delta_time = end_time - time.time()
954
      if delta_time < 0:
955
        return constants.JOB_NOTCHANGED
956

    
957
      job = self._LoadJobUnlocked(job_id)
958
      if not job:
959
        logging.debug("Job %s not found", job_id)
960
        break
961

    
962
      status = job.CalcStatus()
963
      job_info = self._GetJobInfoUnlocked(job, fields)
964
      log_entries = job.GetLogEntries(prev_log_serial)
965

    
966
      # Serializing and deserializing data can cause type changes (e.g. from
967
      # tuple to list) or precision loss. We're doing it here so that we get
968
      # the same modifications as the data received from the client. Without
969
      # this, the comparison afterwards might fail without the data being
970
      # significantly different.
971
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
972
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
973

    
974
      if status not in (constants.JOB_STATUS_QUEUED,
975
                        constants.JOB_STATUS_RUNNING,
976
                        constants.JOB_STATUS_WAITLOCK):
977
        # Don't even try to wait if the job is no longer running, there will be
978
        # no changes.
979
        break
980

    
981
      if (prev_job_info != job_info or
982
          (log_entries and prev_log_serial != log_entries[0][0])):
983
        break
984

    
985
      logging.debug("Waiting again")
986

    
987
      # Release the queue lock while waiting
988
      job.change.wait(delta_time)
989

    
990
    logging.debug("Job %s changed", job_id)
991

    
992
    return (job_info, log_entries)
993

    
994
  @utils.LockedMethod
995
  @_RequireOpenQueue
996
  def CancelJob(self, job_id):
997
    """Cancels a job.
998

999
    This will only succeed if the job has not started yet.
1000

1001
    @type job_id: string
1002
    @param job_id: job ID of job to be cancelled.
1003

1004
    """
1005
    logging.debug("Cancelling job %s", job_id)
1006

    
1007
    job = self._LoadJobUnlocked(job_id)
1008
    if not job:
1009
      logging.debug("Job %s not found", job_id)
1010
      return
1011

    
1012
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1013
      logging.debug("Job %s is no longer in the queue", job.id)
1014
      return
1015

    
1016
    try:
1017
      for op in job.ops:
1018
        op.status = constants.OP_STATUS_ERROR
1019
        op.result = "Job cancelled by request"
1020
    finally:
1021
      self.UpdateJobUnlocked(job)
1022

    
1023
  @_RequireOpenQueue
1024
  def _ArchiveJobUnlocked(self, job_id):
1025
    """Archives a job.
1026

1027
    @type job_id: string
1028
    @param job_id: Job ID of job to be archived.
1029

1030
    """
1031
    logging.info("Archiving job %s", job_id)
1032

    
1033
    job = self._LoadJobUnlocked(job_id)
1034
    if not job:
1035
      logging.debug("Job %s not found", job_id)
1036
      return
1037

    
1038
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1039
                                constants.JOB_STATUS_SUCCESS,
1040
                                constants.JOB_STATUS_ERROR):
1041
      logging.debug("Job %s is not yet done", job.id)
1042
      return
1043

    
1044
    old = self._GetJobPath(job.id)
1045
    new = self._GetArchivedJobPath(job.id)
1046

    
1047
    self._RenameFileUnlocked(old, new)
1048

    
1049
    logging.debug("Successfully archived job %s", job.id)
1050

    
1051
  @utils.LockedMethod
1052
  @_RequireOpenQueue
1053
  def ArchiveJob(self, job_id):
1054
    """Archives a job.
1055

1056
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1057

1058
    @type job_id: string
1059
    @param job_id: Job ID of job to be archived.
1060

1061
    """
1062
    return self._ArchiveJobUnlocked(job_id)
1063

    
1064
  @utils.LockedMethod
1065
  @_RequireOpenQueue
1066
  def AutoArchiveJobs(self, age):
1067
    """Archives all jobs based on age.
1068

1069
    The method will archive all jobs which are older than the age
1070
    parameter. For jobs that don't have an end timestamp, the start
1071
    timestamp will be considered. The special '-1' age will cause
1072
    archival of all jobs (that are not running or queued).
1073

1074
    @type age: int
1075
    @param age: the minimum age in seconds
1076

1077
    """
1078
    logging.info("Archiving jobs with age more than %s seconds", age)
1079

    
1080
    now = time.time()
1081
    for jid in self._GetJobIDsUnlocked(archived=False):
1082
      job = self._LoadJobUnlocked(jid)
1083
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1084
                                  constants.OP_STATUS_ERROR,
1085
                                  constants.OP_STATUS_CANCELED):
1086
        continue
1087
      if job.end_timestamp is None:
1088
        if job.start_timestamp is None:
1089
          job_age = job.received_timestamp
1090
        else:
1091
          job_age = job.start_timestamp
1092
      else:
1093
        job_age = job.end_timestamp
1094

    
1095
      if age == -1 or now - job_age[0] > age:
1096
        self._ArchiveJobUnlocked(jid)
1097

    
1098
  def _GetJobInfoUnlocked(self, job, fields):
1099
    """Returns information about a job.
1100

1101
    @type job: L{_QueuedJob}
1102
    @param job: the job which we query
1103
    @type fields: list
1104
    @param fields: names of fields to return
1105
    @rtype: list
1106
    @return: list with one element for each field
1107
    @raise errors.OpExecError: when an invalid field
1108
        has been passed
1109

1110
    """
1111
    row = []
1112
    for fname in fields:
1113
      if fname == "id":
1114
        row.append(job.id)
1115
      elif fname == "status":
1116
        row.append(job.CalcStatus())
1117
      elif fname == "ops":
1118
        row.append([op.input.__getstate__() for op in job.ops])
1119
      elif fname == "opresult":
1120
        row.append([op.result for op in job.ops])
1121
      elif fname == "opstatus":
1122
        row.append([op.status for op in job.ops])
1123
      elif fname == "oplog":
1124
        row.append([op.log for op in job.ops])
1125
      elif fname == "opstart":
1126
        row.append([op.start_timestamp for op in job.ops])
1127
      elif fname == "opend":
1128
        row.append([op.end_timestamp for op in job.ops])
1129
      elif fname == "received_ts":
1130
        row.append(job.received_timestamp)
1131
      elif fname == "start_ts":
1132
        row.append(job.start_timestamp)
1133
      elif fname == "end_ts":
1134
        row.append(job.end_timestamp)
1135
      elif fname == "summary":
1136
        row.append([op.input.Summary() for op in job.ops])
1137
      else:
1138
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1139
    return row
1140

    
1141
  @utils.LockedMethod
1142
  @_RequireOpenQueue
1143
  def QueryJobs(self, job_ids, fields):
1144
    """Returns a list of jobs in queue.
1145

1146
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1147
    processing for each job.
1148

1149
    @type job_ids: list
1150
    @param job_ids: sequence of job identifiers or None for all
1151
    @type fields: list
1152
    @param fields: names of fields to return
1153
    @rtype: list
1154
    @return: list one element per job, each element being list with
1155
        the requested fields
1156

1157
    """
1158
    jobs = []
1159

    
1160
    for job in self._GetJobsUnlocked(job_ids):
1161
      if job is None:
1162
        jobs.append(None)
1163
      else:
1164
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1165

    
1166
    return jobs
1167

    
1168
  @utils.LockedMethod
1169
  @_RequireOpenQueue
1170
  def Shutdown(self):
1171
    """Stops the job queue.
1172

1173
    This shutdowns all the worker threads an closes the queue.
1174

1175
    """
1176
    self._wpool.TerminateWorkers()
1177

    
1178
    self._queue_lock.Close()
1179
    self._queue_lock = None