Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ caeffaa0

History | View | Annotate | Download (40.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar change: a Condition variable we use for waiting for job changes
157

158
  """
159
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "__weakref__"]
163

    
164
  def __init__(self, queue, job_id, ops):
165
    """Constructor for the _QueuedJob.
166

167
    @type queue: L{JobQueue}
168
    @param queue: our parent queue
169
    @type job_id: job_id
170
    @param job_id: our job id
171
    @type ops: list
172
    @param ops: the list of opcodes we hold, which will be encapsulated
173
        in _QueuedOpCodes
174

175
    """
176
    if not ops:
177
      # TODO: use a better exception
178
      raise Exception("No opcodes")
179

    
180
    self.queue = queue
181
    self.id = job_id
182
    self.ops = [_QueuedOpCode(op) for op in ops]
183
    self.run_op_index = -1
184
    self.log_serial = 0
185
    self.received_timestamp = TimeStampNow()
186
    self.start_timestamp = None
187
    self.end_timestamp = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.run_op_index = state["run_op_index"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    obj.ops = []
213
    obj.log_serial = 0
214
    for op_state in state["ops"]:
215
      op = _QueuedOpCode.Restore(op_state)
216
      for log_entry in op.log:
217
        obj.log_serial = max(obj.log_serial, log_entry[0])
218
      obj.ops.append(op)
219

    
220
    # Condition to wait for changes
221
    obj.change = threading.Condition(obj.queue._lock)
222

    
223
    return obj
224

    
225
  def Serialize(self):
226
    """Serialize the _JobQueue instance.
227

228
    @rtype: dict
229
    @return: the serialized state
230

231
    """
232
    return {
233
      "id": self.id,
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
      "start_timestamp": self.start_timestamp,
237
      "end_timestamp": self.end_timestamp,
238
      "received_timestamp": self.received_timestamp,
239
      }
240

    
241
  def CalcStatus(self):
242
    """Compute the status of this job.
243

244
    This function iterates over all the _QueuedOpCodes in the job and
245
    based on their status, computes the job status.
246

247
    The algorithm is:
248
      - if we find a cancelled, or finished with error, the job
249
        status will be the same
250
      - otherwise, the last opcode with the status one of:
251
          - waitlock
252
          - canceling
253
          - running
254

255
        will determine the job status
256

257
      - otherwise, it means either all opcodes are queued, or success,
258
        and the job status will be the same
259

260
    @return: the job status
261

262
    """
263
    status = constants.JOB_STATUS_QUEUED
264

    
265
    all_success = True
266
    for op in self.ops:
267
      if op.status == constants.OP_STATUS_SUCCESS:
268
        continue
269

    
270
      all_success = False
271

    
272
      if op.status == constants.OP_STATUS_QUEUED:
273
        pass
274
      elif op.status == constants.OP_STATUS_WAITLOCK:
275
        status = constants.JOB_STATUS_WAITLOCK
276
      elif op.status == constants.OP_STATUS_RUNNING:
277
        status = constants.JOB_STATUS_RUNNING
278
      elif op.status == constants.OP_STATUS_CANCELING:
279
        status = constants.JOB_STATUS_CANCELING
280
        break
281
      elif op.status == constants.OP_STATUS_ERROR:
282
        status = constants.JOB_STATUS_ERROR
283
        # The whole job fails if one opcode failed
284
        break
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
        status = constants.OP_STATUS_CANCELED
287
        break
288

    
289
    if all_success:
290
      status = constants.JOB_STATUS_SUCCESS
291

    
292
    return status
293

    
294
  def GetLogEntries(self, newer_than):
295
    """Selectively returns the log entries.
296

297
    @type newer_than: None or int
298
    @param newer_than: if this is None, return all log entries,
299
        otherwise return only the log entries with serial higher
300
        than this value
301
    @rtype: list
302
    @return: the list of the log entries selected
303

304
    """
305
    if newer_than is None:
306
      serial = -1
307
    else:
308
      serial = newer_than
309

    
310
    entries = []
311
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313

    
314
    return entries
315

    
316
  def MarkUnfinishedOps(self, status, result):
317
    """Mark unfinished opcodes with a given status and result.
318

319
    This is an utility function for marking all running or waiting to
320
    be run opcodes with a given status. Opcodes which are already
321
    finalised are not changed.
322

323
    @param status: a given opcode status
324
    @param result: the opcode result
325

326
    """
327
    not_marked = True
328
    for op in self.ops:
329
      if op.status in constants.OPS_FINALIZED:
330
        assert not_marked, "Finalized opcodes found after non-finalized ones"
331
        continue
332
      op.status = status
333
      op.result = result
334
      not_marked = False
335

    
336

    
337
class _JobQueueWorker(workerpool.BaseWorker):
338
  """The actual job workers.
339

340
  """
341
  def _NotifyStart(self):
342
    """Mark the opcode as running, not lock-waiting.
343

344
    This is called from the mcpu code as a notifier function, when the
345
    LU is finally about to start the Exec() method. Of course, to have
346
    end-user visible results, the opcode must be initially (before
347
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348

349
    """
350
    assert self.queue, "Queue attribute is missing"
351
    assert self.opcode, "Opcode attribute is missing"
352

    
353
    self.queue.acquire()
354
    try:
355
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356
                                    constants.OP_STATUS_CANCELING)
357

    
358
      # Cancel here if we were asked to
359
      if self.opcode.status == constants.OP_STATUS_CANCELING:
360
        raise CancelJob()
361

    
362
      self.opcode.status = constants.OP_STATUS_RUNNING
363
    finally:
364
      self.queue.release()
365

    
366
  def RunTask(self, job):
367
    """Job executor.
368

369
    This functions processes a job. It is closely tied to the _QueuedJob and
370
    _QueuedOpCode classes.
371

372
    @type job: L{_QueuedJob}
373
    @param job: the job to be processed
374

375
    """
376
    logging.info("Worker %s processing job %s",
377
                  self.worker_id, job.id)
378
    proc = mcpu.Processor(self.pool.queue.context)
379
    self.queue = queue = job.queue
380
    try:
381
      try:
382
        count = len(job.ops)
383
        for idx, op in enumerate(job.ops):
384
          op_summary = op.input.Summary()
385
          if op.status == constants.OP_STATUS_SUCCESS:
386
            # this is a job that was partially completed before master
387
            # daemon shutdown, so it can be expected that some opcodes
388
            # are already completed successfully (if any did error
389
            # out, then the whole job should have been aborted and not
390
            # resubmitted for processing)
391
            logging.info("Op %s/%s: opcode %s already processed, skipping",
392
                         idx + 1, count, op_summary)
393
            continue
394
          try:
395
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396
                         op_summary)
397

    
398
            queue.acquire()
399
            try:
400
              if op.status == constants.OP_STATUS_CANCELED:
401
                raise CancelJob()
402
              assert op.status == constants.OP_STATUS_QUEUED
403
              job.run_op_index = idx
404
              op.status = constants.OP_STATUS_WAITLOCK
405
              op.result = None
406
              op.start_timestamp = TimeStampNow()
407
              if idx == 0: # first opcode
408
                job.start_timestamp = op.start_timestamp
409
              queue.UpdateJobUnlocked(job)
410

    
411
              input_opcode = op.input
412
            finally:
413
              queue.release()
414

    
415
            def _Log(*args):
416
              """Append a log entry.
417

418
              """
419
              assert len(args) < 3
420

    
421
              if len(args) == 1:
422
                log_type = constants.ELOG_MESSAGE
423
                log_msg = args[0]
424
              else:
425
                log_type, log_msg = args
426

    
427
              # The time is split to make serialization easier and not lose
428
              # precision.
429
              timestamp = utils.SplitTime(time.time())
430

    
431
              queue.acquire()
432
              try:
433
                job.log_serial += 1
434
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
435

    
436
                job.change.notifyAll()
437
              finally:
438
                queue.release()
439

    
440
            # Make sure not to hold lock while _Log is called
441
            self.opcode = op
442
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443

    
444
            queue.acquire()
445
            try:
446
              op.status = constants.OP_STATUS_SUCCESS
447
              op.result = result
448
              op.end_timestamp = TimeStampNow()
449
              queue.UpdateJobUnlocked(job)
450
            finally:
451
              queue.release()
452

    
453
            logging.info("Op %s/%s: Successfully finished opcode %s",
454
                         idx + 1, count, op_summary)
455
          except CancelJob:
456
            # Will be handled further up
457
            raise
458
          except Exception, err:
459
            queue.acquire()
460
            try:
461
              try:
462
                op.status = constants.OP_STATUS_ERROR
463
                if isinstance(err, errors.GenericError):
464
                  op.result = errors.EncodeException(err)
465
                else:
466
                  op.result = str(err)
467
                op.end_timestamp = TimeStampNow()
468
                logging.info("Op %s/%s: Error in opcode %s: %s",
469
                             idx + 1, count, op_summary, err)
470
              finally:
471
                queue.UpdateJobUnlocked(job)
472
            finally:
473
              queue.release()
474
            raise
475

    
476
      except CancelJob:
477
        queue.acquire()
478
        try:
479
          queue.CancelJobUnlocked(job)
480
        finally:
481
          queue.release()
482
      except errors.GenericError, err:
483
        logging.exception("Ganeti exception")
484
      except:
485
        logging.exception("Unhandled exception")
486
    finally:
487
      queue.acquire()
488
      try:
489
        try:
490
          job.run_op_index = -1
491
          job.end_timestamp = TimeStampNow()
492
          queue.UpdateJobUnlocked(job)
493
        finally:
494
          job_id = job.id
495
          status = job.CalcStatus()
496
      finally:
497
        queue.release()
498
      logging.info("Worker %s finished job %s, status = %s",
499
                   self.worker_id, job_id, status)
500

    
501

    
502
class _JobQueueWorkerPool(workerpool.WorkerPool):
503
  """Simple class implementing a job-processing workerpool.
504

505
  """
506
  def __init__(self, queue):
507
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
508
                                              _JobQueueWorker)
509
    self.queue = queue
510

    
511

    
512
def _RequireOpenQueue(fn):
513
  """Decorator for "public" functions.
514

515
  This function should be used for all 'public' functions. That is,
516
  functions usually called from other classes. Note that this should
517
  be applied only to methods (not plain functions), since it expects
518
  that the decorated function is called with a first argument that has
519
  a '_queue_lock' argument.
520

521
  @warning: Use this decorator only after utils.LockedMethod!
522

523
  Example::
524
    @utils.LockedMethod
525
    @_RequireOpenQueue
526
    def Example(self):
527
      pass
528

529
  """
530
  def wrapper(self, *args, **kwargs):
531
    assert self._queue_lock is not None, "Queue should be open"
532
    return fn(self, *args, **kwargs)
533
  return wrapper
534

    
535

    
536
class JobQueue(object):
537
  """Queue used to manage the jobs.
538

539
  @cvar _RE_JOB_FILE: regex matching the valid job file names
540

541
  """
542
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
543

    
544
  def __init__(self, context):
545
    """Constructor for JobQueue.
546

547
    The constructor will initialize the job queue object and then
548
    start loading the current jobs from disk, either for starting them
549
    (if they were queue) or for aborting them (if they were already
550
    running).
551

552
    @type context: GanetiContext
553
    @param context: the context object for access to the configuration
554
        data and other ganeti objects
555

556
    """
557
    self.context = context
558
    self._memcache = weakref.WeakValueDictionary()
559
    self._my_hostname = utils.HostInfo().name
560

    
561
    # Locking
562
    self._lock = threading.Lock()
563
    self.acquire = self._lock.acquire
564
    self.release = self._lock.release
565

    
566
    # Initialize
567
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
568

    
569
    # Read serial file
570
    self._last_serial = jstore.ReadSerial()
571
    assert self._last_serial is not None, ("Serial file was modified between"
572
                                           " check in jstore and here")
573

    
574
    # Get initial list of nodes
575
    self._nodes = dict((n.name, n.primary_ip)
576
                       for n in self.context.cfg.GetAllNodesInfo().values()
577
                       if n.master_candidate)
578

    
579
    # Remove master node
580
    try:
581
      del self._nodes[self._my_hostname]
582
    except KeyError:
583
      pass
584

    
585
    # TODO: Check consistency across nodes
586

    
587
    # Setup worker pool
588
    self._wpool = _JobQueueWorkerPool(self)
589
    try:
590
      # We need to lock here because WorkerPool.AddTask() may start a job while
591
      # we're still doing our work.
592
      self.acquire()
593
      try:
594
        logging.info("Inspecting job queue")
595

    
596
        all_job_ids = self._GetJobIDsUnlocked()
597
        jobs_count = len(all_job_ids)
598
        lastinfo = time.time()
599
        for idx, job_id in enumerate(all_job_ids):
600
          # Give an update every 1000 jobs or 10 seconds
601
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
602
              idx == (jobs_count - 1)):
603
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
604
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
605
            lastinfo = time.time()
606

    
607
          job = self._LoadJobUnlocked(job_id)
608

    
609
          # a failure in loading the job can cause 'None' to be returned
610
          if job is None:
611
            continue
612

    
613
          status = job.CalcStatus()
614

    
615
          if status in (constants.JOB_STATUS_QUEUED, ):
616
            self._wpool.AddTask(job)
617

    
618
          elif status in (constants.JOB_STATUS_RUNNING,
619
                          constants.JOB_STATUS_WAITLOCK,
620
                          constants.JOB_STATUS_CANCELING):
621
            logging.warning("Unfinished job %s found: %s", job.id, job)
622
            try:
623
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
624
                                    "Unclean master daemon shutdown")
625
            finally:
626
              self.UpdateJobUnlocked(job)
627

    
628
        logging.info("Job queue inspection finished")
629
      finally:
630
        self.release()
631
    except:
632
      self._wpool.TerminateWorkers()
633
      raise
634

    
635
  @utils.LockedMethod
636
  @_RequireOpenQueue
637
  def AddNode(self, node):
638
    """Register a new node with the queue.
639

640
    @type node: L{objects.Node}
641
    @param node: the node object to be added
642

643
    """
644
    node_name = node.name
645
    assert node_name != self._my_hostname
646

    
647
    # Clean queue directory on added node
648
    rpc.RpcRunner.call_jobqueue_purge(node_name)
649

    
650
    if not node.master_candidate:
651
      # remove if existing, ignoring errors
652
      self._nodes.pop(node_name, None)
653
      # and skip the replication of the job ids
654
      return
655

    
656
    # Upload the whole queue excluding archived jobs
657
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
658

    
659
    # Upload current serial file
660
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
661

    
662
    for file_name in files:
663
      # Read file content
664
      fd = open(file_name, "r")
665
      try:
666
        content = fd.read()
667
      finally:
668
        fd.close()
669

    
670
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
671
                                                  [node.primary_ip],
672
                                                  file_name, content)
673
      if not result[node_name]:
674
        logging.error("Failed to upload %s to %s", file_name, node_name)
675

    
676
    self._nodes[node_name] = node.primary_ip
677

    
678
  @utils.LockedMethod
679
  @_RequireOpenQueue
680
  def RemoveNode(self, node_name):
681
    """Callback called when removing nodes from the cluster.
682

683
    @type node_name: str
684
    @param node_name: the name of the node to remove
685

686
    """
687
    try:
688
      # The queue is removed by the "leave node" RPC call.
689
      del self._nodes[node_name]
690
    except KeyError:
691
      pass
692

    
693
  def _CheckRpcResult(self, result, nodes, failmsg):
694
    """Verifies the status of an RPC call.
695

696
    Since we aim to keep consistency should this node (the current
697
    master) fail, we will log errors if our rpc fail, and especially
698
    log the case when more than half of the nodes fails.
699

700
    @param result: the data as returned from the rpc call
701
    @type nodes: list
702
    @param nodes: the list of nodes we made the call to
703
    @type failmsg: str
704
    @param failmsg: the identifier to be used for logging
705

706
    """
707
    failed = []
708
    success = []
709

    
710
    for node in nodes:
711
      if result[node]:
712
        success.append(node)
713
      else:
714
        failed.append(node)
715

    
716
    if failed:
717
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
718

    
719
    # +1 for the master node
720
    if (len(success) + 1) < len(failed):
721
      # TODO: Handle failing nodes
722
      logging.error("More than half of the nodes failed")
723

    
724
  def _GetNodeIp(self):
725
    """Helper for returning the node name/ip list.
726

727
    @rtype: (list, list)
728
    @return: a tuple of two lists, the first one with the node
729
        names and the second one with the node addresses
730

731
    """
732
    name_list = self._nodes.keys()
733
    addr_list = [self._nodes[name] for name in name_list]
734
    return name_list, addr_list
735

    
736
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
737
    """Writes a file locally and then replicates it to all nodes.
738

739
    This function will replace the contents of a file on the local
740
    node and then replicate it to all the other nodes we have.
741

742
    @type file_name: str
743
    @param file_name: the path of the file to be replicated
744
    @type data: str
745
    @param data: the new contents of the file
746

747
    """
748
    utils.WriteFile(file_name, data=data)
749

    
750
    names, addrs = self._GetNodeIp()
751
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
752
    self._CheckRpcResult(result, self._nodes,
753
                         "Updating %s" % file_name)
754

    
755
  def _RenameFilesUnlocked(self, rename):
756
    """Renames a file locally and then replicate the change.
757

758
    This function will rename a file in the local queue directory
759
    and then replicate this rename to all the other nodes we have.
760

761
    @type rename: list of (old, new)
762
    @param rename: List containing tuples mapping old to new names
763

764
    """
765
    # Rename them locally
766
    for old, new in rename:
767
      utils.RenameFile(old, new, mkdir=True)
768

    
769
    # ... and on all nodes
770
    names, addrs = self._GetNodeIp()
771
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
772
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
773

    
774
  def _FormatJobID(self, job_id):
775
    """Convert a job ID to string format.
776

777
    Currently this just does C{str(job_id)} after performing some
778
    checks, but if we want to change the job id format this will
779
    abstract this change.
780

781
    @type job_id: int or long
782
    @param job_id: the numeric job id
783
    @rtype: str
784
    @return: the formatted job id
785

786
    """
787
    if not isinstance(job_id, (int, long)):
788
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
789
    if job_id < 0:
790
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
791

    
792
    return str(job_id)
793

    
794
  @classmethod
795
  def _GetArchiveDirectory(cls, job_id):
796
    """Returns the archive directory for a job.
797

798
    @type job_id: str
799
    @param job_id: Job identifier
800
    @rtype: str
801
    @return: Directory name
802

803
    """
804
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
805

    
806
  def _NewSerialsUnlocked(self, count):
807
    """Generates a new job identifier.
808

809
    Job identifiers are unique during the lifetime of a cluster.
810

811
    @type count: integer
812
    @param count: how many serials to return
813
    @rtype: str
814
    @return: a string representing the job identifier.
815

816
    """
817
    assert count > 0
818
    # New number
819
    serial = self._last_serial + count
820

    
821
    # Write to file
822
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
823
                                        "%s\n" % serial)
824

    
825
    result = [self._FormatJobID(v)
826
              for v in range(self._last_serial, serial + 1)]
827
    # Keep it only if we were able to write the file
828
    self._last_serial = serial
829

    
830
    return result
831

    
832
  @staticmethod
833
  def _GetJobPath(job_id):
834
    """Returns the job file for a given job id.
835

836
    @type job_id: str
837
    @param job_id: the job identifier
838
    @rtype: str
839
    @return: the path to the job file
840

841
    """
842
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
843

    
844
  @classmethod
845
  def _GetArchivedJobPath(cls, job_id):
846
    """Returns the archived job file for a give job id.
847

848
    @type job_id: str
849
    @param job_id: the job identifier
850
    @rtype: str
851
    @return: the path to the archived job file
852

853
    """
854
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
855
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
856

    
857
  @classmethod
858
  def _ExtractJobID(cls, name):
859
    """Extract the job id from a filename.
860

861
    @type name: str
862
    @param name: the job filename
863
    @rtype: job id or None
864
    @return: the job id corresponding to the given filename,
865
        or None if the filename does not represent a valid
866
        job file
867

868
    """
869
    m = cls._RE_JOB_FILE.match(name)
870
    if m:
871
      return m.group(1)
872
    else:
873
      return None
874

    
875
  def _GetJobIDsUnlocked(self, archived=False):
876
    """Return all known job IDs.
877

878
    If the parameter archived is True, archived jobs IDs will be
879
    included. Currently this argument is unused.
880

881
    The method only looks at disk because it's a requirement that all
882
    jobs are present on disk (so in the _memcache we don't have any
883
    extra IDs).
884

885
    @rtype: list
886
    @return: the list of job IDs
887

888
    """
889
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
890
    jlist = utils.NiceSort(jlist)
891
    return jlist
892

    
893
  def _ListJobFiles(self):
894
    """Returns the list of current job files.
895

896
    @rtype: list
897
    @return: the list of job file names
898

899
    """
900
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
901
            if self._RE_JOB_FILE.match(name)]
902

    
903
  def _LoadJobUnlocked(self, job_id):
904
    """Loads a job from the disk or memory.
905

906
    Given a job id, this will return the cached job object if
907
    existing, or try to load the job from the disk. If loading from
908
    disk, it will also add the job to the cache.
909

910
    @param job_id: the job id
911
    @rtype: L{_QueuedJob} or None
912
    @return: either None or the job object
913

914
    """
915
    job = self._memcache.get(job_id, None)
916
    if job:
917
      logging.debug("Found job %s in memcache", job_id)
918
      return job
919

    
920
    filepath = self._GetJobPath(job_id)
921
    logging.debug("Loading job from %s", filepath)
922
    try:
923
      fd = open(filepath, "r")
924
    except IOError, err:
925
      if err.errno in (errno.ENOENT, ):
926
        return None
927
      raise
928
    try:
929
      data = serializer.LoadJson(fd.read())
930
    finally:
931
      fd.close()
932

    
933
    try:
934
      job = _QueuedJob.Restore(self, data)
935
    except Exception, err:
936
      new_path = self._GetArchivedJobPath(job_id)
937
      if filepath == new_path:
938
        # job already archived (future case)
939
        logging.exception("Can't parse job %s", job_id)
940
      else:
941
        # non-archived case
942
        logging.exception("Can't parse job %s, will archive.", job_id)
943
        self._RenameFilesUnlocked([(filepath, new_path)])
944
      return None
945

    
946
    self._memcache[job_id] = job
947
    logging.debug("Added job %s to the cache", job_id)
948
    return job
949

    
950
  def _GetJobsUnlocked(self, job_ids):
951
    """Return a list of jobs based on their IDs.
952

953
    @type job_ids: list
954
    @param job_ids: either an empty list (meaning all jobs),
955
        or a list of job IDs
956
    @rtype: list
957
    @return: the list of job objects
958

959
    """
960
    if not job_ids:
961
      job_ids = self._GetJobIDsUnlocked()
962

    
963
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
964

    
965
  @staticmethod
966
  def _IsQueueMarkedDrain():
967
    """Check if the queue is marked from drain.
968

969
    This currently uses the queue drain file, which makes it a
970
    per-node flag. In the future this can be moved to the config file.
971

972
    @rtype: boolean
973
    @return: True of the job queue is marked for draining
974

975
    """
976
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
977

    
978
  @staticmethod
979
  def SetDrainFlag(drain_flag):
980
    """Sets the drain flag for the queue.
981

982
    This is similar to the function L{backend.JobQueueSetDrainFlag},
983
    and in the future we might merge them.
984

985
    @type drain_flag: boolean
986
    @param drain_flag: Whether to set or unset the drain flag
987

988
    """
989
    if drain_flag:
990
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
991
    else:
992
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
993
    return True
994

    
995
  @_RequireOpenQueue
996
  def _SubmitJobUnlocked(self, job_id, ops):
997
    """Create and store a new job.
998

999
    This enters the job into our job queue and also puts it on the new
1000
    queue, in order for it to be picked up by the queue processors.
1001

1002
    @type job_id: job ID
1003
    @param jod_id: the job ID for the new job
1004
    @type ops: list
1005
    @param ops: The list of OpCodes that will become the new job.
1006
    @rtype: job ID
1007
    @return: the job ID of the newly created job
1008
    @raise errors.JobQueueDrainError: if the job is marked for draining
1009

1010
    """
1011
    if self._IsQueueMarkedDrain():
1012
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1013

    
1014
    # Check job queue size
1015
    size = len(self._ListJobFiles())
1016
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1017
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1018
      # submission, though.
1019
      #size = ...
1020
      pass
1021

    
1022
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1023
      raise errors.JobQueueFull()
1024

    
1025
    job = _QueuedJob(self, job_id, ops)
1026

    
1027
    # Write to disk
1028
    self.UpdateJobUnlocked(job)
1029

    
1030
    logging.debug("Adding new job %s to the cache", job_id)
1031
    self._memcache[job_id] = job
1032

    
1033
    # Add to worker pool
1034
    self._wpool.AddTask(job)
1035

    
1036
    return job.id
1037

    
1038
  @utils.LockedMethod
1039
  @_RequireOpenQueue
1040
  def SubmitJob(self, ops):
1041
    """Create and store a new job.
1042

1043
    @see: L{_SubmitJobUnlocked}
1044

1045
    """
1046
    job_id = self._NewSerialsUnlocked(1)[0]
1047
    return self._SubmitJobUnlocked(job_id, ops)
1048

    
1049
  @utils.LockedMethod
1050
  @_RequireOpenQueue
1051
  def SubmitManyJobs(self, jobs):
1052
    """Create and store multiple jobs.
1053

1054
    @see: L{_SubmitJobUnlocked}
1055

1056
    """
1057
    results = []
1058
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1059
    for job_id, ops in zip(all_job_ids, jobs):
1060
      try:
1061
        data = self._SubmitJobUnlocked(job_id, ops)
1062
        status = True
1063
      except errors.GenericError, err:
1064
        data = str(err)
1065
        status = False
1066
      results.append((status, data))
1067

    
1068
    return results
1069

    
1070

    
1071
  @_RequireOpenQueue
1072
  def UpdateJobUnlocked(self, job):
1073
    """Update a job's on disk storage.
1074

1075
    After a job has been modified, this function needs to be called in
1076
    order to write the changes to disk and replicate them to the other
1077
    nodes.
1078

1079
    @type job: L{_QueuedJob}
1080
    @param job: the changed job
1081

1082
    """
1083
    filename = self._GetJobPath(job.id)
1084
    data = serializer.DumpJson(job.Serialize(), indent=False)
1085
    logging.debug("Writing job %s to %s", job.id, filename)
1086
    self._WriteAndReplicateFileUnlocked(filename, data)
1087

    
1088
    # Notify waiters about potential changes
1089
    job.change.notifyAll()
1090

    
1091
  @utils.LockedMethod
1092
  @_RequireOpenQueue
1093
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1094
                        timeout):
1095
    """Waits for changes in a job.
1096

1097
    @type job_id: string
1098
    @param job_id: Job identifier
1099
    @type fields: list of strings
1100
    @param fields: Which fields to check for changes
1101
    @type prev_job_info: list or None
1102
    @param prev_job_info: Last job information returned
1103
    @type prev_log_serial: int
1104
    @param prev_log_serial: Last job message serial number
1105
    @type timeout: float
1106
    @param timeout: maximum time to wait
1107
    @rtype: tuple (job info, log entries)
1108
    @return: a tuple of the job information as required via
1109
        the fields parameter, and the log entries as a list
1110

1111
        if the job has not changed and the timeout has expired,
1112
        we instead return a special value,
1113
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1114
        as such by the clients
1115

1116
    """
1117
    logging.debug("Waiting for changes in job %s", job_id)
1118

    
1119
    job_info = None
1120
    log_entries = None
1121

    
1122
    end_time = time.time() + timeout
1123
    while True:
1124
      delta_time = end_time - time.time()
1125
      if delta_time < 0:
1126
        return constants.JOB_NOTCHANGED
1127

    
1128
      job = self._LoadJobUnlocked(job_id)
1129
      if not job:
1130
        logging.debug("Job %s not found", job_id)
1131
        break
1132

    
1133
      status = job.CalcStatus()
1134
      job_info = self._GetJobInfoUnlocked(job, fields)
1135
      log_entries = job.GetLogEntries(prev_log_serial)
1136

    
1137
      # Serializing and deserializing data can cause type changes (e.g. from
1138
      # tuple to list) or precision loss. We're doing it here so that we get
1139
      # the same modifications as the data received from the client. Without
1140
      # this, the comparison afterwards might fail without the data being
1141
      # significantly different.
1142
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1143
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1144

    
1145
      if status not in (constants.JOB_STATUS_QUEUED,
1146
                        constants.JOB_STATUS_RUNNING,
1147
                        constants.JOB_STATUS_WAITLOCK):
1148
        # Don't even try to wait if the job is no longer running, there will be
1149
        # no changes.
1150
        break
1151

    
1152
      if (prev_job_info != job_info or
1153
          (log_entries and prev_log_serial != log_entries[0][0])):
1154
        break
1155

    
1156
      logging.debug("Waiting again")
1157

    
1158
      # Release the queue lock while waiting
1159
      job.change.wait(delta_time)
1160

    
1161
    logging.debug("Job %s changed", job_id)
1162

    
1163
    if job_info is None and log_entries is None:
1164
      return None
1165
    else:
1166
      return (job_info, log_entries)
1167

    
1168
  @utils.LockedMethod
1169
  @_RequireOpenQueue
1170
  def CancelJob(self, job_id):
1171
    """Cancels a job.
1172

1173
    This will only succeed if the job has not started yet.
1174

1175
    @type job_id: string
1176
    @param job_id: job ID of job to be cancelled.
1177

1178
    """
1179
    logging.info("Cancelling job %s", job_id)
1180

    
1181
    job = self._LoadJobUnlocked(job_id)
1182
    if not job:
1183
      logging.debug("Job %s not found", job_id)
1184
      return (False, "Job %s not found" % job_id)
1185

    
1186
    job_status = job.CalcStatus()
1187

    
1188
    if job_status not in (constants.JOB_STATUS_QUEUED,
1189
                          constants.JOB_STATUS_WAITLOCK):
1190
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1191
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1192

    
1193
    if job_status == constants.JOB_STATUS_QUEUED:
1194
      self.CancelJobUnlocked(job)
1195
      return (True, "Job %s canceled" % job.id)
1196

    
1197
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1198
      # The worker will notice the new status and cancel the job
1199
      try:
1200
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1201
      finally:
1202
        self.UpdateJobUnlocked(job)
1203
      return (True, "Job %s will be canceled" % job.id)
1204

    
1205
  @_RequireOpenQueue
1206
  def CancelJobUnlocked(self, job):
1207
    """Marks a job as canceled.
1208

1209
    """
1210
    try:
1211
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1212
                            "Job canceled by request")
1213
    finally:
1214
      self.UpdateJobUnlocked(job)
1215

    
1216
  @_RequireOpenQueue
1217
  def _ArchiveJobsUnlocked(self, jobs):
1218
    """Archives jobs.
1219

1220
    @type jobs: list of L{_QueuedJob}
1221
    @param jobs: Job objects
1222
    @rtype: int
1223
    @return: Number of archived jobs
1224

1225
    """
1226
    archive_jobs = []
1227
    rename_files = []
1228
    for job in jobs:
1229
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1230
                                  constants.JOB_STATUS_SUCCESS,
1231
                                  constants.JOB_STATUS_ERROR):
1232
        logging.debug("Job %s is not yet done", job.id)
1233
        continue
1234

    
1235
      archive_jobs.append(job)
1236

    
1237
      old = self._GetJobPath(job.id)
1238
      new = self._GetArchivedJobPath(job.id)
1239
      rename_files.append((old, new))
1240

    
1241
    # TODO: What if 1..n files fail to rename?
1242
    self._RenameFilesUnlocked(rename_files)
1243

    
1244
    logging.debug("Successfully archived job(s) %s",
1245
                  ", ".join(job.id for job in archive_jobs))
1246

    
1247
    return len(archive_jobs)
1248

    
1249
  @utils.LockedMethod
1250
  @_RequireOpenQueue
1251
  def ArchiveJob(self, job_id):
1252
    """Archives a job.
1253

1254
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1255

1256
    @type job_id: string
1257
    @param job_id: Job ID of job to be archived.
1258
    @rtype: bool
1259
    @return: Whether job was archived
1260

1261
    """
1262
    logging.info("Archiving job %s", job_id)
1263

    
1264
    job = self._LoadJobUnlocked(job_id)
1265
    if not job:
1266
      logging.debug("Job %s not found", job_id)
1267
      return False
1268

    
1269
    return self._ArchiveJobsUnlocked([job]) == 1
1270

    
1271
  @utils.LockedMethod
1272
  @_RequireOpenQueue
1273
  def AutoArchiveJobs(self, age, timeout):
1274
    """Archives all jobs based on age.
1275

1276
    The method will archive all jobs which are older than the age
1277
    parameter. For jobs that don't have an end timestamp, the start
1278
    timestamp will be considered. The special '-1' age will cause
1279
    archival of all jobs (that are not running or queued).
1280

1281
    @type age: int
1282
    @param age: the minimum age in seconds
1283

1284
    """
1285
    logging.info("Archiving jobs with age more than %s seconds", age)
1286

    
1287
    now = time.time()
1288
    end_time = now + timeout
1289
    archived_count = 0
1290
    last_touched = 0
1291

    
1292
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1293
    pending = []
1294
    for idx, job_id in enumerate(all_job_ids):
1295
      last_touched = idx
1296

    
1297
      # Not optimal because jobs could be pending
1298
      # TODO: Measure average duration for job archival and take number of
1299
      # pending jobs into account.
1300
      if time.time() > end_time:
1301
        break
1302

    
1303
      # Returns None if the job failed to load
1304
      job = self._LoadJobUnlocked(job_id)
1305
      if job:
1306
        if job.end_timestamp is None:
1307
          if job.start_timestamp is None:
1308
            job_age = job.received_timestamp
1309
          else:
1310
            job_age = job.start_timestamp
1311
        else:
1312
          job_age = job.end_timestamp
1313

    
1314
        if age == -1 or now - job_age[0] > age:
1315
          pending.append(job)
1316

    
1317
          # Archive 10 jobs at a time
1318
          if len(pending) >= 10:
1319
            archived_count += self._ArchiveJobsUnlocked(pending)
1320
            pending = []
1321

    
1322
    if pending:
1323
      archived_count += self._ArchiveJobsUnlocked(pending)
1324

    
1325
    return (archived_count, len(all_job_ids) - last_touched - 1)
1326

    
1327
  def _GetJobInfoUnlocked(self, job, fields):
1328
    """Returns information about a job.
1329

1330
    @type job: L{_QueuedJob}
1331
    @param job: the job which we query
1332
    @type fields: list
1333
    @param fields: names of fields to return
1334
    @rtype: list
1335
    @return: list with one element for each field
1336
    @raise errors.OpExecError: when an invalid field
1337
        has been passed
1338

1339
    """
1340
    row = []
1341
    for fname in fields:
1342
      if fname == "id":
1343
        row.append(job.id)
1344
      elif fname == "status":
1345
        row.append(job.CalcStatus())
1346
      elif fname == "ops":
1347
        row.append([op.input.__getstate__() for op in job.ops])
1348
      elif fname == "opresult":
1349
        row.append([op.result for op in job.ops])
1350
      elif fname == "opstatus":
1351
        row.append([op.status for op in job.ops])
1352
      elif fname == "oplog":
1353
        row.append([op.log for op in job.ops])
1354
      elif fname == "opstart":
1355
        row.append([op.start_timestamp for op in job.ops])
1356
      elif fname == "opend":
1357
        row.append([op.end_timestamp for op in job.ops])
1358
      elif fname == "received_ts":
1359
        row.append(job.received_timestamp)
1360
      elif fname == "start_ts":
1361
        row.append(job.start_timestamp)
1362
      elif fname == "end_ts":
1363
        row.append(job.end_timestamp)
1364
      elif fname == "summary":
1365
        row.append([op.input.Summary() for op in job.ops])
1366
      else:
1367
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1368
    return row
1369

    
1370
  @utils.LockedMethod
1371
  @_RequireOpenQueue
1372
  def QueryJobs(self, job_ids, fields):
1373
    """Returns a list of jobs in queue.
1374

1375
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1376
    processing for each job.
1377

1378
    @type job_ids: list
1379
    @param job_ids: sequence of job identifiers or None for all
1380
    @type fields: list
1381
    @param fields: names of fields to return
1382
    @rtype: list
1383
    @return: list one element per job, each element being list with
1384
        the requested fields
1385

1386
    """
1387
    jobs = []
1388

    
1389
    for job in self._GetJobsUnlocked(job_ids):
1390
      if job is None:
1391
        jobs.append(None)
1392
      else:
1393
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1394

    
1395
    return jobs
1396

    
1397
  @utils.LockedMethod
1398
  @_RequireOpenQueue
1399
  def Shutdown(self):
1400
    """Stops the job queue.
1401

1402
    This shutdowns all the worker threads an closes the queue.
1403

1404
    """
1405
    self._wpool.TerminateWorkers()
1406

    
1407
    self._queue_lock.Close()
1408
    self._queue_lock = None