Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 711b5124

History | View | Annotate | Download (34.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50
JOBQUEUE_THREADS = 25
51

    
52

    
53
def TimeStampNow():
54
  """Returns the current timestamp.
55

56
  @rtype: tuple
57
  @return: the current time in the (seconds, microseconds) format
58

59
  """
60
  return utils.SplitTime(time.time())
61

    
62

    
63
class _QueuedOpCode(object):
64
  """Encasulates an opcode object.
65

66
  @ivar log: holds the execution log and consists of tuples
67
  of the form C{(log_serial, timestamp, level, message)}
68
  @ivar input: the OpCode we encapsulate
69
  @ivar status: the current status
70
  @ivar result: the result of the LU execution
71
  @ivar start_timestamp: timestamp for the start of the execution
72
  @ivar stop_timestamp: timestamp for the end of the execution
73

74
  """
75
  def __init__(self, op):
76
    """Constructor for the _QuededOpCode.
77

78
    @type op: L{opcodes.OpCode}
79
    @param op: the opcode we encapsulate
80

81
    """
82
    self.input = op
83
    self.status = constants.OP_STATUS_QUEUED
84
    self.result = None
85
    self.log = []
86
    self.start_timestamp = None
87
    self.end_timestamp = None
88

    
89
  @classmethod
90
  def Restore(cls, state):
91
    """Restore the _QueuedOpCode from the serialized form.
92

93
    @type state: dict
94
    @param state: the serialized state
95
    @rtype: _QueuedOpCode
96
    @return: a new _QueuedOpCode instance
97

98
    """
99
    obj = _QueuedOpCode.__new__(cls)
100
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101
    obj.status = state["status"]
102
    obj.result = state["result"]
103
    obj.log = state["log"]
104
    obj.start_timestamp = state.get("start_timestamp", None)
105
    obj.end_timestamp = state.get("end_timestamp", None)
106
    return obj
107

    
108
  def Serialize(self):
109
    """Serializes this _QueuedOpCode.
110

111
    @rtype: dict
112
    @return: the dictionary holding the serialized state
113

114
    """
115
    return {
116
      "input": self.input.__getstate__(),
117
      "status": self.status,
118
      "result": self.result,
119
      "log": self.log,
120
      "start_timestamp": self.start_timestamp,
121
      "end_timestamp": self.end_timestamp,
122
      }
123

    
124

    
125
class _QueuedJob(object):
126
  """In-memory job representation.
127

128
  This is what we use to track the user-submitted jobs. Locking must
129
  be taken care of by users of this class.
130

131
  @type queue: L{JobQueue}
132
  @ivar queue: the parent queue
133
  @ivar id: the job ID
134
  @type ops: list
135
  @ivar ops: the list of _QueuedOpCode that constitute the job
136
  @type run_op_index: int
137
  @ivar run_op_index: the currently executing opcode, or -1 if
138
      we didn't yet start executing
139
  @type log_serial: int
140
  @ivar log_serial: holds the index for the next log entry
141
  @ivar received_timestamp: the timestamp for when the job was received
142
  @ivar start_timestmap: the timestamp for start of execution
143
  @ivar end_timestamp: the timestamp for end of execution
144
  @ivar change: a Condition variable we use for waiting for job changes
145

146
  """
147
  def __init__(self, queue, job_id, ops):
148
    """Constructor for the _QueuedJob.
149

150
    @type queue: L{JobQueue}
151
    @param queue: our parent queue
152
    @type job_id: job_id
153
    @param job_id: our job id
154
    @type ops: list
155
    @param ops: the list of opcodes we hold, which will be encapsulated
156
        in _QueuedOpCodes
157

158
    """
159
    if not ops:
160
      # TODO: use a better exception
161
      raise Exception("No opcodes")
162

    
163
    self.queue = queue
164
    self.id = job_id
165
    self.ops = [_QueuedOpCode(op) for op in ops]
166
    self.run_op_index = -1
167
    self.log_serial = 0
168
    self.received_timestamp = TimeStampNow()
169
    self.start_timestamp = None
170
    self.end_timestamp = None
171

    
172
    # Condition to wait for changes
173
    self.change = threading.Condition(self.queue._lock)
174

    
175
  @classmethod
176
  def Restore(cls, queue, state):
177
    """Restore a _QueuedJob from serialized state:
178

179
    @type queue: L{JobQueue}
180
    @param queue: to which queue the restored job belongs
181
    @type state: dict
182
    @param state: the serialized state
183
    @rtype: _JobQueue
184
    @return: the restored _JobQueue instance
185

186
    """
187
    obj = _QueuedJob.__new__(cls)
188
    obj.queue = queue
189
    obj.id = state["id"]
190
    obj.run_op_index = state["run_op_index"]
191
    obj.received_timestamp = state.get("received_timestamp", None)
192
    obj.start_timestamp = state.get("start_timestamp", None)
193
    obj.end_timestamp = state.get("end_timestamp", None)
194

    
195
    obj.ops = []
196
    obj.log_serial = 0
197
    for op_state in state["ops"]:
198
      op = _QueuedOpCode.Restore(op_state)
199
      for log_entry in op.log:
200
        obj.log_serial = max(obj.log_serial, log_entry[0])
201
      obj.ops.append(op)
202

    
203
    # Condition to wait for changes
204
    obj.change = threading.Condition(obj.queue._lock)
205

    
206
    return obj
207

    
208
  def Serialize(self):
209
    """Serialize the _JobQueue instance.
210

211
    @rtype: dict
212
    @return: the serialized state
213

214
    """
215
    return {
216
      "id": self.id,
217
      "ops": [op.Serialize() for op in self.ops],
218
      "run_op_index": self.run_op_index,
219
      "start_timestamp": self.start_timestamp,
220
      "end_timestamp": self.end_timestamp,
221
      "received_timestamp": self.received_timestamp,
222
      }
223

    
224
  def CalcStatus(self):
225
    """Compute the status of this job.
226

227
    This function iterates over all the _QueuedOpCodes in the job and
228
    based on their status, computes the job status.
229

230
    The algorithm is:
231
      - if we find a cancelled, or finished with error, the job
232
        status will be the same
233
      - otherwise, the last opcode with the status one of:
234
          - waitlock
235
          - running
236

237
        will determine the job status
238

239
      - otherwise, it means either all opcodes are queued, or success,
240
        and the job status will be the same
241

242
    @return: the job status
243

244
    """
245
    status = constants.JOB_STATUS_QUEUED
246

    
247
    all_success = True
248
    for op in self.ops:
249
      if op.status == constants.OP_STATUS_SUCCESS:
250
        continue
251

    
252
      all_success = False
253

    
254
      if op.status == constants.OP_STATUS_QUEUED:
255
        pass
256
      elif op.status == constants.OP_STATUS_WAITLOCK:
257
        status = constants.JOB_STATUS_WAITLOCK
258
      elif op.status == constants.OP_STATUS_RUNNING:
259
        status = constants.JOB_STATUS_RUNNING
260
      elif op.status == constants.OP_STATUS_ERROR:
261
        status = constants.JOB_STATUS_ERROR
262
        # The whole job fails if one opcode failed
263
        break
264
      elif op.status == constants.OP_STATUS_CANCELED:
265
        status = constants.OP_STATUS_CANCELED
266
        break
267

    
268
    if all_success:
269
      status = constants.JOB_STATUS_SUCCESS
270

    
271
    return status
272

    
273
  def GetLogEntries(self, newer_than):
274
    """Selectively returns the log entries.
275

276
    @type newer_than: None or int
277
    @param newer_than: if this is None, return all log enties,
278
        otherwise return only the log entries with serial higher
279
        than this value
280
    @rtype: list
281
    @return: the list of the log entries selected
282

283
    """
284
    if newer_than is None:
285
      serial = -1
286
    else:
287
      serial = newer_than
288

    
289
    entries = []
290
    for op in self.ops:
291
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292

    
293
    return entries
294

    
295

    
296
class _JobQueueWorker(workerpool.BaseWorker):
297
  """The actual job workers.
298

299
  """
300
  def _NotifyStart(self):
301
    """Mark the opcode as running, not lock-waiting.
302

303
    This is called from the mcpu code as a notifier function, when the
304
    LU is finally about to start the Exec() method. Of course, to have
305
    end-user visible results, the opcode must be initially (before
306
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
307

308
    """
309
    assert self.queue, "Queue attribute is missing"
310
    assert self.opcode, "Opcode attribute is missing"
311

    
312
    self.queue.acquire()
313
    try:
314
      self.opcode.status = constants.OP_STATUS_RUNNING
315
    finally:
316
      self.queue.release()
317

    
318
  def RunTask(self, job):
319
    """Job executor.
320

321
    This functions processes a job. It is closely tied to the _QueuedJob and
322
    _QueuedOpCode classes.
323

324
    @type job: L{_QueuedJob}
325
    @param job: the job to be processed
326

327
    """
328
    logging.debug("Worker %s processing job %s",
329
                  self.worker_id, job.id)
330
    proc = mcpu.Processor(self.pool.queue.context)
331
    self.queue = queue = job.queue
332
    try:
333
      try:
334
        count = len(job.ops)
335
        for idx, op in enumerate(job.ops):
336
          try:
337
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
338

    
339
            queue.acquire()
340
            try:
341
              job.run_op_index = idx
342
              op.status = constants.OP_STATUS_WAITLOCK
343
              op.result = None
344
              op.start_timestamp = TimeStampNow()
345
              if idx == 0: # first opcode
346
                job.start_timestamp = op.start_timestamp
347
              queue.UpdateJobUnlocked(job)
348

    
349
              input_opcode = op.input
350
            finally:
351
              queue.release()
352

    
353
            def _Log(*args):
354
              """Append a log entry.
355

356
              """
357
              assert len(args) < 3
358

    
359
              if len(args) == 1:
360
                log_type = constants.ELOG_MESSAGE
361
                log_msg = args[0]
362
              else:
363
                log_type, log_msg = args
364

    
365
              # The time is split to make serialization easier and not lose
366
              # precision.
367
              timestamp = utils.SplitTime(time.time())
368

    
369
              queue.acquire()
370
              try:
371
                job.log_serial += 1
372
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
373

    
374
                job.change.notifyAll()
375
              finally:
376
                queue.release()
377

    
378
            # Make sure not to hold lock while _Log is called
379
            self.opcode = op
380
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
381

    
382
            queue.acquire()
383
            try:
384
              op.status = constants.OP_STATUS_SUCCESS
385
              op.result = result
386
              op.end_timestamp = TimeStampNow()
387
              queue.UpdateJobUnlocked(job)
388
            finally:
389
              queue.release()
390

    
391
            logging.debug("Op %s/%s: Successfully finished %s",
392
                          idx + 1, count, op)
393
          except Exception, err:
394
            queue.acquire()
395
            try:
396
              try:
397
                op.status = constants.OP_STATUS_ERROR
398
                op.result = str(err)
399
                op.end_timestamp = TimeStampNow()
400
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
401
              finally:
402
                queue.UpdateJobUnlocked(job)
403
            finally:
404
              queue.release()
405
            raise
406

    
407
      except errors.GenericError, err:
408
        logging.exception("Ganeti exception")
409
      except:
410
        logging.exception("Unhandled exception")
411
    finally:
412
      queue.acquire()
413
      try:
414
        try:
415
          job.run_op_idx = -1
416
          job.end_timestamp = TimeStampNow()
417
          queue.UpdateJobUnlocked(job)
418
        finally:
419
          job_id = job.id
420
          status = job.CalcStatus()
421
      finally:
422
        queue.release()
423
      logging.debug("Worker %s finished job %s, status = %s",
424
                    self.worker_id, job_id, status)
425

    
426

    
427
class _JobQueueWorkerPool(workerpool.WorkerPool):
428
  """Simple class implementing a job-processing workerpool.
429

430
  """
431
  def __init__(self, queue):
432
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
433
                                              _JobQueueWorker)
434
    self.queue = queue
435

    
436

    
437
class JobQueue(object):
438
  """Quue used to manaage the jobs.
439

440
  @cvar _RE_JOB_FILE: regex matching the valid job file names
441

442
  """
443
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444

    
445
  def _RequireOpenQueue(fn):
446
    """Decorator for "public" functions.
447

448
    This function should be used for all 'public' functions. That is,
449
    functions usually called from other classes.
450

451
    @warning: Use this decorator only after utils.LockedMethod!
452

453
    Example::
454
      @utils.LockedMethod
455
      @_RequireOpenQueue
456
      def Example(self):
457
        pass
458

459
    """
460
    def wrapper(self, *args, **kwargs):
461
      assert self._queue_lock is not None, "Queue should be open"
462
      return fn(self, *args, **kwargs)
463
    return wrapper
464

    
465
  def __init__(self, context):
466
    """Constructor for JobQueue.
467

468
    The constructor will initialize the job queue object and then
469
    start loading the current jobs from disk, either for starting them
470
    (if they were queue) or for aborting them (if they were already
471
    running).
472

473
    @type context: GanetiContext
474
    @param context: the context object for access to the configuration
475
        data and other ganeti objects
476

477
    """
478
    self.context = context
479
    self._memcache = weakref.WeakValueDictionary()
480
    self._my_hostname = utils.HostInfo().name
481

    
482
    # Locking
483
    self._lock = threading.Lock()
484
    self.acquire = self._lock.acquire
485
    self.release = self._lock.release
486

    
487
    # Initialize
488
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489

    
490
    # Read serial file
491
    self._last_serial = jstore.ReadSerial()
492
    assert self._last_serial is not None, ("Serial file was modified between"
493
                                           " check in jstore and here")
494

    
495
    # Get initial list of nodes
496
    self._nodes = dict((n.name, n.primary_ip)
497
                       for n in self.context.cfg.GetAllNodesInfo().values())
498

    
499
    # Remove master node
500
    try:
501
      del self._nodes[self._my_hostname]
502
    except ValueError:
503
      pass
504

    
505
    # TODO: Check consistency across nodes
506

    
507
    # Setup worker pool
508
    self._wpool = _JobQueueWorkerPool(self)
509
    try:
510
      # We need to lock here because WorkerPool.AddTask() may start a job while
511
      # we're still doing our work.
512
      self.acquire()
513
      try:
514
        logging.info("Inspecting job queue")
515

    
516
        all_job_ids = self._GetJobIDsUnlocked()
517
        lastinfo = time.time()
518
        for idx, job_id in enumerate(all_job_ids):
519
          # Give an update every 1000 jobs or 10 seconds
520
          if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0):
521
            jobs_count = len(all_job_ids)
522
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
523
                         idx, jobs_count, 100.0 * (idx + 1) / jobs_count)
524
            lastinfo = time.time()
525

    
526
          job = self._LoadJobUnlocked(job_id)
527

    
528
          # a failure in loading the job can cause 'None' to be returned
529
          if job is None:
530
            continue
531

    
532
          status = job.CalcStatus()
533

    
534
          if status in (constants.JOB_STATUS_QUEUED, ):
535
            self._wpool.AddTask(job)
536

    
537
          elif status in (constants.JOB_STATUS_RUNNING,
538
                          constants.JOB_STATUS_WAITLOCK):
539
            logging.warning("Unfinished job %s found: %s", job.id, job)
540
            try:
541
              for op in job.ops:
542
                op.status = constants.OP_STATUS_ERROR
543
                op.result = "Unclean master daemon shutdown"
544
            finally:
545
              self.UpdateJobUnlocked(job)
546

    
547
        logging.info("Job queue inspection finished")
548
      finally:
549
        self.release()
550
    except:
551
      self._wpool.TerminateWorkers()
552
      raise
553

    
554
  @utils.LockedMethod
555
  @_RequireOpenQueue
556
  def AddNode(self, node):
557
    """Register a new node with the queue.
558

559
    @type node: L{objects.Node}
560
    @param node: the node object to be added
561

562
    """
563
    node_name = node.name
564
    assert node_name != self._my_hostname
565

    
566
    # Clean queue directory on added node
567
    rpc.RpcRunner.call_jobqueue_purge(node_name)
568

    
569
    # Upload the whole queue excluding archived jobs
570
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
571

    
572
    # Upload current serial file
573
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
574

    
575
    for file_name in files:
576
      # Read file content
577
      fd = open(file_name, "r")
578
      try:
579
        content = fd.read()
580
      finally:
581
        fd.close()
582

    
583
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
584
                                                  [node.primary_ip],
585
                                                  file_name, content)
586
      if not result[node_name]:
587
        logging.error("Failed to upload %s to %s", file_name, node_name)
588

    
589
    self._nodes[node_name] = node.primary_ip
590

    
591
  @utils.LockedMethod
592
  @_RequireOpenQueue
593
  def RemoveNode(self, node_name):
594
    """Callback called when removing nodes from the cluster.
595

596
    @type node_name: str
597
    @param node_name: the name of the node to remove
598

599
    """
600
    try:
601
      # The queue is removed by the "leave node" RPC call.
602
      del self._nodes[node_name]
603
    except KeyError:
604
      pass
605

    
606
  def _CheckRpcResult(self, result, nodes, failmsg):
607
    """Verifies the status of an RPC call.
608

609
    Since we aim to keep consistency should this node (the current
610
    master) fail, we will log errors if our rpc fail, and especially
611
    log the case when more than half of the nodes failes.
612

613
    @param result: the data as returned from the rpc call
614
    @type nodes: list
615
    @param nodes: the list of nodes we made the call to
616
    @type failmsg: str
617
    @param failmsg: the identifier to be used for logging
618

619
    """
620
    failed = []
621
    success = []
622

    
623
    for node in nodes:
624
      if result[node]:
625
        success.append(node)
626
      else:
627
        failed.append(node)
628

    
629
    if failed:
630
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
631

    
632
    # +1 for the master node
633
    if (len(success) + 1) < len(failed):
634
      # TODO: Handle failing nodes
635
      logging.error("More than half of the nodes failed")
636

    
637
  def _GetNodeIp(self):
638
    """Helper for returning the node name/ip list.
639

640
    @rtype: (list, list)
641
    @return: a tuple of two lists, the first one with the node
642
        names and the second one with the node addresses
643

644
    """
645
    name_list = self._nodes.keys()
646
    addr_list = [self._nodes[name] for name in name_list]
647
    return name_list, addr_list
648

    
649
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
650
    """Writes a file locally and then replicates it to all nodes.
651

652
    This function will replace the contents of a file on the local
653
    node and then replicate it to all the other nodes we have.
654

655
    @type file_name: str
656
    @param file_name: the path of the file to be replicated
657
    @type data: str
658
    @param data: the new contents of the file
659

660
    """
661
    utils.WriteFile(file_name, data=data)
662

    
663
    names, addrs = self._GetNodeIp()
664
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
665
    self._CheckRpcResult(result, self._nodes,
666
                         "Updating %s" % file_name)
667

    
668
  def _RenameFileUnlocked(self, old, new):
669
    """Renames a file locally and then replicate the change.
670

671
    This function will rename a file in the local queue directory
672
    and then replicate this rename to all the other nodes we have.
673

674
    @type old: str
675
    @param old: the current name of the file
676
    @type new: str
677
    @param new: the new name of the file
678

679
    """
680
    os.rename(old, new)
681

    
682
    names, addrs = self._GetNodeIp()
683
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
684
    self._CheckRpcResult(result, self._nodes,
685
                         "Moving %s to %s" % (old, new))
686

    
687
  def _FormatJobID(self, job_id):
688
    """Convert a job ID to string format.
689

690
    Currently this just does C{str(job_id)} after performing some
691
    checks, but if we want to change the job id format this will
692
    abstract this change.
693

694
    @type job_id: int or long
695
    @param job_id: the numeric job id
696
    @rtype: str
697
    @return: the formatted job id
698

699
    """
700
    if not isinstance(job_id, (int, long)):
701
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
702
    if job_id < 0:
703
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
704

    
705
    return str(job_id)
706

    
707
  def _NewSerialUnlocked(self):
708
    """Generates a new job identifier.
709

710
    Job identifiers are unique during the lifetime of a cluster.
711

712
    @rtype: str
713
    @return: a string representing the job identifier.
714

715
    """
716
    # New number
717
    serial = self._last_serial + 1
718

    
719
    # Write to file
720
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
721
                                        "%s\n" % serial)
722

    
723
    # Keep it only if we were able to write the file
724
    self._last_serial = serial
725

    
726
    return self._FormatJobID(serial)
727

    
728
  @staticmethod
729
  def _GetJobPath(job_id):
730
    """Returns the job file for a given job id.
731

732
    @type job_id: str
733
    @param job_id: the job identifier
734
    @rtype: str
735
    @return: the path to the job file
736

737
    """
738
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
739

    
740
  @staticmethod
741
  def _GetArchivedJobPath(job_id):
742
    """Returns the archived job file for a give job id.
743

744
    @type job_id: str
745
    @param job_id: the job identifier
746
    @rtype: str
747
    @return: the path to the archived job file
748

749
    """
750
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
751

    
752
  @classmethod
753
  def _ExtractJobID(cls, name):
754
    """Extract the job id from a filename.
755

756
    @type name: str
757
    @param name: the job filename
758
    @rtype: job id or None
759
    @return: the job id corresponding to the given filename,
760
        or None if the filename does not represent a valid
761
        job file
762

763
    """
764
    m = cls._RE_JOB_FILE.match(name)
765
    if m:
766
      return m.group(1)
767
    else:
768
      return None
769

    
770
  def _GetJobIDsUnlocked(self, archived=False):
771
    """Return all known job IDs.
772

773
    If the parameter archived is True, archived jobs IDs will be
774
    included. Currently this argument is unused.
775

776
    The method only looks at disk because it's a requirement that all
777
    jobs are present on disk (so in the _memcache we don't have any
778
    extra IDs).
779

780
    @rtype: list
781
    @return: the list of job IDs
782

783
    """
784
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
785
    jlist = utils.NiceSort(jlist)
786
    return jlist
787

    
788
  def _ListJobFiles(self):
789
    """Returns the list of current job files.
790

791
    @rtype: list
792
    @return: the list of job file names
793

794
    """
795
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
796
            if self._RE_JOB_FILE.match(name)]
797

    
798
  def _LoadJobUnlocked(self, job_id):
799
    """Loads a job from the disk or memory.
800

801
    Given a job id, this will return the cached job object if
802
    existing, or try to load the job from the disk. If loading from
803
    disk, it will also add the job to the cache.
804

805
    @param job_id: the job id
806
    @rtype: L{_QueuedJob} or None
807
    @return: either None or the job object
808

809
    """
810
    job = self._memcache.get(job_id, None)
811
    if job:
812
      logging.debug("Found job %s in memcache", job_id)
813
      return job
814

    
815
    filepath = self._GetJobPath(job_id)
816
    logging.debug("Loading job from %s", filepath)
817
    try:
818
      fd = open(filepath, "r")
819
    except IOError, err:
820
      if err.errno in (errno.ENOENT, ):
821
        return None
822
      raise
823
    try:
824
      data = serializer.LoadJson(fd.read())
825
    finally:
826
      fd.close()
827

    
828
    try:
829
      job = _QueuedJob.Restore(self, data)
830
    except Exception, err:
831
      new_path = self._GetArchivedJobPath(job_id)
832
      if filepath == new_path:
833
        # job already archived (future case)
834
        logging.exception("Can't parse job %s", job_id)
835
      else:
836
        # non-archived case
837
        logging.exception("Can't parse job %s, will archive.", job_id)
838
        self._RenameFileUnlocked(filepath, new_path)
839
      return None
840

    
841
    self._memcache[job_id] = job
842
    logging.debug("Added job %s to the cache", job_id)
843
    return job
844

    
845
  def _GetJobsUnlocked(self, job_ids):
846
    """Return a list of jobs based on their IDs.
847

848
    @type job_ids: list
849
    @param job_ids: either an empty list (meaning all jobs),
850
        or a list of job IDs
851
    @rtype: list
852
    @return: the list of job objects
853

854
    """
855
    if not job_ids:
856
      job_ids = self._GetJobIDsUnlocked()
857

    
858
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
859

    
860
  @staticmethod
861
  def _IsQueueMarkedDrain():
862
    """Check if the queue is marked from drain.
863

864
    This currently uses the queue drain file, which makes it a
865
    per-node flag. In the future this can be moved to the config file.
866

867
    @rtype: boolean
868
    @return: True of the job queue is marked for draining
869

870
    """
871
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
872

    
873
  @staticmethod
874
  def SetDrainFlag(drain_flag):
875
    """Sets the drain flag for the queue.
876

877
    This is similar to the function L{backend.JobQueueSetDrainFlag},
878
    and in the future we might merge them.
879

880
    @type drain_flag: boolean
881
    @param drain_flag: wheter to set or unset the drain flag
882

883
    """
884
    if drain_flag:
885
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
886
    else:
887
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
888
    return True
889

    
890
  @utils.LockedMethod
891
  @_RequireOpenQueue
892
  def SubmitJob(self, ops):
893
    """Create and store a new job.
894

895
    This enters the job into our job queue and also puts it on the new
896
    queue, in order for it to be picked up by the queue processors.
897

898
    @type ops: list
899
    @param ops: The list of OpCodes that will become the new job.
900
    @rtype: job ID
901
    @return: the job ID of the newly created job
902
    @raise errors.JobQueueDrainError: if the job is marked for draining
903

904
    """
905
    if self._IsQueueMarkedDrain():
906
      raise errors.JobQueueDrainError()
907
    # Get job identifier
908
    job_id = self._NewSerialUnlocked()
909
    job = _QueuedJob(self, job_id, ops)
910

    
911
    # Write to disk
912
    self.UpdateJobUnlocked(job)
913

    
914
    logging.debug("Adding new job %s to the cache", job_id)
915
    self._memcache[job_id] = job
916

    
917
    # Add to worker pool
918
    self._wpool.AddTask(job)
919

    
920
    return job.id
921

    
922
  @_RequireOpenQueue
923
  def UpdateJobUnlocked(self, job):
924
    """Update a job's on disk storage.
925

926
    After a job has been modified, this function needs to be called in
927
    order to write the changes to disk and replicate them to the other
928
    nodes.
929

930
    @type job: L{_QueuedJob}
931
    @param job: the changed job
932

933
    """
934
    filename = self._GetJobPath(job.id)
935
    data = serializer.DumpJson(job.Serialize(), indent=False)
936
    logging.debug("Writing job %s to %s", job.id, filename)
937
    self._WriteAndReplicateFileUnlocked(filename, data)
938

    
939
    # Notify waiters about potential changes
940
    job.change.notifyAll()
941

    
942
  @utils.LockedMethod
943
  @_RequireOpenQueue
944
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
945
                        timeout):
946
    """Waits for changes in a job.
947

948
    @type job_id: string
949
    @param job_id: Job identifier
950
    @type fields: list of strings
951
    @param fields: Which fields to check for changes
952
    @type prev_job_info: list or None
953
    @param prev_job_info: Last job information returned
954
    @type prev_log_serial: int
955
    @param prev_log_serial: Last job message serial number
956
    @type timeout: float
957
    @param timeout: maximum time to wait
958
    @rtype: tuple (job info, log entries)
959
    @return: a tuple of the job information as required via
960
        the fields parameter, and the log entries as a list
961

962
        if the job has not changed and the timeout has expired,
963
        we instead return a special value,
964
        L{constants.JOB_NOTCHANGED}, which should be interpreted
965
        as such by the clients
966

967
    """
968
    logging.debug("Waiting for changes in job %s", job_id)
969
    end_time = time.time() + timeout
970
    while True:
971
      delta_time = end_time - time.time()
972
      if delta_time < 0:
973
        return constants.JOB_NOTCHANGED
974

    
975
      job = self._LoadJobUnlocked(job_id)
976
      if not job:
977
        logging.debug("Job %s not found", job_id)
978
        break
979

    
980
      status = job.CalcStatus()
981
      job_info = self._GetJobInfoUnlocked(job, fields)
982
      log_entries = job.GetLogEntries(prev_log_serial)
983

    
984
      # Serializing and deserializing data can cause type changes (e.g. from
985
      # tuple to list) or precision loss. We're doing it here so that we get
986
      # the same modifications as the data received from the client. Without
987
      # this, the comparison afterwards might fail without the data being
988
      # significantly different.
989
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
990
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
991

    
992
      if status not in (constants.JOB_STATUS_QUEUED,
993
                        constants.JOB_STATUS_RUNNING,
994
                        constants.JOB_STATUS_WAITLOCK):
995
        # Don't even try to wait if the job is no longer running, there will be
996
        # no changes.
997
        break
998

    
999
      if (prev_job_info != job_info or
1000
          (log_entries and prev_log_serial != log_entries[0][0])):
1001
        break
1002

    
1003
      logging.debug("Waiting again")
1004

    
1005
      # Release the queue lock while waiting
1006
      job.change.wait(delta_time)
1007

    
1008
    logging.debug("Job %s changed", job_id)
1009

    
1010
    return (job_info, log_entries)
1011

    
1012
  @utils.LockedMethod
1013
  @_RequireOpenQueue
1014
  def CancelJob(self, job_id):
1015
    """Cancels a job.
1016

1017
    This will only succeed if the job has not started yet.
1018

1019
    @type job_id: string
1020
    @param job_id: job ID of job to be cancelled.
1021

1022
    """
1023
    logging.debug("Cancelling job %s", job_id)
1024

    
1025
    job = self._LoadJobUnlocked(job_id)
1026
    if not job:
1027
      logging.debug("Job %s not found", job_id)
1028
      return
1029

    
1030
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1031
      logging.debug("Job %s is no longer in the queue", job.id)
1032
      return
1033

    
1034
    try:
1035
      for op in job.ops:
1036
        op.status = constants.OP_STATUS_ERROR
1037
        op.result = "Job cancelled by request"
1038
    finally:
1039
      self.UpdateJobUnlocked(job)
1040

    
1041
  @_RequireOpenQueue
1042
  def _ArchiveJobUnlocked(self, job_id):
1043
    """Archives a job.
1044

1045
    @type job_id: string
1046
    @param job_id: Job ID of job to be archived.
1047

1048
    """
1049
    logging.info("Archiving job %s", job_id)
1050

    
1051
    job = self._LoadJobUnlocked(job_id)
1052
    if not job:
1053
      logging.debug("Job %s not found", job_id)
1054
      return
1055

    
1056
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1057
                                constants.JOB_STATUS_SUCCESS,
1058
                                constants.JOB_STATUS_ERROR):
1059
      logging.debug("Job %s is not yet done", job.id)
1060
      return
1061

    
1062
    old = self._GetJobPath(job.id)
1063
    new = self._GetArchivedJobPath(job.id)
1064

    
1065
    self._RenameFileUnlocked(old, new)
1066

    
1067
    logging.debug("Successfully archived job %s", job.id)
1068

    
1069
  @utils.LockedMethod
1070
  @_RequireOpenQueue
1071
  def ArchiveJob(self, job_id):
1072
    """Archives a job.
1073

1074
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1075

1076
    @type job_id: string
1077
    @param job_id: Job ID of job to be archived.
1078

1079
    """
1080
    return self._ArchiveJobUnlocked(job_id)
1081

    
1082
  @utils.LockedMethod
1083
  @_RequireOpenQueue
1084
  def AutoArchiveJobs(self, age):
1085
    """Archives all jobs based on age.
1086

1087
    The method will archive all jobs which are older than the age
1088
    parameter. For jobs that don't have an end timestamp, the start
1089
    timestamp will be considered. The special '-1' age will cause
1090
    archival of all jobs (that are not running or queued).
1091

1092
    @type age: int
1093
    @param age: the minimum age in seconds
1094

1095
    """
1096
    logging.info("Archiving jobs with age more than %s seconds", age)
1097

    
1098
    now = time.time()
1099
    for jid in self._GetJobIDsUnlocked(archived=False):
1100
      job = self._LoadJobUnlocked(jid)
1101
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1102
                                  constants.OP_STATUS_ERROR,
1103
                                  constants.OP_STATUS_CANCELED):
1104
        continue
1105
      if job.end_timestamp is None:
1106
        if job.start_timestamp is None:
1107
          job_age = job.received_timestamp
1108
        else:
1109
          job_age = job.start_timestamp
1110
      else:
1111
        job_age = job.end_timestamp
1112

    
1113
      if age == -1 or now - job_age[0] > age:
1114
        self._ArchiveJobUnlocked(jid)
1115

    
1116
  def _GetJobInfoUnlocked(self, job, fields):
1117
    """Returns information about a job.
1118

1119
    @type job: L{_QueuedJob}
1120
    @param job: the job which we query
1121
    @type fields: list
1122
    @param fields: names of fields to return
1123
    @rtype: list
1124
    @return: list with one element for each field
1125
    @raise errors.OpExecError: when an invalid field
1126
        has been passed
1127

1128
    """
1129
    row = []
1130
    for fname in fields:
1131
      if fname == "id":
1132
        row.append(job.id)
1133
      elif fname == "status":
1134
        row.append(job.CalcStatus())
1135
      elif fname == "ops":
1136
        row.append([op.input.__getstate__() for op in job.ops])
1137
      elif fname == "opresult":
1138
        row.append([op.result for op in job.ops])
1139
      elif fname == "opstatus":
1140
        row.append([op.status for op in job.ops])
1141
      elif fname == "oplog":
1142
        row.append([op.log for op in job.ops])
1143
      elif fname == "opstart":
1144
        row.append([op.start_timestamp for op in job.ops])
1145
      elif fname == "opend":
1146
        row.append([op.end_timestamp for op in job.ops])
1147
      elif fname == "received_ts":
1148
        row.append(job.received_timestamp)
1149
      elif fname == "start_ts":
1150
        row.append(job.start_timestamp)
1151
      elif fname == "end_ts":
1152
        row.append(job.end_timestamp)
1153
      elif fname == "summary":
1154
        row.append([op.input.Summary() for op in job.ops])
1155
      else:
1156
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1157
    return row
1158

    
1159
  @utils.LockedMethod
1160
  @_RequireOpenQueue
1161
  def QueryJobs(self, job_ids, fields):
1162
    """Returns a list of jobs in queue.
1163

1164
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1165
    processing for each job.
1166

1167
    @type job_ids: list
1168
    @param job_ids: sequence of job identifiers or None for all
1169
    @type fields: list
1170
    @param fields: names of fields to return
1171
    @rtype: list
1172
    @return: list one element per job, each element being list with
1173
        the requested fields
1174

1175
    """
1176
    jobs = []
1177

    
1178
    for job in self._GetJobsUnlocked(job_ids):
1179
      if job is None:
1180
        jobs.append(None)
1181
      else:
1182
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1183

    
1184
    return jobs
1185

    
1186
  @utils.LockedMethod
1187
  @_RequireOpenQueue
1188
  def Shutdown(self):
1189
    """Stops the job queue.
1190

1191
    This shutdowns all the worker threads an closes the queue.
1192

1193
    """
1194
    self._wpool.TerminateWorkers()
1195

    
1196
    self._queue_lock.Close()
1197
    self._queue_lock = None