Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 13f1af63

History | View | Annotate | Download (38.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encasulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  def __init__(self, op):
84
    """Constructor for the _QuededOpCode.
85

86
    @type op: L{opcodes.OpCode}
87
    @param op: the opcode we encapsulate
88

89
    """
90
    self.input = op
91
    self.status = constants.OP_STATUS_QUEUED
92
    self.result = None
93
    self.log = []
94
    self.start_timestamp = None
95
    self.end_timestamp = None
96

    
97
  @classmethod
98
  def Restore(cls, state):
99
    """Restore the _QueuedOpCode from the serialized form.
100

101
    @type state: dict
102
    @param state: the serialized state
103
    @rtype: _QueuedOpCode
104
    @return: a new _QueuedOpCode instance
105

106
    """
107
    obj = _QueuedOpCode.__new__(cls)
108
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109
    obj.status = state["status"]
110
    obj.result = state["result"]
111
    obj.log = state["log"]
112
    obj.start_timestamp = state.get("start_timestamp", None)
113
    obj.end_timestamp = state.get("end_timestamp", None)
114
    return obj
115

    
116
  def Serialize(self):
117
    """Serializes this _QueuedOpCode.
118

119
    @rtype: dict
120
    @return: the dictionary holding the serialized state
121

122
    """
123
    return {
124
      "input": self.input.__getstate__(),
125
      "status": self.status,
126
      "result": self.result,
127
      "log": self.log,
128
      "start_timestamp": self.start_timestamp,
129
      "end_timestamp": self.end_timestamp,
130
      }
131

    
132

    
133
class _QueuedJob(object):
134
  """In-memory job representation.
135

136
  This is what we use to track the user-submitted jobs. Locking must
137
  be taken care of by users of this class.
138

139
  @type queue: L{JobQueue}
140
  @ivar queue: the parent queue
141
  @ivar id: the job ID
142
  @type ops: list
143
  @ivar ops: the list of _QueuedOpCode that constitute the job
144
  @type run_op_index: int
145
  @ivar run_op_index: the currently executing opcode, or -1 if
146
      we didn't yet start executing
147
  @type log_serial: int
148
  @ivar log_serial: holds the index for the next log entry
149
  @ivar received_timestamp: the timestamp for when the job was received
150
  @ivar start_timestmap: the timestamp for start of execution
151
  @ivar end_timestamp: the timestamp for end of execution
152
  @ivar change: a Condition variable we use for waiting for job changes
153

154
  """
155
  def __init__(self, queue, job_id, ops):
156
    """Constructor for the _QueuedJob.
157

158
    @type queue: L{JobQueue}
159
    @param queue: our parent queue
160
    @type job_id: job_id
161
    @param job_id: our job id
162
    @type ops: list
163
    @param ops: the list of opcodes we hold, which will be encapsulated
164
        in _QueuedOpCodes
165

166
    """
167
    if not ops:
168
      # TODO: use a better exception
169
      raise Exception("No opcodes")
170

    
171
    self.queue = queue
172
    self.id = job_id
173
    self.ops = [_QueuedOpCode(op) for op in ops]
174
    self.run_op_index = -1
175
    self.log_serial = 0
176
    self.received_timestamp = TimeStampNow()
177
    self.start_timestamp = None
178
    self.end_timestamp = None
179

    
180
    # Condition to wait for changes
181
    self.change = threading.Condition(self.queue._lock)
182

    
183
  @classmethod
184
  def Restore(cls, queue, state):
185
    """Restore a _QueuedJob from serialized state:
186

187
    @type queue: L{JobQueue}
188
    @param queue: to which queue the restored job belongs
189
    @type state: dict
190
    @param state: the serialized state
191
    @rtype: _JobQueue
192
    @return: the restored _JobQueue instance
193

194
    """
195
    obj = _QueuedJob.__new__(cls)
196
    obj.queue = queue
197
    obj.id = state["id"]
198
    obj.run_op_index = state["run_op_index"]
199
    obj.received_timestamp = state.get("received_timestamp", None)
200
    obj.start_timestamp = state.get("start_timestamp", None)
201
    obj.end_timestamp = state.get("end_timestamp", None)
202

    
203
    obj.ops = []
204
    obj.log_serial = 0
205
    for op_state in state["ops"]:
206
      op = _QueuedOpCode.Restore(op_state)
207
      for log_entry in op.log:
208
        obj.log_serial = max(obj.log_serial, log_entry[0])
209
      obj.ops.append(op)
210

    
211
    # Condition to wait for changes
212
    obj.change = threading.Condition(obj.queue._lock)
213

    
214
    return obj
215

    
216
  def Serialize(self):
217
    """Serialize the _JobQueue instance.
218

219
    @rtype: dict
220
    @return: the serialized state
221

222
    """
223
    return {
224
      "id": self.id,
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227
      "start_timestamp": self.start_timestamp,
228
      "end_timestamp": self.end_timestamp,
229
      "received_timestamp": self.received_timestamp,
230
      }
231

    
232
  def CalcStatus(self):
233
    """Compute the status of this job.
234

235
    This function iterates over all the _QueuedOpCodes in the job and
236
    based on their status, computes the job status.
237

238
    The algorithm is:
239
      - if we find a cancelled, or finished with error, the job
240
        status will be the same
241
      - otherwise, the last opcode with the status one of:
242
          - waitlock
243
          - canceling
244
          - running
245

246
        will determine the job status
247

248
      - otherwise, it means either all opcodes are queued, or success,
249
        and the job status will be the same
250

251
    @return: the job status
252

253
    """
254
    status = constants.JOB_STATUS_QUEUED
255

    
256
    all_success = True
257
    for op in self.ops:
258
      if op.status == constants.OP_STATUS_SUCCESS:
259
        continue
260

    
261
      all_success = False
262

    
263
      if op.status == constants.OP_STATUS_QUEUED:
264
        pass
265
      elif op.status == constants.OP_STATUS_WAITLOCK:
266
        status = constants.JOB_STATUS_WAITLOCK
267
      elif op.status == constants.OP_STATUS_RUNNING:
268
        status = constants.JOB_STATUS_RUNNING
269
      elif op.status == constants.OP_STATUS_CANCELING:
270
        status = constants.JOB_STATUS_CANCELING
271
        break
272
      elif op.status == constants.OP_STATUS_ERROR:
273
        status = constants.JOB_STATUS_ERROR
274
        # The whole job fails if one opcode failed
275
        break
276
      elif op.status == constants.OP_STATUS_CANCELED:
277
        status = constants.OP_STATUS_CANCELED
278
        break
279

    
280
    if all_success:
281
      status = constants.JOB_STATUS_SUCCESS
282

    
283
    return status
284

    
285
  def GetLogEntries(self, newer_than):
286
    """Selectively returns the log entries.
287

288
    @type newer_than: None or int
289
    @param newer_than: if this is None, return all log enties,
290
        otherwise return only the log entries with serial higher
291
        than this value
292
    @rtype: list
293
    @return: the list of the log entries selected
294

295
    """
296
    if newer_than is None:
297
      serial = -1
298
    else:
299
      serial = newer_than
300

    
301
    entries = []
302
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304

    
305
    return entries
306

    
307

    
308
class _JobQueueWorker(workerpool.BaseWorker):
309
  """The actual job workers.
310

311
  """
312
  def _NotifyStart(self):
313
    """Mark the opcode as running, not lock-waiting.
314

315
    This is called from the mcpu code as a notifier function, when the
316
    LU is finally about to start the Exec() method. Of course, to have
317
    end-user visible results, the opcode must be initially (before
318
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319

320
    """
321
    assert self.queue, "Queue attribute is missing"
322
    assert self.opcode, "Opcode attribute is missing"
323

    
324
    self.queue.acquire()
325
    try:
326
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327
                                    constants.OP_STATUS_CANCELING)
328

    
329
      # Cancel here if we were asked to
330
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331
        raise CancelJob()
332

    
333
      self.opcode.status = constants.OP_STATUS_RUNNING
334
    finally:
335
      self.queue.release()
336

    
337
  def RunTask(self, job):
338
    """Job executor.
339

340
    This functions processes a job. It is closely tied to the _QueuedJob and
341
    _QueuedOpCode classes.
342

343
    @type job: L{_QueuedJob}
344
    @param job: the job to be processed
345

346
    """
347
    logging.info("Worker %s processing job %s",
348
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
350
    self.queue = queue = job.queue
351
    try:
352
      try:
353
        count = len(job.ops)
354
        for idx, op in enumerate(job.ops):
355
          op_summary = op.input.Summary()
356
          try:
357
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358
                         op_summary)
359

    
360
            queue.acquire()
361
            try:
362
              if op.status == constants.OP_STATUS_CANCELED:
363
                raise CancelJob()
364
              assert op.status == constants.OP_STATUS_QUEUED
365
              job.run_op_index = idx
366
              op.status = constants.OP_STATUS_WAITLOCK
367
              op.result = None
368
              op.start_timestamp = TimeStampNow()
369
              if idx == 0: # first opcode
370
                job.start_timestamp = op.start_timestamp
371
              queue.UpdateJobUnlocked(job)
372

    
373
              input_opcode = op.input
374
            finally:
375
              queue.release()
376

    
377
            def _Log(*args):
378
              """Append a log entry.
379

380
              """
381
              assert len(args) < 3
382

    
383
              if len(args) == 1:
384
                log_type = constants.ELOG_MESSAGE
385
                log_msg = args[0]
386
              else:
387
                log_type, log_msg = args
388

    
389
              # The time is split to make serialization easier and not lose
390
              # precision.
391
              timestamp = utils.SplitTime(time.time())
392

    
393
              queue.acquire()
394
              try:
395
                job.log_serial += 1
396
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
397

    
398
                job.change.notifyAll()
399
              finally:
400
                queue.release()
401

    
402
            # Make sure not to hold lock while _Log is called
403
            self.opcode = op
404
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
405

    
406
            queue.acquire()
407
            try:
408
              op.status = constants.OP_STATUS_SUCCESS
409
              op.result = result
410
              op.end_timestamp = TimeStampNow()
411
              queue.UpdateJobUnlocked(job)
412
            finally:
413
              queue.release()
414

    
415
            logging.info("Op %s/%s: Successfully finished opcode %s",
416
                         idx + 1, count, op_summary)
417
          except CancelJob:
418
            # Will be handled further up
419
            raise
420
          except Exception, err:
421
            queue.acquire()
422
            try:
423
              try:
424
                op.status = constants.OP_STATUS_ERROR
425
                op.result = str(err)
426
                op.end_timestamp = TimeStampNow()
427
                logging.info("Op %s/%s: Error in opcode %s: %s",
428
                             idx + 1, count, op_summary, err)
429
              finally:
430
                queue.UpdateJobUnlocked(job)
431
            finally:
432
              queue.release()
433
            raise
434

    
435
      except CancelJob:
436
        queue.acquire()
437
        try:
438
          queue.CancelJobUnlocked(job)
439
        finally:
440
          queue.release()
441
      except errors.GenericError, err:
442
        logging.exception("Ganeti exception")
443
      except:
444
        logging.exception("Unhandled exception")
445
    finally:
446
      queue.acquire()
447
      try:
448
        try:
449
          job.run_op_idx = -1
450
          job.end_timestamp = TimeStampNow()
451
          queue.UpdateJobUnlocked(job)
452
        finally:
453
          job_id = job.id
454
          status = job.CalcStatus()
455
      finally:
456
        queue.release()
457
      logging.info("Worker %s finished job %s, status = %s",
458
                   self.worker_id, job_id, status)
459

    
460

    
461
class _JobQueueWorkerPool(workerpool.WorkerPool):
462
  """Simple class implementing a job-processing workerpool.
463

464
  """
465
  def __init__(self, queue):
466
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
467
                                              _JobQueueWorker)
468
    self.queue = queue
469

    
470

    
471
class JobQueue(object):
472
  """Quue used to manaage the jobs.
473

474
  @cvar _RE_JOB_FILE: regex matching the valid job file names
475

476
  """
477
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
478

    
479
  def _RequireOpenQueue(fn):
480
    """Decorator for "public" functions.
481

482
    This function should be used for all 'public' functions. That is,
483
    functions usually called from other classes.
484

485
    @warning: Use this decorator only after utils.LockedMethod!
486

487
    Example::
488
      @utils.LockedMethod
489
      @_RequireOpenQueue
490
      def Example(self):
491
        pass
492

493
    """
494
    def wrapper(self, *args, **kwargs):
495
      assert self._queue_lock is not None, "Queue should be open"
496
      return fn(self, *args, **kwargs)
497
    return wrapper
498

    
499
  def __init__(self, context):
500
    """Constructor for JobQueue.
501

502
    The constructor will initialize the job queue object and then
503
    start loading the current jobs from disk, either for starting them
504
    (if they were queue) or for aborting them (if they were already
505
    running).
506

507
    @type context: GanetiContext
508
    @param context: the context object for access to the configuration
509
        data and other ganeti objects
510

511
    """
512
    self.context = context
513
    self._memcache = weakref.WeakValueDictionary()
514
    self._my_hostname = utils.HostInfo().name
515

    
516
    # Locking
517
    self._lock = threading.Lock()
518
    self.acquire = self._lock.acquire
519
    self.release = self._lock.release
520

    
521
    # Initialize
522
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
523

    
524
    # Read serial file
525
    self._last_serial = jstore.ReadSerial()
526
    assert self._last_serial is not None, ("Serial file was modified between"
527
                                           " check in jstore and here")
528

    
529
    # Get initial list of nodes
530
    self._nodes = dict((n.name, n.primary_ip)
531
                       for n in self.context.cfg.GetAllNodesInfo().values()
532
                       if n.master_candidate)
533

    
534
    # Remove master node
535
    try:
536
      del self._nodes[self._my_hostname]
537
    except KeyError:
538
      pass
539

    
540
    # TODO: Check consistency across nodes
541

    
542
    # Setup worker pool
543
    self._wpool = _JobQueueWorkerPool(self)
544
    try:
545
      # We need to lock here because WorkerPool.AddTask() may start a job while
546
      # we're still doing our work.
547
      self.acquire()
548
      try:
549
        logging.info("Inspecting job queue")
550

    
551
        all_job_ids = self._GetJobIDsUnlocked()
552
        jobs_count = len(all_job_ids)
553
        lastinfo = time.time()
554
        for idx, job_id in enumerate(all_job_ids):
555
          # Give an update every 1000 jobs or 10 seconds
556
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
557
              idx == (jobs_count - 1)):
558
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
559
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
560
            lastinfo = time.time()
561

    
562
          job = self._LoadJobUnlocked(job_id)
563

    
564
          # a failure in loading the job can cause 'None' to be returned
565
          if job is None:
566
            continue
567

    
568
          status = job.CalcStatus()
569

    
570
          if status in (constants.JOB_STATUS_QUEUED, ):
571
            self._wpool.AddTask(job)
572

    
573
          elif status in (constants.JOB_STATUS_RUNNING,
574
                          constants.JOB_STATUS_WAITLOCK,
575
                          constants.JOB_STATUS_CANCELING):
576
            logging.warning("Unfinished job %s found: %s", job.id, job)
577
            try:
578
              for op in job.ops:
579
                op.status = constants.OP_STATUS_ERROR
580
                op.result = "Unclean master daemon shutdown"
581
            finally:
582
              self.UpdateJobUnlocked(job)
583

    
584
        logging.info("Job queue inspection finished")
585
      finally:
586
        self.release()
587
    except:
588
      self._wpool.TerminateWorkers()
589
      raise
590

    
591
  @utils.LockedMethod
592
  @_RequireOpenQueue
593
  def AddNode(self, node):
594
    """Register a new node with the queue.
595

596
    @type node: L{objects.Node}
597
    @param node: the node object to be added
598

599
    """
600
    node_name = node.name
601
    assert node_name != self._my_hostname
602

    
603
    # Clean queue directory on added node
604
    rpc.RpcRunner.call_jobqueue_purge(node_name)
605

    
606
    if not node.master_candidate:
607
      # remove if existing, ignoring errors
608
      self._nodes.pop(node_name, None)
609
      # and skip the replication of the job ids
610
      return
611

    
612
    # Upload the whole queue excluding archived jobs
613
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
614

    
615
    # Upload current serial file
616
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
617

    
618
    for file_name in files:
619
      # Read file content
620
      fd = open(file_name, "r")
621
      try:
622
        content = fd.read()
623
      finally:
624
        fd.close()
625

    
626
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
627
                                                  [node.primary_ip],
628
                                                  file_name, content)
629
      if not result[node_name]:
630
        logging.error("Failed to upload %s to %s", file_name, node_name)
631

    
632
    self._nodes[node_name] = node.primary_ip
633

    
634
  @utils.LockedMethod
635
  @_RequireOpenQueue
636
  def RemoveNode(self, node_name):
637
    """Callback called when removing nodes from the cluster.
638

639
    @type node_name: str
640
    @param node_name: the name of the node to remove
641

642
    """
643
    try:
644
      # The queue is removed by the "leave node" RPC call.
645
      del self._nodes[node_name]
646
    except KeyError:
647
      pass
648

    
649
  def _CheckRpcResult(self, result, nodes, failmsg):
650
    """Verifies the status of an RPC call.
651

652
    Since we aim to keep consistency should this node (the current
653
    master) fail, we will log errors if our rpc fail, and especially
654
    log the case when more than half of the nodes failes.
655

656
    @param result: the data as returned from the rpc call
657
    @type nodes: list
658
    @param nodes: the list of nodes we made the call to
659
    @type failmsg: str
660
    @param failmsg: the identifier to be used for logging
661

662
    """
663
    failed = []
664
    success = []
665

    
666
    for node in nodes:
667
      if result[node]:
668
        success.append(node)
669
      else:
670
        failed.append(node)
671

    
672
    if failed:
673
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
674

    
675
    # +1 for the master node
676
    if (len(success) + 1) < len(failed):
677
      # TODO: Handle failing nodes
678
      logging.error("More than half of the nodes failed")
679

    
680
  def _GetNodeIp(self):
681
    """Helper for returning the node name/ip list.
682

683
    @rtype: (list, list)
684
    @return: a tuple of two lists, the first one with the node
685
        names and the second one with the node addresses
686

687
    """
688
    name_list = self._nodes.keys()
689
    addr_list = [self._nodes[name] for name in name_list]
690
    return name_list, addr_list
691

    
692
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
693
    """Writes a file locally and then replicates it to all nodes.
694

695
    This function will replace the contents of a file on the local
696
    node and then replicate it to all the other nodes we have.
697

698
    @type file_name: str
699
    @param file_name: the path of the file to be replicated
700
    @type data: str
701
    @param data: the new contents of the file
702

703
    """
704
    utils.WriteFile(file_name, data=data)
705

    
706
    names, addrs = self._GetNodeIp()
707
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
708
    self._CheckRpcResult(result, self._nodes,
709
                         "Updating %s" % file_name)
710

    
711
  def _RenameFilesUnlocked(self, rename):
712
    """Renames a file locally and then replicate the change.
713

714
    This function will rename a file in the local queue directory
715
    and then replicate this rename to all the other nodes we have.
716

717
    @type rename: list of (old, new)
718
    @param rename: List containing tuples mapping old to new names
719

720
    """
721
    # Rename them locally
722
    for old, new in rename:
723
      utils.RenameFile(old, new, mkdir=True)
724

    
725
    # ... and on all nodes
726
    names, addrs = self._GetNodeIp()
727
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
728
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
729

    
730
  def _FormatJobID(self, job_id):
731
    """Convert a job ID to string format.
732

733
    Currently this just does C{str(job_id)} after performing some
734
    checks, but if we want to change the job id format this will
735
    abstract this change.
736

737
    @type job_id: int or long
738
    @param job_id: the numeric job id
739
    @rtype: str
740
    @return: the formatted job id
741

742
    """
743
    if not isinstance(job_id, (int, long)):
744
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
745
    if job_id < 0:
746
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
747

    
748
    return str(job_id)
749

    
750
  @classmethod
751
  def _GetArchiveDirectory(cls, job_id):
752
    """Returns the archive directory for a job.
753

754
    @type job_id: str
755
    @param job_id: Job identifier
756
    @rtype: str
757
    @return: Directory name
758

759
    """
760
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
761

    
762
  def _NewSerialUnlocked(self):
763
    """Generates a new job identifier.
764

765
    Job identifiers are unique during the lifetime of a cluster.
766

767
    @rtype: str
768
    @return: a string representing the job identifier.
769

770
    """
771
    # New number
772
    serial = self._last_serial + 1
773

    
774
    # Write to file
775
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
776
                                        "%s\n" % serial)
777

    
778
    # Keep it only if we were able to write the file
779
    self._last_serial = serial
780

    
781
    return self._FormatJobID(serial)
782

    
783
  @staticmethod
784
  def _GetJobPath(job_id):
785
    """Returns the job file for a given job id.
786

787
    @type job_id: str
788
    @param job_id: the job identifier
789
    @rtype: str
790
    @return: the path to the job file
791

792
    """
793
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
794

    
795
  @classmethod
796
  def _GetArchivedJobPath(cls, job_id):
797
    """Returns the archived job file for a give job id.
798

799
    @type job_id: str
800
    @param job_id: the job identifier
801
    @rtype: str
802
    @return: the path to the archived job file
803

804
    """
805
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
806
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
807

    
808
  @classmethod
809
  def _ExtractJobID(cls, name):
810
    """Extract the job id from a filename.
811

812
    @type name: str
813
    @param name: the job filename
814
    @rtype: job id or None
815
    @return: the job id corresponding to the given filename,
816
        or None if the filename does not represent a valid
817
        job file
818

819
    """
820
    m = cls._RE_JOB_FILE.match(name)
821
    if m:
822
      return m.group(1)
823
    else:
824
      return None
825

    
826
  def _GetJobIDsUnlocked(self, archived=False):
827
    """Return all known job IDs.
828

829
    If the parameter archived is True, archived jobs IDs will be
830
    included. Currently this argument is unused.
831

832
    The method only looks at disk because it's a requirement that all
833
    jobs are present on disk (so in the _memcache we don't have any
834
    extra IDs).
835

836
    @rtype: list
837
    @return: the list of job IDs
838

839
    """
840
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
841
    jlist = utils.NiceSort(jlist)
842
    return jlist
843

    
844
  def _ListJobFiles(self):
845
    """Returns the list of current job files.
846

847
    @rtype: list
848
    @return: the list of job file names
849

850
    """
851
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
852
            if self._RE_JOB_FILE.match(name)]
853

    
854
  def _LoadJobUnlocked(self, job_id):
855
    """Loads a job from the disk or memory.
856

857
    Given a job id, this will return the cached job object if
858
    existing, or try to load the job from the disk. If loading from
859
    disk, it will also add the job to the cache.
860

861
    @param job_id: the job id
862
    @rtype: L{_QueuedJob} or None
863
    @return: either None or the job object
864

865
    """
866
    job = self._memcache.get(job_id, None)
867
    if job:
868
      logging.debug("Found job %s in memcache", job_id)
869
      return job
870

    
871
    filepath = self._GetJobPath(job_id)
872
    logging.debug("Loading job from %s", filepath)
873
    try:
874
      fd = open(filepath, "r")
875
    except IOError, err:
876
      if err.errno in (errno.ENOENT, ):
877
        return None
878
      raise
879
    try:
880
      data = serializer.LoadJson(fd.read())
881
    finally:
882
      fd.close()
883

    
884
    try:
885
      job = _QueuedJob.Restore(self, data)
886
    except Exception, err:
887
      new_path = self._GetArchivedJobPath(job_id)
888
      if filepath == new_path:
889
        # job already archived (future case)
890
        logging.exception("Can't parse job %s", job_id)
891
      else:
892
        # non-archived case
893
        logging.exception("Can't parse job %s, will archive.", job_id)
894
        self._RenameFilesUnlocked([(filepath, new_path)])
895
      return None
896

    
897
    self._memcache[job_id] = job
898
    logging.debug("Added job %s to the cache", job_id)
899
    return job
900

    
901
  def _GetJobsUnlocked(self, job_ids):
902
    """Return a list of jobs based on their IDs.
903

904
    @type job_ids: list
905
    @param job_ids: either an empty list (meaning all jobs),
906
        or a list of job IDs
907
    @rtype: list
908
    @return: the list of job objects
909

910
    """
911
    if not job_ids:
912
      job_ids = self._GetJobIDsUnlocked()
913

    
914
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
915

    
916
  @staticmethod
917
  def _IsQueueMarkedDrain():
918
    """Check if the queue is marked from drain.
919

920
    This currently uses the queue drain file, which makes it a
921
    per-node flag. In the future this can be moved to the config file.
922

923
    @rtype: boolean
924
    @return: True of the job queue is marked for draining
925

926
    """
927
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
928

    
929
  @staticmethod
930
  def SetDrainFlag(drain_flag):
931
    """Sets the drain flag for the queue.
932

933
    This is similar to the function L{backend.JobQueueSetDrainFlag},
934
    and in the future we might merge them.
935

936
    @type drain_flag: boolean
937
    @param drain_flag: wheter to set or unset the drain flag
938

939
    """
940
    if drain_flag:
941
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
942
    else:
943
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
944
    return True
945

    
946
  @_RequireOpenQueue
947
  def _SubmitJobUnlocked(self, ops):
948
    """Create and store a new job.
949

950
    This enters the job into our job queue and also puts it on the new
951
    queue, in order for it to be picked up by the queue processors.
952

953
    @type ops: list
954
    @param ops: The list of OpCodes that will become the new job.
955
    @rtype: job ID
956
    @return: the job ID of the newly created job
957
    @raise errors.JobQueueDrainError: if the job is marked for draining
958

959
    """
960
    if self._IsQueueMarkedDrain():
961
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
962

    
963
    # Check job queue size
964
    size = len(self._ListJobFiles())
965
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
966
      # TODO: Autoarchive jobs. Make sure it's not done on every job
967
      # submission, though.
968
      #size = ...
969
      pass
970

    
971
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
972
      raise errors.JobQueueFull()
973

    
974
    # Get job identifier
975
    job_id = self._NewSerialUnlocked()
976
    job = _QueuedJob(self, job_id, ops)
977

    
978
    # Write to disk
979
    self.UpdateJobUnlocked(job)
980

    
981
    logging.debug("Adding new job %s to the cache", job_id)
982
    self._memcache[job_id] = job
983

    
984
    # Add to worker pool
985
    self._wpool.AddTask(job)
986

    
987
    return job.id
988

    
989
  @utils.LockedMethod
990
  @_RequireOpenQueue
991
  def SubmitJob(self, ops):
992
    """Create and store a new job.
993

994
    @see: L{_SubmitJobUnlocked}
995

996
    """
997
    return self._SubmitJobUnlocked(ops)
998

    
999
  @utils.LockedMethod
1000
  @_RequireOpenQueue
1001
  def SubmitManyJobs(self, jobs):
1002
    """Create and store multiple jobs.
1003

1004
    @see: L{_SubmitJobUnlocked}
1005

1006
    """
1007
    results = []
1008
    for ops in jobs:
1009
      try:
1010
        data = self._SubmitJobUnlocked(ops)
1011
        status = True
1012
      except errors.GenericError, err:
1013
        data = str(err)
1014
        status = False
1015
      results.append((status, data))
1016

    
1017
    return results
1018

    
1019

    
1020
  @_RequireOpenQueue
1021
  def UpdateJobUnlocked(self, job):
1022
    """Update a job's on disk storage.
1023

1024
    After a job has been modified, this function needs to be called in
1025
    order to write the changes to disk and replicate them to the other
1026
    nodes.
1027

1028
    @type job: L{_QueuedJob}
1029
    @param job: the changed job
1030

1031
    """
1032
    filename = self._GetJobPath(job.id)
1033
    data = serializer.DumpJson(job.Serialize(), indent=False)
1034
    logging.debug("Writing job %s to %s", job.id, filename)
1035
    self._WriteAndReplicateFileUnlocked(filename, data)
1036

    
1037
    # Notify waiters about potential changes
1038
    job.change.notifyAll()
1039

    
1040
  @utils.LockedMethod
1041
  @_RequireOpenQueue
1042
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1043
                        timeout):
1044
    """Waits for changes in a job.
1045

1046
    @type job_id: string
1047
    @param job_id: Job identifier
1048
    @type fields: list of strings
1049
    @param fields: Which fields to check for changes
1050
    @type prev_job_info: list or None
1051
    @param prev_job_info: Last job information returned
1052
    @type prev_log_serial: int
1053
    @param prev_log_serial: Last job message serial number
1054
    @type timeout: float
1055
    @param timeout: maximum time to wait
1056
    @rtype: tuple (job info, log entries)
1057
    @return: a tuple of the job information as required via
1058
        the fields parameter, and the log entries as a list
1059

1060
        if the job has not changed and the timeout has expired,
1061
        we instead return a special value,
1062
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1063
        as such by the clients
1064

1065
    """
1066
    logging.debug("Waiting for changes in job %s", job_id)
1067
    end_time = time.time() + timeout
1068
    while True:
1069
      delta_time = end_time - time.time()
1070
      if delta_time < 0:
1071
        return constants.JOB_NOTCHANGED
1072

    
1073
      job = self._LoadJobUnlocked(job_id)
1074
      if not job:
1075
        logging.debug("Job %s not found", job_id)
1076
        break
1077

    
1078
      status = job.CalcStatus()
1079
      job_info = self._GetJobInfoUnlocked(job, fields)
1080
      log_entries = job.GetLogEntries(prev_log_serial)
1081

    
1082
      # Serializing and deserializing data can cause type changes (e.g. from
1083
      # tuple to list) or precision loss. We're doing it here so that we get
1084
      # the same modifications as the data received from the client. Without
1085
      # this, the comparison afterwards might fail without the data being
1086
      # significantly different.
1087
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1088
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1089

    
1090
      if status not in (constants.JOB_STATUS_QUEUED,
1091
                        constants.JOB_STATUS_RUNNING,
1092
                        constants.JOB_STATUS_WAITLOCK):
1093
        # Don't even try to wait if the job is no longer running, there will be
1094
        # no changes.
1095
        break
1096

    
1097
      if (prev_job_info != job_info or
1098
          (log_entries and prev_log_serial != log_entries[0][0])):
1099
        break
1100

    
1101
      logging.debug("Waiting again")
1102

    
1103
      # Release the queue lock while waiting
1104
      job.change.wait(delta_time)
1105

    
1106
    logging.debug("Job %s changed", job_id)
1107

    
1108
    return (job_info, log_entries)
1109

    
1110
  @utils.LockedMethod
1111
  @_RequireOpenQueue
1112
  def CancelJob(self, job_id):
1113
    """Cancels a job.
1114

1115
    This will only succeed if the job has not started yet.
1116

1117
    @type job_id: string
1118
    @param job_id: job ID of job to be cancelled.
1119

1120
    """
1121
    logging.info("Cancelling job %s", job_id)
1122

    
1123
    job = self._LoadJobUnlocked(job_id)
1124
    if not job:
1125
      logging.debug("Job %s not found", job_id)
1126
      return (False, "Job %s not found" % job_id)
1127

    
1128
    job_status = job.CalcStatus()
1129

    
1130
    if job_status not in (constants.JOB_STATUS_QUEUED,
1131
                          constants.JOB_STATUS_WAITLOCK):
1132
      logging.debug("Job %s is no longer in the queue", job.id)
1133
      return (False, "Job %s is no longer in the queue" % job.id)
1134

    
1135
    if job_status == constants.JOB_STATUS_QUEUED:
1136
      self.CancelJobUnlocked(job)
1137
      return (True, "Job %s canceled" % job.id)
1138

    
1139
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1140
      # The worker will notice the new status and cancel the job
1141
      try:
1142
        for op in job.ops:
1143
          op.status = constants.OP_STATUS_CANCELING
1144
      finally:
1145
        self.UpdateJobUnlocked(job)
1146
      return (True, "Job %s will be canceled" % job.id)
1147

    
1148
  @_RequireOpenQueue
1149
  def CancelJobUnlocked(self, job):
1150
    """Marks a job as canceled.
1151

1152
    """
1153
    try:
1154
      for op in job.ops:
1155
        op.status = constants.OP_STATUS_CANCELED
1156
        op.result = "Job canceled by request"
1157
    finally:
1158
      self.UpdateJobUnlocked(job)
1159

    
1160
  @_RequireOpenQueue
1161
  def _ArchiveJobsUnlocked(self, jobs):
1162
    """Archives jobs.
1163

1164
    @type jobs: list of L{_QueuedJob}
1165
    @param jobs: Job objects
1166
    @rtype: int
1167
    @return: Number of archived jobs
1168

1169
    """
1170
    archive_jobs = []
1171
    rename_files = []
1172
    for job in jobs:
1173
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1174
                                  constants.JOB_STATUS_SUCCESS,
1175
                                  constants.JOB_STATUS_ERROR):
1176
        logging.debug("Job %s is not yet done", job.id)
1177
        continue
1178

    
1179
      archive_jobs.append(job)
1180

    
1181
      old = self._GetJobPath(job.id)
1182
      new = self._GetArchivedJobPath(job.id)
1183
      rename_files.append((old, new))
1184

    
1185
    # TODO: What if 1..n files fail to rename?
1186
    self._RenameFilesUnlocked(rename_files)
1187

    
1188
    logging.debug("Successfully archived job(s) %s",
1189
                  ", ".join(job.id for job in archive_jobs))
1190

    
1191
    return len(archive_jobs)
1192

    
1193
  @utils.LockedMethod
1194
  @_RequireOpenQueue
1195
  def ArchiveJob(self, job_id):
1196
    """Archives a job.
1197

1198
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1199

1200
    @type job_id: string
1201
    @param job_id: Job ID of job to be archived.
1202
    @rtype: bool
1203
    @return: Whether job was archived
1204

1205
    """
1206
    logging.info("Archiving job %s", job_id)
1207

    
1208
    job = self._LoadJobUnlocked(job_id)
1209
    if not job:
1210
      logging.debug("Job %s not found", job_id)
1211
      return False
1212

    
1213
    return self._ArchiveJobsUnlocked([job]) == 1
1214

    
1215
  @utils.LockedMethod
1216
  @_RequireOpenQueue
1217
  def AutoArchiveJobs(self, age, timeout):
1218
    """Archives all jobs based on age.
1219

1220
    The method will archive all jobs which are older than the age
1221
    parameter. For jobs that don't have an end timestamp, the start
1222
    timestamp will be considered. The special '-1' age will cause
1223
    archival of all jobs (that are not running or queued).
1224

1225
    @type age: int
1226
    @param age: the minimum age in seconds
1227

1228
    """
1229
    logging.info("Archiving jobs with age more than %s seconds", age)
1230

    
1231
    now = time.time()
1232
    end_time = now + timeout
1233
    archived_count = 0
1234
    last_touched = 0
1235

    
1236
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1237
    pending = []
1238
    for idx, job_id in enumerate(all_job_ids):
1239
      last_touched = idx
1240

    
1241
      # Not optimal because jobs could be pending
1242
      # TODO: Measure average duration for job archival and take number of
1243
      # pending jobs into account.
1244
      if time.time() > end_time:
1245
        break
1246

    
1247
      # Returns None if the job failed to load
1248
      job = self._LoadJobUnlocked(job_id)
1249
      if job:
1250
        if job.end_timestamp is None:
1251
          if job.start_timestamp is None:
1252
            job_age = job.received_timestamp
1253
          else:
1254
            job_age = job.start_timestamp
1255
        else:
1256
          job_age = job.end_timestamp
1257

    
1258
        if age == -1 or now - job_age[0] > age:
1259
          pending.append(job)
1260

    
1261
          # Archive 10 jobs at a time
1262
          if len(pending) >= 10:
1263
            archived_count += self._ArchiveJobsUnlocked(pending)
1264
            pending = []
1265

    
1266
    if pending:
1267
      archived_count += self._ArchiveJobsUnlocked(pending)
1268

    
1269
    return (archived_count, len(all_job_ids) - last_touched - 1)
1270

    
1271
  def _GetJobInfoUnlocked(self, job, fields):
1272
    """Returns information about a job.
1273

1274
    @type job: L{_QueuedJob}
1275
    @param job: the job which we query
1276
    @type fields: list
1277
    @param fields: names of fields to return
1278
    @rtype: list
1279
    @return: list with one element for each field
1280
    @raise errors.OpExecError: when an invalid field
1281
        has been passed
1282

1283
    """
1284
    row = []
1285
    for fname in fields:
1286
      if fname == "id":
1287
        row.append(job.id)
1288
      elif fname == "status":
1289
        row.append(job.CalcStatus())
1290
      elif fname == "ops":
1291
        row.append([op.input.__getstate__() for op in job.ops])
1292
      elif fname == "opresult":
1293
        row.append([op.result for op in job.ops])
1294
      elif fname == "opstatus":
1295
        row.append([op.status for op in job.ops])
1296
      elif fname == "oplog":
1297
        row.append([op.log for op in job.ops])
1298
      elif fname == "opstart":
1299
        row.append([op.start_timestamp for op in job.ops])
1300
      elif fname == "opend":
1301
        row.append([op.end_timestamp for op in job.ops])
1302
      elif fname == "received_ts":
1303
        row.append(job.received_timestamp)
1304
      elif fname == "start_ts":
1305
        row.append(job.start_timestamp)
1306
      elif fname == "end_ts":
1307
        row.append(job.end_timestamp)
1308
      elif fname == "summary":
1309
        row.append([op.input.Summary() for op in job.ops])
1310
      else:
1311
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1312
    return row
1313

    
1314
  @utils.LockedMethod
1315
  @_RequireOpenQueue
1316
  def QueryJobs(self, job_ids, fields):
1317
    """Returns a list of jobs in queue.
1318

1319
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1320
    processing for each job.
1321

1322
    @type job_ids: list
1323
    @param job_ids: sequence of job identifiers or None for all
1324
    @type fields: list
1325
    @param fields: names of fields to return
1326
    @rtype: list
1327
    @return: list one element per job, each element being list with
1328
        the requested fields
1329

1330
    """
1331
    jobs = []
1332

    
1333
    for job in self._GetJobsUnlocked(job_ids):
1334
      if job is None:
1335
        jobs.append(None)
1336
      else:
1337
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1338

    
1339
    return jobs
1340

    
1341
  @utils.LockedMethod
1342
  @_RequireOpenQueue
1343
  def Shutdown(self):
1344
    """Stops the job queue.
1345

1346
    This shutdowns all the worker threads an closes the queue.
1347

1348
    """
1349
    self._wpool.TerminateWorkers()
1350

    
1351
    self._queue_lock.Close()
1352
    self._queue_lock = None