Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fbf0262f

History | View | Annotate | Download (35.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52

    
53

    
54
class CancelJob:
55
  """Special exception to cancel a job.
56

57
  """
58

    
59

    
60
def TimeStampNow():
61
  """Returns the current timestamp.
62

63
  @rtype: tuple
64
  @return: the current time in the (seconds, microseconds) format
65

66
  """
67
  return utils.SplitTime(time.time())
68

    
69

    
70
class _QueuedOpCode(object):
71
  """Encasulates an opcode object.
72

73
  @ivar log: holds the execution log and consists of tuples
74
  of the form C{(log_serial, timestamp, level, message)}
75
  @ivar input: the OpCode we encapsulate
76
  @ivar status: the current status
77
  @ivar result: the result of the LU execution
78
  @ivar start_timestamp: timestamp for the start of the execution
79
  @ivar stop_timestamp: timestamp for the end of the execution
80

81
  """
82
  def __init__(self, op):
83
    """Constructor for the _QuededOpCode.
84

85
    @type op: L{opcodes.OpCode}
86
    @param op: the opcode we encapsulate
87

88
    """
89
    self.input = op
90
    self.status = constants.OP_STATUS_QUEUED
91
    self.result = None
92
    self.log = []
93
    self.start_timestamp = None
94
    self.end_timestamp = None
95

    
96
  @classmethod
97
  def Restore(cls, state):
98
    """Restore the _QueuedOpCode from the serialized form.
99

100
    @type state: dict
101
    @param state: the serialized state
102
    @rtype: _QueuedOpCode
103
    @return: a new _QueuedOpCode instance
104

105
    """
106
    obj = _QueuedOpCode.__new__(cls)
107
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
108
    obj.status = state["status"]
109
    obj.result = state["result"]
110
    obj.log = state["log"]
111
    obj.start_timestamp = state.get("start_timestamp", None)
112
    obj.end_timestamp = state.get("end_timestamp", None)
113
    return obj
114

    
115
  def Serialize(self):
116
    """Serializes this _QueuedOpCode.
117

118
    @rtype: dict
119
    @return: the dictionary holding the serialized state
120

121
    """
122
    return {
123
      "input": self.input.__getstate__(),
124
      "status": self.status,
125
      "result": self.result,
126
      "log": self.log,
127
      "start_timestamp": self.start_timestamp,
128
      "end_timestamp": self.end_timestamp,
129
      }
130

    
131

    
132
class _QueuedJob(object):
133
  """In-memory job representation.
134

135
  This is what we use to track the user-submitted jobs. Locking must
136
  be taken care of by users of this class.
137

138
  @type queue: L{JobQueue}
139
  @ivar queue: the parent queue
140
  @ivar id: the job ID
141
  @type ops: list
142
  @ivar ops: the list of _QueuedOpCode that constitute the job
143
  @type run_op_index: int
144
  @ivar run_op_index: the currently executing opcode, or -1 if
145
      we didn't yet start executing
146
  @type log_serial: int
147
  @ivar log_serial: holds the index for the next log entry
148
  @ivar received_timestamp: the timestamp for when the job was received
149
  @ivar start_timestmap: the timestamp for start of execution
150
  @ivar end_timestamp: the timestamp for end of execution
151
  @ivar change: a Condition variable we use for waiting for job changes
152

153
  """
154
  def __init__(self, queue, job_id, ops):
155
    """Constructor for the _QueuedJob.
156

157
    @type queue: L{JobQueue}
158
    @param queue: our parent queue
159
    @type job_id: job_id
160
    @param job_id: our job id
161
    @type ops: list
162
    @param ops: the list of opcodes we hold, which will be encapsulated
163
        in _QueuedOpCodes
164

165
    """
166
    if not ops:
167
      # TODO: use a better exception
168
      raise Exception("No opcodes")
169

    
170
    self.queue = queue
171
    self.id = job_id
172
    self.ops = [_QueuedOpCode(op) for op in ops]
173
    self.run_op_index = -1
174
    self.log_serial = 0
175
    self.received_timestamp = TimeStampNow()
176
    self.start_timestamp = None
177
    self.end_timestamp = None
178

    
179
    # Condition to wait for changes
180
    self.change = threading.Condition(self.queue._lock)
181

    
182
  @classmethod
183
  def Restore(cls, queue, state):
184
    """Restore a _QueuedJob from serialized state:
185

186
    @type queue: L{JobQueue}
187
    @param queue: to which queue the restored job belongs
188
    @type state: dict
189
    @param state: the serialized state
190
    @rtype: _JobQueue
191
    @return: the restored _JobQueue instance
192

193
    """
194
    obj = _QueuedJob.__new__(cls)
195
    obj.queue = queue
196
    obj.id = state["id"]
197
    obj.run_op_index = state["run_op_index"]
198
    obj.received_timestamp = state.get("received_timestamp", None)
199
    obj.start_timestamp = state.get("start_timestamp", None)
200
    obj.end_timestamp = state.get("end_timestamp", None)
201

    
202
    obj.ops = []
203
    obj.log_serial = 0
204
    for op_state in state["ops"]:
205
      op = _QueuedOpCode.Restore(op_state)
206
      for log_entry in op.log:
207
        obj.log_serial = max(obj.log_serial, log_entry[0])
208
      obj.ops.append(op)
209

    
210
    # Condition to wait for changes
211
    obj.change = threading.Condition(obj.queue._lock)
212

    
213
    return obj
214

    
215
  def Serialize(self):
216
    """Serialize the _JobQueue instance.
217

218
    @rtype: dict
219
    @return: the serialized state
220

221
    """
222
    return {
223
      "id": self.id,
224
      "ops": [op.Serialize() for op in self.ops],
225
      "run_op_index": self.run_op_index,
226
      "start_timestamp": self.start_timestamp,
227
      "end_timestamp": self.end_timestamp,
228
      "received_timestamp": self.received_timestamp,
229
      }
230

    
231
  def CalcStatus(self):
232
    """Compute the status of this job.
233

234
    This function iterates over all the _QueuedOpCodes in the job and
235
    based on their status, computes the job status.
236

237
    The algorithm is:
238
      - if we find a cancelled, or finished with error, the job
239
        status will be the same
240
      - otherwise, the last opcode with the status one of:
241
          - waitlock
242
          - canceling
243
          - running
244

245
        will determine the job status
246

247
      - otherwise, it means either all opcodes are queued, or success,
248
        and the job status will be the same
249

250
    @return: the job status
251

252
    """
253
    status = constants.JOB_STATUS_QUEUED
254

    
255
    all_success = True
256
    for op in self.ops:
257
      if op.status == constants.OP_STATUS_SUCCESS:
258
        continue
259

    
260
      all_success = False
261

    
262
      if op.status == constants.OP_STATUS_QUEUED:
263
        pass
264
      elif op.status == constants.OP_STATUS_WAITLOCK:
265
        status = constants.JOB_STATUS_WAITLOCK
266
      elif op.status == constants.OP_STATUS_RUNNING:
267
        status = constants.JOB_STATUS_RUNNING
268
      elif op.status == constants.OP_STATUS_CANCELING:
269
        status = constants.JOB_STATUS_CANCELING
270
        break
271
      elif op.status == constants.OP_STATUS_ERROR:
272
        status = constants.JOB_STATUS_ERROR
273
        # The whole job fails if one opcode failed
274
        break
275
      elif op.status == constants.OP_STATUS_CANCELED:
276
        status = constants.OP_STATUS_CANCELED
277
        break
278

    
279
    if all_success:
280
      status = constants.JOB_STATUS_SUCCESS
281

    
282
    return status
283

    
284
  def GetLogEntries(self, newer_than):
285
    """Selectively returns the log entries.
286

287
    @type newer_than: None or int
288
    @param newer_than: if this is None, return all log enties,
289
        otherwise return only the log entries with serial higher
290
        than this value
291
    @rtype: list
292
    @return: the list of the log entries selected
293

294
    """
295
    if newer_than is None:
296
      serial = -1
297
    else:
298
      serial = newer_than
299

    
300
    entries = []
301
    for op in self.ops:
302
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
303

    
304
    return entries
305

    
306

    
307
class _JobQueueWorker(workerpool.BaseWorker):
308
  """The actual job workers.
309

310
  """
311
  def _NotifyStart(self):
312
    """Mark the opcode as running, not lock-waiting.
313

314
    This is called from the mcpu code as a notifier function, when the
315
    LU is finally about to start the Exec() method. Of course, to have
316
    end-user visible results, the opcode must be initially (before
317
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
318

319
    """
320
    assert self.queue, "Queue attribute is missing"
321
    assert self.opcode, "Opcode attribute is missing"
322

    
323
    self.queue.acquire()
324
    try:
325
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326
                                    constants.OP_STATUS_CANCELING)
327

    
328
      # Cancel here if we were asked to
329
      if self.opcode.status == constants.OP_STATUS_CANCELING:
330
        raise CancelJob()
331

    
332
      self.opcode.status = constants.OP_STATUS_RUNNING
333
    finally:
334
      self.queue.release()
335

    
336
  def RunTask(self, job):
337
    """Job executor.
338

339
    This functions processes a job. It is closely tied to the _QueuedJob and
340
    _QueuedOpCode classes.
341

342
    @type job: L{_QueuedJob}
343
    @param job: the job to be processed
344

345
    """
346
    logging.debug("Worker %s processing job %s",
347
                  self.worker_id, job.id)
348
    proc = mcpu.Processor(self.pool.queue.context)
349
    self.queue = queue = job.queue
350
    try:
351
      try:
352
        count = len(job.ops)
353
        for idx, op in enumerate(job.ops):
354
          try:
355
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
356

    
357
            queue.acquire()
358
            try:
359
              assert op.status == constants.OP_STATUS_QUEUED
360
              job.run_op_index = idx
361
              op.status = constants.OP_STATUS_WAITLOCK
362
              op.result = None
363
              op.start_timestamp = TimeStampNow()
364
              if idx == 0: # first opcode
365
                job.start_timestamp = op.start_timestamp
366
              queue.UpdateJobUnlocked(job)
367

    
368
              input_opcode = op.input
369
            finally:
370
              queue.release()
371

    
372
            def _Log(*args):
373
              """Append a log entry.
374

375
              """
376
              assert len(args) < 3
377

    
378
              if len(args) == 1:
379
                log_type = constants.ELOG_MESSAGE
380
                log_msg = args[0]
381
              else:
382
                log_type, log_msg = args
383

    
384
              # The time is split to make serialization easier and not lose
385
              # precision.
386
              timestamp = utils.SplitTime(time.time())
387

    
388
              queue.acquire()
389
              try:
390
                job.log_serial += 1
391
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
392

    
393
                job.change.notifyAll()
394
              finally:
395
                queue.release()
396

    
397
            # Make sure not to hold lock while _Log is called
398
            self.opcode = op
399
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
400

    
401
            queue.acquire()
402
            try:
403
              op.status = constants.OP_STATUS_SUCCESS
404
              op.result = result
405
              op.end_timestamp = TimeStampNow()
406
              queue.UpdateJobUnlocked(job)
407
            finally:
408
              queue.release()
409

    
410
            logging.debug("Op %s/%s: Successfully finished %s",
411
                          idx + 1, count, op)
412
          except CancelJob:
413
            # Will be handled further up
414
            raise
415
          except Exception, err:
416
            queue.acquire()
417
            try:
418
              try:
419
                op.status = constants.OP_STATUS_ERROR
420
                op.result = str(err)
421
                op.end_timestamp = TimeStampNow()
422
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
423
              finally:
424
                queue.UpdateJobUnlocked(job)
425
            finally:
426
              queue.release()
427
            raise
428

    
429
      except CancelJob:
430
        queue.acquire()
431
        try:
432
          queue.CancelJobUnlocked(job)
433
        finally:
434
          queue.release()
435
      except errors.GenericError, err:
436
        logging.exception("Ganeti exception")
437
      except:
438
        logging.exception("Unhandled exception")
439
    finally:
440
      queue.acquire()
441
      try:
442
        try:
443
          job.run_op_idx = -1
444
          job.end_timestamp = TimeStampNow()
445
          queue.UpdateJobUnlocked(job)
446
        finally:
447
          job_id = job.id
448
          status = job.CalcStatus()
449
      finally:
450
        queue.release()
451
      logging.debug("Worker %s finished job %s, status = %s",
452
                    self.worker_id, job_id, status)
453

    
454

    
455
class _JobQueueWorkerPool(workerpool.WorkerPool):
456
  """Simple class implementing a job-processing workerpool.
457

458
  """
459
  def __init__(self, queue):
460
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
461
                                              _JobQueueWorker)
462
    self.queue = queue
463

    
464

    
465
class JobQueue(object):
466
  """Quue used to manaage the jobs.
467

468
  @cvar _RE_JOB_FILE: regex matching the valid job file names
469

470
  """
471
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
472

    
473
  def _RequireOpenQueue(fn):
474
    """Decorator for "public" functions.
475

476
    This function should be used for all 'public' functions. That is,
477
    functions usually called from other classes.
478

479
    @warning: Use this decorator only after utils.LockedMethod!
480

481
    Example::
482
      @utils.LockedMethod
483
      @_RequireOpenQueue
484
      def Example(self):
485
        pass
486

487
    """
488
    def wrapper(self, *args, **kwargs):
489
      assert self._queue_lock is not None, "Queue should be open"
490
      return fn(self, *args, **kwargs)
491
    return wrapper
492

    
493
  def __init__(self, context):
494
    """Constructor for JobQueue.
495

496
    The constructor will initialize the job queue object and then
497
    start loading the current jobs from disk, either for starting them
498
    (if they were queue) or for aborting them (if they were already
499
    running).
500

501
    @type context: GanetiContext
502
    @param context: the context object for access to the configuration
503
        data and other ganeti objects
504

505
    """
506
    self.context = context
507
    self._memcache = weakref.WeakValueDictionary()
508
    self._my_hostname = utils.HostInfo().name
509

    
510
    # Locking
511
    self._lock = threading.Lock()
512
    self.acquire = self._lock.acquire
513
    self.release = self._lock.release
514

    
515
    # Initialize
516
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
517

    
518
    # Read serial file
519
    self._last_serial = jstore.ReadSerial()
520
    assert self._last_serial is not None, ("Serial file was modified between"
521
                                           " check in jstore and here")
522

    
523
    # Get initial list of nodes
524
    self._nodes = dict((n.name, n.primary_ip)
525
                       for n in self.context.cfg.GetAllNodesInfo().values())
526

    
527
    # Remove master node
528
    try:
529
      del self._nodes[self._my_hostname]
530
    except KeyError:
531
      pass
532

    
533
    # TODO: Check consistency across nodes
534

    
535
    # Setup worker pool
536
    self._wpool = _JobQueueWorkerPool(self)
537
    try:
538
      # We need to lock here because WorkerPool.AddTask() may start a job while
539
      # we're still doing our work.
540
      self.acquire()
541
      try:
542
        logging.info("Inspecting job queue")
543

    
544
        all_job_ids = self._GetJobIDsUnlocked()
545
        lastinfo = time.time()
546
        for idx, job_id in enumerate(all_job_ids):
547
          # Give an update every 1000 jobs or 10 seconds
548
          if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0):
549
            jobs_count = len(all_job_ids)
550
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
551
                         idx, jobs_count, 100.0 * (idx + 1) / jobs_count)
552
            lastinfo = time.time()
553

    
554
          job = self._LoadJobUnlocked(job_id)
555

    
556
          # a failure in loading the job can cause 'None' to be returned
557
          if job is None:
558
            continue
559

    
560
          status = job.CalcStatus()
561

    
562
          if status in (constants.JOB_STATUS_QUEUED, ):
563
            self._wpool.AddTask(job)
564

    
565
          elif status in (constants.JOB_STATUS_RUNNING,
566
                          constants.JOB_STATUS_WAITLOCK,
567
                          constants.JOB_STATUS_CANCELING):
568
            logging.warning("Unfinished job %s found: %s", job.id, job)
569
            try:
570
              for op in job.ops:
571
                op.status = constants.OP_STATUS_ERROR
572
                op.result = "Unclean master daemon shutdown"
573
            finally:
574
              self.UpdateJobUnlocked(job)
575

    
576
        logging.info("Job queue inspection finished")
577
      finally:
578
        self.release()
579
    except:
580
      self._wpool.TerminateWorkers()
581
      raise
582

    
583
  @utils.LockedMethod
584
  @_RequireOpenQueue
585
  def AddNode(self, node):
586
    """Register a new node with the queue.
587

588
    @type node: L{objects.Node}
589
    @param node: the node object to be added
590

591
    """
592
    node_name = node.name
593
    assert node_name != self._my_hostname
594

    
595
    # Clean queue directory on added node
596
    rpc.RpcRunner.call_jobqueue_purge(node_name)
597

    
598
    # Upload the whole queue excluding archived jobs
599
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
600

    
601
    # Upload current serial file
602
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
603

    
604
    for file_name in files:
605
      # Read file content
606
      fd = open(file_name, "r")
607
      try:
608
        content = fd.read()
609
      finally:
610
        fd.close()
611

    
612
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
613
                                                  [node.primary_ip],
614
                                                  file_name, content)
615
      if not result[node_name]:
616
        logging.error("Failed to upload %s to %s", file_name, node_name)
617

    
618
    self._nodes[node_name] = node.primary_ip
619

    
620
  @utils.LockedMethod
621
  @_RequireOpenQueue
622
  def RemoveNode(self, node_name):
623
    """Callback called when removing nodes from the cluster.
624

625
    @type node_name: str
626
    @param node_name: the name of the node to remove
627

628
    """
629
    try:
630
      # The queue is removed by the "leave node" RPC call.
631
      del self._nodes[node_name]
632
    except KeyError:
633
      pass
634

    
635
  def _CheckRpcResult(self, result, nodes, failmsg):
636
    """Verifies the status of an RPC call.
637

638
    Since we aim to keep consistency should this node (the current
639
    master) fail, we will log errors if our rpc fail, and especially
640
    log the case when more than half of the nodes failes.
641

642
    @param result: the data as returned from the rpc call
643
    @type nodes: list
644
    @param nodes: the list of nodes we made the call to
645
    @type failmsg: str
646
    @param failmsg: the identifier to be used for logging
647

648
    """
649
    failed = []
650
    success = []
651

    
652
    for node in nodes:
653
      if result[node]:
654
        success.append(node)
655
      else:
656
        failed.append(node)
657

    
658
    if failed:
659
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
660

    
661
    # +1 for the master node
662
    if (len(success) + 1) < len(failed):
663
      # TODO: Handle failing nodes
664
      logging.error("More than half of the nodes failed")
665

    
666
  def _GetNodeIp(self):
667
    """Helper for returning the node name/ip list.
668

669
    @rtype: (list, list)
670
    @return: a tuple of two lists, the first one with the node
671
        names and the second one with the node addresses
672

673
    """
674
    name_list = self._nodes.keys()
675
    addr_list = [self._nodes[name] for name in name_list]
676
    return name_list, addr_list
677

    
678
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
679
    """Writes a file locally and then replicates it to all nodes.
680

681
    This function will replace the contents of a file on the local
682
    node and then replicate it to all the other nodes we have.
683

684
    @type file_name: str
685
    @param file_name: the path of the file to be replicated
686
    @type data: str
687
    @param data: the new contents of the file
688

689
    """
690
    utils.WriteFile(file_name, data=data)
691

    
692
    names, addrs = self._GetNodeIp()
693
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
694
    self._CheckRpcResult(result, self._nodes,
695
                         "Updating %s" % file_name)
696

    
697
  def _RenameFileUnlocked(self, old, new):
698
    """Renames a file locally and then replicate the change.
699

700
    This function will rename a file in the local queue directory
701
    and then replicate this rename to all the other nodes we have.
702

703
    @type old: str
704
    @param old: the current name of the file
705
    @type new: str
706
    @param new: the new name of the file
707

708
    """
709
    os.rename(old, new)
710

    
711
    names, addrs = self._GetNodeIp()
712
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
713
    self._CheckRpcResult(result, self._nodes,
714
                         "Moving %s to %s" % (old, new))
715

    
716
  def _FormatJobID(self, job_id):
717
    """Convert a job ID to string format.
718

719
    Currently this just does C{str(job_id)} after performing some
720
    checks, but if we want to change the job id format this will
721
    abstract this change.
722

723
    @type job_id: int or long
724
    @param job_id: the numeric job id
725
    @rtype: str
726
    @return: the formatted job id
727

728
    """
729
    if not isinstance(job_id, (int, long)):
730
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
731
    if job_id < 0:
732
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
733

    
734
    return str(job_id)
735

    
736
  def _NewSerialUnlocked(self):
737
    """Generates a new job identifier.
738

739
    Job identifiers are unique during the lifetime of a cluster.
740

741
    @rtype: str
742
    @return: a string representing the job identifier.
743

744
    """
745
    # New number
746
    serial = self._last_serial + 1
747

    
748
    # Write to file
749
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
750
                                        "%s\n" % serial)
751

    
752
    # Keep it only if we were able to write the file
753
    self._last_serial = serial
754

    
755
    return self._FormatJobID(serial)
756

    
757
  @staticmethod
758
  def _GetJobPath(job_id):
759
    """Returns the job file for a given job id.
760

761
    @type job_id: str
762
    @param job_id: the job identifier
763
    @rtype: str
764
    @return: the path to the job file
765

766
    """
767
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
768

    
769
  @staticmethod
770
  def _GetArchivedJobPath(job_id):
771
    """Returns the archived job file for a give job id.
772

773
    @type job_id: str
774
    @param job_id: the job identifier
775
    @rtype: str
776
    @return: the path to the archived job file
777

778
    """
779
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
780

    
781
  @classmethod
782
  def _ExtractJobID(cls, name):
783
    """Extract the job id from a filename.
784

785
    @type name: str
786
    @param name: the job filename
787
    @rtype: job id or None
788
    @return: the job id corresponding to the given filename,
789
        or None if the filename does not represent a valid
790
        job file
791

792
    """
793
    m = cls._RE_JOB_FILE.match(name)
794
    if m:
795
      return m.group(1)
796
    else:
797
      return None
798

    
799
  def _GetJobIDsUnlocked(self, archived=False):
800
    """Return all known job IDs.
801

802
    If the parameter archived is True, archived jobs IDs will be
803
    included. Currently this argument is unused.
804

805
    The method only looks at disk because it's a requirement that all
806
    jobs are present on disk (so in the _memcache we don't have any
807
    extra IDs).
808

809
    @rtype: list
810
    @return: the list of job IDs
811

812
    """
813
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
814
    jlist = utils.NiceSort(jlist)
815
    return jlist
816

    
817
  def _ListJobFiles(self):
818
    """Returns the list of current job files.
819

820
    @rtype: list
821
    @return: the list of job file names
822

823
    """
824
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
825
            if self._RE_JOB_FILE.match(name)]
826

    
827
  def _LoadJobUnlocked(self, job_id):
828
    """Loads a job from the disk or memory.
829

830
    Given a job id, this will return the cached job object if
831
    existing, or try to load the job from the disk. If loading from
832
    disk, it will also add the job to the cache.
833

834
    @param job_id: the job id
835
    @rtype: L{_QueuedJob} or None
836
    @return: either None or the job object
837

838
    """
839
    job = self._memcache.get(job_id, None)
840
    if job:
841
      logging.debug("Found job %s in memcache", job_id)
842
      return job
843

    
844
    filepath = self._GetJobPath(job_id)
845
    logging.debug("Loading job from %s", filepath)
846
    try:
847
      fd = open(filepath, "r")
848
    except IOError, err:
849
      if err.errno in (errno.ENOENT, ):
850
        return None
851
      raise
852
    try:
853
      data = serializer.LoadJson(fd.read())
854
    finally:
855
      fd.close()
856

    
857
    try:
858
      job = _QueuedJob.Restore(self, data)
859
    except Exception, err:
860
      new_path = self._GetArchivedJobPath(job_id)
861
      if filepath == new_path:
862
        # job already archived (future case)
863
        logging.exception("Can't parse job %s", job_id)
864
      else:
865
        # non-archived case
866
        logging.exception("Can't parse job %s, will archive.", job_id)
867
        self._RenameFileUnlocked(filepath, new_path)
868
      return None
869

    
870
    self._memcache[job_id] = job
871
    logging.debug("Added job %s to the cache", job_id)
872
    return job
873

    
874
  def _GetJobsUnlocked(self, job_ids):
875
    """Return a list of jobs based on their IDs.
876

877
    @type job_ids: list
878
    @param job_ids: either an empty list (meaning all jobs),
879
        or a list of job IDs
880
    @rtype: list
881
    @return: the list of job objects
882

883
    """
884
    if not job_ids:
885
      job_ids = self._GetJobIDsUnlocked()
886

    
887
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
888

    
889
  @staticmethod
890
  def _IsQueueMarkedDrain():
891
    """Check if the queue is marked from drain.
892

893
    This currently uses the queue drain file, which makes it a
894
    per-node flag. In the future this can be moved to the config file.
895

896
    @rtype: boolean
897
    @return: True of the job queue is marked for draining
898

899
    """
900
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
901

    
902
  @staticmethod
903
  def SetDrainFlag(drain_flag):
904
    """Sets the drain flag for the queue.
905

906
    This is similar to the function L{backend.JobQueueSetDrainFlag},
907
    and in the future we might merge them.
908

909
    @type drain_flag: boolean
910
    @param drain_flag: wheter to set or unset the drain flag
911

912
    """
913
    if drain_flag:
914
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
915
    else:
916
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
917
    return True
918

    
919
  @utils.LockedMethod
920
  @_RequireOpenQueue
921
  def SubmitJob(self, ops):
922
    """Create and store a new job.
923

924
    This enters the job into our job queue and also puts it on the new
925
    queue, in order for it to be picked up by the queue processors.
926

927
    @type ops: list
928
    @param ops: The list of OpCodes that will become the new job.
929
    @rtype: job ID
930
    @return: the job ID of the newly created job
931
    @raise errors.JobQueueDrainError: if the job is marked for draining
932

933
    """
934
    if self._IsQueueMarkedDrain():
935
      raise errors.JobQueueDrainError()
936
    # Get job identifier
937
    job_id = self._NewSerialUnlocked()
938
    job = _QueuedJob(self, job_id, ops)
939

    
940
    # Write to disk
941
    self.UpdateJobUnlocked(job)
942

    
943
    logging.debug("Adding new job %s to the cache", job_id)
944
    self._memcache[job_id] = job
945

    
946
    # Add to worker pool
947
    self._wpool.AddTask(job)
948

    
949
    return job.id
950

    
951
  @_RequireOpenQueue
952
  def UpdateJobUnlocked(self, job):
953
    """Update a job's on disk storage.
954

955
    After a job has been modified, this function needs to be called in
956
    order to write the changes to disk and replicate them to the other
957
    nodes.
958

959
    @type job: L{_QueuedJob}
960
    @param job: the changed job
961

962
    """
963
    filename = self._GetJobPath(job.id)
964
    data = serializer.DumpJson(job.Serialize(), indent=False)
965
    logging.debug("Writing job %s to %s", job.id, filename)
966
    self._WriteAndReplicateFileUnlocked(filename, data)
967

    
968
    # Notify waiters about potential changes
969
    job.change.notifyAll()
970

    
971
  @utils.LockedMethod
972
  @_RequireOpenQueue
973
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
974
                        timeout):
975
    """Waits for changes in a job.
976

977
    @type job_id: string
978
    @param job_id: Job identifier
979
    @type fields: list of strings
980
    @param fields: Which fields to check for changes
981
    @type prev_job_info: list or None
982
    @param prev_job_info: Last job information returned
983
    @type prev_log_serial: int
984
    @param prev_log_serial: Last job message serial number
985
    @type timeout: float
986
    @param timeout: maximum time to wait
987
    @rtype: tuple (job info, log entries)
988
    @return: a tuple of the job information as required via
989
        the fields parameter, and the log entries as a list
990

991
        if the job has not changed and the timeout has expired,
992
        we instead return a special value,
993
        L{constants.JOB_NOTCHANGED}, which should be interpreted
994
        as such by the clients
995

996
    """
997
    logging.debug("Waiting for changes in job %s", job_id)
998
    end_time = time.time() + timeout
999
    while True:
1000
      delta_time = end_time - time.time()
1001
      if delta_time < 0:
1002
        return constants.JOB_NOTCHANGED
1003

    
1004
      job = self._LoadJobUnlocked(job_id)
1005
      if not job:
1006
        logging.debug("Job %s not found", job_id)
1007
        break
1008

    
1009
      status = job.CalcStatus()
1010
      job_info = self._GetJobInfoUnlocked(job, fields)
1011
      log_entries = job.GetLogEntries(prev_log_serial)
1012

    
1013
      # Serializing and deserializing data can cause type changes (e.g. from
1014
      # tuple to list) or precision loss. We're doing it here so that we get
1015
      # the same modifications as the data received from the client. Without
1016
      # this, the comparison afterwards might fail without the data being
1017
      # significantly different.
1018
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1019
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1020

    
1021
      if status not in (constants.JOB_STATUS_QUEUED,
1022
                        constants.JOB_STATUS_RUNNING,
1023
                        constants.JOB_STATUS_WAITLOCK):
1024
        # Don't even try to wait if the job is no longer running, there will be
1025
        # no changes.
1026
        break
1027

    
1028
      if (prev_job_info != job_info or
1029
          (log_entries and prev_log_serial != log_entries[0][0])):
1030
        break
1031

    
1032
      logging.debug("Waiting again")
1033

    
1034
      # Release the queue lock while waiting
1035
      job.change.wait(delta_time)
1036

    
1037
    logging.debug("Job %s changed", job_id)
1038

    
1039
    return (job_info, log_entries)
1040

    
1041
  @utils.LockedMethod
1042
  @_RequireOpenQueue
1043
  def CancelJob(self, job_id):
1044
    """Cancels a job.
1045

1046
    This will only succeed if the job has not started yet.
1047

1048
    @type job_id: string
1049
    @param job_id: job ID of job to be cancelled.
1050

1051
    """
1052
    logging.info("Cancelling job %s", job_id)
1053

    
1054
    job = self._LoadJobUnlocked(job_id)
1055
    if not job:
1056
      logging.debug("Job %s not found", job_id)
1057
      return (False, "Job %s not found" % job_id)
1058

    
1059
    job_status = job.CalcStatus()
1060

    
1061
    if job_status not in (constants.JOB_STATUS_QUEUED,
1062
                          constants.JOB_STATUS_WAITLOCK):
1063
      logging.debug("Job %s is no longer in the queue", job.id)
1064
      return (False, "Job %s is no longer in the queue" % job.id)
1065

    
1066
    if job_status == constants.JOB_STATUS_QUEUED:
1067
      self.CancelJobUnlocked(job)
1068
      return (True, "Job %s canceled" % job.id)
1069

    
1070
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1071
      # The worker will notice the new status and cancel the job
1072
      try:
1073
        for op in job.ops:
1074
          op.status = constants.OP_STATUS_CANCELING
1075
      finally:
1076
        self.UpdateJobUnlocked(job)
1077
      return (True, "Job %s will be canceled" % job.id)
1078

    
1079
  @_RequireOpenQueue
1080
  def CancelJobUnlocked(self, job):
1081
    """Marks a job as canceled.
1082

1083
    """
1084
    try:
1085
      for op in job.ops:
1086
        op.status = constants.OP_STATUS_ERROR
1087
        op.result = "Job canceled by request"
1088
    finally:
1089
      self.UpdateJobUnlocked(job)
1090

    
1091
  @_RequireOpenQueue
1092
  def _ArchiveJobUnlocked(self, job_id):
1093
    """Archives a job.
1094

1095
    @type job_id: string
1096
    @param job_id: Job ID of job to be archived.
1097

1098
    """
1099
    logging.info("Archiving job %s", job_id)
1100

    
1101
    job = self._LoadJobUnlocked(job_id)
1102
    if not job:
1103
      logging.debug("Job %s not found", job_id)
1104
      return
1105

    
1106
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1107
                                constants.JOB_STATUS_SUCCESS,
1108
                                constants.JOB_STATUS_ERROR):
1109
      logging.debug("Job %s is not yet done", job.id)
1110
      return
1111

    
1112
    old = self._GetJobPath(job.id)
1113
    new = self._GetArchivedJobPath(job.id)
1114

    
1115
    self._RenameFileUnlocked(old, new)
1116

    
1117
    logging.debug("Successfully archived job %s", job.id)
1118

    
1119
  @utils.LockedMethod
1120
  @_RequireOpenQueue
1121
  def ArchiveJob(self, job_id):
1122
    """Archives a job.
1123

1124
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1125

1126
    @type job_id: string
1127
    @param job_id: Job ID of job to be archived.
1128

1129
    """
1130
    return self._ArchiveJobUnlocked(job_id)
1131

    
1132
  @utils.LockedMethod
1133
  @_RequireOpenQueue
1134
  def AutoArchiveJobs(self, age):
1135
    """Archives all jobs based on age.
1136

1137
    The method will archive all jobs which are older than the age
1138
    parameter. For jobs that don't have an end timestamp, the start
1139
    timestamp will be considered. The special '-1' age will cause
1140
    archival of all jobs (that are not running or queued).
1141

1142
    @type age: int
1143
    @param age: the minimum age in seconds
1144

1145
    """
1146
    logging.info("Archiving jobs with age more than %s seconds", age)
1147

    
1148
    now = time.time()
1149
    for jid in self._GetJobIDsUnlocked(archived=False):
1150
      job = self._LoadJobUnlocked(jid)
1151
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1152
                                  constants.OP_STATUS_ERROR,
1153
                                  constants.OP_STATUS_CANCELED):
1154
        continue
1155
      if job.end_timestamp is None:
1156
        if job.start_timestamp is None:
1157
          job_age = job.received_timestamp
1158
        else:
1159
          job_age = job.start_timestamp
1160
      else:
1161
        job_age = job.end_timestamp
1162

    
1163
      if age == -1 or now - job_age[0] > age:
1164
        self._ArchiveJobUnlocked(jid)
1165

    
1166
  def _GetJobInfoUnlocked(self, job, fields):
1167
    """Returns information about a job.
1168

1169
    @type job: L{_QueuedJob}
1170
    @param job: the job which we query
1171
    @type fields: list
1172
    @param fields: names of fields to return
1173
    @rtype: list
1174
    @return: list with one element for each field
1175
    @raise errors.OpExecError: when an invalid field
1176
        has been passed
1177

1178
    """
1179
    row = []
1180
    for fname in fields:
1181
      if fname == "id":
1182
        row.append(job.id)
1183
      elif fname == "status":
1184
        row.append(job.CalcStatus())
1185
      elif fname == "ops":
1186
        row.append([op.input.__getstate__() for op in job.ops])
1187
      elif fname == "opresult":
1188
        row.append([op.result for op in job.ops])
1189
      elif fname == "opstatus":
1190
        row.append([op.status for op in job.ops])
1191
      elif fname == "oplog":
1192
        row.append([op.log for op in job.ops])
1193
      elif fname == "opstart":
1194
        row.append([op.start_timestamp for op in job.ops])
1195
      elif fname == "opend":
1196
        row.append([op.end_timestamp for op in job.ops])
1197
      elif fname == "received_ts":
1198
        row.append(job.received_timestamp)
1199
      elif fname == "start_ts":
1200
        row.append(job.start_timestamp)
1201
      elif fname == "end_ts":
1202
        row.append(job.end_timestamp)
1203
      elif fname == "summary":
1204
        row.append([op.input.Summary() for op in job.ops])
1205
      else:
1206
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1207
    return row
1208

    
1209
  @utils.LockedMethod
1210
  @_RequireOpenQueue
1211
  def QueryJobs(self, job_ids, fields):
1212
    """Returns a list of jobs in queue.
1213

1214
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1215
    processing for each job.
1216

1217
    @type job_ids: list
1218
    @param job_ids: sequence of job identifiers or None for all
1219
    @type fields: list
1220
    @param fields: names of fields to return
1221
    @rtype: list
1222
    @return: list one element per job, each element being list with
1223
        the requested fields
1224

1225
    """
1226
    jobs = []
1227

    
1228
    for job in self._GetJobsUnlocked(job_ids):
1229
      if job is None:
1230
        jobs.append(None)
1231
      else:
1232
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1233

    
1234
    return jobs
1235

    
1236
  @utils.LockedMethod
1237
  @_RequireOpenQueue
1238
  def Shutdown(self):
1239
    """Stops the job queue.
1240

1241
    This shutdowns all the worker threads an closes the queue.
1242

1243
    """
1244
    self._wpool.TerminateWorkers()
1245

    
1246
    self._queue_lock.Close()
1247
    self._queue_lock = None