Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 16714921

History | View | Annotate | Download (33.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50
JOBQUEUE_THREADS = 25
51

    
52

    
53
def TimeStampNow():
54
  """Returns the current timestamp.
55

56
  @rtype: tuple
57
  @return: the current time in the (seconds, microseconds) format
58

59
  """
60
  return utils.SplitTime(time.time())
61

    
62

    
63
class _QueuedOpCode(object):
64
  """Encasulates an opcode object.
65

66
  @ivar log: holds the execution log and consists of tuples
67
  of the form C{(log_serial, timestamp, level, message)}
68
  @ivar input: the OpCode we encapsulate
69
  @ivar status: the current status
70
  @ivar result: the result of the LU execution
71
  @ivar start_timestamp: timestamp for the start of the execution
72
  @ivar stop_timestamp: timestamp for the end of the execution
73

74
  """
75
  def __init__(self, op):
76
    """Constructor for the _QuededOpCode.
77

78
    @type op: L{opcodes.OpCode}
79
    @param op: the opcode we encapsulate
80

81
    """
82
    self.input = op
83
    self.status = constants.OP_STATUS_QUEUED
84
    self.result = None
85
    self.log = []
86
    self.start_timestamp = None
87
    self.end_timestamp = None
88

    
89
  @classmethod
90
  def Restore(cls, state):
91
    """Restore the _QueuedOpCode from the serialized form.
92

93
    @type state: dict
94
    @param state: the serialized state
95
    @rtype: _QueuedOpCode
96
    @return: a new _QueuedOpCode instance
97

98
    """
99
    obj = _QueuedOpCode.__new__(cls)
100
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101
    obj.status = state["status"]
102
    obj.result = state["result"]
103
    obj.log = state["log"]
104
    obj.start_timestamp = state.get("start_timestamp", None)
105
    obj.end_timestamp = state.get("end_timestamp", None)
106
    return obj
107

    
108
  def Serialize(self):
109
    """Serializes this _QueuedOpCode.
110

111
    @rtype: dict
112
    @return: the dictionary holding the serialized state
113

114
    """
115
    return {
116
      "input": self.input.__getstate__(),
117
      "status": self.status,
118
      "result": self.result,
119
      "log": self.log,
120
      "start_timestamp": self.start_timestamp,
121
      "end_timestamp": self.end_timestamp,
122
      }
123

    
124

    
125
class _QueuedJob(object):
126
  """In-memory job representation.
127

128
  This is what we use to track the user-submitted jobs. Locking must
129
  be taken care of by users of this class.
130

131
  @type queue: L{JobQueue}
132
  @ivar queue: the parent queue
133
  @ivar id: the job ID
134
  @type ops: list
135
  @ivar ops: the list of _QueuedOpCode that constitute the job
136
  @type run_op_index: int
137
  @ivar run_op_index: the currently executing opcode, or -1 if
138
      we didn't yet start executing
139
  @type log_serial: int
140
  @ivar log_serial: holds the index for the next log entry
141
  @ivar received_timestamp: the timestamp for when the job was received
142
  @ivar start_timestmap: the timestamp for start of execution
143
  @ivar end_timestamp: the timestamp for end of execution
144
  @ivar change: a Condition variable we use for waiting for job changes
145

146
  """
147
  def __init__(self, queue, job_id, ops):
148
    """Constructor for the _QueuedJob.
149

150
    @type queue: L{JobQueue}
151
    @param queue: our parent queue
152
    @type job_id: job_id
153
    @param job_id: our job id
154
    @type ops: list
155
    @param ops: the list of opcodes we hold, which will be encapsulated
156
        in _QueuedOpCodes
157

158
    """
159
    if not ops:
160
      # TODO: use a better exception
161
      raise Exception("No opcodes")
162

    
163
    self.queue = queue
164
    self.id = job_id
165
    self.ops = [_QueuedOpCode(op) for op in ops]
166
    self.run_op_index = -1
167
    self.log_serial = 0
168
    self.received_timestamp = TimeStampNow()
169
    self.start_timestamp = None
170
    self.end_timestamp = None
171

    
172
    # Condition to wait for changes
173
    self.change = threading.Condition(self.queue._lock)
174

    
175
  @classmethod
176
  def Restore(cls, queue, state):
177
    """Restore a _QueuedJob from serialized state:
178

179
    @type queue: L{JobQueue}
180
    @param queue: to which queue the restored job belongs
181
    @type state: dict
182
    @param state: the serialized state
183
    @rtype: _JobQueue
184
    @return: the restored _JobQueue instance
185

186
    """
187
    obj = _QueuedJob.__new__(cls)
188
    obj.queue = queue
189
    obj.id = state["id"]
190
    obj.run_op_index = state["run_op_index"]
191
    obj.received_timestamp = state.get("received_timestamp", None)
192
    obj.start_timestamp = state.get("start_timestamp", None)
193
    obj.end_timestamp = state.get("end_timestamp", None)
194

    
195
    obj.ops = []
196
    obj.log_serial = 0
197
    for op_state in state["ops"]:
198
      op = _QueuedOpCode.Restore(op_state)
199
      for log_entry in op.log:
200
        obj.log_serial = max(obj.log_serial, log_entry[0])
201
      obj.ops.append(op)
202

    
203
    # Condition to wait for changes
204
    obj.change = threading.Condition(obj.queue._lock)
205

    
206
    return obj
207

    
208
  def Serialize(self):
209
    """Serialize the _JobQueue instance.
210

211
    @rtype: dict
212
    @return: the serialized state
213

214
    """
215
    return {
216
      "id": self.id,
217
      "ops": [op.Serialize() for op in self.ops],
218
      "run_op_index": self.run_op_index,
219
      "start_timestamp": self.start_timestamp,
220
      "end_timestamp": self.end_timestamp,
221
      "received_timestamp": self.received_timestamp,
222
      }
223

    
224
  def CalcStatus(self):
225
    """Compute the status of this job.
226

227
    This function iterates over all the _QueuedOpCodes in the job and
228
    based on their status, computes the job status.
229

230
    The algorithm is:
231
      - if we find a cancelled, or finished with error, the job
232
        status will be the same
233
      - otherwise, the last opcode with the status one of:
234
          - waitlock
235
          - running
236

237
        will determine the job status
238

239
      - otherwise, it means either all opcodes are queued, or success,
240
        and the job status will be the same
241

242
    @return: the job status
243

244
    """
245
    status = constants.JOB_STATUS_QUEUED
246

    
247
    all_success = True
248
    for op in self.ops:
249
      if op.status == constants.OP_STATUS_SUCCESS:
250
        continue
251

    
252
      all_success = False
253

    
254
      if op.status == constants.OP_STATUS_QUEUED:
255
        pass
256
      elif op.status == constants.OP_STATUS_WAITLOCK:
257
        status = constants.JOB_STATUS_WAITLOCK
258
      elif op.status == constants.OP_STATUS_RUNNING:
259
        status = constants.JOB_STATUS_RUNNING
260
      elif op.status == constants.OP_STATUS_ERROR:
261
        status = constants.JOB_STATUS_ERROR
262
        # The whole job fails if one opcode failed
263
        break
264
      elif op.status == constants.OP_STATUS_CANCELED:
265
        status = constants.OP_STATUS_CANCELED
266
        break
267

    
268
    if all_success:
269
      status = constants.JOB_STATUS_SUCCESS
270

    
271
    return status
272

    
273
  def GetLogEntries(self, newer_than):
274
    """Selectively returns the log entries.
275

276
    @type newer_than: None or int
277
    @param newer_than: if this is None, return all log enties,
278
        otherwise return only the log entries with serial higher
279
        than this value
280
    @rtype: list
281
    @return: the list of the log entries selected
282

283
    """
284
    if newer_than is None:
285
      serial = -1
286
    else:
287
      serial = newer_than
288

    
289
    entries = []
290
    for op in self.ops:
291
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292

    
293
    return entries
294

    
295

    
296
class _JobQueueWorker(workerpool.BaseWorker):
297
  """The actual job workers.
298

299
  """
300
  def _NotifyStart(self):
301
    """Mark the opcode as running, not lock-waiting.
302

303
    This is called from the mcpu code as a notifier function, when the
304
    LU is finally about to start the Exec() method. Of course, to have
305
    end-user visible results, the opcode must be initially (before
306
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
307

308
    """
309
    assert self.queue, "Queue attribute is missing"
310
    assert self.opcode, "Opcode attribute is missing"
311

    
312
    self.queue.acquire()
313
    try:
314
      self.opcode.status = constants.OP_STATUS_RUNNING
315
    finally:
316
      self.queue.release()
317

    
318
  def RunTask(self, job):
319
    """Job executor.
320

321
    This functions processes a job. It is closely tied to the _QueuedJob and
322
    _QueuedOpCode classes.
323

324
    @type job: L{_QueuedJob}
325
    @param job: the job to be processed
326

327
    """
328
    logging.debug("Worker %s processing job %s",
329
                  self.worker_id, job.id)
330
    proc = mcpu.Processor(self.pool.queue.context)
331
    self.queue = queue = job.queue
332
    try:
333
      try:
334
        count = len(job.ops)
335
        for idx, op in enumerate(job.ops):
336
          try:
337
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
338

    
339
            queue.acquire()
340
            try:
341
              job.run_op_index = idx
342
              op.status = constants.OP_STATUS_WAITLOCK
343
              op.result = None
344
              op.start_timestamp = TimeStampNow()
345
              if idx == 0: # first opcode
346
                job.start_timestamp = op.start_timestamp
347
              queue.UpdateJobUnlocked(job)
348

    
349
              input_opcode = op.input
350
            finally:
351
              queue.release()
352

    
353
            def _Log(*args):
354
              """Append a log entry.
355

356
              """
357
              assert len(args) < 3
358

    
359
              if len(args) == 1:
360
                log_type = constants.ELOG_MESSAGE
361
                log_msg = args[0]
362
              else:
363
                log_type, log_msg = args
364

    
365
              # The time is split to make serialization easier and not lose
366
              # precision.
367
              timestamp = utils.SplitTime(time.time())
368

    
369
              queue.acquire()
370
              try:
371
                job.log_serial += 1
372
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
373

    
374
                job.change.notifyAll()
375
              finally:
376
                queue.release()
377

    
378
            # Make sure not to hold lock while _Log is called
379
            self.opcode = op
380
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
381

    
382
            queue.acquire()
383
            try:
384
              op.status = constants.OP_STATUS_SUCCESS
385
              op.result = result
386
              op.end_timestamp = TimeStampNow()
387
              queue.UpdateJobUnlocked(job)
388
            finally:
389
              queue.release()
390

    
391
            logging.debug("Op %s/%s: Successfully finished %s",
392
                          idx + 1, count, op)
393
          except Exception, err:
394
            queue.acquire()
395
            try:
396
              try:
397
                op.status = constants.OP_STATUS_ERROR
398
                op.result = str(err)
399
                op.end_timestamp = TimeStampNow()
400
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
401
              finally:
402
                queue.UpdateJobUnlocked(job)
403
            finally:
404
              queue.release()
405
            raise
406

    
407
      except errors.GenericError, err:
408
        logging.exception("Ganeti exception")
409
      except:
410
        logging.exception("Unhandled exception")
411
    finally:
412
      queue.acquire()
413
      try:
414
        try:
415
          job.run_op_idx = -1
416
          job.end_timestamp = TimeStampNow()
417
          queue.UpdateJobUnlocked(job)
418
        finally:
419
          job_id = job.id
420
          status = job.CalcStatus()
421
      finally:
422
        queue.release()
423
      logging.debug("Worker %s finished job %s, status = %s",
424
                    self.worker_id, job_id, status)
425

    
426

    
427
class _JobQueueWorkerPool(workerpool.WorkerPool):
428
  """Simple class implementing a job-processing workerpool.
429

430
  """
431
  def __init__(self, queue):
432
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
433
                                              _JobQueueWorker)
434
    self.queue = queue
435

    
436

    
437
class JobQueue(object):
438
  """Quue used to manaage the jobs.
439

440
  @cvar _RE_JOB_FILE: regex matching the valid job file names
441

442
  """
443
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444

    
445
  def _RequireOpenQueue(fn):
446
    """Decorator for "public" functions.
447

448
    This function should be used for all 'public' functions. That is,
449
    functions usually called from other classes.
450

451
    @warning: Use this decorator only after utils.LockedMethod!
452

453
    Example::
454
      @utils.LockedMethod
455
      @_RequireOpenQueue
456
      def Example(self):
457
        pass
458

459
    """
460
    def wrapper(self, *args, **kwargs):
461
      assert self._queue_lock is not None, "Queue should be open"
462
      return fn(self, *args, **kwargs)
463
    return wrapper
464

    
465
  def __init__(self, context):
466
    """Constructor for JobQueue.
467

468
    The constructor will initialize the job queue object and then
469
    start loading the current jobs from disk, either for starting them
470
    (if they were queue) or for aborting them (if they were already
471
    running).
472

473
    @type context: GanetiContext
474
    @param context: the context object for access to the configuration
475
        data and other ganeti objects
476

477
    """
478
    self.context = context
479
    self._memcache = weakref.WeakValueDictionary()
480
    self._my_hostname = utils.HostInfo().name
481

    
482
    # Locking
483
    self._lock = threading.Lock()
484
    self.acquire = self._lock.acquire
485
    self.release = self._lock.release
486

    
487
    # Initialize
488
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489

    
490
    # Read serial file
491
    self._last_serial = jstore.ReadSerial()
492
    assert self._last_serial is not None, ("Serial file was modified between"
493
                                           " check in jstore and here")
494

    
495
    # Get initial list of nodes
496
    self._nodes = dict((n.name, n.primary_ip)
497
                       for n in self.context.cfg.GetAllNodesInfo().values())
498

    
499
    # Remove master node
500
    try:
501
      del self._nodes[self._my_hostname]
502
    except ValueError:
503
      pass
504

    
505
    # TODO: Check consistency across nodes
506

    
507
    # Setup worker pool
508
    self._wpool = _JobQueueWorkerPool(self)
509
    try:
510
      # We need to lock here because WorkerPool.AddTask() may start a job while
511
      # we're still doing our work.
512
      self.acquire()
513
      try:
514
        for job in self._GetJobsUnlocked(None):
515
          # a failure in loading the job can cause 'None' to be returned
516
          if job is None:
517
            continue
518

    
519
          status = job.CalcStatus()
520

    
521
          if status in (constants.JOB_STATUS_QUEUED, ):
522
            self._wpool.AddTask(job)
523

    
524
          elif status in (constants.JOB_STATUS_RUNNING,
525
                          constants.JOB_STATUS_WAITLOCK):
526
            logging.warning("Unfinished job %s found: %s", job.id, job)
527
            try:
528
              for op in job.ops:
529
                op.status = constants.OP_STATUS_ERROR
530
                op.result = "Unclean master daemon shutdown"
531
            finally:
532
              self.UpdateJobUnlocked(job)
533
      finally:
534
        self.release()
535
    except:
536
      self._wpool.TerminateWorkers()
537
      raise
538

    
539
  @utils.LockedMethod
540
  @_RequireOpenQueue
541
  def AddNode(self, node):
542
    """Register a new node with the queue.
543

544
    @type node: L{objects.Node}
545
    @param node: the node object to be added
546

547
    """
548
    node_name = node.name
549
    assert node_name != self._my_hostname
550

    
551
    # Clean queue directory on added node
552
    rpc.RpcRunner.call_jobqueue_purge(node_name)
553

    
554
    # Upload the whole queue excluding archived jobs
555
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
556

    
557
    # Upload current serial file
558
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
559

    
560
    for file_name in files:
561
      # Read file content
562
      fd = open(file_name, "r")
563
      try:
564
        content = fd.read()
565
      finally:
566
        fd.close()
567

    
568
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
569
                                                  [node.primary_ip],
570
                                                  file_name, content)
571
      if not result[node_name]:
572
        logging.error("Failed to upload %s to %s", file_name, node_name)
573

    
574
    self._nodes[node_name] = node.primary_ip
575

    
576
  @utils.LockedMethod
577
  @_RequireOpenQueue
578
  def RemoveNode(self, node_name):
579
    """Callback called when removing nodes from the cluster.
580

581
    @type node_name: str
582
    @param node_name: the name of the node to remove
583

584
    """
585
    try:
586
      # The queue is removed by the "leave node" RPC call.
587
      del self._nodes[node_name]
588
    except KeyError:
589
      pass
590

    
591
  def _CheckRpcResult(self, result, nodes, failmsg):
592
    """Verifies the status of an RPC call.
593

594
    Since we aim to keep consistency should this node (the current
595
    master) fail, we will log errors if our rpc fail, and especially
596
    log the case when more than half of the nodes failes.
597

598
    @param result: the data as returned from the rpc call
599
    @type nodes: list
600
    @param nodes: the list of nodes we made the call to
601
    @type failmsg: str
602
    @param failmsg: the identifier to be used for logging
603

604
    """
605
    failed = []
606
    success = []
607

    
608
    for node in nodes:
609
      if result[node]:
610
        success.append(node)
611
      else:
612
        failed.append(node)
613

    
614
    if failed:
615
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
616

    
617
    # +1 for the master node
618
    if (len(success) + 1) < len(failed):
619
      # TODO: Handle failing nodes
620
      logging.error("More than half of the nodes failed")
621

    
622
  def _GetNodeIp(self):
623
    """Helper for returning the node name/ip list.
624

625
    @rtype: (list, list)
626
    @return: a tuple of two lists, the first one with the node
627
        names and the second one with the node addresses
628

629
    """
630
    name_list = self._nodes.keys()
631
    addr_list = [self._nodes[name] for name in name_list]
632
    return name_list, addr_list
633

    
634
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
635
    """Writes a file locally and then replicates it to all nodes.
636

637
    This function will replace the contents of a file on the local
638
    node and then replicate it to all the other nodes we have.
639

640
    @type file_name: str
641
    @param file_name: the path of the file to be replicated
642
    @type data: str
643
    @param data: the new contents of the file
644

645
    """
646
    utils.WriteFile(file_name, data=data)
647

    
648
    names, addrs = self._GetNodeIp()
649
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
650
    self._CheckRpcResult(result, self._nodes,
651
                         "Updating %s" % file_name)
652

    
653
  def _RenameFileUnlocked(self, old, new):
654
    """Renames a file locally and then replicate the change.
655

656
    This function will rename a file in the local queue directory
657
    and then replicate this rename to all the other nodes we have.
658

659
    @type old: str
660
    @param old: the current name of the file
661
    @type new: str
662
    @param new: the new name of the file
663

664
    """
665
    os.rename(old, new)
666

    
667
    names, addrs = self._GetNodeIp()
668
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
669
    self._CheckRpcResult(result, self._nodes,
670
                         "Moving %s to %s" % (old, new))
671

    
672
  def _FormatJobID(self, job_id):
673
    """Convert a job ID to string format.
674

675
    Currently this just does C{str(job_id)} after performing some
676
    checks, but if we want to change the job id format this will
677
    abstract this change.
678

679
    @type job_id: int or long
680
    @param job_id: the numeric job id
681
    @rtype: str
682
    @return: the formatted job id
683

684
    """
685
    if not isinstance(job_id, (int, long)):
686
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
687
    if job_id < 0:
688
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
689

    
690
    return str(job_id)
691

    
692
  def _NewSerialUnlocked(self):
693
    """Generates a new job identifier.
694

695
    Job identifiers are unique during the lifetime of a cluster.
696

697
    @rtype: str
698
    @return: a string representing the job identifier.
699

700
    """
701
    # New number
702
    serial = self._last_serial + 1
703

    
704
    # Write to file
705
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
706
                                        "%s\n" % serial)
707

    
708
    # Keep it only if we were able to write the file
709
    self._last_serial = serial
710

    
711
    return self._FormatJobID(serial)
712

    
713
  @staticmethod
714
  def _GetJobPath(job_id):
715
    """Returns the job file for a given job id.
716

717
    @type job_id: str
718
    @param job_id: the job identifier
719
    @rtype: str
720
    @return: the path to the job file
721

722
    """
723
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
724

    
725
  @staticmethod
726
  def _GetArchivedJobPath(job_id):
727
    """Returns the archived job file for a give job id.
728

729
    @type job_id: str
730
    @param job_id: the job identifier
731
    @rtype: str
732
    @return: the path to the archived job file
733

734
    """
735
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
736

    
737
  @classmethod
738
  def _ExtractJobID(cls, name):
739
    """Extract the job id from a filename.
740

741
    @type name: str
742
    @param name: the job filename
743
    @rtype: job id or None
744
    @return: the job id corresponding to the given filename,
745
        or None if the filename does not represent a valid
746
        job file
747

748
    """
749
    m = cls._RE_JOB_FILE.match(name)
750
    if m:
751
      return m.group(1)
752
    else:
753
      return None
754

    
755
  def _GetJobIDsUnlocked(self, archived=False):
756
    """Return all known job IDs.
757

758
    If the parameter archived is True, archived jobs IDs will be
759
    included. Currently this argument is unused.
760

761
    The method only looks at disk because it's a requirement that all
762
    jobs are present on disk (so in the _memcache we don't have any
763
    extra IDs).
764

765
    @rtype: list
766
    @return: the list of job IDs
767

768
    """
769
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
770
    jlist = utils.NiceSort(jlist)
771
    return jlist
772

    
773
  def _ListJobFiles(self):
774
    """Returns the list of current job files.
775

776
    @rtype: list
777
    @return: the list of job file names
778

779
    """
780
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
781
            if self._RE_JOB_FILE.match(name)]
782

    
783
  def _LoadJobUnlocked(self, job_id):
784
    """Loads a job from the disk or memory.
785

786
    Given a job id, this will return the cached job object if
787
    existing, or try to load the job from the disk. If loading from
788
    disk, it will also add the job to the cache.
789

790
    @param job_id: the job id
791
    @rtype: L{_QueuedJob} or None
792
    @return: either None or the job object
793

794
    """
795
    job = self._memcache.get(job_id, None)
796
    if job:
797
      logging.debug("Found job %s in memcache", job_id)
798
      return job
799

    
800
    filepath = self._GetJobPath(job_id)
801
    logging.debug("Loading job from %s", filepath)
802
    try:
803
      fd = open(filepath, "r")
804
    except IOError, err:
805
      if err.errno in (errno.ENOENT, ):
806
        return None
807
      raise
808
    try:
809
      data = serializer.LoadJson(fd.read())
810
    finally:
811
      fd.close()
812

    
813
    try:
814
      job = _QueuedJob.Restore(self, data)
815
    except Exception, err:
816
      new_path = self._GetArchivedJobPath(job_id)
817
      if filepath == new_path:
818
        # job already archived (future case)
819
        logging.exception("Can't parse job %s", job_id)
820
      else:
821
        # non-archived case
822
        logging.exception("Can't parse job %s, will archive.", job_id)
823
        self._RenameFileUnlocked(filepath, new_path)
824
      return None
825

    
826
    self._memcache[job_id] = job
827
    logging.debug("Added job %s to the cache", job_id)
828
    return job
829

    
830
  def _GetJobsUnlocked(self, job_ids):
831
    """Return a list of jobs based on their IDs.
832

833
    @type job_ids: list
834
    @param job_ids: either an empty list (meaning all jobs),
835
        or a list of job IDs
836
    @rtype: list
837
    @return: the list of job objects
838

839
    """
840
    if not job_ids:
841
      job_ids = self._GetJobIDsUnlocked()
842

    
843
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
844

    
845
  @staticmethod
846
  def _IsQueueMarkedDrain():
847
    """Check if the queue is marked from drain.
848

849
    This currently uses the queue drain file, which makes it a
850
    per-node flag. In the future this can be moved to the config file.
851

852
    @rtype: boolean
853
    @return: True of the job queue is marked for draining
854

855
    """
856
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
857

    
858
  @staticmethod
859
  def SetDrainFlag(drain_flag):
860
    """Sets the drain flag for the queue.
861

862
    This is similar to the function L{backend.JobQueueSetDrainFlag},
863
    and in the future we might merge them.
864

865
    @type drain_flag: boolean
866
    @param drain_flag: wheter to set or unset the drain flag
867

868
    """
869
    if drain_flag:
870
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
871
    else:
872
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
873
    return True
874

    
875
  @utils.LockedMethod
876
  @_RequireOpenQueue
877
  def SubmitJob(self, ops):
878
    """Create and store a new job.
879

880
    This enters the job into our job queue and also puts it on the new
881
    queue, in order for it to be picked up by the queue processors.
882

883
    @type ops: list
884
    @param ops: The list of OpCodes that will become the new job.
885
    @rtype: job ID
886
    @return: the job ID of the newly created job
887
    @raise errors.JobQueueDrainError: if the job is marked for draining
888

889
    """
890
    if self._IsQueueMarkedDrain():
891
      raise errors.JobQueueDrainError()
892
    # Get job identifier
893
    job_id = self._NewSerialUnlocked()
894
    job = _QueuedJob(self, job_id, ops)
895

    
896
    # Write to disk
897
    self.UpdateJobUnlocked(job)
898

    
899
    logging.debug("Adding new job %s to the cache", job_id)
900
    self._memcache[job_id] = job
901

    
902
    # Add to worker pool
903
    self._wpool.AddTask(job)
904

    
905
    return job.id
906

    
907
  @_RequireOpenQueue
908
  def UpdateJobUnlocked(self, job):
909
    """Update a job's on disk storage.
910

911
    After a job has been modified, this function needs to be called in
912
    order to write the changes to disk and replicate them to the other
913
    nodes.
914

915
    @type job: L{_QueuedJob}
916
    @param job: the changed job
917

918
    """
919
    filename = self._GetJobPath(job.id)
920
    data = serializer.DumpJson(job.Serialize(), indent=False)
921
    logging.debug("Writing job %s to %s", job.id, filename)
922
    self._WriteAndReplicateFileUnlocked(filename, data)
923

    
924
    # Notify waiters about potential changes
925
    job.change.notifyAll()
926

    
927
  @utils.LockedMethod
928
  @_RequireOpenQueue
929
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
930
                        timeout):
931
    """Waits for changes in a job.
932

933
    @type job_id: string
934
    @param job_id: Job identifier
935
    @type fields: list of strings
936
    @param fields: Which fields to check for changes
937
    @type prev_job_info: list or None
938
    @param prev_job_info: Last job information returned
939
    @type prev_log_serial: int
940
    @param prev_log_serial: Last job message serial number
941
    @type timeout: float
942
    @param timeout: maximum time to wait
943
    @rtype: tuple (job info, log entries)
944
    @return: a tuple of the job information as required via
945
        the fields parameter, and the log entries as a list
946

947
        if the job has not changed and the timeout has expired,
948
        we instead return a special value,
949
        L{constants.JOB_NOTCHANGED}, which should be interpreted
950
        as such by the clients
951

952
    """
953
    logging.debug("Waiting for changes in job %s", job_id)
954
    end_time = time.time() + timeout
955
    while True:
956
      delta_time = end_time - time.time()
957
      if delta_time < 0:
958
        return constants.JOB_NOTCHANGED
959

    
960
      job = self._LoadJobUnlocked(job_id)
961
      if not job:
962
        logging.debug("Job %s not found", job_id)
963
        break
964

    
965
      status = job.CalcStatus()
966
      job_info = self._GetJobInfoUnlocked(job, fields)
967
      log_entries = job.GetLogEntries(prev_log_serial)
968

    
969
      # Serializing and deserializing data can cause type changes (e.g. from
970
      # tuple to list) or precision loss. We're doing it here so that we get
971
      # the same modifications as the data received from the client. Without
972
      # this, the comparison afterwards might fail without the data being
973
      # significantly different.
974
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
975
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
976

    
977
      if status not in (constants.JOB_STATUS_QUEUED,
978
                        constants.JOB_STATUS_RUNNING,
979
                        constants.JOB_STATUS_WAITLOCK):
980
        # Don't even try to wait if the job is no longer running, there will be
981
        # no changes.
982
        break
983

    
984
      if (prev_job_info != job_info or
985
          (log_entries and prev_log_serial != log_entries[0][0])):
986
        break
987

    
988
      logging.debug("Waiting again")
989

    
990
      # Release the queue lock while waiting
991
      job.change.wait(delta_time)
992

    
993
    logging.debug("Job %s changed", job_id)
994

    
995
    return (job_info, log_entries)
996

    
997
  @utils.LockedMethod
998
  @_RequireOpenQueue
999
  def CancelJob(self, job_id):
1000
    """Cancels a job.
1001

1002
    This will only succeed if the job has not started yet.
1003

1004
    @type job_id: string
1005
    @param job_id: job ID of job to be cancelled.
1006

1007
    """
1008
    logging.debug("Cancelling job %s", job_id)
1009

    
1010
    job = self._LoadJobUnlocked(job_id)
1011
    if not job:
1012
      logging.debug("Job %s not found", job_id)
1013
      return
1014

    
1015
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1016
      logging.debug("Job %s is no longer in the queue", job.id)
1017
      return
1018

    
1019
    try:
1020
      for op in job.ops:
1021
        op.status = constants.OP_STATUS_ERROR
1022
        op.result = "Job cancelled by request"
1023
    finally:
1024
      self.UpdateJobUnlocked(job)
1025

    
1026
  @_RequireOpenQueue
1027
  def _ArchiveJobUnlocked(self, job_id):
1028
    """Archives a job.
1029

1030
    @type job_id: string
1031
    @param job_id: Job ID of job to be archived.
1032

1033
    """
1034
    logging.info("Archiving job %s", job_id)
1035

    
1036
    job = self._LoadJobUnlocked(job_id)
1037
    if not job:
1038
      logging.debug("Job %s not found", job_id)
1039
      return
1040

    
1041
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1042
                                constants.JOB_STATUS_SUCCESS,
1043
                                constants.JOB_STATUS_ERROR):
1044
      logging.debug("Job %s is not yet done", job.id)
1045
      return
1046

    
1047
    old = self._GetJobPath(job.id)
1048
    new = self._GetArchivedJobPath(job.id)
1049

    
1050
    self._RenameFileUnlocked(old, new)
1051

    
1052
    logging.debug("Successfully archived job %s", job.id)
1053

    
1054
  @utils.LockedMethod
1055
  @_RequireOpenQueue
1056
  def ArchiveJob(self, job_id):
1057
    """Archives a job.
1058

1059
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1060

1061
    @type job_id: string
1062
    @param job_id: Job ID of job to be archived.
1063

1064
    """
1065
    return self._ArchiveJobUnlocked(job_id)
1066

    
1067
  @utils.LockedMethod
1068
  @_RequireOpenQueue
1069
  def AutoArchiveJobs(self, age):
1070
    """Archives all jobs based on age.
1071

1072
    The method will archive all jobs which are older than the age
1073
    parameter. For jobs that don't have an end timestamp, the start
1074
    timestamp will be considered. The special '-1' age will cause
1075
    archival of all jobs (that are not running or queued).
1076

1077
    @type age: int
1078
    @param age: the minimum age in seconds
1079

1080
    """
1081
    logging.info("Archiving jobs with age more than %s seconds", age)
1082

    
1083
    now = time.time()
1084
    for jid in self._GetJobIDsUnlocked(archived=False):
1085
      job = self._LoadJobUnlocked(jid)
1086
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1087
                                  constants.OP_STATUS_ERROR,
1088
                                  constants.OP_STATUS_CANCELED):
1089
        continue
1090
      if job.end_timestamp is None:
1091
        if job.start_timestamp is None:
1092
          job_age = job.received_timestamp
1093
        else:
1094
          job_age = job.start_timestamp
1095
      else:
1096
        job_age = job.end_timestamp
1097

    
1098
      if age == -1 or now - job_age[0] > age:
1099
        self._ArchiveJobUnlocked(jid)
1100

    
1101
  def _GetJobInfoUnlocked(self, job, fields):
1102
    """Returns information about a job.
1103

1104
    @type job: L{_QueuedJob}
1105
    @param job: the job which we query
1106
    @type fields: list
1107
    @param fields: names of fields to return
1108
    @rtype: list
1109
    @return: list with one element for each field
1110
    @raise errors.OpExecError: when an invalid field
1111
        has been passed
1112

1113
    """
1114
    row = []
1115
    for fname in fields:
1116
      if fname == "id":
1117
        row.append(job.id)
1118
      elif fname == "status":
1119
        row.append(job.CalcStatus())
1120
      elif fname == "ops":
1121
        row.append([op.input.__getstate__() for op in job.ops])
1122
      elif fname == "opresult":
1123
        row.append([op.result for op in job.ops])
1124
      elif fname == "opstatus":
1125
        row.append([op.status for op in job.ops])
1126
      elif fname == "oplog":
1127
        row.append([op.log for op in job.ops])
1128
      elif fname == "opstart":
1129
        row.append([op.start_timestamp for op in job.ops])
1130
      elif fname == "opend":
1131
        row.append([op.end_timestamp for op in job.ops])
1132
      elif fname == "received_ts":
1133
        row.append(job.received_timestamp)
1134
      elif fname == "start_ts":
1135
        row.append(job.start_timestamp)
1136
      elif fname == "end_ts":
1137
        row.append(job.end_timestamp)
1138
      elif fname == "summary":
1139
        row.append([op.input.Summary() for op in job.ops])
1140
      else:
1141
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1142
    return row
1143

    
1144
  @utils.LockedMethod
1145
  @_RequireOpenQueue
1146
  def QueryJobs(self, job_ids, fields):
1147
    """Returns a list of jobs in queue.
1148

1149
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1150
    processing for each job.
1151

1152
    @type job_ids: list
1153
    @param job_ids: sequence of job identifiers or None for all
1154
    @type fields: list
1155
    @param fields: names of fields to return
1156
    @rtype: list
1157
    @return: list one element per job, each element being list with
1158
        the requested fields
1159

1160
    """
1161
    jobs = []
1162

    
1163
    for job in self._GetJobsUnlocked(job_ids):
1164
      if job is None:
1165
        jobs.append(None)
1166
      else:
1167
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1168

    
1169
    return jobs
1170

    
1171
  @utils.LockedMethod
1172
  @_RequireOpenQueue
1173
  def Shutdown(self):
1174
    """Stops the job queue.
1175

1176
    This shutdowns all the worker threads an closes the queue.
1177

1178
    """
1179
    self._wpool.TerminateWorkers()
1180

    
1181
    self._queue_lock.Close()
1182
    self._queue_lock = None