Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ a8a76bc2

History | View | Annotate | Download (40.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar change: a Condition variable we use for waiting for job changes
157

158
  """
159
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "__weakref__"]
163

    
164
  def __init__(self, queue, job_id, ops):
165
    """Constructor for the _QueuedJob.
166

167
    @type queue: L{JobQueue}
168
    @param queue: our parent queue
169
    @type job_id: job_id
170
    @param job_id: our job id
171
    @type ops: list
172
    @param ops: the list of opcodes we hold, which will be encapsulated
173
        in _QueuedOpCodes
174

175
    """
176
    if not ops:
177
      # TODO: use a better exception
178
      raise Exception("No opcodes")
179

    
180
    self.queue = queue
181
    self.id = job_id
182
    self.ops = [_QueuedOpCode(op) for op in ops]
183
    self.run_op_index = -1
184
    self.log_serial = 0
185
    self.received_timestamp = TimeStampNow()
186
    self.start_timestamp = None
187
    self.end_timestamp = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.run_op_index = state["run_op_index"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    obj.ops = []
213
    obj.log_serial = 0
214
    for op_state in state["ops"]:
215
      op = _QueuedOpCode.Restore(op_state)
216
      for log_entry in op.log:
217
        obj.log_serial = max(obj.log_serial, log_entry[0])
218
      obj.ops.append(op)
219

    
220
    # Condition to wait for changes
221
    obj.change = threading.Condition(obj.queue._lock)
222

    
223
    return obj
224

    
225
  def Serialize(self):
226
    """Serialize the _JobQueue instance.
227

228
    @rtype: dict
229
    @return: the serialized state
230

231
    """
232
    return {
233
      "id": self.id,
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
      "start_timestamp": self.start_timestamp,
237
      "end_timestamp": self.end_timestamp,
238
      "received_timestamp": self.received_timestamp,
239
      }
240

    
241
  def CalcStatus(self):
242
    """Compute the status of this job.
243

244
    This function iterates over all the _QueuedOpCodes in the job and
245
    based on their status, computes the job status.
246

247
    The algorithm is:
248
      - if we find a cancelled, or finished with error, the job
249
        status will be the same
250
      - otherwise, the last opcode with the status one of:
251
          - waitlock
252
          - canceling
253
          - running
254

255
        will determine the job status
256

257
      - otherwise, it means either all opcodes are queued, or success,
258
        and the job status will be the same
259

260
    @return: the job status
261

262
    """
263
    status = constants.JOB_STATUS_QUEUED
264

    
265
    all_success = True
266
    for op in self.ops:
267
      if op.status == constants.OP_STATUS_SUCCESS:
268
        continue
269

    
270
      all_success = False
271

    
272
      if op.status == constants.OP_STATUS_QUEUED:
273
        pass
274
      elif op.status == constants.OP_STATUS_WAITLOCK:
275
        status = constants.JOB_STATUS_WAITLOCK
276
      elif op.status == constants.OP_STATUS_RUNNING:
277
        status = constants.JOB_STATUS_RUNNING
278
      elif op.status == constants.OP_STATUS_CANCELING:
279
        status = constants.JOB_STATUS_CANCELING
280
        break
281
      elif op.status == constants.OP_STATUS_ERROR:
282
        status = constants.JOB_STATUS_ERROR
283
        # The whole job fails if one opcode failed
284
        break
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
        status = constants.OP_STATUS_CANCELED
287
        break
288

    
289
    if all_success:
290
      status = constants.JOB_STATUS_SUCCESS
291

    
292
    return status
293

    
294
  def GetLogEntries(self, newer_than):
295
    """Selectively returns the log entries.
296

297
    @type newer_than: None or int
298
    @param newer_than: if this is None, return all log entries,
299
        otherwise return only the log entries with serial higher
300
        than this value
301
    @rtype: list
302
    @return: the list of the log entries selected
303

304
    """
305
    if newer_than is None:
306
      serial = -1
307
    else:
308
      serial = newer_than
309

    
310
    entries = []
311
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313

    
314
    return entries
315

    
316
  def MarkUnfinishedOps(self, status, result):
317
    """Mark unfinished opcodes with a given status and result.
318

319
    This is an utility function for marking all running or waiting to
320
    be run opcodes with a given status. Opcodes which are already
321
    finalised are not changed.
322

323
    @param status: a given opcode status
324
    @param result: the opcode result
325

326
    """
327
    not_marked = True
328
    for op in self.ops:
329
      if op.status in constants.OPS_FINALIZED:
330
        assert not_marked, "Finalized opcodes found after non-finalized ones"
331
        continue
332
      op.status = status
333
      op.result = result
334
      not_marked = False
335

    
336

    
337
class _JobQueueWorker(workerpool.BaseWorker):
338
  """The actual job workers.
339

340
  """
341
  def _NotifyStart(self):
342
    """Mark the opcode as running, not lock-waiting.
343

344
    This is called from the mcpu code as a notifier function, when the
345
    LU is finally about to start the Exec() method. Of course, to have
346
    end-user visible results, the opcode must be initially (before
347
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348

349
    """
350
    assert self.queue, "Queue attribute is missing"
351
    assert self.opcode, "Opcode attribute is missing"
352

    
353
    self.queue.acquire()
354
    try:
355
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356
                                    constants.OP_STATUS_CANCELING)
357

    
358
      # Cancel here if we were asked to
359
      if self.opcode.status == constants.OP_STATUS_CANCELING:
360
        raise CancelJob()
361

    
362
      self.opcode.status = constants.OP_STATUS_RUNNING
363
    finally:
364
      self.queue.release()
365

    
366
  def RunTask(self, job):
367
    """Job executor.
368

369
    This functions processes a job. It is closely tied to the _QueuedJob and
370
    _QueuedOpCode classes.
371

372
    @type job: L{_QueuedJob}
373
    @param job: the job to be processed
374

375
    """
376
    logging.info("Worker %s processing job %s",
377
                  self.worker_id, job.id)
378
    proc = mcpu.Processor(self.pool.queue.context)
379
    self.queue = queue = job.queue
380
    try:
381
      try:
382
        count = len(job.ops)
383
        for idx, op in enumerate(job.ops):
384
          op_summary = op.input.Summary()
385
          if op.status == constants.OP_STATUS_SUCCESS:
386
            # this is a job that was partially completed before master
387
            # daemon shutdown, so it can be expected that some opcodes
388
            # are already completed successfully (if any did error
389
            # out, then the whole job should have been aborted and not
390
            # resubmitted for processing)
391
            logging.info("Op %s/%s: opcode %s already processed, skipping",
392
                         idx + 1, count, op_summary)
393
            continue
394
          try:
395
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396
                         op_summary)
397

    
398
            queue.acquire()
399
            try:
400
              if op.status == constants.OP_STATUS_CANCELED:
401
                raise CancelJob()
402
              assert op.status == constants.OP_STATUS_QUEUED
403
              job.run_op_index = idx
404
              op.status = constants.OP_STATUS_WAITLOCK
405
              op.result = None
406
              op.start_timestamp = TimeStampNow()
407
              if idx == 0: # first opcode
408
                job.start_timestamp = op.start_timestamp
409
              queue.UpdateJobUnlocked(job)
410

    
411
              input_opcode = op.input
412
            finally:
413
              queue.release()
414

    
415
            def _Log(*args):
416
              """Append a log entry.
417

418
              """
419
              assert len(args) < 3
420

    
421
              if len(args) == 1:
422
                log_type = constants.ELOG_MESSAGE
423
                log_msg = args[0]
424
              else:
425
                log_type, log_msg = args
426

    
427
              # The time is split to make serialization easier and not lose
428
              # precision.
429
              timestamp = utils.SplitTime(time.time())
430

    
431
              queue.acquire()
432
              try:
433
                job.log_serial += 1
434
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
435

    
436
                job.change.notifyAll()
437
              finally:
438
                queue.release()
439

    
440
            # Make sure not to hold lock while _Log is called
441
            self.opcode = op
442
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443

    
444
            queue.acquire()
445
            try:
446
              op.status = constants.OP_STATUS_SUCCESS
447
              op.result = result
448
              op.end_timestamp = TimeStampNow()
449
              queue.UpdateJobUnlocked(job)
450
            finally:
451
              queue.release()
452

    
453
            logging.info("Op %s/%s: Successfully finished opcode %s",
454
                         idx + 1, count, op_summary)
455
          except CancelJob:
456
            # Will be handled further up
457
            raise
458
          except Exception, err:
459
            queue.acquire()
460
            try:
461
              try:
462
                op.status = constants.OP_STATUS_ERROR
463
                op.result = str(err)
464
                op.end_timestamp = TimeStampNow()
465
                logging.info("Op %s/%s: Error in opcode %s: %s",
466
                             idx + 1, count, op_summary, err)
467
              finally:
468
                queue.UpdateJobUnlocked(job)
469
            finally:
470
              queue.release()
471
            raise
472

    
473
      except CancelJob:
474
        queue.acquire()
475
        try:
476
          queue.CancelJobUnlocked(job)
477
        finally:
478
          queue.release()
479
      except errors.GenericError, err:
480
        logging.exception("Ganeti exception")
481
      except:
482
        logging.exception("Unhandled exception")
483
    finally:
484
      queue.acquire()
485
      try:
486
        try:
487
          job.run_op_index = -1
488
          job.end_timestamp = TimeStampNow()
489
          queue.UpdateJobUnlocked(job)
490
        finally:
491
          job_id = job.id
492
          status = job.CalcStatus()
493
      finally:
494
        queue.release()
495
      logging.info("Worker %s finished job %s, status = %s",
496
                   self.worker_id, job_id, status)
497

    
498

    
499
class _JobQueueWorkerPool(workerpool.WorkerPool):
500
  """Simple class implementing a job-processing workerpool.
501

502
  """
503
  def __init__(self, queue):
504
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
505
                                              _JobQueueWorker)
506
    self.queue = queue
507

    
508

    
509
class JobQueue(object):
510
  """Queue used to manage the jobs.
511

512
  @cvar _RE_JOB_FILE: regex matching the valid job file names
513

514
  """
515
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
516

    
517
  def _RequireOpenQueue(fn):
518
    """Decorator for "public" functions.
519

520
    This function should be used for all 'public' functions. That is,
521
    functions usually called from other classes.
522

523
    @warning: Use this decorator only after utils.LockedMethod!
524

525
    Example::
526
      @utils.LockedMethod
527
      @_RequireOpenQueue
528
      def Example(self):
529
        pass
530

531
    """
532
    def wrapper(self, *args, **kwargs):
533
      assert self._queue_lock is not None, "Queue should be open"
534
      return fn(self, *args, **kwargs)
535
    return wrapper
536

    
537
  def __init__(self, context):
538
    """Constructor for JobQueue.
539

540
    The constructor will initialize the job queue object and then
541
    start loading the current jobs from disk, either for starting them
542
    (if they were queue) or for aborting them (if they were already
543
    running).
544

545
    @type context: GanetiContext
546
    @param context: the context object for access to the configuration
547
        data and other ganeti objects
548

549
    """
550
    self.context = context
551
    self._memcache = weakref.WeakValueDictionary()
552
    self._my_hostname = utils.HostInfo().name
553

    
554
    # Locking
555
    self._lock = threading.Lock()
556
    self.acquire = self._lock.acquire
557
    self.release = self._lock.release
558

    
559
    # Initialize
560
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
561

    
562
    # Read serial file
563
    self._last_serial = jstore.ReadSerial()
564
    assert self._last_serial is not None, ("Serial file was modified between"
565
                                           " check in jstore and here")
566

    
567
    # Get initial list of nodes
568
    self._nodes = dict((n.name, n.primary_ip)
569
                       for n in self.context.cfg.GetAllNodesInfo().values()
570
                       if n.master_candidate)
571

    
572
    # Remove master node
573
    try:
574
      del self._nodes[self._my_hostname]
575
    except KeyError:
576
      pass
577

    
578
    # TODO: Check consistency across nodes
579

    
580
    # Setup worker pool
581
    self._wpool = _JobQueueWorkerPool(self)
582
    try:
583
      # We need to lock here because WorkerPool.AddTask() may start a job while
584
      # we're still doing our work.
585
      self.acquire()
586
      try:
587
        logging.info("Inspecting job queue")
588

    
589
        all_job_ids = self._GetJobIDsUnlocked()
590
        jobs_count = len(all_job_ids)
591
        lastinfo = time.time()
592
        for idx, job_id in enumerate(all_job_ids):
593
          # Give an update every 1000 jobs or 10 seconds
594
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
595
              idx == (jobs_count - 1)):
596
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
597
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
598
            lastinfo = time.time()
599

    
600
          job = self._LoadJobUnlocked(job_id)
601

    
602
          # a failure in loading the job can cause 'None' to be returned
603
          if job is None:
604
            continue
605

    
606
          status = job.CalcStatus()
607

    
608
          if status in (constants.JOB_STATUS_QUEUED, ):
609
            self._wpool.AddTask(job)
610

    
611
          elif status in (constants.JOB_STATUS_RUNNING,
612
                          constants.JOB_STATUS_WAITLOCK,
613
                          constants.JOB_STATUS_CANCELING):
614
            logging.warning("Unfinished job %s found: %s", job.id, job)
615
            try:
616
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
617
                                    "Unclean master daemon shutdown")
618
            finally:
619
              self.UpdateJobUnlocked(job)
620

    
621
        logging.info("Job queue inspection finished")
622
      finally:
623
        self.release()
624
    except:
625
      self._wpool.TerminateWorkers()
626
      raise
627

    
628
  @utils.LockedMethod
629
  @_RequireOpenQueue
630
  def AddNode(self, node):
631
    """Register a new node with the queue.
632

633
    @type node: L{objects.Node}
634
    @param node: the node object to be added
635

636
    """
637
    node_name = node.name
638
    assert node_name != self._my_hostname
639

    
640
    # Clean queue directory on added node
641
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
642
    msg = result.RemoteFailMsg()
643
    if msg:
644
      logging.warning("Cannot cleanup queue directory on node %s: %s",
645
                      node_name, msg)
646

    
647
    if not node.master_candidate:
648
      # remove if existing, ignoring errors
649
      self._nodes.pop(node_name, None)
650
      # and skip the replication of the job ids
651
      return
652

    
653
    # Upload the whole queue excluding archived jobs
654
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
655

    
656
    # Upload current serial file
657
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
658

    
659
    for file_name in files:
660
      # Read file content
661
      fd = open(file_name, "r")
662
      try:
663
        content = fd.read()
664
      finally:
665
        fd.close()
666

    
667
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
668
                                                  [node.primary_ip],
669
                                                  file_name, content)
670
      msg = result[node_name].RemoteFailMsg()
671
      if msg:
672
        logging.error("Failed to upload file %s to node %s: %s",
673
                      file_name, node_name, msg)
674

    
675
    self._nodes[node_name] = node.primary_ip
676

    
677
  @utils.LockedMethod
678
  @_RequireOpenQueue
679
  def RemoveNode(self, node_name):
680
    """Callback called when removing nodes from the cluster.
681

682
    @type node_name: str
683
    @param node_name: the name of the node to remove
684

685
    """
686
    try:
687
      # The queue is removed by the "leave node" RPC call.
688
      del self._nodes[node_name]
689
    except KeyError:
690
      pass
691

    
692
  def _CheckRpcResult(self, result, nodes, failmsg):
693
    """Verifies the status of an RPC call.
694

695
    Since we aim to keep consistency should this node (the current
696
    master) fail, we will log errors if our rpc fail, and especially
697
    log the case when more than half of the nodes fails.
698

699
    @param result: the data as returned from the rpc call
700
    @type nodes: list
701
    @param nodes: the list of nodes we made the call to
702
    @type failmsg: str
703
    @param failmsg: the identifier to be used for logging
704

705
    """
706
    failed = []
707
    success = []
708

    
709
    for node in nodes:
710
      msg = result[node].RemoteFailMsg()
711
      if msg:
712
        failed.append(node)
713
        logging.error("RPC call %s failed on node %s: %s",
714
                      result[node].call, node, msg)
715
      else:
716
        success.append(node)
717

    
718
    # +1 for the master node
719
    if (len(success) + 1) < len(failed):
720
      # TODO: Handle failing nodes
721
      logging.error("More than half of the nodes failed")
722

    
723
  def _GetNodeIp(self):
724
    """Helper for returning the node name/ip list.
725

726
    @rtype: (list, list)
727
    @return: a tuple of two lists, the first one with the node
728
        names and the second one with the node addresses
729

730
    """
731
    name_list = self._nodes.keys()
732
    addr_list = [self._nodes[name] for name in name_list]
733
    return name_list, addr_list
734

    
735
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
736
    """Writes a file locally and then replicates it to all nodes.
737

738
    This function will replace the contents of a file on the local
739
    node and then replicate it to all the other nodes we have.
740

741
    @type file_name: str
742
    @param file_name: the path of the file to be replicated
743
    @type data: str
744
    @param data: the new contents of the file
745

746
    """
747
    utils.WriteFile(file_name, data=data)
748

    
749
    names, addrs = self._GetNodeIp()
750
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
751
    self._CheckRpcResult(result, self._nodes,
752
                         "Updating %s" % file_name)
753

    
754
  def _RenameFilesUnlocked(self, rename):
755
    """Renames a file locally and then replicate the change.
756

757
    This function will rename a file in the local queue directory
758
    and then replicate this rename to all the other nodes we have.
759

760
    @type rename: list of (old, new)
761
    @param rename: List containing tuples mapping old to new names
762

763
    """
764
    # Rename them locally
765
    for old, new in rename:
766
      utils.RenameFile(old, new, mkdir=True)
767

    
768
    # ... and on all nodes
769
    names, addrs = self._GetNodeIp()
770
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
771
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
772

    
773
  def _FormatJobID(self, job_id):
774
    """Convert a job ID to string format.
775

776
    Currently this just does C{str(job_id)} after performing some
777
    checks, but if we want to change the job id format this will
778
    abstract this change.
779

780
    @type job_id: int or long
781
    @param job_id: the numeric job id
782
    @rtype: str
783
    @return: the formatted job id
784

785
    """
786
    if not isinstance(job_id, (int, long)):
787
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
788
    if job_id < 0:
789
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
790

    
791
    return str(job_id)
792

    
793
  @classmethod
794
  def _GetArchiveDirectory(cls, job_id):
795
    """Returns the archive directory for a job.
796

797
    @type job_id: str
798
    @param job_id: Job identifier
799
    @rtype: str
800
    @return: Directory name
801

802
    """
803
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
804

    
805
  def _NewSerialUnlocked(self):
806
    """Generates a new job identifier.
807

808
    Job identifiers are unique during the lifetime of a cluster.
809

810
    @rtype: str
811
    @return: a string representing the job identifier.
812

813
    """
814
    # New number
815
    serial = self._last_serial + 1
816

    
817
    # Write to file
818
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
819
                                        "%s\n" % serial)
820

    
821
    # Keep it only if we were able to write the file
822
    self._last_serial = serial
823

    
824
    return self._FormatJobID(serial)
825

    
826
  @staticmethod
827
  def _GetJobPath(job_id):
828
    """Returns the job file for a given job id.
829

830
    @type job_id: str
831
    @param job_id: the job identifier
832
    @rtype: str
833
    @return: the path to the job file
834

835
    """
836
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
837

    
838
  @classmethod
839
  def _GetArchivedJobPath(cls, job_id):
840
    """Returns the archived job file for a give job id.
841

842
    @type job_id: str
843
    @param job_id: the job identifier
844
    @rtype: str
845
    @return: the path to the archived job file
846

847
    """
848
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
849
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
850

    
851
  @classmethod
852
  def _ExtractJobID(cls, name):
853
    """Extract the job id from a filename.
854

855
    @type name: str
856
    @param name: the job filename
857
    @rtype: job id or None
858
    @return: the job id corresponding to the given filename,
859
        or None if the filename does not represent a valid
860
        job file
861

862
    """
863
    m = cls._RE_JOB_FILE.match(name)
864
    if m:
865
      return m.group(1)
866
    else:
867
      return None
868

    
869
  def _GetJobIDsUnlocked(self, archived=False):
870
    """Return all known job IDs.
871

872
    If the parameter archived is True, archived jobs IDs will be
873
    included. Currently this argument is unused.
874

875
    The method only looks at disk because it's a requirement that all
876
    jobs are present on disk (so in the _memcache we don't have any
877
    extra IDs).
878

879
    @rtype: list
880
    @return: the list of job IDs
881

882
    """
883
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
884
    jlist = utils.NiceSort(jlist)
885
    return jlist
886

    
887
  def _ListJobFiles(self):
888
    """Returns the list of current job files.
889

890
    @rtype: list
891
    @return: the list of job file names
892

893
    """
894
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
895
            if self._RE_JOB_FILE.match(name)]
896

    
897
  def _LoadJobUnlocked(self, job_id):
898
    """Loads a job from the disk or memory.
899

900
    Given a job id, this will return the cached job object if
901
    existing, or try to load the job from the disk. If loading from
902
    disk, it will also add the job to the cache.
903

904
    @param job_id: the job id
905
    @rtype: L{_QueuedJob} or None
906
    @return: either None or the job object
907

908
    """
909
    job = self._memcache.get(job_id, None)
910
    if job:
911
      logging.debug("Found job %s in memcache", job_id)
912
      return job
913

    
914
    filepath = self._GetJobPath(job_id)
915
    logging.debug("Loading job from %s", filepath)
916
    try:
917
      fd = open(filepath, "r")
918
    except IOError, err:
919
      if err.errno in (errno.ENOENT, ):
920
        return None
921
      raise
922
    try:
923
      data = serializer.LoadJson(fd.read())
924
    finally:
925
      fd.close()
926

    
927
    try:
928
      job = _QueuedJob.Restore(self, data)
929
    except Exception, err:
930
      new_path = self._GetArchivedJobPath(job_id)
931
      if filepath == new_path:
932
        # job already archived (future case)
933
        logging.exception("Can't parse job %s", job_id)
934
      else:
935
        # non-archived case
936
        logging.exception("Can't parse job %s, will archive.", job_id)
937
        self._RenameFilesUnlocked([(filepath, new_path)])
938
      return None
939

    
940
    self._memcache[job_id] = job
941
    logging.debug("Added job %s to the cache", job_id)
942
    return job
943

    
944
  def _GetJobsUnlocked(self, job_ids):
945
    """Return a list of jobs based on their IDs.
946

947
    @type job_ids: list
948
    @param job_ids: either an empty list (meaning all jobs),
949
        or a list of job IDs
950
    @rtype: list
951
    @return: the list of job objects
952

953
    """
954
    if not job_ids:
955
      job_ids = self._GetJobIDsUnlocked()
956

    
957
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
958

    
959
  @staticmethod
960
  def _IsQueueMarkedDrain():
961
    """Check if the queue is marked from drain.
962

963
    This currently uses the queue drain file, which makes it a
964
    per-node flag. In the future this can be moved to the config file.
965

966
    @rtype: boolean
967
    @return: True of the job queue is marked for draining
968

969
    """
970
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
971

    
972
  @staticmethod
973
  def SetDrainFlag(drain_flag):
974
    """Sets the drain flag for the queue.
975

976
    This is similar to the function L{backend.JobQueueSetDrainFlag},
977
    and in the future we might merge them.
978

979
    @type drain_flag: boolean
980
    @param drain_flag: Whether to set or unset the drain flag
981

982
    """
983
    if drain_flag:
984
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
985
    else:
986
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
987
    return True
988

    
989
  @_RequireOpenQueue
990
  def _SubmitJobUnlocked(self, ops):
991
    """Create and store a new job.
992

993
    This enters the job into our job queue and also puts it on the new
994
    queue, in order for it to be picked up by the queue processors.
995

996
    @type ops: list
997
    @param ops: The list of OpCodes that will become the new job.
998
    @rtype: job ID
999
    @return: the job ID of the newly created job
1000
    @raise errors.JobQueueDrainError: if the job is marked for draining
1001

1002
    """
1003
    if self._IsQueueMarkedDrain():
1004
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1005

    
1006
    # Check job queue size
1007
    size = len(self._ListJobFiles())
1008
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1009
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1010
      # submission, though.
1011
      #size = ...
1012
      pass
1013

    
1014
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1015
      raise errors.JobQueueFull()
1016

    
1017
    # Get job identifier
1018
    job_id = self._NewSerialUnlocked()
1019
    job = _QueuedJob(self, job_id, ops)
1020

    
1021
    # Write to disk
1022
    self.UpdateJobUnlocked(job)
1023

    
1024
    logging.debug("Adding new job %s to the cache", job_id)
1025
    self._memcache[job_id] = job
1026

    
1027
    # Add to worker pool
1028
    self._wpool.AddTask(job)
1029

    
1030
    return job.id
1031

    
1032
  @utils.LockedMethod
1033
  @_RequireOpenQueue
1034
  def SubmitJob(self, ops):
1035
    """Create and store a new job.
1036

1037
    @see: L{_SubmitJobUnlocked}
1038

1039
    """
1040
    return self._SubmitJobUnlocked(ops)
1041

    
1042
  @utils.LockedMethod
1043
  @_RequireOpenQueue
1044
  def SubmitManyJobs(self, jobs):
1045
    """Create and store multiple jobs.
1046

1047
    @see: L{_SubmitJobUnlocked}
1048

1049
    """
1050
    results = []
1051
    for ops in jobs:
1052
      try:
1053
        data = self._SubmitJobUnlocked(ops)
1054
        status = True
1055
      except errors.GenericError, err:
1056
        data = str(err)
1057
        status = False
1058
      results.append((status, data))
1059

    
1060
    return results
1061

    
1062

    
1063
  @_RequireOpenQueue
1064
  def UpdateJobUnlocked(self, job):
1065
    """Update a job's on disk storage.
1066

1067
    After a job has been modified, this function needs to be called in
1068
    order to write the changes to disk and replicate them to the other
1069
    nodes.
1070

1071
    @type job: L{_QueuedJob}
1072
    @param job: the changed job
1073

1074
    """
1075
    filename = self._GetJobPath(job.id)
1076
    data = serializer.DumpJson(job.Serialize(), indent=False)
1077
    logging.debug("Writing job %s to %s", job.id, filename)
1078
    self._WriteAndReplicateFileUnlocked(filename, data)
1079

    
1080
    # Notify waiters about potential changes
1081
    job.change.notifyAll()
1082

    
1083
  @utils.LockedMethod
1084
  @_RequireOpenQueue
1085
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1086
                        timeout):
1087
    """Waits for changes in a job.
1088

1089
    @type job_id: string
1090
    @param job_id: Job identifier
1091
    @type fields: list of strings
1092
    @param fields: Which fields to check for changes
1093
    @type prev_job_info: list or None
1094
    @param prev_job_info: Last job information returned
1095
    @type prev_log_serial: int
1096
    @param prev_log_serial: Last job message serial number
1097
    @type timeout: float
1098
    @param timeout: maximum time to wait
1099
    @rtype: tuple (job info, log entries)
1100
    @return: a tuple of the job information as required via
1101
        the fields parameter, and the log entries as a list
1102

1103
        if the job has not changed and the timeout has expired,
1104
        we instead return a special value,
1105
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1106
        as such by the clients
1107

1108
    """
1109
    logging.debug("Waiting for changes in job %s", job_id)
1110

    
1111
    job_info = None
1112
    log_entries = None
1113

    
1114
    end_time = time.time() + timeout
1115
    while True:
1116
      delta_time = end_time - time.time()
1117
      if delta_time < 0:
1118
        return constants.JOB_NOTCHANGED
1119

    
1120
      job = self._LoadJobUnlocked(job_id)
1121
      if not job:
1122
        logging.debug("Job %s not found", job_id)
1123
        break
1124

    
1125
      status = job.CalcStatus()
1126
      job_info = self._GetJobInfoUnlocked(job, fields)
1127
      log_entries = job.GetLogEntries(prev_log_serial)
1128

    
1129
      # Serializing and deserializing data can cause type changes (e.g. from
1130
      # tuple to list) or precision loss. We're doing it here so that we get
1131
      # the same modifications as the data received from the client. Without
1132
      # this, the comparison afterwards might fail without the data being
1133
      # significantly different.
1134
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1135
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1136

    
1137
      if status not in (constants.JOB_STATUS_QUEUED,
1138
                        constants.JOB_STATUS_RUNNING,
1139
                        constants.JOB_STATUS_WAITLOCK):
1140
        # Don't even try to wait if the job is no longer running, there will be
1141
        # no changes.
1142
        break
1143

    
1144
      if (prev_job_info != job_info or
1145
          (log_entries and prev_log_serial != log_entries[0][0])):
1146
        break
1147

    
1148
      logging.debug("Waiting again")
1149

    
1150
      # Release the queue lock while waiting
1151
      job.change.wait(delta_time)
1152

    
1153
    logging.debug("Job %s changed", job_id)
1154

    
1155
    if job_info is None and log_entries is None:
1156
      return None
1157
    else:
1158
      return (job_info, log_entries)
1159

    
1160
  @utils.LockedMethod
1161
  @_RequireOpenQueue
1162
  def CancelJob(self, job_id):
1163
    """Cancels a job.
1164

1165
    This will only succeed if the job has not started yet.
1166

1167
    @type job_id: string
1168
    @param job_id: job ID of job to be cancelled.
1169

1170
    """
1171
    logging.info("Cancelling job %s", job_id)
1172

    
1173
    job = self._LoadJobUnlocked(job_id)
1174
    if not job:
1175
      logging.debug("Job %s not found", job_id)
1176
      return (False, "Job %s not found" % job_id)
1177

    
1178
    job_status = job.CalcStatus()
1179

    
1180
    if job_status not in (constants.JOB_STATUS_QUEUED,
1181
                          constants.JOB_STATUS_WAITLOCK):
1182
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1183
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1184

    
1185
    if job_status == constants.JOB_STATUS_QUEUED:
1186
      self.CancelJobUnlocked(job)
1187
      return (True, "Job %s canceled" % job.id)
1188

    
1189
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1190
      # The worker will notice the new status and cancel the job
1191
      try:
1192
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1193
      finally:
1194
        self.UpdateJobUnlocked(job)
1195
      return (True, "Job %s will be canceled" % job.id)
1196

    
1197
  @_RequireOpenQueue
1198
  def CancelJobUnlocked(self, job):
1199
    """Marks a job as canceled.
1200

1201
    """
1202
    try:
1203
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1204
                            "Job canceled by request")
1205
    finally:
1206
      self.UpdateJobUnlocked(job)
1207

    
1208
  @_RequireOpenQueue
1209
  def _ArchiveJobsUnlocked(self, jobs):
1210
    """Archives jobs.
1211

1212
    @type jobs: list of L{_QueuedJob}
1213
    @param jobs: Job objects
1214
    @rtype: int
1215
    @return: Number of archived jobs
1216

1217
    """
1218
    archive_jobs = []
1219
    rename_files = []
1220
    for job in jobs:
1221
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1222
                                  constants.JOB_STATUS_SUCCESS,
1223
                                  constants.JOB_STATUS_ERROR):
1224
        logging.debug("Job %s is not yet done", job.id)
1225
        continue
1226

    
1227
      archive_jobs.append(job)
1228

    
1229
      old = self._GetJobPath(job.id)
1230
      new = self._GetArchivedJobPath(job.id)
1231
      rename_files.append((old, new))
1232

    
1233
    # TODO: What if 1..n files fail to rename?
1234
    self._RenameFilesUnlocked(rename_files)
1235

    
1236
    logging.debug("Successfully archived job(s) %s",
1237
                  ", ".join(job.id for job in archive_jobs))
1238

    
1239
    return len(archive_jobs)
1240

    
1241
  @utils.LockedMethod
1242
  @_RequireOpenQueue
1243
  def ArchiveJob(self, job_id):
1244
    """Archives a job.
1245

1246
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1247

1248
    @type job_id: string
1249
    @param job_id: Job ID of job to be archived.
1250
    @rtype: bool
1251
    @return: Whether job was archived
1252

1253
    """
1254
    logging.info("Archiving job %s", job_id)
1255

    
1256
    job = self._LoadJobUnlocked(job_id)
1257
    if not job:
1258
      logging.debug("Job %s not found", job_id)
1259
      return False
1260

    
1261
    return self._ArchiveJobsUnlocked([job]) == 1
1262

    
1263
  @utils.LockedMethod
1264
  @_RequireOpenQueue
1265
  def AutoArchiveJobs(self, age, timeout):
1266
    """Archives all jobs based on age.
1267

1268
    The method will archive all jobs which are older than the age
1269
    parameter. For jobs that don't have an end timestamp, the start
1270
    timestamp will be considered. The special '-1' age will cause
1271
    archival of all jobs (that are not running or queued).
1272

1273
    @type age: int
1274
    @param age: the minimum age in seconds
1275

1276
    """
1277
    logging.info("Archiving jobs with age more than %s seconds", age)
1278

    
1279
    now = time.time()
1280
    end_time = now + timeout
1281
    archived_count = 0
1282
    last_touched = 0
1283

    
1284
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1285
    pending = []
1286
    for idx, job_id in enumerate(all_job_ids):
1287
      last_touched = idx
1288

    
1289
      # Not optimal because jobs could be pending
1290
      # TODO: Measure average duration for job archival and take number of
1291
      # pending jobs into account.
1292
      if time.time() > end_time:
1293
        break
1294

    
1295
      # Returns None if the job failed to load
1296
      job = self._LoadJobUnlocked(job_id)
1297
      if job:
1298
        if job.end_timestamp is None:
1299
          if job.start_timestamp is None:
1300
            job_age = job.received_timestamp
1301
          else:
1302
            job_age = job.start_timestamp
1303
        else:
1304
          job_age = job.end_timestamp
1305

    
1306
        if age == -1 or now - job_age[0] > age:
1307
          pending.append(job)
1308

    
1309
          # Archive 10 jobs at a time
1310
          if len(pending) >= 10:
1311
            archived_count += self._ArchiveJobsUnlocked(pending)
1312
            pending = []
1313

    
1314
    if pending:
1315
      archived_count += self._ArchiveJobsUnlocked(pending)
1316

    
1317
    return (archived_count, len(all_job_ids) - last_touched - 1)
1318

    
1319
  def _GetJobInfoUnlocked(self, job, fields):
1320
    """Returns information about a job.
1321

1322
    @type job: L{_QueuedJob}
1323
    @param job: the job which we query
1324
    @type fields: list
1325
    @param fields: names of fields to return
1326
    @rtype: list
1327
    @return: list with one element for each field
1328
    @raise errors.OpExecError: when an invalid field
1329
        has been passed
1330

1331
    """
1332
    row = []
1333
    for fname in fields:
1334
      if fname == "id":
1335
        row.append(job.id)
1336
      elif fname == "status":
1337
        row.append(job.CalcStatus())
1338
      elif fname == "ops":
1339
        row.append([op.input.__getstate__() for op in job.ops])
1340
      elif fname == "opresult":
1341
        row.append([op.result for op in job.ops])
1342
      elif fname == "opstatus":
1343
        row.append([op.status for op in job.ops])
1344
      elif fname == "oplog":
1345
        row.append([op.log for op in job.ops])
1346
      elif fname == "opstart":
1347
        row.append([op.start_timestamp for op in job.ops])
1348
      elif fname == "opend":
1349
        row.append([op.end_timestamp for op in job.ops])
1350
      elif fname == "received_ts":
1351
        row.append(job.received_timestamp)
1352
      elif fname == "start_ts":
1353
        row.append(job.start_timestamp)
1354
      elif fname == "end_ts":
1355
        row.append(job.end_timestamp)
1356
      elif fname == "summary":
1357
        row.append([op.input.Summary() for op in job.ops])
1358
      else:
1359
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1360
    return row
1361

    
1362
  @utils.LockedMethod
1363
  @_RequireOpenQueue
1364
  def QueryJobs(self, job_ids, fields):
1365
    """Returns a list of jobs in queue.
1366

1367
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1368
    processing for each job.
1369

1370
    @type job_ids: list
1371
    @param job_ids: sequence of job identifiers or None for all
1372
    @type fields: list
1373
    @param fields: names of fields to return
1374
    @rtype: list
1375
    @return: list one element per job, each element being list with
1376
        the requested fields
1377

1378
    """
1379
    jobs = []
1380

    
1381
    for job in self._GetJobsUnlocked(job_ids):
1382
      if job is None:
1383
        jobs.append(None)
1384
      else:
1385
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1386

    
1387
    return jobs
1388

    
1389
  @utils.LockedMethod
1390
  @_RequireOpenQueue
1391
  def Shutdown(self):
1392
    """Stops the job queue.
1393

1394
    This shutdowns all the worker threads an closes the queue.
1395

1396
    """
1397
    self._wpool.TerminateWorkers()
1398

    
1399
    self._queue_lock.Close()
1400
    self._queue_lock = None