Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ e6345c35

History | View | Annotate | Download (40.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar change: a Condition variable we use for waiting for job changes
157

158
  """
159
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "__weakref__"]
163

    
164
  def __init__(self, queue, job_id, ops):
165
    """Constructor for the _QueuedJob.
166

167
    @type queue: L{JobQueue}
168
    @param queue: our parent queue
169
    @type job_id: job_id
170
    @param job_id: our job id
171
    @type ops: list
172
    @param ops: the list of opcodes we hold, which will be encapsulated
173
        in _QueuedOpCodes
174

175
    """
176
    if not ops:
177
      # TODO: use a better exception
178
      raise Exception("No opcodes")
179

    
180
    self.queue = queue
181
    self.id = job_id
182
    self.ops = [_QueuedOpCode(op) for op in ops]
183
    self.run_op_index = -1
184
    self.log_serial = 0
185
    self.received_timestamp = TimeStampNow()
186
    self.start_timestamp = None
187
    self.end_timestamp = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.run_op_index = state["run_op_index"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    obj.ops = []
213
    obj.log_serial = 0
214
    for op_state in state["ops"]:
215
      op = _QueuedOpCode.Restore(op_state)
216
      for log_entry in op.log:
217
        obj.log_serial = max(obj.log_serial, log_entry[0])
218
      obj.ops.append(op)
219

    
220
    # Condition to wait for changes
221
    obj.change = threading.Condition(obj.queue._lock)
222

    
223
    return obj
224

    
225
  def Serialize(self):
226
    """Serialize the _JobQueue instance.
227

228
    @rtype: dict
229
    @return: the serialized state
230

231
    """
232
    return {
233
      "id": self.id,
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
      "start_timestamp": self.start_timestamp,
237
      "end_timestamp": self.end_timestamp,
238
      "received_timestamp": self.received_timestamp,
239
      }
240

    
241
  def CalcStatus(self):
242
    """Compute the status of this job.
243

244
    This function iterates over all the _QueuedOpCodes in the job and
245
    based on their status, computes the job status.
246

247
    The algorithm is:
248
      - if we find a cancelled, or finished with error, the job
249
        status will be the same
250
      - otherwise, the last opcode with the status one of:
251
          - waitlock
252
          - canceling
253
          - running
254

255
        will determine the job status
256

257
      - otherwise, it means either all opcodes are queued, or success,
258
        and the job status will be the same
259

260
    @return: the job status
261

262
    """
263
    status = constants.JOB_STATUS_QUEUED
264

    
265
    all_success = True
266
    for op in self.ops:
267
      if op.status == constants.OP_STATUS_SUCCESS:
268
        continue
269

    
270
      all_success = False
271

    
272
      if op.status == constants.OP_STATUS_QUEUED:
273
        pass
274
      elif op.status == constants.OP_STATUS_WAITLOCK:
275
        status = constants.JOB_STATUS_WAITLOCK
276
      elif op.status == constants.OP_STATUS_RUNNING:
277
        status = constants.JOB_STATUS_RUNNING
278
      elif op.status == constants.OP_STATUS_CANCELING:
279
        status = constants.JOB_STATUS_CANCELING
280
        break
281
      elif op.status == constants.OP_STATUS_ERROR:
282
        status = constants.JOB_STATUS_ERROR
283
        # The whole job fails if one opcode failed
284
        break
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
        status = constants.OP_STATUS_CANCELED
287
        break
288

    
289
    if all_success:
290
      status = constants.JOB_STATUS_SUCCESS
291

    
292
    return status
293

    
294
  def GetLogEntries(self, newer_than):
295
    """Selectively returns the log entries.
296

297
    @type newer_than: None or int
298
    @param newer_than: if this is None, return all log entries,
299
        otherwise return only the log entries with serial higher
300
        than this value
301
    @rtype: list
302
    @return: the list of the log entries selected
303

304
    """
305
    if newer_than is None:
306
      serial = -1
307
    else:
308
      serial = newer_than
309

    
310
    entries = []
311
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313

    
314
    return entries
315

    
316
  def MarkUnfinishedOps(self, status, result):
317
    """Mark unfinished opcodes with a given status and result.
318

319
    This is an utility function for marking all running or waiting to
320
    be run opcodes with a given status. Opcodes which are already
321
    finalised are not changed.
322

323
    @param status: a given opcode status
324
    @param result: the opcode result
325

326
    """
327
    not_marked = True
328
    for op in self.ops:
329
      if op.status in constants.OPS_FINALIZED:
330
        assert not_marked, "Finalized opcodes found after non-finalized ones"
331
        continue
332
      op.status = status
333
      op.result = result
334
      not_marked = False
335

    
336

    
337
class _JobQueueWorker(workerpool.BaseWorker):
338
  """The actual job workers.
339

340
  """
341
  def _NotifyStart(self):
342
    """Mark the opcode as running, not lock-waiting.
343

344
    This is called from the mcpu code as a notifier function, when the
345
    LU is finally about to start the Exec() method. Of course, to have
346
    end-user visible results, the opcode must be initially (before
347
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348

349
    """
350
    assert self.queue, "Queue attribute is missing"
351
    assert self.opcode, "Opcode attribute is missing"
352

    
353
    self.queue.acquire()
354
    try:
355
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356
                                    constants.OP_STATUS_CANCELING)
357

    
358
      # Cancel here if we were asked to
359
      if self.opcode.status == constants.OP_STATUS_CANCELING:
360
        raise CancelJob()
361

    
362
      self.opcode.status = constants.OP_STATUS_RUNNING
363
    finally:
364
      self.queue.release()
365

    
366
  def RunTask(self, job):
367
    """Job executor.
368

369
    This functions processes a job. It is closely tied to the _QueuedJob and
370
    _QueuedOpCode classes.
371

372
    @type job: L{_QueuedJob}
373
    @param job: the job to be processed
374

375
    """
376
    logging.info("Worker %s processing job %s",
377
                  self.worker_id, job.id)
378
    proc = mcpu.Processor(self.pool.queue.context)
379
    self.queue = queue = job.queue
380
    try:
381
      try:
382
        count = len(job.ops)
383
        for idx, op in enumerate(job.ops):
384
          op_summary = op.input.Summary()
385
          if op.status == constants.OP_STATUS_SUCCESS:
386
            # this is a job that was partially completed before master
387
            # daemon shutdown, so it can be expected that some opcodes
388
            # are already completed successfully (if any did error
389
            # out, then the whole job should have been aborted and not
390
            # resubmitted for processing)
391
            logging.info("Op %s/%s: opcode %s already processed, skipping",
392
                         idx + 1, count, op_summary)
393
            continue
394
          try:
395
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396
                         op_summary)
397

    
398
            queue.acquire()
399
            try:
400
              if op.status == constants.OP_STATUS_CANCELED:
401
                raise CancelJob()
402
              assert op.status == constants.OP_STATUS_QUEUED
403
              job.run_op_index = idx
404
              op.status = constants.OP_STATUS_WAITLOCK
405
              op.result = None
406
              op.start_timestamp = TimeStampNow()
407
              if idx == 0: # first opcode
408
                job.start_timestamp = op.start_timestamp
409
              queue.UpdateJobUnlocked(job)
410

    
411
              input_opcode = op.input
412
            finally:
413
              queue.release()
414

    
415
            def _Log(*args):
416
              """Append a log entry.
417

418
              """
419
              assert len(args) < 3
420

    
421
              if len(args) == 1:
422
                log_type = constants.ELOG_MESSAGE
423
                log_msg = args[0]
424
              else:
425
                log_type, log_msg = args
426

    
427
              # The time is split to make serialization easier and not lose
428
              # precision.
429
              timestamp = utils.SplitTime(time.time())
430

    
431
              queue.acquire()
432
              try:
433
                job.log_serial += 1
434
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
435

    
436
                job.change.notifyAll()
437
              finally:
438
                queue.release()
439

    
440
            # Make sure not to hold lock while _Log is called
441
            self.opcode = op
442
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443

    
444
            queue.acquire()
445
            try:
446
              op.status = constants.OP_STATUS_SUCCESS
447
              op.result = result
448
              op.end_timestamp = TimeStampNow()
449
              queue.UpdateJobUnlocked(job)
450
            finally:
451
              queue.release()
452

    
453
            logging.info("Op %s/%s: Successfully finished opcode %s",
454
                         idx + 1, count, op_summary)
455
          except CancelJob:
456
            # Will be handled further up
457
            raise
458
          except Exception, err:
459
            queue.acquire()
460
            try:
461
              try:
462
                op.status = constants.OP_STATUS_ERROR
463
                if isinstance(err, errors.GenericError):
464
                  op.result = errors.EncodeException(err)
465
                else:
466
                  op.result = str(err)
467
                op.end_timestamp = TimeStampNow()
468
                logging.info("Op %s/%s: Error in opcode %s: %s",
469
                             idx + 1, count, op_summary, err)
470
              finally:
471
                queue.UpdateJobUnlocked(job)
472
            finally:
473
              queue.release()
474
            raise
475

    
476
      except CancelJob:
477
        queue.acquire()
478
        try:
479
          queue.CancelJobUnlocked(job)
480
        finally:
481
          queue.release()
482
      except errors.GenericError, err:
483
        logging.exception("Ganeti exception")
484
      except:
485
        logging.exception("Unhandled exception")
486
    finally:
487
      queue.acquire()
488
      try:
489
        try:
490
          job.run_op_index = -1
491
          job.end_timestamp = TimeStampNow()
492
          queue.UpdateJobUnlocked(job)
493
        finally:
494
          job_id = job.id
495
          status = job.CalcStatus()
496
      finally:
497
        queue.release()
498
      logging.info("Worker %s finished job %s, status = %s",
499
                   self.worker_id, job_id, status)
500

    
501

    
502
class _JobQueueWorkerPool(workerpool.WorkerPool):
503
  """Simple class implementing a job-processing workerpool.
504

505
  """
506
  def __init__(self, queue):
507
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
508
                                              _JobQueueWorker)
509
    self.queue = queue
510

    
511

    
512
class JobQueue(object):
513
  """Queue used to manage the jobs.
514

515
  @cvar _RE_JOB_FILE: regex matching the valid job file names
516

517
  """
518
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
519

    
520
  def _RequireOpenQueue(fn):
521
    """Decorator for "public" functions.
522

523
    This function should be used for all 'public' functions. That is,
524
    functions usually called from other classes.
525

526
    @warning: Use this decorator only after utils.LockedMethod!
527

528
    Example::
529
      @utils.LockedMethod
530
      @_RequireOpenQueue
531
      def Example(self):
532
        pass
533

534
    """
535
    def wrapper(self, *args, **kwargs):
536
      assert self._queue_lock is not None, "Queue should be open"
537
      return fn(self, *args, **kwargs)
538
    return wrapper
539

    
540
  def __init__(self, context):
541
    """Constructor for JobQueue.
542

543
    The constructor will initialize the job queue object and then
544
    start loading the current jobs from disk, either for starting them
545
    (if they were queue) or for aborting them (if they were already
546
    running).
547

548
    @type context: GanetiContext
549
    @param context: the context object for access to the configuration
550
        data and other ganeti objects
551

552
    """
553
    self.context = context
554
    self._memcache = weakref.WeakValueDictionary()
555
    self._my_hostname = utils.HostInfo().name
556

    
557
    # Locking
558
    self._lock = threading.Lock()
559
    self.acquire = self._lock.acquire
560
    self.release = self._lock.release
561

    
562
    # Initialize
563
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
564

    
565
    # Read serial file
566
    self._last_serial = jstore.ReadSerial()
567
    assert self._last_serial is not None, ("Serial file was modified between"
568
                                           " check in jstore and here")
569

    
570
    # Get initial list of nodes
571
    self._nodes = dict((n.name, n.primary_ip)
572
                       for n in self.context.cfg.GetAllNodesInfo().values()
573
                       if n.master_candidate)
574

    
575
    # Remove master node
576
    try:
577
      del self._nodes[self._my_hostname]
578
    except KeyError:
579
      pass
580

    
581
    # TODO: Check consistency across nodes
582

    
583
    # Setup worker pool
584
    self._wpool = _JobQueueWorkerPool(self)
585
    try:
586
      # We need to lock here because WorkerPool.AddTask() may start a job while
587
      # we're still doing our work.
588
      self.acquire()
589
      try:
590
        logging.info("Inspecting job queue")
591

    
592
        all_job_ids = self._GetJobIDsUnlocked()
593
        jobs_count = len(all_job_ids)
594
        lastinfo = time.time()
595
        for idx, job_id in enumerate(all_job_ids):
596
          # Give an update every 1000 jobs or 10 seconds
597
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
598
              idx == (jobs_count - 1)):
599
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
600
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
601
            lastinfo = time.time()
602

    
603
          job = self._LoadJobUnlocked(job_id)
604

    
605
          # a failure in loading the job can cause 'None' to be returned
606
          if job is None:
607
            continue
608

    
609
          status = job.CalcStatus()
610

    
611
          if status in (constants.JOB_STATUS_QUEUED, ):
612
            self._wpool.AddTask(job)
613

    
614
          elif status in (constants.JOB_STATUS_RUNNING,
615
                          constants.JOB_STATUS_WAITLOCK,
616
                          constants.JOB_STATUS_CANCELING):
617
            logging.warning("Unfinished job %s found: %s", job.id, job)
618
            try:
619
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
620
                                    "Unclean master daemon shutdown")
621
            finally:
622
              self.UpdateJobUnlocked(job)
623

    
624
        logging.info("Job queue inspection finished")
625
      finally:
626
        self.release()
627
    except:
628
      self._wpool.TerminateWorkers()
629
      raise
630

    
631
  @utils.LockedMethod
632
  @_RequireOpenQueue
633
  def AddNode(self, node):
634
    """Register a new node with the queue.
635

636
    @type node: L{objects.Node}
637
    @param node: the node object to be added
638

639
    """
640
    node_name = node.name
641
    assert node_name != self._my_hostname
642

    
643
    # Clean queue directory on added node
644
    rpc.RpcRunner.call_jobqueue_purge(node_name)
645

    
646
    if not node.master_candidate:
647
      # remove if existing, ignoring errors
648
      self._nodes.pop(node_name, None)
649
      # and skip the replication of the job ids
650
      return
651

    
652
    # Upload the whole queue excluding archived jobs
653
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
654

    
655
    # Upload current serial file
656
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
657

    
658
    for file_name in files:
659
      # Read file content
660
      fd = open(file_name, "r")
661
      try:
662
        content = fd.read()
663
      finally:
664
        fd.close()
665

    
666
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
667
                                                  [node.primary_ip],
668
                                                  file_name, content)
669
      if not result[node_name]:
670
        logging.error("Failed to upload %s to %s", file_name, node_name)
671

    
672
    self._nodes[node_name] = node.primary_ip
673

    
674
  @utils.LockedMethod
675
  @_RequireOpenQueue
676
  def RemoveNode(self, node_name):
677
    """Callback called when removing nodes from the cluster.
678

679
    @type node_name: str
680
    @param node_name: the name of the node to remove
681

682
    """
683
    try:
684
      # The queue is removed by the "leave node" RPC call.
685
      del self._nodes[node_name]
686
    except KeyError:
687
      pass
688

    
689
  def _CheckRpcResult(self, result, nodes, failmsg):
690
    """Verifies the status of an RPC call.
691

692
    Since we aim to keep consistency should this node (the current
693
    master) fail, we will log errors if our rpc fail, and especially
694
    log the case when more than half of the nodes fails.
695

696
    @param result: the data as returned from the rpc call
697
    @type nodes: list
698
    @param nodes: the list of nodes we made the call to
699
    @type failmsg: str
700
    @param failmsg: the identifier to be used for logging
701

702
    """
703
    failed = []
704
    success = []
705

    
706
    for node in nodes:
707
      if result[node]:
708
        success.append(node)
709
      else:
710
        failed.append(node)
711

    
712
    if failed:
713
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
714

    
715
    # +1 for the master node
716
    if (len(success) + 1) < len(failed):
717
      # TODO: Handle failing nodes
718
      logging.error("More than half of the nodes failed")
719

    
720
  def _GetNodeIp(self):
721
    """Helper for returning the node name/ip list.
722

723
    @rtype: (list, list)
724
    @return: a tuple of two lists, the first one with the node
725
        names and the second one with the node addresses
726

727
    """
728
    name_list = self._nodes.keys()
729
    addr_list = [self._nodes[name] for name in name_list]
730
    return name_list, addr_list
731

    
732
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
733
    """Writes a file locally and then replicates it to all nodes.
734

735
    This function will replace the contents of a file on the local
736
    node and then replicate it to all the other nodes we have.
737

738
    @type file_name: str
739
    @param file_name: the path of the file to be replicated
740
    @type data: str
741
    @param data: the new contents of the file
742

743
    """
744
    utils.WriteFile(file_name, data=data)
745

    
746
    names, addrs = self._GetNodeIp()
747
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
748
    self._CheckRpcResult(result, self._nodes,
749
                         "Updating %s" % file_name)
750

    
751
  def _RenameFilesUnlocked(self, rename):
752
    """Renames a file locally and then replicate the change.
753

754
    This function will rename a file in the local queue directory
755
    and then replicate this rename to all the other nodes we have.
756

757
    @type rename: list of (old, new)
758
    @param rename: List containing tuples mapping old to new names
759

760
    """
761
    # Rename them locally
762
    for old, new in rename:
763
      utils.RenameFile(old, new, mkdir=True)
764

    
765
    # ... and on all nodes
766
    names, addrs = self._GetNodeIp()
767
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
768
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
769

    
770
  def _FormatJobID(self, job_id):
771
    """Convert a job ID to string format.
772

773
    Currently this just does C{str(job_id)} after performing some
774
    checks, but if we want to change the job id format this will
775
    abstract this change.
776

777
    @type job_id: int or long
778
    @param job_id: the numeric job id
779
    @rtype: str
780
    @return: the formatted job id
781

782
    """
783
    if not isinstance(job_id, (int, long)):
784
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
785
    if job_id < 0:
786
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
787

    
788
    return str(job_id)
789

    
790
  @classmethod
791
  def _GetArchiveDirectory(cls, job_id):
792
    """Returns the archive directory for a job.
793

794
    @type job_id: str
795
    @param job_id: Job identifier
796
    @rtype: str
797
    @return: Directory name
798

799
    """
800
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
801

    
802
  def _NewSerialsUnlocked(self, count):
803
    """Generates a new job identifier.
804

805
    Job identifiers are unique during the lifetime of a cluster.
806

807
    @type count: integer
808
    @param count: how many serials to return
809
    @rtype: str
810
    @return: a string representing the job identifier.
811

812
    """
813
    assert count > 0
814
    # New number
815
    serial = self._last_serial + count
816

    
817
    # Write to file
818
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
819
                                        "%s\n" % serial)
820

    
821
    result = [self._FormatJobID(v)
822
              for v in range(self._last_serial, serial + 1)]
823
    # Keep it only if we were able to write the file
824
    self._last_serial = serial
825

    
826
    return result
827

    
828
  @staticmethod
829
  def _GetJobPath(job_id):
830
    """Returns the job file for a given job id.
831

832
    @type job_id: str
833
    @param job_id: the job identifier
834
    @rtype: str
835
    @return: the path to the job file
836

837
    """
838
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
839

    
840
  @classmethod
841
  def _GetArchivedJobPath(cls, job_id):
842
    """Returns the archived job file for a give job id.
843

844
    @type job_id: str
845
    @param job_id: the job identifier
846
    @rtype: str
847
    @return: the path to the archived job file
848

849
    """
850
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
851
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
852

    
853
  @classmethod
854
  def _ExtractJobID(cls, name):
855
    """Extract the job id from a filename.
856

857
    @type name: str
858
    @param name: the job filename
859
    @rtype: job id or None
860
    @return: the job id corresponding to the given filename,
861
        or None if the filename does not represent a valid
862
        job file
863

864
    """
865
    m = cls._RE_JOB_FILE.match(name)
866
    if m:
867
      return m.group(1)
868
    else:
869
      return None
870

    
871
  def _GetJobIDsUnlocked(self, archived=False):
872
    """Return all known job IDs.
873

874
    If the parameter archived is True, archived jobs IDs will be
875
    included. Currently this argument is unused.
876

877
    The method only looks at disk because it's a requirement that all
878
    jobs are present on disk (so in the _memcache we don't have any
879
    extra IDs).
880

881
    @rtype: list
882
    @return: the list of job IDs
883

884
    """
885
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
886
    jlist = utils.NiceSort(jlist)
887
    return jlist
888

    
889
  def _ListJobFiles(self):
890
    """Returns the list of current job files.
891

892
    @rtype: list
893
    @return: the list of job file names
894

895
    """
896
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
897
            if self._RE_JOB_FILE.match(name)]
898

    
899
  def _LoadJobUnlocked(self, job_id):
900
    """Loads a job from the disk or memory.
901

902
    Given a job id, this will return the cached job object if
903
    existing, or try to load the job from the disk. If loading from
904
    disk, it will also add the job to the cache.
905

906
    @param job_id: the job id
907
    @rtype: L{_QueuedJob} or None
908
    @return: either None or the job object
909

910
    """
911
    job = self._memcache.get(job_id, None)
912
    if job:
913
      logging.debug("Found job %s in memcache", job_id)
914
      return job
915

    
916
    filepath = self._GetJobPath(job_id)
917
    logging.debug("Loading job from %s", filepath)
918
    try:
919
      fd = open(filepath, "r")
920
    except IOError, err:
921
      if err.errno in (errno.ENOENT, ):
922
        return None
923
      raise
924
    try:
925
      data = serializer.LoadJson(fd.read())
926
    finally:
927
      fd.close()
928

    
929
    try:
930
      job = _QueuedJob.Restore(self, data)
931
    except Exception, err:
932
      new_path = self._GetArchivedJobPath(job_id)
933
      if filepath == new_path:
934
        # job already archived (future case)
935
        logging.exception("Can't parse job %s", job_id)
936
      else:
937
        # non-archived case
938
        logging.exception("Can't parse job %s, will archive.", job_id)
939
        self._RenameFilesUnlocked([(filepath, new_path)])
940
      return None
941

    
942
    self._memcache[job_id] = job
943
    logging.debug("Added job %s to the cache", job_id)
944
    return job
945

    
946
  def _GetJobsUnlocked(self, job_ids):
947
    """Return a list of jobs based on their IDs.
948

949
    @type job_ids: list
950
    @param job_ids: either an empty list (meaning all jobs),
951
        or a list of job IDs
952
    @rtype: list
953
    @return: the list of job objects
954

955
    """
956
    if not job_ids:
957
      job_ids = self._GetJobIDsUnlocked()
958

    
959
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
960

    
961
  @staticmethod
962
  def _IsQueueMarkedDrain():
963
    """Check if the queue is marked from drain.
964

965
    This currently uses the queue drain file, which makes it a
966
    per-node flag. In the future this can be moved to the config file.
967

968
    @rtype: boolean
969
    @return: True of the job queue is marked for draining
970

971
    """
972
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
973

    
974
  @staticmethod
975
  def SetDrainFlag(drain_flag):
976
    """Sets the drain flag for the queue.
977

978
    This is similar to the function L{backend.JobQueueSetDrainFlag},
979
    and in the future we might merge them.
980

981
    @type drain_flag: boolean
982
    @param drain_flag: Whether to set or unset the drain flag
983

984
    """
985
    if drain_flag:
986
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
987
    else:
988
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
989
    return True
990

    
991
  @_RequireOpenQueue
992
  def _SubmitJobUnlocked(self, job_id, ops):
993
    """Create and store a new job.
994

995
    This enters the job into our job queue and also puts it on the new
996
    queue, in order for it to be picked up by the queue processors.
997

998
    @type job_id: job ID
999
    @param jod_id: the job ID for the new job
1000
    @type ops: list
1001
    @param ops: The list of OpCodes that will become the new job.
1002
    @rtype: job ID
1003
    @return: the job ID of the newly created job
1004
    @raise errors.JobQueueDrainError: if the job is marked for draining
1005

1006
    """
1007
    if self._IsQueueMarkedDrain():
1008
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1009

    
1010
    # Check job queue size
1011
    size = len(self._ListJobFiles())
1012
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1013
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1014
      # submission, though.
1015
      #size = ...
1016
      pass
1017

    
1018
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1019
      raise errors.JobQueueFull()
1020

    
1021
    job = _QueuedJob(self, job_id, ops)
1022

    
1023
    # Write to disk
1024
    self.UpdateJobUnlocked(job)
1025

    
1026
    logging.debug("Adding new job %s to the cache", job_id)
1027
    self._memcache[job_id] = job
1028

    
1029
    # Add to worker pool
1030
    self._wpool.AddTask(job)
1031

    
1032
    return job.id
1033

    
1034
  @utils.LockedMethod
1035
  @_RequireOpenQueue
1036
  def SubmitJob(self, ops):
1037
    """Create and store a new job.
1038

1039
    @see: L{_SubmitJobUnlocked}
1040

1041
    """
1042
    job_id = self._NewSerialsUnlocked(1)[0]
1043
    return self._SubmitJobUnlocked(job_id, ops)
1044

    
1045
  @utils.LockedMethod
1046
  @_RequireOpenQueue
1047
  def SubmitManyJobs(self, jobs):
1048
    """Create and store multiple jobs.
1049

1050
    @see: L{_SubmitJobUnlocked}
1051

1052
    """
1053
    results = []
1054
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1055
    for job_id, ops in zip(all_job_ids, jobs):
1056
      try:
1057
        data = self._SubmitJobUnlocked(job_id, ops)
1058
        status = True
1059
      except errors.GenericError, err:
1060
        data = str(err)
1061
        status = False
1062
      results.append((status, data))
1063

    
1064
    return results
1065

    
1066

    
1067
  @_RequireOpenQueue
1068
  def UpdateJobUnlocked(self, job):
1069
    """Update a job's on disk storage.
1070

1071
    After a job has been modified, this function needs to be called in
1072
    order to write the changes to disk and replicate them to the other
1073
    nodes.
1074

1075
    @type job: L{_QueuedJob}
1076
    @param job: the changed job
1077

1078
    """
1079
    filename = self._GetJobPath(job.id)
1080
    data = serializer.DumpJson(job.Serialize(), indent=False)
1081
    logging.debug("Writing job %s to %s", job.id, filename)
1082
    self._WriteAndReplicateFileUnlocked(filename, data)
1083

    
1084
    # Notify waiters about potential changes
1085
    job.change.notifyAll()
1086

    
1087
  @utils.LockedMethod
1088
  @_RequireOpenQueue
1089
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1090
                        timeout):
1091
    """Waits for changes in a job.
1092

1093
    @type job_id: string
1094
    @param job_id: Job identifier
1095
    @type fields: list of strings
1096
    @param fields: Which fields to check for changes
1097
    @type prev_job_info: list or None
1098
    @param prev_job_info: Last job information returned
1099
    @type prev_log_serial: int
1100
    @param prev_log_serial: Last job message serial number
1101
    @type timeout: float
1102
    @param timeout: maximum time to wait
1103
    @rtype: tuple (job info, log entries)
1104
    @return: a tuple of the job information as required via
1105
        the fields parameter, and the log entries as a list
1106

1107
        if the job has not changed and the timeout has expired,
1108
        we instead return a special value,
1109
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1110
        as such by the clients
1111

1112
    """
1113
    logging.debug("Waiting for changes in job %s", job_id)
1114

    
1115
    job_info = None
1116
    log_entries = None
1117

    
1118
    end_time = time.time() + timeout
1119
    while True:
1120
      delta_time = end_time - time.time()
1121
      if delta_time < 0:
1122
        return constants.JOB_NOTCHANGED
1123

    
1124
      job = self._LoadJobUnlocked(job_id)
1125
      if not job:
1126
        logging.debug("Job %s not found", job_id)
1127
        break
1128

    
1129
      status = job.CalcStatus()
1130
      job_info = self._GetJobInfoUnlocked(job, fields)
1131
      log_entries = job.GetLogEntries(prev_log_serial)
1132

    
1133
      # Serializing and deserializing data can cause type changes (e.g. from
1134
      # tuple to list) or precision loss. We're doing it here so that we get
1135
      # the same modifications as the data received from the client. Without
1136
      # this, the comparison afterwards might fail without the data being
1137
      # significantly different.
1138
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1139
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1140

    
1141
      if status not in (constants.JOB_STATUS_QUEUED,
1142
                        constants.JOB_STATUS_RUNNING,
1143
                        constants.JOB_STATUS_WAITLOCK):
1144
        # Don't even try to wait if the job is no longer running, there will be
1145
        # no changes.
1146
        break
1147

    
1148
      if (prev_job_info != job_info or
1149
          (log_entries and prev_log_serial != log_entries[0][0])):
1150
        break
1151

    
1152
      logging.debug("Waiting again")
1153

    
1154
      # Release the queue lock while waiting
1155
      job.change.wait(delta_time)
1156

    
1157
    logging.debug("Job %s changed", job_id)
1158

    
1159
    if job_info is None and log_entries is None:
1160
      return None
1161
    else:
1162
      return (job_info, log_entries)
1163

    
1164
  @utils.LockedMethod
1165
  @_RequireOpenQueue
1166
  def CancelJob(self, job_id):
1167
    """Cancels a job.
1168

1169
    This will only succeed if the job has not started yet.
1170

1171
    @type job_id: string
1172
    @param job_id: job ID of job to be cancelled.
1173

1174
    """
1175
    logging.info("Cancelling job %s", job_id)
1176

    
1177
    job = self._LoadJobUnlocked(job_id)
1178
    if not job:
1179
      logging.debug("Job %s not found", job_id)
1180
      return (False, "Job %s not found" % job_id)
1181

    
1182
    job_status = job.CalcStatus()
1183

    
1184
    if job_status not in (constants.JOB_STATUS_QUEUED,
1185
                          constants.JOB_STATUS_WAITLOCK):
1186
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1187
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1188

    
1189
    if job_status == constants.JOB_STATUS_QUEUED:
1190
      self.CancelJobUnlocked(job)
1191
      return (True, "Job %s canceled" % job.id)
1192

    
1193
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1194
      # The worker will notice the new status and cancel the job
1195
      try:
1196
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1197
      finally:
1198
        self.UpdateJobUnlocked(job)
1199
      return (True, "Job %s will be canceled" % job.id)
1200

    
1201
  @_RequireOpenQueue
1202
  def CancelJobUnlocked(self, job):
1203
    """Marks a job as canceled.
1204

1205
    """
1206
    try:
1207
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1208
                            "Job canceled by request")
1209
    finally:
1210
      self.UpdateJobUnlocked(job)
1211

    
1212
  @_RequireOpenQueue
1213
  def _ArchiveJobsUnlocked(self, jobs):
1214
    """Archives jobs.
1215

1216
    @type jobs: list of L{_QueuedJob}
1217
    @param jobs: Job objects
1218
    @rtype: int
1219
    @return: Number of archived jobs
1220

1221
    """
1222
    archive_jobs = []
1223
    rename_files = []
1224
    for job in jobs:
1225
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1226
                                  constants.JOB_STATUS_SUCCESS,
1227
                                  constants.JOB_STATUS_ERROR):
1228
        logging.debug("Job %s is not yet done", job.id)
1229
        continue
1230

    
1231
      archive_jobs.append(job)
1232

    
1233
      old = self._GetJobPath(job.id)
1234
      new = self._GetArchivedJobPath(job.id)
1235
      rename_files.append((old, new))
1236

    
1237
    # TODO: What if 1..n files fail to rename?
1238
    self._RenameFilesUnlocked(rename_files)
1239

    
1240
    logging.debug("Successfully archived job(s) %s",
1241
                  ", ".join(job.id for job in archive_jobs))
1242

    
1243
    return len(archive_jobs)
1244

    
1245
  @utils.LockedMethod
1246
  @_RequireOpenQueue
1247
  def ArchiveJob(self, job_id):
1248
    """Archives a job.
1249

1250
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1251

1252
    @type job_id: string
1253
    @param job_id: Job ID of job to be archived.
1254
    @rtype: bool
1255
    @return: Whether job was archived
1256

1257
    """
1258
    logging.info("Archiving job %s", job_id)
1259

    
1260
    job = self._LoadJobUnlocked(job_id)
1261
    if not job:
1262
      logging.debug("Job %s not found", job_id)
1263
      return False
1264

    
1265
    return self._ArchiveJobsUnlocked([job]) == 1
1266

    
1267
  @utils.LockedMethod
1268
  @_RequireOpenQueue
1269
  def AutoArchiveJobs(self, age, timeout):
1270
    """Archives all jobs based on age.
1271

1272
    The method will archive all jobs which are older than the age
1273
    parameter. For jobs that don't have an end timestamp, the start
1274
    timestamp will be considered. The special '-1' age will cause
1275
    archival of all jobs (that are not running or queued).
1276

1277
    @type age: int
1278
    @param age: the minimum age in seconds
1279

1280
    """
1281
    logging.info("Archiving jobs with age more than %s seconds", age)
1282

    
1283
    now = time.time()
1284
    end_time = now + timeout
1285
    archived_count = 0
1286
    last_touched = 0
1287

    
1288
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1289
    pending = []
1290
    for idx, job_id in enumerate(all_job_ids):
1291
      last_touched = idx
1292

    
1293
      # Not optimal because jobs could be pending
1294
      # TODO: Measure average duration for job archival and take number of
1295
      # pending jobs into account.
1296
      if time.time() > end_time:
1297
        break
1298

    
1299
      # Returns None if the job failed to load
1300
      job = self._LoadJobUnlocked(job_id)
1301
      if job:
1302
        if job.end_timestamp is None:
1303
          if job.start_timestamp is None:
1304
            job_age = job.received_timestamp
1305
          else:
1306
            job_age = job.start_timestamp
1307
        else:
1308
          job_age = job.end_timestamp
1309

    
1310
        if age == -1 or now - job_age[0] > age:
1311
          pending.append(job)
1312

    
1313
          # Archive 10 jobs at a time
1314
          if len(pending) >= 10:
1315
            archived_count += self._ArchiveJobsUnlocked(pending)
1316
            pending = []
1317

    
1318
    if pending:
1319
      archived_count += self._ArchiveJobsUnlocked(pending)
1320

    
1321
    return (archived_count, len(all_job_ids) - last_touched - 1)
1322

    
1323
  def _GetJobInfoUnlocked(self, job, fields):
1324
    """Returns information about a job.
1325

1326
    @type job: L{_QueuedJob}
1327
    @param job: the job which we query
1328
    @type fields: list
1329
    @param fields: names of fields to return
1330
    @rtype: list
1331
    @return: list with one element for each field
1332
    @raise errors.OpExecError: when an invalid field
1333
        has been passed
1334

1335
    """
1336
    row = []
1337
    for fname in fields:
1338
      if fname == "id":
1339
        row.append(job.id)
1340
      elif fname == "status":
1341
        row.append(job.CalcStatus())
1342
      elif fname == "ops":
1343
        row.append([op.input.__getstate__() for op in job.ops])
1344
      elif fname == "opresult":
1345
        row.append([op.result for op in job.ops])
1346
      elif fname == "opstatus":
1347
        row.append([op.status for op in job.ops])
1348
      elif fname == "oplog":
1349
        row.append([op.log for op in job.ops])
1350
      elif fname == "opstart":
1351
        row.append([op.start_timestamp for op in job.ops])
1352
      elif fname == "opend":
1353
        row.append([op.end_timestamp for op in job.ops])
1354
      elif fname == "received_ts":
1355
        row.append(job.received_timestamp)
1356
      elif fname == "start_ts":
1357
        row.append(job.start_timestamp)
1358
      elif fname == "end_ts":
1359
        row.append(job.end_timestamp)
1360
      elif fname == "summary":
1361
        row.append([op.input.Summary() for op in job.ops])
1362
      else:
1363
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1364
    return row
1365

    
1366
  @utils.LockedMethod
1367
  @_RequireOpenQueue
1368
  def QueryJobs(self, job_ids, fields):
1369
    """Returns a list of jobs in queue.
1370

1371
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1372
    processing for each job.
1373

1374
    @type job_ids: list
1375
    @param job_ids: sequence of job identifiers or None for all
1376
    @type fields: list
1377
    @param fields: names of fields to return
1378
    @rtype: list
1379
    @return: list one element per job, each element being list with
1380
        the requested fields
1381

1382
    """
1383
    jobs = []
1384

    
1385
    for job in self._GetJobsUnlocked(job_ids):
1386
      if job is None:
1387
        jobs.append(None)
1388
      else:
1389
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1390

    
1391
    return jobs
1392

    
1393
  @utils.LockedMethod
1394
  @_RequireOpenQueue
1395
  def Shutdown(self):
1396
    """Stops the job queue.
1397

1398
    This shutdowns all the worker threads an closes the queue.
1399

1400
    """
1401
    self._wpool.TerminateWorkers()
1402

    
1403
    self._queue_lock.Close()
1404
    self._queue_lock = None