Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 69b99987

History | View | Annotate | Download (41.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type log_serial: int
149
  @ivar log_serial: holds the index for the next log entry
150
  @ivar received_timestamp: the timestamp for when the job was received
151
  @ivar start_timestmap: the timestamp for start of execution
152
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
155

156
  """
157
  __slots__ = ["queue", "id", "ops", "log_serial",
158
               "received_timestamp", "start_timestamp", "end_timestamp",
159
               "lock_status", "change",
160
               "__weakref__"]
161

    
162
  def __init__(self, queue, job_id, ops):
163
    """Constructor for the _QueuedJob.
164

165
    @type queue: L{JobQueue}
166
    @param queue: our parent queue
167
    @type job_id: job_id
168
    @param job_id: our job id
169
    @type ops: list
170
    @param ops: the list of opcodes we hold, which will be encapsulated
171
        in _QueuedOpCodes
172

173
    """
174
    if not ops:
175
      # TODO: use a better exception
176
      raise Exception("No opcodes")
177

    
178
    self.queue = queue
179
    self.id = job_id
180
    self.ops = [_QueuedOpCode(op) for op in ops]
181
    self.log_serial = 0
182
    self.received_timestamp = TimeStampNow()
183
    self.start_timestamp = None
184
    self.end_timestamp = None
185

    
186
    # In-memory attributes
187
    self.lock_status = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.received_timestamp = state.get("received_timestamp", None)
208
    obj.start_timestamp = state.get("start_timestamp", None)
209
    obj.end_timestamp = state.get("end_timestamp", None)
210

    
211
    # In-memory attributes
212
    obj.lock_status = None
213

    
214
    obj.ops = []
215
    obj.log_serial = 0
216
    for op_state in state["ops"]:
217
      op = _QueuedOpCode.Restore(op_state)
218
      for log_entry in op.log:
219
        obj.log_serial = max(obj.log_serial, log_entry[0])
220
      obj.ops.append(op)
221

    
222
    # Condition to wait for changes
223
    obj.change = threading.Condition(obj.queue._lock)
224

    
225
    return obj
226

    
227
  def Serialize(self):
228
    """Serialize the _JobQueue instance.
229

230
    @rtype: dict
231
    @return: the serialized state
232

233
    """
234
    return {
235
      "id": self.id,
236
      "ops": [op.Serialize() for op in self.ops],
237
      "start_timestamp": self.start_timestamp,
238
      "end_timestamp": self.end_timestamp,
239
      "received_timestamp": self.received_timestamp,
240
      }
241

    
242
  def CalcStatus(self):
243
    """Compute the status of this job.
244

245
    This function iterates over all the _QueuedOpCodes in the job and
246
    based on their status, computes the job status.
247

248
    The algorithm is:
249
      - if we find a cancelled, or finished with error, the job
250
        status will be the same
251
      - otherwise, the last opcode with the status one of:
252
          - waitlock
253
          - canceling
254
          - running
255

256
        will determine the job status
257

258
      - otherwise, it means either all opcodes are queued, or success,
259
        and the job status will be the same
260

261
    @return: the job status
262

263
    """
264
    status = constants.JOB_STATUS_QUEUED
265

    
266
    all_success = True
267
    for op in self.ops:
268
      if op.status == constants.OP_STATUS_SUCCESS:
269
        continue
270

    
271
      all_success = False
272

    
273
      if op.status == constants.OP_STATUS_QUEUED:
274
        pass
275
      elif op.status == constants.OP_STATUS_WAITLOCK:
276
        status = constants.JOB_STATUS_WAITLOCK
277
      elif op.status == constants.OP_STATUS_RUNNING:
278
        status = constants.JOB_STATUS_RUNNING
279
      elif op.status == constants.OP_STATUS_CANCELING:
280
        status = constants.JOB_STATUS_CANCELING
281
        break
282
      elif op.status == constants.OP_STATUS_ERROR:
283
        status = constants.JOB_STATUS_ERROR
284
        # The whole job fails if one opcode failed
285
        break
286
      elif op.status == constants.OP_STATUS_CANCELED:
287
        status = constants.OP_STATUS_CANCELED
288
        break
289

    
290
    if all_success:
291
      status = constants.JOB_STATUS_SUCCESS
292

    
293
    return status
294

    
295
  def GetLogEntries(self, newer_than):
296
    """Selectively returns the log entries.
297

298
    @type newer_than: None or int
299
    @param newer_than: if this is None, return all log entries,
300
        otherwise return only the log entries with serial higher
301
        than this value
302
    @rtype: list
303
    @return: the list of the log entries selected
304

305
    """
306
    if newer_than is None:
307
      serial = -1
308
    else:
309
      serial = newer_than
310

    
311
    entries = []
312
    for op in self.ops:
313
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
314

    
315
    return entries
316

    
317
  def MarkUnfinishedOps(self, status, result):
318
    """Mark unfinished opcodes with a given status and result.
319

320
    This is an utility function for marking all running or waiting to
321
    be run opcodes with a given status. Opcodes which are already
322
    finalised are not changed.
323

324
    @param status: a given opcode status
325
    @param result: the opcode result
326

327
    """
328
    not_marked = True
329
    for op in self.ops:
330
      if op.status in constants.OPS_FINALIZED:
331
        assert not_marked, "Finalized opcodes found after non-finalized ones"
332
        continue
333
      op.status = status
334
      op.result = result
335
      not_marked = False
336

    
337

    
338
class _OpExecCallbacks(mcpu.OpExecCbBase):
339
  def __init__(self, queue, job, op):
340
    """Initializes this class.
341

342
    @type queue: L{JobQueue}
343
    @param queue: Job queue
344
    @type job: L{_QueuedJob}
345
    @param job: Job object
346
    @type op: L{_QueuedOpCode}
347
    @param op: OpCode
348

349
    """
350
    assert queue, "Queue is missing"
351
    assert job, "Job is missing"
352
    assert op, "Opcode is missing"
353

    
354
    self._queue = queue
355
    self._job = job
356
    self._op = op
357

    
358
  def NotifyStart(self):
359
    """Mark the opcode as running, not lock-waiting.
360

361
    This is called from the mcpu code as a notifier function, when the LU is
362
    finally about to start the Exec() method. Of course, to have end-user
363
    visible results, the opcode must be initially (before calling into
364
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
365

366
    """
367
    self._queue.acquire()
368
    try:
369
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
370
                                 constants.OP_STATUS_CANCELING)
371

    
372
      # All locks are acquired by now
373
      self._job.lock_status = None
374

    
375
      # Cancel here if we were asked to
376
      if self._op.status == constants.OP_STATUS_CANCELING:
377
        raise CancelJob()
378

    
379
      self._op.status = constants.OP_STATUS_RUNNING
380
    finally:
381
      self._queue.release()
382

    
383
  def Feedback(self, *args):
384
    """Append a log entry.
385

386
    """
387
    assert len(args) < 3
388

    
389
    if len(args) == 1:
390
      log_type = constants.ELOG_MESSAGE
391
      log_msg = args[0]
392
    else:
393
      (log_type, log_msg) = args
394

    
395
    # The time is split to make serialization easier and not lose
396
    # precision.
397
    timestamp = utils.SplitTime(time.time())
398

    
399
    self._queue.acquire()
400
    try:
401
      self._job.log_serial += 1
402
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
403

    
404
      self._job.change.notifyAll()
405
    finally:
406
      self._queue.release()
407

    
408
  def ReportLocks(self, msg):
409
    """Write locking information to the job.
410

411
    Called whenever the LU processor is waiting for a lock or has acquired one.
412

413
    """
414
    # Not getting the queue lock because this is a single assignment
415
    self._job.lock_status = msg
416

    
417

    
418
class _JobQueueWorker(workerpool.BaseWorker):
419
  """The actual job workers.
420

421
  """
422
  def RunTask(self, job):
423
    """Job executor.
424

425
    This functions processes a job. It is closely tied to the _QueuedJob and
426
    _QueuedOpCode classes.
427

428
    @type job: L{_QueuedJob}
429
    @param job: the job to be processed
430

431
    """
432
    logging.info("Worker %s processing job %s",
433
                  self.worker_id, job.id)
434
    proc = mcpu.Processor(self.pool.queue.context)
435
    queue = job.queue
436
    try:
437
      try:
438
        count = len(job.ops)
439
        for idx, op in enumerate(job.ops):
440
          op_summary = op.input.Summary()
441
          if op.status == constants.OP_STATUS_SUCCESS:
442
            # this is a job that was partially completed before master
443
            # daemon shutdown, so it can be expected that some opcodes
444
            # are already completed successfully (if any did error
445
            # out, then the whole job should have been aborted and not
446
            # resubmitted for processing)
447
            logging.info("Op %s/%s: opcode %s already processed, skipping",
448
                         idx + 1, count, op_summary)
449
            continue
450
          try:
451
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
452
                         op_summary)
453

    
454
            queue.acquire()
455
            try:
456
              if op.status == constants.OP_STATUS_CANCELED:
457
                raise CancelJob()
458
              assert op.status == constants.OP_STATUS_QUEUED
459
              op.status = constants.OP_STATUS_WAITLOCK
460
              op.result = None
461
              op.start_timestamp = TimeStampNow()
462
              if idx == 0: # first opcode
463
                job.start_timestamp = op.start_timestamp
464
              queue.UpdateJobUnlocked(job)
465

    
466
              input_opcode = op.input
467
            finally:
468
              queue.release()
469

    
470
            # Make sure not to hold queue lock while calling ExecOpCode
471
            result = proc.ExecOpCode(input_opcode,
472
                                     _OpExecCallbacks(queue, job, op))
473

    
474
            queue.acquire()
475
            try:
476
              op.status = constants.OP_STATUS_SUCCESS
477
              op.result = result
478
              op.end_timestamp = TimeStampNow()
479
              queue.UpdateJobUnlocked(job)
480
            finally:
481
              queue.release()
482

    
483
            logging.info("Op %s/%s: Successfully finished opcode %s",
484
                         idx + 1, count, op_summary)
485
          except CancelJob:
486
            # Will be handled further up
487
            raise
488
          except Exception, err:
489
            queue.acquire()
490
            try:
491
              try:
492
                op.status = constants.OP_STATUS_ERROR
493
                if isinstance(err, errors.GenericError):
494
                  op.result = errors.EncodeException(err)
495
                else:
496
                  op.result = str(err)
497
                op.end_timestamp = TimeStampNow()
498
                logging.info("Op %s/%s: Error in opcode %s: %s",
499
                             idx + 1, count, op_summary, err)
500
              finally:
501
                queue.UpdateJobUnlocked(job)
502
            finally:
503
              queue.release()
504
            raise
505

    
506
      except CancelJob:
507
        queue.acquire()
508
        try:
509
          queue.CancelJobUnlocked(job)
510
        finally:
511
          queue.release()
512
      except errors.GenericError, err:
513
        logging.exception("Ganeti exception")
514
      except:
515
        logging.exception("Unhandled exception")
516
    finally:
517
      queue.acquire()
518
      try:
519
        try:
520
          job.lock_status = None
521
          job.end_timestamp = TimeStampNow()
522
          queue.UpdateJobUnlocked(job)
523
        finally:
524
          job_id = job.id
525
          status = job.CalcStatus()
526
      finally:
527
        queue.release()
528

    
529
      logging.info("Worker %s finished job %s, status = %s",
530
                   self.worker_id, job_id, status)
531

    
532

    
533
class _JobQueueWorkerPool(workerpool.WorkerPool):
534
  """Simple class implementing a job-processing workerpool.
535

536
  """
537
  def __init__(self, queue):
538
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
539
                                              _JobQueueWorker)
540
    self.queue = queue
541

    
542

    
543
class JobQueue(object):
544
  """Queue used to manage the jobs.
545

546
  @cvar _RE_JOB_FILE: regex matching the valid job file names
547

548
  """
549
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
550

    
551
  def _RequireOpenQueue(fn):
552
    """Decorator for "public" functions.
553

554
    This function should be used for all 'public' functions. That is,
555
    functions usually called from other classes.
556

557
    @warning: Use this decorator only after utils.LockedMethod!
558

559
    Example::
560
      @utils.LockedMethod
561
      @_RequireOpenQueue
562
      def Example(self):
563
        pass
564

565
    """
566
    def wrapper(self, *args, **kwargs):
567
      assert self._queue_lock is not None, "Queue should be open"
568
      return fn(self, *args, **kwargs)
569
    return wrapper
570

    
571
  def __init__(self, context):
572
    """Constructor for JobQueue.
573

574
    The constructor will initialize the job queue object and then
575
    start loading the current jobs from disk, either for starting them
576
    (if they were queue) or for aborting them (if they were already
577
    running).
578

579
    @type context: GanetiContext
580
    @param context: the context object for access to the configuration
581
        data and other ganeti objects
582

583
    """
584
    self.context = context
585
    self._memcache = weakref.WeakValueDictionary()
586
    self._my_hostname = utils.HostInfo().name
587

    
588
    # Locking
589
    self._lock = threading.Lock()
590
    self.acquire = self._lock.acquire
591
    self.release = self._lock.release
592

    
593
    # Initialize
594
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
595

    
596
    # Read serial file
597
    self._last_serial = jstore.ReadSerial()
598
    assert self._last_serial is not None, ("Serial file was modified between"
599
                                           " check in jstore and here")
600

    
601
    # Get initial list of nodes
602
    self._nodes = dict((n.name, n.primary_ip)
603
                       for n in self.context.cfg.GetAllNodesInfo().values()
604
                       if n.master_candidate)
605

    
606
    # Remove master node
607
    try:
608
      del self._nodes[self._my_hostname]
609
    except KeyError:
610
      pass
611

    
612
    # TODO: Check consistency across nodes
613

    
614
    # Setup worker pool
615
    self._wpool = _JobQueueWorkerPool(self)
616
    try:
617
      # We need to lock here because WorkerPool.AddTask() may start a job while
618
      # we're still doing our work.
619
      self.acquire()
620
      try:
621
        logging.info("Inspecting job queue")
622

    
623
        all_job_ids = self._GetJobIDsUnlocked()
624
        jobs_count = len(all_job_ids)
625
        lastinfo = time.time()
626
        for idx, job_id in enumerate(all_job_ids):
627
          # Give an update every 1000 jobs or 10 seconds
628
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
629
              idx == (jobs_count - 1)):
630
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
631
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
632
            lastinfo = time.time()
633

    
634
          job = self._LoadJobUnlocked(job_id)
635

    
636
          # a failure in loading the job can cause 'None' to be returned
637
          if job is None:
638
            continue
639

    
640
          status = job.CalcStatus()
641

    
642
          if status in (constants.JOB_STATUS_QUEUED, ):
643
            self._wpool.AddTask(job)
644

    
645
          elif status in (constants.JOB_STATUS_RUNNING,
646
                          constants.JOB_STATUS_WAITLOCK,
647
                          constants.JOB_STATUS_CANCELING):
648
            logging.warning("Unfinished job %s found: %s", job.id, job)
649
            try:
650
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
651
                                    "Unclean master daemon shutdown")
652
            finally:
653
              self.UpdateJobUnlocked(job)
654

    
655
        logging.info("Job queue inspection finished")
656
      finally:
657
        self.release()
658
    except:
659
      self._wpool.TerminateWorkers()
660
      raise
661

    
662
  @utils.LockedMethod
663
  @_RequireOpenQueue
664
  def AddNode(self, node):
665
    """Register a new node with the queue.
666

667
    @type node: L{objects.Node}
668
    @param node: the node object to be added
669

670
    """
671
    node_name = node.name
672
    assert node_name != self._my_hostname
673

    
674
    # Clean queue directory on added node
675
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
676
    msg = result.fail_msg
677
    if msg:
678
      logging.warning("Cannot cleanup queue directory on node %s: %s",
679
                      node_name, msg)
680

    
681
    if not node.master_candidate:
682
      # remove if existing, ignoring errors
683
      self._nodes.pop(node_name, None)
684
      # and skip the replication of the job ids
685
      return
686

    
687
    # Upload the whole queue excluding archived jobs
688
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
689

    
690
    # Upload current serial file
691
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
692

    
693
    for file_name in files:
694
      # Read file content
695
      content = utils.ReadFile(file_name)
696

    
697
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
698
                                                  [node.primary_ip],
699
                                                  file_name, content)
700
      msg = result[node_name].fail_msg
701
      if msg:
702
        logging.error("Failed to upload file %s to node %s: %s",
703
                      file_name, node_name, msg)
704

    
705
    self._nodes[node_name] = node.primary_ip
706

    
707
  @utils.LockedMethod
708
  @_RequireOpenQueue
709
  def RemoveNode(self, node_name):
710
    """Callback called when removing nodes from the cluster.
711

712
    @type node_name: str
713
    @param node_name: the name of the node to remove
714

715
    """
716
    try:
717
      # The queue is removed by the "leave node" RPC call.
718
      del self._nodes[node_name]
719
    except KeyError:
720
      pass
721

    
722
  def _CheckRpcResult(self, result, nodes, failmsg):
723
    """Verifies the status of an RPC call.
724

725
    Since we aim to keep consistency should this node (the current
726
    master) fail, we will log errors if our rpc fail, and especially
727
    log the case when more than half of the nodes fails.
728

729
    @param result: the data as returned from the rpc call
730
    @type nodes: list
731
    @param nodes: the list of nodes we made the call to
732
    @type failmsg: str
733
    @param failmsg: the identifier to be used for logging
734

735
    """
736
    failed = []
737
    success = []
738

    
739
    for node in nodes:
740
      msg = result[node].fail_msg
741
      if msg:
742
        failed.append(node)
743
        logging.error("RPC call %s failed on node %s: %s",
744
                      result[node].call, node, msg)
745
      else:
746
        success.append(node)
747

    
748
    # +1 for the master node
749
    if (len(success) + 1) < len(failed):
750
      # TODO: Handle failing nodes
751
      logging.error("More than half of the nodes failed")
752

    
753
  def _GetNodeIp(self):
754
    """Helper for returning the node name/ip list.
755

756
    @rtype: (list, list)
757
    @return: a tuple of two lists, the first one with the node
758
        names and the second one with the node addresses
759

760
    """
761
    name_list = self._nodes.keys()
762
    addr_list = [self._nodes[name] for name in name_list]
763
    return name_list, addr_list
764

    
765
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
766
    """Writes a file locally and then replicates it to all nodes.
767

768
    This function will replace the contents of a file on the local
769
    node and then replicate it to all the other nodes we have.
770

771
    @type file_name: str
772
    @param file_name: the path of the file to be replicated
773
    @type data: str
774
    @param data: the new contents of the file
775

776
    """
777
    utils.WriteFile(file_name, data=data)
778

    
779
    names, addrs = self._GetNodeIp()
780
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
781
    self._CheckRpcResult(result, self._nodes,
782
                         "Updating %s" % file_name)
783

    
784
  def _RenameFilesUnlocked(self, rename):
785
    """Renames a file locally and then replicate the change.
786

787
    This function will rename a file in the local queue directory
788
    and then replicate this rename to all the other nodes we have.
789

790
    @type rename: list of (old, new)
791
    @param rename: List containing tuples mapping old to new names
792

793
    """
794
    # Rename them locally
795
    for old, new in rename:
796
      utils.RenameFile(old, new, mkdir=True)
797

    
798
    # ... and on all nodes
799
    names, addrs = self._GetNodeIp()
800
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
801
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
802

    
803
  def _FormatJobID(self, job_id):
804
    """Convert a job ID to string format.
805

806
    Currently this just does C{str(job_id)} after performing some
807
    checks, but if we want to change the job id format this will
808
    abstract this change.
809

810
    @type job_id: int or long
811
    @param job_id: the numeric job id
812
    @rtype: str
813
    @return: the formatted job id
814

815
    """
816
    if not isinstance(job_id, (int, long)):
817
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
818
    if job_id < 0:
819
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
820

    
821
    return str(job_id)
822

    
823
  @classmethod
824
  def _GetArchiveDirectory(cls, job_id):
825
    """Returns the archive directory for a job.
826

827
    @type job_id: str
828
    @param job_id: Job identifier
829
    @rtype: str
830
    @return: Directory name
831

832
    """
833
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
834

    
835
  def _NewSerialsUnlocked(self, count):
836
    """Generates a new job identifier.
837

838
    Job identifiers are unique during the lifetime of a cluster.
839

840
    @type count: integer
841
    @param count: how many serials to return
842
    @rtype: str
843
    @return: a string representing the job identifier.
844

845
    """
846
    assert count > 0
847
    # New number
848
    serial = self._last_serial + count
849

    
850
    # Write to file
851
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
852
                                        "%s\n" % serial)
853

    
854
    result = [self._FormatJobID(v)
855
              for v in range(self._last_serial, serial + 1)]
856
    # Keep it only if we were able to write the file
857
    self._last_serial = serial
858

    
859
    return result
860

    
861
  @staticmethod
862
  def _GetJobPath(job_id):
863
    """Returns the job file for a given job id.
864

865
    @type job_id: str
866
    @param job_id: the job identifier
867
    @rtype: str
868
    @return: the path to the job file
869

870
    """
871
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
872

    
873
  @classmethod
874
  def _GetArchivedJobPath(cls, job_id):
875
    """Returns the archived job file for a give job id.
876

877
    @type job_id: str
878
    @param job_id: the job identifier
879
    @rtype: str
880
    @return: the path to the archived job file
881

882
    """
883
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
884
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
885

    
886
  @classmethod
887
  def _ExtractJobID(cls, name):
888
    """Extract the job id from a filename.
889

890
    @type name: str
891
    @param name: the job filename
892
    @rtype: job id or None
893
    @return: the job id corresponding to the given filename,
894
        or None if the filename does not represent a valid
895
        job file
896

897
    """
898
    m = cls._RE_JOB_FILE.match(name)
899
    if m:
900
      return m.group(1)
901
    else:
902
      return None
903

    
904
  def _GetJobIDsUnlocked(self, archived=False):
905
    """Return all known job IDs.
906

907
    If the parameter archived is True, archived jobs IDs will be
908
    included. Currently this argument is unused.
909

910
    The method only looks at disk because it's a requirement that all
911
    jobs are present on disk (so in the _memcache we don't have any
912
    extra IDs).
913

914
    @rtype: list
915
    @return: the list of job IDs
916

917
    """
918
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
919
    jlist = utils.NiceSort(jlist)
920
    return jlist
921

    
922
  def _ListJobFiles(self):
923
    """Returns the list of current job files.
924

925
    @rtype: list
926
    @return: the list of job file names
927

928
    """
929
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
930
            if self._RE_JOB_FILE.match(name)]
931

    
932
  def _LoadJobUnlocked(self, job_id):
933
    """Loads a job from the disk or memory.
934

935
    Given a job id, this will return the cached job object if
936
    existing, or try to load the job from the disk. If loading from
937
    disk, it will also add the job to the cache.
938

939
    @param job_id: the job id
940
    @rtype: L{_QueuedJob} or None
941
    @return: either None or the job object
942

943
    """
944
    job = self._memcache.get(job_id, None)
945
    if job:
946
      logging.debug("Found job %s in memcache", job_id)
947
      return job
948

    
949
    filepath = self._GetJobPath(job_id)
950
    logging.debug("Loading job from %s", filepath)
951
    try:
952
      raw_data = utils.ReadFile(filepath)
953
    except IOError, err:
954
      if err.errno in (errno.ENOENT, ):
955
        return None
956
      raise
957

    
958
    data = serializer.LoadJson(raw_data)
959

    
960
    try:
961
      job = _QueuedJob.Restore(self, data)
962
    except Exception, err:
963
      new_path = self._GetArchivedJobPath(job_id)
964
      if filepath == new_path:
965
        # job already archived (future case)
966
        logging.exception("Can't parse job %s", job_id)
967
      else:
968
        # non-archived case
969
        logging.exception("Can't parse job %s, will archive.", job_id)
970
        self._RenameFilesUnlocked([(filepath, new_path)])
971
      return None
972

    
973
    self._memcache[job_id] = job
974
    logging.debug("Added job %s to the cache", job_id)
975
    return job
976

    
977
  def _GetJobsUnlocked(self, job_ids):
978
    """Return a list of jobs based on their IDs.
979

980
    @type job_ids: list
981
    @param job_ids: either an empty list (meaning all jobs),
982
        or a list of job IDs
983
    @rtype: list
984
    @return: the list of job objects
985

986
    """
987
    if not job_ids:
988
      job_ids = self._GetJobIDsUnlocked()
989

    
990
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
991

    
992
  @staticmethod
993
  def _IsQueueMarkedDrain():
994
    """Check if the queue is marked from drain.
995

996
    This currently uses the queue drain file, which makes it a
997
    per-node flag. In the future this can be moved to the config file.
998

999
    @rtype: boolean
1000
    @return: True of the job queue is marked for draining
1001

1002
    """
1003
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1004

    
1005
  @staticmethod
1006
  def SetDrainFlag(drain_flag):
1007
    """Sets the drain flag for the queue.
1008

1009
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1010
    and in the future we might merge them.
1011

1012
    @type drain_flag: boolean
1013
    @param drain_flag: Whether to set or unset the drain flag
1014

1015
    """
1016
    if drain_flag:
1017
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1018
    else:
1019
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1020
    return True
1021

    
1022
  @_RequireOpenQueue
1023
  def _SubmitJobUnlocked(self, job_id, ops):
1024
    """Create and store a new job.
1025

1026
    This enters the job into our job queue and also puts it on the new
1027
    queue, in order for it to be picked up by the queue processors.
1028

1029
    @type job_id: job ID
1030
    @param job_id: the job ID for the new job
1031
    @type ops: list
1032
    @param ops: The list of OpCodes that will become the new job.
1033
    @rtype: job ID
1034
    @return: the job ID of the newly created job
1035
    @raise errors.JobQueueDrainError: if the job is marked for draining
1036

1037
    """
1038
    if self._IsQueueMarkedDrain():
1039
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1040

    
1041
    # Check job queue size
1042
    size = len(self._ListJobFiles())
1043
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1044
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1045
      # submission, though.
1046
      #size = ...
1047
      pass
1048

    
1049
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1050
      raise errors.JobQueueFull()
1051

    
1052
    job = _QueuedJob(self, job_id, ops)
1053

    
1054
    # Write to disk
1055
    self.UpdateJobUnlocked(job)
1056

    
1057
    logging.debug("Adding new job %s to the cache", job_id)
1058
    self._memcache[job_id] = job
1059

    
1060
    # Add to worker pool
1061
    self._wpool.AddTask(job)
1062

    
1063
    return job.id
1064

    
1065
  @utils.LockedMethod
1066
  @_RequireOpenQueue
1067
  def SubmitJob(self, ops):
1068
    """Create and store a new job.
1069

1070
    @see: L{_SubmitJobUnlocked}
1071

1072
    """
1073
    job_id = self._NewSerialsUnlocked(1)[0]
1074
    return self._SubmitJobUnlocked(job_id, ops)
1075

    
1076
  @utils.LockedMethod
1077
  @_RequireOpenQueue
1078
  def SubmitManyJobs(self, jobs):
1079
    """Create and store multiple jobs.
1080

1081
    @see: L{_SubmitJobUnlocked}
1082

1083
    """
1084
    results = []
1085
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1086
    for job_id, ops in zip(all_job_ids, jobs):
1087
      try:
1088
        data = self._SubmitJobUnlocked(job_id, ops)
1089
        status = True
1090
      except errors.GenericError, err:
1091
        data = str(err)
1092
        status = False
1093
      results.append((status, data))
1094

    
1095
    return results
1096

    
1097
  @_RequireOpenQueue
1098
  def UpdateJobUnlocked(self, job):
1099
    """Update a job's on disk storage.
1100

1101
    After a job has been modified, this function needs to be called in
1102
    order to write the changes to disk and replicate them to the other
1103
    nodes.
1104

1105
    @type job: L{_QueuedJob}
1106
    @param job: the changed job
1107

1108
    """
1109
    filename = self._GetJobPath(job.id)
1110
    data = serializer.DumpJson(job.Serialize(), indent=False)
1111
    logging.debug("Writing job %s to %s", job.id, filename)
1112
    self._WriteAndReplicateFileUnlocked(filename, data)
1113

    
1114
    # Notify waiters about potential changes
1115
    job.change.notifyAll()
1116

    
1117
  @utils.LockedMethod
1118
  @_RequireOpenQueue
1119
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1120
                        timeout):
1121
    """Waits for changes in a job.
1122

1123
    @type job_id: string
1124
    @param job_id: Job identifier
1125
    @type fields: list of strings
1126
    @param fields: Which fields to check for changes
1127
    @type prev_job_info: list or None
1128
    @param prev_job_info: Last job information returned
1129
    @type prev_log_serial: int
1130
    @param prev_log_serial: Last job message serial number
1131
    @type timeout: float
1132
    @param timeout: maximum time to wait
1133
    @rtype: tuple (job info, log entries)
1134
    @return: a tuple of the job information as required via
1135
        the fields parameter, and the log entries as a list
1136

1137
        if the job has not changed and the timeout has expired,
1138
        we instead return a special value,
1139
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1140
        as such by the clients
1141

1142
    """
1143
    logging.debug("Waiting for changes in job %s", job_id)
1144

    
1145
    job_info = None
1146
    log_entries = None
1147

    
1148
    end_time = time.time() + timeout
1149
    while True:
1150
      delta_time = end_time - time.time()
1151
      if delta_time < 0:
1152
        return constants.JOB_NOTCHANGED
1153

    
1154
      job = self._LoadJobUnlocked(job_id)
1155
      if not job:
1156
        logging.debug("Job %s not found", job_id)
1157
        break
1158

    
1159
      status = job.CalcStatus()
1160
      job_info = self._GetJobInfoUnlocked(job, fields)
1161
      log_entries = job.GetLogEntries(prev_log_serial)
1162

    
1163
      # Serializing and deserializing data can cause type changes (e.g. from
1164
      # tuple to list) or precision loss. We're doing it here so that we get
1165
      # the same modifications as the data received from the client. Without
1166
      # this, the comparison afterwards might fail without the data being
1167
      # significantly different.
1168
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1169
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1170

    
1171
      if status not in (constants.JOB_STATUS_QUEUED,
1172
                        constants.JOB_STATUS_RUNNING,
1173
                        constants.JOB_STATUS_WAITLOCK):
1174
        # Don't even try to wait if the job is no longer running, there will be
1175
        # no changes.
1176
        break
1177

    
1178
      if (prev_job_info != job_info or
1179
          (log_entries and prev_log_serial != log_entries[0][0])):
1180
        break
1181

    
1182
      logging.debug("Waiting again")
1183

    
1184
      # Release the queue lock while waiting
1185
      job.change.wait(delta_time)
1186

    
1187
    logging.debug("Job %s changed", job_id)
1188

    
1189
    if job_info is None and log_entries is None:
1190
      return None
1191
    else:
1192
      return (job_info, log_entries)
1193

    
1194
  @utils.LockedMethod
1195
  @_RequireOpenQueue
1196
  def CancelJob(self, job_id):
1197
    """Cancels a job.
1198

1199
    This will only succeed if the job has not started yet.
1200

1201
    @type job_id: string
1202
    @param job_id: job ID of job to be cancelled.
1203

1204
    """
1205
    logging.info("Cancelling job %s", job_id)
1206

    
1207
    job = self._LoadJobUnlocked(job_id)
1208
    if not job:
1209
      logging.debug("Job %s not found", job_id)
1210
      return (False, "Job %s not found" % job_id)
1211

    
1212
    job_status = job.CalcStatus()
1213

    
1214
    if job_status not in (constants.JOB_STATUS_QUEUED,
1215
                          constants.JOB_STATUS_WAITLOCK):
1216
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1217
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1218

    
1219
    if job_status == constants.JOB_STATUS_QUEUED:
1220
      self.CancelJobUnlocked(job)
1221
      return (True, "Job %s canceled" % job.id)
1222

    
1223
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1224
      # The worker will notice the new status and cancel the job
1225
      try:
1226
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1227
      finally:
1228
        self.UpdateJobUnlocked(job)
1229
      return (True, "Job %s will be canceled" % job.id)
1230

    
1231
  @_RequireOpenQueue
1232
  def CancelJobUnlocked(self, job):
1233
    """Marks a job as canceled.
1234

1235
    """
1236
    try:
1237
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1238
                            "Job canceled by request")
1239
    finally:
1240
      self.UpdateJobUnlocked(job)
1241

    
1242
  @_RequireOpenQueue
1243
  def _ArchiveJobsUnlocked(self, jobs):
1244
    """Archives jobs.
1245

1246
    @type jobs: list of L{_QueuedJob}
1247
    @param jobs: Job objects
1248
    @rtype: int
1249
    @return: Number of archived jobs
1250

1251
    """
1252
    archive_jobs = []
1253
    rename_files = []
1254
    for job in jobs:
1255
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1256
                                  constants.JOB_STATUS_SUCCESS,
1257
                                  constants.JOB_STATUS_ERROR):
1258
        logging.debug("Job %s is not yet done", job.id)
1259
        continue
1260

    
1261
      archive_jobs.append(job)
1262

    
1263
      old = self._GetJobPath(job.id)
1264
      new = self._GetArchivedJobPath(job.id)
1265
      rename_files.append((old, new))
1266

    
1267
    # TODO: What if 1..n files fail to rename?
1268
    self._RenameFilesUnlocked(rename_files)
1269

    
1270
    logging.debug("Successfully archived job(s) %s",
1271
                  ", ".join(job.id for job in archive_jobs))
1272

    
1273
    return len(archive_jobs)
1274

    
1275
  @utils.LockedMethod
1276
  @_RequireOpenQueue
1277
  def ArchiveJob(self, job_id):
1278
    """Archives a job.
1279

1280
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1281

1282
    @type job_id: string
1283
    @param job_id: Job ID of job to be archived.
1284
    @rtype: bool
1285
    @return: Whether job was archived
1286

1287
    """
1288
    logging.info("Archiving job %s", job_id)
1289

    
1290
    job = self._LoadJobUnlocked(job_id)
1291
    if not job:
1292
      logging.debug("Job %s not found", job_id)
1293
      return False
1294

    
1295
    return self._ArchiveJobsUnlocked([job]) == 1
1296

    
1297
  @utils.LockedMethod
1298
  @_RequireOpenQueue
1299
  def AutoArchiveJobs(self, age, timeout):
1300
    """Archives all jobs based on age.
1301

1302
    The method will archive all jobs which are older than the age
1303
    parameter. For jobs that don't have an end timestamp, the start
1304
    timestamp will be considered. The special '-1' age will cause
1305
    archival of all jobs (that are not running or queued).
1306

1307
    @type age: int
1308
    @param age: the minimum age in seconds
1309

1310
    """
1311
    logging.info("Archiving jobs with age more than %s seconds", age)
1312

    
1313
    now = time.time()
1314
    end_time = now + timeout
1315
    archived_count = 0
1316
    last_touched = 0
1317

    
1318
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1319
    pending = []
1320
    for idx, job_id in enumerate(all_job_ids):
1321
      last_touched = idx
1322

    
1323
      # Not optimal because jobs could be pending
1324
      # TODO: Measure average duration for job archival and take number of
1325
      # pending jobs into account.
1326
      if time.time() > end_time:
1327
        break
1328

    
1329
      # Returns None if the job failed to load
1330
      job = self._LoadJobUnlocked(job_id)
1331
      if job:
1332
        if job.end_timestamp is None:
1333
          if job.start_timestamp is None:
1334
            job_age = job.received_timestamp
1335
          else:
1336
            job_age = job.start_timestamp
1337
        else:
1338
          job_age = job.end_timestamp
1339

    
1340
        if age == -1 or now - job_age[0] > age:
1341
          pending.append(job)
1342

    
1343
          # Archive 10 jobs at a time
1344
          if len(pending) >= 10:
1345
            archived_count += self._ArchiveJobsUnlocked(pending)
1346
            pending = []
1347

    
1348
    if pending:
1349
      archived_count += self._ArchiveJobsUnlocked(pending)
1350

    
1351
    return (archived_count, len(all_job_ids) - last_touched - 1)
1352

    
1353
  def _GetJobInfoUnlocked(self, job, fields):
1354
    """Returns information about a job.
1355

1356
    @type job: L{_QueuedJob}
1357
    @param job: the job which we query
1358
    @type fields: list
1359
    @param fields: names of fields to return
1360
    @rtype: list
1361
    @return: list with one element for each field
1362
    @raise errors.OpExecError: when an invalid field
1363
        has been passed
1364

1365
    """
1366
    row = []
1367
    for fname in fields:
1368
      if fname == "id":
1369
        row.append(job.id)
1370
      elif fname == "status":
1371
        row.append(job.CalcStatus())
1372
      elif fname == "ops":
1373
        row.append([op.input.__getstate__() for op in job.ops])
1374
      elif fname == "opresult":
1375
        row.append([op.result for op in job.ops])
1376
      elif fname == "opstatus":
1377
        row.append([op.status for op in job.ops])
1378
      elif fname == "oplog":
1379
        row.append([op.log for op in job.ops])
1380
      elif fname == "opstart":
1381
        row.append([op.start_timestamp for op in job.ops])
1382
      elif fname == "opend":
1383
        row.append([op.end_timestamp for op in job.ops])
1384
      elif fname == "received_ts":
1385
        row.append(job.received_timestamp)
1386
      elif fname == "start_ts":
1387
        row.append(job.start_timestamp)
1388
      elif fname == "end_ts":
1389
        row.append(job.end_timestamp)
1390
      elif fname == "lock_status":
1391
        row.append(job.lock_status)
1392
      elif fname == "summary":
1393
        row.append([op.input.Summary() for op in job.ops])
1394
      else:
1395
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1396
    return row
1397

    
1398
  @utils.LockedMethod
1399
  @_RequireOpenQueue
1400
  def QueryJobs(self, job_ids, fields):
1401
    """Returns a list of jobs in queue.
1402

1403
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1404
    processing for each job.
1405

1406
    @type job_ids: list
1407
    @param job_ids: sequence of job identifiers or None for all
1408
    @type fields: list
1409
    @param fields: names of fields to return
1410
    @rtype: list
1411
    @return: list one element per job, each element being list with
1412
        the requested fields
1413

1414
    """
1415
    jobs = []
1416

    
1417
    for job in self._GetJobsUnlocked(job_ids):
1418
      if job is None:
1419
        jobs.append(None)
1420
      else:
1421
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1422

    
1423
    return jobs
1424

    
1425
  @utils.LockedMethod
1426
  @_RequireOpenQueue
1427
  def Shutdown(self):
1428
    """Stops the job queue.
1429

1430
    This shutdowns all the worker threads an closes the queue.
1431

1432
    """
1433
    self._wpool.TerminateWorkers()
1434

    
1435
    self._queue_lock.Close()
1436
    self._queue_lock = None