Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ d04aaa2f

History | View | Annotate | Download (37.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encasulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  def __init__(self, op):
84
    """Constructor for the _QuededOpCode.
85

86
    @type op: L{opcodes.OpCode}
87
    @param op: the opcode we encapsulate
88

89
    """
90
    self.input = op
91
    self.status = constants.OP_STATUS_QUEUED
92
    self.result = None
93
    self.log = []
94
    self.start_timestamp = None
95
    self.end_timestamp = None
96

    
97
  @classmethod
98
  def Restore(cls, state):
99
    """Restore the _QueuedOpCode from the serialized form.
100

101
    @type state: dict
102
    @param state: the serialized state
103
    @rtype: _QueuedOpCode
104
    @return: a new _QueuedOpCode instance
105

106
    """
107
    obj = _QueuedOpCode.__new__(cls)
108
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109
    obj.status = state["status"]
110
    obj.result = state["result"]
111
    obj.log = state["log"]
112
    obj.start_timestamp = state.get("start_timestamp", None)
113
    obj.end_timestamp = state.get("end_timestamp", None)
114
    return obj
115

    
116
  def Serialize(self):
117
    """Serializes this _QueuedOpCode.
118

119
    @rtype: dict
120
    @return: the dictionary holding the serialized state
121

122
    """
123
    return {
124
      "input": self.input.__getstate__(),
125
      "status": self.status,
126
      "result": self.result,
127
      "log": self.log,
128
      "start_timestamp": self.start_timestamp,
129
      "end_timestamp": self.end_timestamp,
130
      }
131

    
132

    
133
class _QueuedJob(object):
134
  """In-memory job representation.
135

136
  This is what we use to track the user-submitted jobs. Locking must
137
  be taken care of by users of this class.
138

139
  @type queue: L{JobQueue}
140
  @ivar queue: the parent queue
141
  @ivar id: the job ID
142
  @type ops: list
143
  @ivar ops: the list of _QueuedOpCode that constitute the job
144
  @type run_op_index: int
145
  @ivar run_op_index: the currently executing opcode, or -1 if
146
      we didn't yet start executing
147
  @type log_serial: int
148
  @ivar log_serial: holds the index for the next log entry
149
  @ivar received_timestamp: the timestamp for when the job was received
150
  @ivar start_timestmap: the timestamp for start of execution
151
  @ivar end_timestamp: the timestamp for end of execution
152
  @ivar change: a Condition variable we use for waiting for job changes
153

154
  """
155
  def __init__(self, queue, job_id, ops):
156
    """Constructor for the _QueuedJob.
157

158
    @type queue: L{JobQueue}
159
    @param queue: our parent queue
160
    @type job_id: job_id
161
    @param job_id: our job id
162
    @type ops: list
163
    @param ops: the list of opcodes we hold, which will be encapsulated
164
        in _QueuedOpCodes
165

166
    """
167
    if not ops:
168
      # TODO: use a better exception
169
      raise Exception("No opcodes")
170

    
171
    self.queue = queue
172
    self.id = job_id
173
    self.ops = [_QueuedOpCode(op) for op in ops]
174
    self.run_op_index = -1
175
    self.log_serial = 0
176
    self.received_timestamp = TimeStampNow()
177
    self.start_timestamp = None
178
    self.end_timestamp = None
179

    
180
    # Condition to wait for changes
181
    self.change = threading.Condition(self.queue._lock)
182

    
183
  @classmethod
184
  def Restore(cls, queue, state):
185
    """Restore a _QueuedJob from serialized state:
186

187
    @type queue: L{JobQueue}
188
    @param queue: to which queue the restored job belongs
189
    @type state: dict
190
    @param state: the serialized state
191
    @rtype: _JobQueue
192
    @return: the restored _JobQueue instance
193

194
    """
195
    obj = _QueuedJob.__new__(cls)
196
    obj.queue = queue
197
    obj.id = state["id"]
198
    obj.run_op_index = state["run_op_index"]
199
    obj.received_timestamp = state.get("received_timestamp", None)
200
    obj.start_timestamp = state.get("start_timestamp", None)
201
    obj.end_timestamp = state.get("end_timestamp", None)
202

    
203
    obj.ops = []
204
    obj.log_serial = 0
205
    for op_state in state["ops"]:
206
      op = _QueuedOpCode.Restore(op_state)
207
      for log_entry in op.log:
208
        obj.log_serial = max(obj.log_serial, log_entry[0])
209
      obj.ops.append(op)
210

    
211
    # Condition to wait for changes
212
    obj.change = threading.Condition(obj.queue._lock)
213

    
214
    return obj
215

    
216
  def Serialize(self):
217
    """Serialize the _JobQueue instance.
218

219
    @rtype: dict
220
    @return: the serialized state
221

222
    """
223
    return {
224
      "id": self.id,
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227
      "start_timestamp": self.start_timestamp,
228
      "end_timestamp": self.end_timestamp,
229
      "received_timestamp": self.received_timestamp,
230
      }
231

    
232
  def CalcStatus(self):
233
    """Compute the status of this job.
234

235
    This function iterates over all the _QueuedOpCodes in the job and
236
    based on their status, computes the job status.
237

238
    The algorithm is:
239
      - if we find a cancelled, or finished with error, the job
240
        status will be the same
241
      - otherwise, the last opcode with the status one of:
242
          - waitlock
243
          - canceling
244
          - running
245

246
        will determine the job status
247

248
      - otherwise, it means either all opcodes are queued, or success,
249
        and the job status will be the same
250

251
    @return: the job status
252

253
    """
254
    status = constants.JOB_STATUS_QUEUED
255

    
256
    all_success = True
257
    for op in self.ops:
258
      if op.status == constants.OP_STATUS_SUCCESS:
259
        continue
260

    
261
      all_success = False
262

    
263
      if op.status == constants.OP_STATUS_QUEUED:
264
        pass
265
      elif op.status == constants.OP_STATUS_WAITLOCK:
266
        status = constants.JOB_STATUS_WAITLOCK
267
      elif op.status == constants.OP_STATUS_RUNNING:
268
        status = constants.JOB_STATUS_RUNNING
269
      elif op.status == constants.OP_STATUS_CANCELING:
270
        status = constants.JOB_STATUS_CANCELING
271
        break
272
      elif op.status == constants.OP_STATUS_ERROR:
273
        status = constants.JOB_STATUS_ERROR
274
        # The whole job fails if one opcode failed
275
        break
276
      elif op.status == constants.OP_STATUS_CANCELED:
277
        status = constants.OP_STATUS_CANCELED
278
        break
279

    
280
    if all_success:
281
      status = constants.JOB_STATUS_SUCCESS
282

    
283
    return status
284

    
285
  def GetLogEntries(self, newer_than):
286
    """Selectively returns the log entries.
287

288
    @type newer_than: None or int
289
    @param newer_than: if this is None, return all log enties,
290
        otherwise return only the log entries with serial higher
291
        than this value
292
    @rtype: list
293
    @return: the list of the log entries selected
294

295
    """
296
    if newer_than is None:
297
      serial = -1
298
    else:
299
      serial = newer_than
300

    
301
    entries = []
302
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304

    
305
    return entries
306

    
307

    
308
class _JobQueueWorker(workerpool.BaseWorker):
309
  """The actual job workers.
310

311
  """
312
  def _NotifyStart(self):
313
    """Mark the opcode as running, not lock-waiting.
314

315
    This is called from the mcpu code as a notifier function, when the
316
    LU is finally about to start the Exec() method. Of course, to have
317
    end-user visible results, the opcode must be initially (before
318
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319

320
    """
321
    assert self.queue, "Queue attribute is missing"
322
    assert self.opcode, "Opcode attribute is missing"
323

    
324
    self.queue.acquire()
325
    try:
326
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327
                                    constants.OP_STATUS_CANCELING)
328

    
329
      # Cancel here if we were asked to
330
      if self.opcode.status == constants.OP_STATUS_CANCELING:
331
        raise CancelJob()
332

    
333
      self.opcode.status = constants.OP_STATUS_RUNNING
334
    finally:
335
      self.queue.release()
336

    
337
  def RunTask(self, job):
338
    """Job executor.
339

340
    This functions processes a job. It is closely tied to the _QueuedJob and
341
    _QueuedOpCode classes.
342

343
    @type job: L{_QueuedJob}
344
    @param job: the job to be processed
345

346
    """
347
    logging.info("Worker %s processing job %s",
348
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
350
    self.queue = queue = job.queue
351
    try:
352
      try:
353
        count = len(job.ops)
354
        for idx, op in enumerate(job.ops):
355
          op_summary = op.input.Summary()
356
          try:
357
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358
                         op_summary)
359

    
360
            queue.acquire()
361
            try:
362
              if op.status == constants.OP_STATUS_CANCELED:
363
                raise CancelJob()
364
              assert op.status == constants.OP_STATUS_QUEUED
365
              job.run_op_index = idx
366
              op.status = constants.OP_STATUS_WAITLOCK
367
              op.result = None
368
              op.start_timestamp = TimeStampNow()
369
              if idx == 0: # first opcode
370
                job.start_timestamp = op.start_timestamp
371
              queue.UpdateJobUnlocked(job)
372

    
373
              input_opcode = op.input
374
            finally:
375
              queue.release()
376

    
377
            def _Log(*args):
378
              """Append a log entry.
379

380
              """
381
              assert len(args) < 3
382

    
383
              if len(args) == 1:
384
                log_type = constants.ELOG_MESSAGE
385
                log_msg = args[0]
386
              else:
387
                log_type, log_msg = args
388

    
389
              # The time is split to make serialization easier and not lose
390
              # precision.
391
              timestamp = utils.SplitTime(time.time())
392

    
393
              queue.acquire()
394
              try:
395
                job.log_serial += 1
396
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
397

    
398
                job.change.notifyAll()
399
              finally:
400
                queue.release()
401

    
402
            # Make sure not to hold lock while _Log is called
403
            self.opcode = op
404
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
405

    
406
            queue.acquire()
407
            try:
408
              op.status = constants.OP_STATUS_SUCCESS
409
              op.result = result
410
              op.end_timestamp = TimeStampNow()
411
              queue.UpdateJobUnlocked(job)
412
            finally:
413
              queue.release()
414

    
415
            logging.info("Op %s/%s: Successfully finished opcode %s",
416
                         idx + 1, count, op_summary)
417
          except CancelJob:
418
            # Will be handled further up
419
            raise
420
          except Exception, err:
421
            queue.acquire()
422
            try:
423
              try:
424
                op.status = constants.OP_STATUS_ERROR
425
                op.result = str(err)
426
                op.end_timestamp = TimeStampNow()
427
                logging.info("Op %s/%s: Error in opcode %s: %s",
428
                             idx + 1, count, op_summary, err)
429
              finally:
430
                queue.UpdateJobUnlocked(job)
431
            finally:
432
              queue.release()
433
            raise
434

    
435
      except CancelJob:
436
        queue.acquire()
437
        try:
438
          queue.CancelJobUnlocked(job)
439
        finally:
440
          queue.release()
441
      except errors.GenericError, err:
442
        logging.exception("Ganeti exception")
443
      except:
444
        logging.exception("Unhandled exception")
445
    finally:
446
      queue.acquire()
447
      try:
448
        try:
449
          job.run_op_idx = -1
450
          job.end_timestamp = TimeStampNow()
451
          queue.UpdateJobUnlocked(job)
452
        finally:
453
          job_id = job.id
454
          status = job.CalcStatus()
455
      finally:
456
        queue.release()
457
      logging.info("Worker %s finished job %s, status = %s",
458
                   self.worker_id, job_id, status)
459

    
460

    
461
class _JobQueueWorkerPool(workerpool.WorkerPool):
462
  """Simple class implementing a job-processing workerpool.
463

464
  """
465
  def __init__(self, queue):
466
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
467
                                              _JobQueueWorker)
468
    self.queue = queue
469

    
470

    
471
class JobQueue(object):
472
  """Quue used to manaage the jobs.
473

474
  @cvar _RE_JOB_FILE: regex matching the valid job file names
475

476
  """
477
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
478

    
479
  def _RequireOpenQueue(fn):
480
    """Decorator for "public" functions.
481

482
    This function should be used for all 'public' functions. That is,
483
    functions usually called from other classes.
484

485
    @warning: Use this decorator only after utils.LockedMethod!
486

487
    Example::
488
      @utils.LockedMethod
489
      @_RequireOpenQueue
490
      def Example(self):
491
        pass
492

493
    """
494
    def wrapper(self, *args, **kwargs):
495
      assert self._queue_lock is not None, "Queue should be open"
496
      return fn(self, *args, **kwargs)
497
    return wrapper
498

    
499
  def __init__(self, context):
500
    """Constructor for JobQueue.
501

502
    The constructor will initialize the job queue object and then
503
    start loading the current jobs from disk, either for starting them
504
    (if they were queue) or for aborting them (if they were already
505
    running).
506

507
    @type context: GanetiContext
508
    @param context: the context object for access to the configuration
509
        data and other ganeti objects
510

511
    """
512
    self.context = context
513
    self._memcache = weakref.WeakValueDictionary()
514
    self._my_hostname = utils.HostInfo().name
515

    
516
    # Locking
517
    self._lock = threading.Lock()
518
    self.acquire = self._lock.acquire
519
    self.release = self._lock.release
520

    
521
    # Initialize
522
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
523

    
524
    # Read serial file
525
    self._last_serial = jstore.ReadSerial()
526
    assert self._last_serial is not None, ("Serial file was modified between"
527
                                           " check in jstore and here")
528

    
529
    # Get initial list of nodes
530
    self._nodes = dict((n.name, n.primary_ip)
531
                       for n in self.context.cfg.GetAllNodesInfo().values()
532
                       if n.master_candidate)
533

    
534
    # Remove master node
535
    try:
536
      del self._nodes[self._my_hostname]
537
    except KeyError:
538
      pass
539

    
540
    # TODO: Check consistency across nodes
541

    
542
    # Setup worker pool
543
    self._wpool = _JobQueueWorkerPool(self)
544
    try:
545
      # We need to lock here because WorkerPool.AddTask() may start a job while
546
      # we're still doing our work.
547
      self.acquire()
548
      try:
549
        logging.info("Inspecting job queue")
550

    
551
        all_job_ids = self._GetJobIDsUnlocked()
552
        jobs_count = len(all_job_ids)
553
        lastinfo = time.time()
554
        for idx, job_id in enumerate(all_job_ids):
555
          # Give an update every 1000 jobs or 10 seconds
556
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
557
              idx == (jobs_count - 1)):
558
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
559
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
560
            lastinfo = time.time()
561

    
562
          job = self._LoadJobUnlocked(job_id)
563

    
564
          # a failure in loading the job can cause 'None' to be returned
565
          if job is None:
566
            continue
567

    
568
          status = job.CalcStatus()
569

    
570
          if status in (constants.JOB_STATUS_QUEUED, ):
571
            self._wpool.AddTask(job)
572

    
573
          elif status in (constants.JOB_STATUS_RUNNING,
574
                          constants.JOB_STATUS_WAITLOCK,
575
                          constants.JOB_STATUS_CANCELING):
576
            logging.warning("Unfinished job %s found: %s", job.id, job)
577
            try:
578
              for op in job.ops:
579
                op.status = constants.OP_STATUS_ERROR
580
                op.result = "Unclean master daemon shutdown"
581
            finally:
582
              self.UpdateJobUnlocked(job)
583

    
584
        logging.info("Job queue inspection finished")
585
      finally:
586
        self.release()
587
    except:
588
      self._wpool.TerminateWorkers()
589
      raise
590

    
591
  @utils.LockedMethod
592
  @_RequireOpenQueue
593
  def AddNode(self, node):
594
    """Register a new node with the queue.
595

596
    @type node: L{objects.Node}
597
    @param node: the node object to be added
598

599
    """
600
    node_name = node.name
601
    assert node_name != self._my_hostname
602

    
603
    # Clean queue directory on added node
604
    rpc.RpcRunner.call_jobqueue_purge(node_name)
605

    
606
    if not node.master_candidate:
607
      # remove if existing, ignoring errors
608
      self._nodes.pop(node_name, None)
609
      # and skip the replication of the job ids
610
      return
611

    
612
    # Upload the whole queue excluding archived jobs
613
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
614

    
615
    # Upload current serial file
616
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
617

    
618
    for file_name in files:
619
      # Read file content
620
      fd = open(file_name, "r")
621
      try:
622
        content = fd.read()
623
      finally:
624
        fd.close()
625

    
626
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
627
                                                  [node.primary_ip],
628
                                                  file_name, content)
629
      if not result[node_name]:
630
        logging.error("Failed to upload %s to %s", file_name, node_name)
631

    
632
    self._nodes[node_name] = node.primary_ip
633

    
634
  @utils.LockedMethod
635
  @_RequireOpenQueue
636
  def RemoveNode(self, node_name):
637
    """Callback called when removing nodes from the cluster.
638

639
    @type node_name: str
640
    @param node_name: the name of the node to remove
641

642
    """
643
    try:
644
      # The queue is removed by the "leave node" RPC call.
645
      del self._nodes[node_name]
646
    except KeyError:
647
      pass
648

    
649
  def _CheckRpcResult(self, result, nodes, failmsg):
650
    """Verifies the status of an RPC call.
651

652
    Since we aim to keep consistency should this node (the current
653
    master) fail, we will log errors if our rpc fail, and especially
654
    log the case when more than half of the nodes failes.
655

656
    @param result: the data as returned from the rpc call
657
    @type nodes: list
658
    @param nodes: the list of nodes we made the call to
659
    @type failmsg: str
660
    @param failmsg: the identifier to be used for logging
661

662
    """
663
    failed = []
664
    success = []
665

    
666
    for node in nodes:
667
      if result[node]:
668
        success.append(node)
669
      else:
670
        failed.append(node)
671

    
672
    if failed:
673
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
674

    
675
    # +1 for the master node
676
    if (len(success) + 1) < len(failed):
677
      # TODO: Handle failing nodes
678
      logging.error("More than half of the nodes failed")
679

    
680
  def _GetNodeIp(self):
681
    """Helper for returning the node name/ip list.
682

683
    @rtype: (list, list)
684
    @return: a tuple of two lists, the first one with the node
685
        names and the second one with the node addresses
686

687
    """
688
    name_list = self._nodes.keys()
689
    addr_list = [self._nodes[name] for name in name_list]
690
    return name_list, addr_list
691

    
692
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
693
    """Writes a file locally and then replicates it to all nodes.
694

695
    This function will replace the contents of a file on the local
696
    node and then replicate it to all the other nodes we have.
697

698
    @type file_name: str
699
    @param file_name: the path of the file to be replicated
700
    @type data: str
701
    @param data: the new contents of the file
702

703
    """
704
    utils.WriteFile(file_name, data=data)
705

    
706
    names, addrs = self._GetNodeIp()
707
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
708
    self._CheckRpcResult(result, self._nodes,
709
                         "Updating %s" % file_name)
710

    
711
  def _RenameFilesUnlocked(self, rename):
712
    """Renames a file locally and then replicate the change.
713

714
    This function will rename a file in the local queue directory
715
    and then replicate this rename to all the other nodes we have.
716

717
    @type rename: list of (old, new)
718
    @param rename: List containing tuples mapping old to new names
719

720
    """
721
    # Rename them locally
722
    for old, new in rename:
723
      utils.RenameFile(old, new, mkdir=True)
724

    
725
    # ... and on all nodes
726
    names, addrs = self._GetNodeIp()
727
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
728
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
729

    
730
  def _FormatJobID(self, job_id):
731
    """Convert a job ID to string format.
732

733
    Currently this just does C{str(job_id)} after performing some
734
    checks, but if we want to change the job id format this will
735
    abstract this change.
736

737
    @type job_id: int or long
738
    @param job_id: the numeric job id
739
    @rtype: str
740
    @return: the formatted job id
741

742
    """
743
    if not isinstance(job_id, (int, long)):
744
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
745
    if job_id < 0:
746
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
747

    
748
    return str(job_id)
749

    
750
  @classmethod
751
  def _GetArchiveDirectory(cls, job_id):
752
    """Returns the archive directory for a job.
753

754
    @type job_id: str
755
    @param job_id: Job identifier
756
    @rtype: str
757
    @return: Directory name
758

759
    """
760
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
761

    
762
  def _NewSerialUnlocked(self):
763
    """Generates a new job identifier.
764

765
    Job identifiers are unique during the lifetime of a cluster.
766

767
    @rtype: str
768
    @return: a string representing the job identifier.
769

770
    """
771
    # New number
772
    serial = self._last_serial + 1
773

    
774
    # Write to file
775
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
776
                                        "%s\n" % serial)
777

    
778
    # Keep it only if we were able to write the file
779
    self._last_serial = serial
780

    
781
    return self._FormatJobID(serial)
782

    
783
  @staticmethod
784
  def _GetJobPath(job_id):
785
    """Returns the job file for a given job id.
786

787
    @type job_id: str
788
    @param job_id: the job identifier
789
    @rtype: str
790
    @return: the path to the job file
791

792
    """
793
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
794

    
795
  @classmethod
796
  def _GetArchivedJobPath(cls, job_id):
797
    """Returns the archived job file for a give job id.
798

799
    @type job_id: str
800
    @param job_id: the job identifier
801
    @rtype: str
802
    @return: the path to the archived job file
803

804
    """
805
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
806
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
807

    
808
  @classmethod
809
  def _ExtractJobID(cls, name):
810
    """Extract the job id from a filename.
811

812
    @type name: str
813
    @param name: the job filename
814
    @rtype: job id or None
815
    @return: the job id corresponding to the given filename,
816
        or None if the filename does not represent a valid
817
        job file
818

819
    """
820
    m = cls._RE_JOB_FILE.match(name)
821
    if m:
822
      return m.group(1)
823
    else:
824
      return None
825

    
826
  def _GetJobIDsUnlocked(self, archived=False):
827
    """Return all known job IDs.
828

829
    If the parameter archived is True, archived jobs IDs will be
830
    included. Currently this argument is unused.
831

832
    The method only looks at disk because it's a requirement that all
833
    jobs are present on disk (so in the _memcache we don't have any
834
    extra IDs).
835

836
    @rtype: list
837
    @return: the list of job IDs
838

839
    """
840
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
841
    jlist = utils.NiceSort(jlist)
842
    return jlist
843

    
844
  def _ListJobFiles(self):
845
    """Returns the list of current job files.
846

847
    @rtype: list
848
    @return: the list of job file names
849

850
    """
851
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
852
            if self._RE_JOB_FILE.match(name)]
853

    
854
  def _LoadJobUnlocked(self, job_id):
855
    """Loads a job from the disk or memory.
856

857
    Given a job id, this will return the cached job object if
858
    existing, or try to load the job from the disk. If loading from
859
    disk, it will also add the job to the cache.
860

861
    @param job_id: the job id
862
    @rtype: L{_QueuedJob} or None
863
    @return: either None or the job object
864

865
    """
866
    job = self._memcache.get(job_id, None)
867
    if job:
868
      logging.debug("Found job %s in memcache", job_id)
869
      return job
870

    
871
    filepath = self._GetJobPath(job_id)
872
    logging.debug("Loading job from %s", filepath)
873
    try:
874
      fd = open(filepath, "r")
875
    except IOError, err:
876
      if err.errno in (errno.ENOENT, ):
877
        return None
878
      raise
879
    try:
880
      data = serializer.LoadJson(fd.read())
881
    finally:
882
      fd.close()
883

    
884
    try:
885
      job = _QueuedJob.Restore(self, data)
886
    except Exception, err:
887
      new_path = self._GetArchivedJobPath(job_id)
888
      if filepath == new_path:
889
        # job already archived (future case)
890
        logging.exception("Can't parse job %s", job_id)
891
      else:
892
        # non-archived case
893
        logging.exception("Can't parse job %s, will archive.", job_id)
894
        self._RenameFilesUnlocked([(filepath, new_path)])
895
      return None
896

    
897
    self._memcache[job_id] = job
898
    logging.debug("Added job %s to the cache", job_id)
899
    return job
900

    
901
  def _GetJobsUnlocked(self, job_ids):
902
    """Return a list of jobs based on their IDs.
903

904
    @type job_ids: list
905
    @param job_ids: either an empty list (meaning all jobs),
906
        or a list of job IDs
907
    @rtype: list
908
    @return: the list of job objects
909

910
    """
911
    if not job_ids:
912
      job_ids = self._GetJobIDsUnlocked()
913

    
914
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
915

    
916
  @staticmethod
917
  def _IsQueueMarkedDrain():
918
    """Check if the queue is marked from drain.
919

920
    This currently uses the queue drain file, which makes it a
921
    per-node flag. In the future this can be moved to the config file.
922

923
    @rtype: boolean
924
    @return: True of the job queue is marked for draining
925

926
    """
927
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
928

    
929
  @staticmethod
930
  def SetDrainFlag(drain_flag):
931
    """Sets the drain flag for the queue.
932

933
    This is similar to the function L{backend.JobQueueSetDrainFlag},
934
    and in the future we might merge them.
935

936
    @type drain_flag: boolean
937
    @param drain_flag: wheter to set or unset the drain flag
938

939
    """
940
    if drain_flag:
941
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
942
    else:
943
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
944
    return True
945

    
946
  @utils.LockedMethod
947
  @_RequireOpenQueue
948
  def SubmitJob(self, ops):
949
    """Create and store a new job.
950

951
    This enters the job into our job queue and also puts it on the new
952
    queue, in order for it to be picked up by the queue processors.
953

954
    @type ops: list
955
    @param ops: The list of OpCodes that will become the new job.
956
    @rtype: job ID
957
    @return: the job ID of the newly created job
958
    @raise errors.JobQueueDrainError: if the job is marked for draining
959

960
    """
961
    if self._IsQueueMarkedDrain():
962
      raise errors.JobQueueDrainError()
963

    
964
    # Check job queue size
965
    size = len(self._ListJobFiles())
966
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
967
      # TODO: Autoarchive jobs. Make sure it's not done on every job
968
      # submission, though.
969
      #size = ...
970
      pass
971

    
972
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
973
      raise errors.JobQueueFull()
974

    
975
    # Get job identifier
976
    job_id = self._NewSerialUnlocked()
977
    job = _QueuedJob(self, job_id, ops)
978

    
979
    # Write to disk
980
    self.UpdateJobUnlocked(job)
981

    
982
    logging.debug("Adding new job %s to the cache", job_id)
983
    self._memcache[job_id] = job
984

    
985
    # Add to worker pool
986
    self._wpool.AddTask(job)
987

    
988
    return job.id
989

    
990
  @_RequireOpenQueue
991
  def UpdateJobUnlocked(self, job):
992
    """Update a job's on disk storage.
993

994
    After a job has been modified, this function needs to be called in
995
    order to write the changes to disk and replicate them to the other
996
    nodes.
997

998
    @type job: L{_QueuedJob}
999
    @param job: the changed job
1000

1001
    """
1002
    filename = self._GetJobPath(job.id)
1003
    data = serializer.DumpJson(job.Serialize(), indent=False)
1004
    logging.debug("Writing job %s to %s", job.id, filename)
1005
    self._WriteAndReplicateFileUnlocked(filename, data)
1006

    
1007
    # Notify waiters about potential changes
1008
    job.change.notifyAll()
1009

    
1010
  @utils.LockedMethod
1011
  @_RequireOpenQueue
1012
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1013
                        timeout):
1014
    """Waits for changes in a job.
1015

1016
    @type job_id: string
1017
    @param job_id: Job identifier
1018
    @type fields: list of strings
1019
    @param fields: Which fields to check for changes
1020
    @type prev_job_info: list or None
1021
    @param prev_job_info: Last job information returned
1022
    @type prev_log_serial: int
1023
    @param prev_log_serial: Last job message serial number
1024
    @type timeout: float
1025
    @param timeout: maximum time to wait
1026
    @rtype: tuple (job info, log entries)
1027
    @return: a tuple of the job information as required via
1028
        the fields parameter, and the log entries as a list
1029

1030
        if the job has not changed and the timeout has expired,
1031
        we instead return a special value,
1032
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1033
        as such by the clients
1034

1035
    """
1036
    logging.debug("Waiting for changes in job %s", job_id)
1037
    end_time = time.time() + timeout
1038
    while True:
1039
      delta_time = end_time - time.time()
1040
      if delta_time < 0:
1041
        return constants.JOB_NOTCHANGED
1042

    
1043
      job = self._LoadJobUnlocked(job_id)
1044
      if not job:
1045
        logging.debug("Job %s not found", job_id)
1046
        break
1047

    
1048
      status = job.CalcStatus()
1049
      job_info = self._GetJobInfoUnlocked(job, fields)
1050
      log_entries = job.GetLogEntries(prev_log_serial)
1051

    
1052
      # Serializing and deserializing data can cause type changes (e.g. from
1053
      # tuple to list) or precision loss. We're doing it here so that we get
1054
      # the same modifications as the data received from the client. Without
1055
      # this, the comparison afterwards might fail without the data being
1056
      # significantly different.
1057
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1058
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1059

    
1060
      if status not in (constants.JOB_STATUS_QUEUED,
1061
                        constants.JOB_STATUS_RUNNING,
1062
                        constants.JOB_STATUS_WAITLOCK):
1063
        # Don't even try to wait if the job is no longer running, there will be
1064
        # no changes.
1065
        break
1066

    
1067
      if (prev_job_info != job_info or
1068
          (log_entries and prev_log_serial != log_entries[0][0])):
1069
        break
1070

    
1071
      logging.debug("Waiting again")
1072

    
1073
      # Release the queue lock while waiting
1074
      job.change.wait(delta_time)
1075

    
1076
    logging.debug("Job %s changed", job_id)
1077

    
1078
    return (job_info, log_entries)
1079

    
1080
  @utils.LockedMethod
1081
  @_RequireOpenQueue
1082
  def CancelJob(self, job_id):
1083
    """Cancels a job.
1084

1085
    This will only succeed if the job has not started yet.
1086

1087
    @type job_id: string
1088
    @param job_id: job ID of job to be cancelled.
1089

1090
    """
1091
    logging.info("Cancelling job %s", job_id)
1092

    
1093
    job = self._LoadJobUnlocked(job_id)
1094
    if not job:
1095
      logging.debug("Job %s not found", job_id)
1096
      return (False, "Job %s not found" % job_id)
1097

    
1098
    job_status = job.CalcStatus()
1099

    
1100
    if job_status not in (constants.JOB_STATUS_QUEUED,
1101
                          constants.JOB_STATUS_WAITLOCK):
1102
      logging.debug("Job %s is no longer in the queue", job.id)
1103
      return (False, "Job %s is no longer in the queue" % job.id)
1104

    
1105
    if job_status == constants.JOB_STATUS_QUEUED:
1106
      self.CancelJobUnlocked(job)
1107
      return (True, "Job %s canceled" % job.id)
1108

    
1109
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1110
      # The worker will notice the new status and cancel the job
1111
      try:
1112
        for op in job.ops:
1113
          op.status = constants.OP_STATUS_CANCELING
1114
      finally:
1115
        self.UpdateJobUnlocked(job)
1116
      return (True, "Job %s will be canceled" % job.id)
1117

    
1118
  @_RequireOpenQueue
1119
  def CancelJobUnlocked(self, job):
1120
    """Marks a job as canceled.
1121

1122
    """
1123
    try:
1124
      for op in job.ops:
1125
        op.status = constants.OP_STATUS_CANCELED
1126
        op.result = "Job canceled by request"
1127
    finally:
1128
      self.UpdateJobUnlocked(job)
1129

    
1130
  @_RequireOpenQueue
1131
  def _ArchiveJobsUnlocked(self, jobs):
1132
    """Archives jobs.
1133

1134
    @type jobs: list of L{_QueuedJob}
1135
    @param jobs: Job objects
1136
    @rtype: int
1137
    @return: Number of archived jobs
1138

1139
    """
1140
    archive_jobs = []
1141
    rename_files = []
1142
    for job in jobs:
1143
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1144
                                  constants.JOB_STATUS_SUCCESS,
1145
                                  constants.JOB_STATUS_ERROR):
1146
        logging.debug("Job %s is not yet done", job.id)
1147
        continue
1148

    
1149
      archive_jobs.append(job)
1150

    
1151
      old = self._GetJobPath(job.id)
1152
      new = self._GetArchivedJobPath(job.id)
1153
      rename_files.append((old, new))
1154

    
1155
    # TODO: What if 1..n files fail to rename?
1156
    self._RenameFilesUnlocked(rename_files)
1157

    
1158
    logging.debug("Successfully archived job(s) %s",
1159
                  ", ".join(job.id for job in archive_jobs))
1160

    
1161
    return len(archive_jobs)
1162

    
1163
  @utils.LockedMethod
1164
  @_RequireOpenQueue
1165
  def ArchiveJob(self, job_id):
1166
    """Archives a job.
1167

1168
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1169

1170
    @type job_id: string
1171
    @param job_id: Job ID of job to be archived.
1172
    @rtype: bool
1173
    @return: Whether job was archived
1174

1175
    """
1176
    logging.info("Archiving job %s", job_id)
1177

    
1178
    job = self._LoadJobUnlocked(job_id)
1179
    if not job:
1180
      logging.debug("Job %s not found", job_id)
1181
      return False
1182

    
1183
    return self._ArchiveJobsUnlocked([job]) == 1
1184

    
1185
  @utils.LockedMethod
1186
  @_RequireOpenQueue
1187
  def AutoArchiveJobs(self, age, timeout):
1188
    """Archives all jobs based on age.
1189

1190
    The method will archive all jobs which are older than the age
1191
    parameter. For jobs that don't have an end timestamp, the start
1192
    timestamp will be considered. The special '-1' age will cause
1193
    archival of all jobs (that are not running or queued).
1194

1195
    @type age: int
1196
    @param age: the minimum age in seconds
1197

1198
    """
1199
    logging.info("Archiving jobs with age more than %s seconds", age)
1200

    
1201
    now = time.time()
1202
    end_time = now + timeout
1203
    archived_count = 0
1204
    last_touched = 0
1205

    
1206
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1207
    pending = []
1208
    for idx, job_id in enumerate(all_job_ids):
1209
      last_touched = idx
1210

    
1211
      # Not optimal because jobs could be pending
1212
      # TODO: Measure average duration for job archival and take number of
1213
      # pending jobs into account.
1214
      if time.time() > end_time:
1215
        break
1216

    
1217
      # Returns None if the job failed to load
1218
      job = self._LoadJobUnlocked(job_id)
1219
      if job:
1220
        if job.end_timestamp is None:
1221
          if job.start_timestamp is None:
1222
            job_age = job.received_timestamp
1223
          else:
1224
            job_age = job.start_timestamp
1225
        else:
1226
          job_age = job.end_timestamp
1227

    
1228
        if age == -1 or now - job_age[0] > age:
1229
          pending.append(job)
1230

    
1231
          # Archive 10 jobs at a time
1232
          if len(pending) >= 10:
1233
            archived_count += self._ArchiveJobsUnlocked(pending)
1234
            pending = []
1235

    
1236
    if pending:
1237
      archived_count += self._ArchiveJobsUnlocked(pending)
1238

    
1239
    return (archived_count, len(all_job_ids) - last_touched - 1)
1240

    
1241
  def _GetJobInfoUnlocked(self, job, fields):
1242
    """Returns information about a job.
1243

1244
    @type job: L{_QueuedJob}
1245
    @param job: the job which we query
1246
    @type fields: list
1247
    @param fields: names of fields to return
1248
    @rtype: list
1249
    @return: list with one element for each field
1250
    @raise errors.OpExecError: when an invalid field
1251
        has been passed
1252

1253
    """
1254
    row = []
1255
    for fname in fields:
1256
      if fname == "id":
1257
        row.append(job.id)
1258
      elif fname == "status":
1259
        row.append(job.CalcStatus())
1260
      elif fname == "ops":
1261
        row.append([op.input.__getstate__() for op in job.ops])
1262
      elif fname == "opresult":
1263
        row.append([op.result for op in job.ops])
1264
      elif fname == "opstatus":
1265
        row.append([op.status for op in job.ops])
1266
      elif fname == "oplog":
1267
        row.append([op.log for op in job.ops])
1268
      elif fname == "opstart":
1269
        row.append([op.start_timestamp for op in job.ops])
1270
      elif fname == "opend":
1271
        row.append([op.end_timestamp for op in job.ops])
1272
      elif fname == "received_ts":
1273
        row.append(job.received_timestamp)
1274
      elif fname == "start_ts":
1275
        row.append(job.start_timestamp)
1276
      elif fname == "end_ts":
1277
        row.append(job.end_timestamp)
1278
      elif fname == "summary":
1279
        row.append([op.input.Summary() for op in job.ops])
1280
      else:
1281
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1282
    return row
1283

    
1284
  @utils.LockedMethod
1285
  @_RequireOpenQueue
1286
  def QueryJobs(self, job_ids, fields):
1287
    """Returns a list of jobs in queue.
1288

1289
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1290
    processing for each job.
1291

1292
    @type job_ids: list
1293
    @param job_ids: sequence of job identifiers or None for all
1294
    @type fields: list
1295
    @param fields: names of fields to return
1296
    @rtype: list
1297
    @return: list one element per job, each element being list with
1298
        the requested fields
1299

1300
    """
1301
    jobs = []
1302

    
1303
    for job in self._GetJobsUnlocked(job_ids):
1304
      if job is None:
1305
        jobs.append(None)
1306
      else:
1307
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1308

    
1309
    return jobs
1310

    
1311
  @utils.LockedMethod
1312
  @_RequireOpenQueue
1313
  def Shutdown(self):
1314
    """Stops the job queue.
1315

1316
    This shutdowns all the worker threads an closes the queue.
1317

1318
    """
1319
    self._wpool.TerminateWorkers()
1320

    
1321
    self._queue_lock.Close()
1322
    self._queue_lock = None