Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ f6424741

History | View | Annotate | Download (38.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar change: a Condition variable we use for waiting for job changes
157

158
  """
159
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "__weakref__"]
163

    
164
  def __init__(self, queue, job_id, ops):
165
    """Constructor for the _QueuedJob.
166

167
    @type queue: L{JobQueue}
168
    @param queue: our parent queue
169
    @type job_id: job_id
170
    @param job_id: our job id
171
    @type ops: list
172
    @param ops: the list of opcodes we hold, which will be encapsulated
173
        in _QueuedOpCodes
174

175
    """
176
    if not ops:
177
      # TODO: use a better exception
178
      raise Exception("No opcodes")
179

    
180
    self.queue = queue
181
    self.id = job_id
182
    self.ops = [_QueuedOpCode(op) for op in ops]
183
    self.run_op_index = -1
184
    self.log_serial = 0
185
    self.received_timestamp = TimeStampNow()
186
    self.start_timestamp = None
187
    self.end_timestamp = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.run_op_index = state["run_op_index"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    obj.ops = []
213
    obj.log_serial = 0
214
    for op_state in state["ops"]:
215
      op = _QueuedOpCode.Restore(op_state)
216
      for log_entry in op.log:
217
        obj.log_serial = max(obj.log_serial, log_entry[0])
218
      obj.ops.append(op)
219

    
220
    # Condition to wait for changes
221
    obj.change = threading.Condition(obj.queue._lock)
222

    
223
    return obj
224

    
225
  def Serialize(self):
226
    """Serialize the _JobQueue instance.
227

228
    @rtype: dict
229
    @return: the serialized state
230

231
    """
232
    return {
233
      "id": self.id,
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
      "start_timestamp": self.start_timestamp,
237
      "end_timestamp": self.end_timestamp,
238
      "received_timestamp": self.received_timestamp,
239
      }
240

    
241
  def CalcStatus(self):
242
    """Compute the status of this job.
243

244
    This function iterates over all the _QueuedOpCodes in the job and
245
    based on their status, computes the job status.
246

247
    The algorithm is:
248
      - if we find a cancelled, or finished with error, the job
249
        status will be the same
250
      - otherwise, the last opcode with the status one of:
251
          - waitlock
252
          - canceling
253
          - running
254

255
        will determine the job status
256

257
      - otherwise, it means either all opcodes are queued, or success,
258
        and the job status will be the same
259

260
    @return: the job status
261

262
    """
263
    status = constants.JOB_STATUS_QUEUED
264

    
265
    all_success = True
266
    for op in self.ops:
267
      if op.status == constants.OP_STATUS_SUCCESS:
268
        continue
269

    
270
      all_success = False
271

    
272
      if op.status == constants.OP_STATUS_QUEUED:
273
        pass
274
      elif op.status == constants.OP_STATUS_WAITLOCK:
275
        status = constants.JOB_STATUS_WAITLOCK
276
      elif op.status == constants.OP_STATUS_RUNNING:
277
        status = constants.JOB_STATUS_RUNNING
278
      elif op.status == constants.OP_STATUS_CANCELING:
279
        status = constants.JOB_STATUS_CANCELING
280
        break
281
      elif op.status == constants.OP_STATUS_ERROR:
282
        status = constants.JOB_STATUS_ERROR
283
        # The whole job fails if one opcode failed
284
        break
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
        status = constants.OP_STATUS_CANCELED
287
        break
288

    
289
    if all_success:
290
      status = constants.JOB_STATUS_SUCCESS
291

    
292
    return status
293

    
294
  def GetLogEntries(self, newer_than):
295
    """Selectively returns the log entries.
296

297
    @type newer_than: None or int
298
    @param newer_than: if this is None, return all log entries,
299
        otherwise return only the log entries with serial higher
300
        than this value
301
    @rtype: list
302
    @return: the list of the log entries selected
303

304
    """
305
    if newer_than is None:
306
      serial = -1
307
    else:
308
      serial = newer_than
309

    
310
    entries = []
311
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313

    
314
    return entries
315

    
316

    
317
class _JobQueueWorker(workerpool.BaseWorker):
318
  """The actual job workers.
319

320
  """
321
  def _NotifyStart(self):
322
    """Mark the opcode as running, not lock-waiting.
323

324
    This is called from the mcpu code as a notifier function, when the
325
    LU is finally about to start the Exec() method. Of course, to have
326
    end-user visible results, the opcode must be initially (before
327
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
328

329
    """
330
    assert self.queue, "Queue attribute is missing"
331
    assert self.opcode, "Opcode attribute is missing"
332

    
333
    self.queue.acquire()
334
    try:
335
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
336
                                    constants.OP_STATUS_CANCELING)
337

    
338
      # Cancel here if we were asked to
339
      if self.opcode.status == constants.OP_STATUS_CANCELING:
340
        raise CancelJob()
341

    
342
      self.opcode.status = constants.OP_STATUS_RUNNING
343
    finally:
344
      self.queue.release()
345

    
346
  def RunTask(self, job):
347
    """Job executor.
348

349
    This functions processes a job. It is closely tied to the _QueuedJob and
350
    _QueuedOpCode classes.
351

352
    @type job: L{_QueuedJob}
353
    @param job: the job to be processed
354

355
    """
356
    logging.info("Worker %s processing job %s",
357
                  self.worker_id, job.id)
358
    proc = mcpu.Processor(self.pool.queue.context)
359
    self.queue = queue = job.queue
360
    try:
361
      try:
362
        count = len(job.ops)
363
        for idx, op in enumerate(job.ops):
364
          op_summary = op.input.Summary()
365
          if op.status == constants.OP_STATUS_SUCCESS:
366
            # this is a job that was partially completed before master
367
            # daemon shutdown, so it can be expected that some opcodes
368
            # are already completed successfully (if any did error
369
            # out, then the whole job should have been aborted and not
370
            # resubmitted for processing)
371
            logging.info("Op %s/%s: opcode %s already processed, skipping",
372
                         idx + 1, count, op_summary)
373
            continue
374
          try:
375
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
376
                         op_summary)
377

    
378
            queue.acquire()
379
            try:
380
              if op.status == constants.OP_STATUS_CANCELED:
381
                raise CancelJob()
382
              assert op.status == constants.OP_STATUS_QUEUED
383
              job.run_op_index = idx
384
              op.status = constants.OP_STATUS_WAITLOCK
385
              op.result = None
386
              op.start_timestamp = TimeStampNow()
387
              if idx == 0: # first opcode
388
                job.start_timestamp = op.start_timestamp
389
              queue.UpdateJobUnlocked(job)
390

    
391
              input_opcode = op.input
392
            finally:
393
              queue.release()
394

    
395
            def _Log(*args):
396
              """Append a log entry.
397

398
              """
399
              assert len(args) < 3
400

    
401
              if len(args) == 1:
402
                log_type = constants.ELOG_MESSAGE
403
                log_msg = args[0]
404
              else:
405
                log_type, log_msg = args
406

    
407
              # The time is split to make serialization easier and not lose
408
              # precision.
409
              timestamp = utils.SplitTime(time.time())
410

    
411
              queue.acquire()
412
              try:
413
                job.log_serial += 1
414
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
415

    
416
                job.change.notifyAll()
417
              finally:
418
                queue.release()
419

    
420
            # Make sure not to hold lock while _Log is called
421
            self.opcode = op
422
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
423

    
424
            queue.acquire()
425
            try:
426
              op.status = constants.OP_STATUS_SUCCESS
427
              op.result = result
428
              op.end_timestamp = TimeStampNow()
429
              queue.UpdateJobUnlocked(job)
430
            finally:
431
              queue.release()
432

    
433
            logging.info("Op %s/%s: Successfully finished opcode %s",
434
                         idx + 1, count, op_summary)
435
          except CancelJob:
436
            # Will be handled further up
437
            raise
438
          except Exception, err:
439
            queue.acquire()
440
            try:
441
              try:
442
                op.status = constants.OP_STATUS_ERROR
443
                op.result = str(err)
444
                op.end_timestamp = TimeStampNow()
445
                logging.info("Op %s/%s: Error in opcode %s: %s",
446
                             idx + 1, count, op_summary, err)
447
              finally:
448
                queue.UpdateJobUnlocked(job)
449
            finally:
450
              queue.release()
451
            raise
452

    
453
      except CancelJob:
454
        queue.acquire()
455
        try:
456
          queue.CancelJobUnlocked(job)
457
        finally:
458
          queue.release()
459
      except errors.GenericError, err:
460
        logging.exception("Ganeti exception")
461
      except:
462
        logging.exception("Unhandled exception")
463
    finally:
464
      queue.acquire()
465
      try:
466
        try:
467
          job.run_op_index = -1
468
          job.end_timestamp = TimeStampNow()
469
          queue.UpdateJobUnlocked(job)
470
        finally:
471
          job_id = job.id
472
          status = job.CalcStatus()
473
      finally:
474
        queue.release()
475
      logging.info("Worker %s finished job %s, status = %s",
476
                   self.worker_id, job_id, status)
477

    
478

    
479
class _JobQueueWorkerPool(workerpool.WorkerPool):
480
  """Simple class implementing a job-processing workerpool.
481

482
  """
483
  def __init__(self, queue):
484
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
485
                                              _JobQueueWorker)
486
    self.queue = queue
487

    
488

    
489
class JobQueue(object):
490
  """Queue used to manage the jobs.
491

492
  @cvar _RE_JOB_FILE: regex matching the valid job file names
493

494
  """
495
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
496

    
497
  def _RequireOpenQueue(fn):
498
    """Decorator for "public" functions.
499

500
    This function should be used for all 'public' functions. That is,
501
    functions usually called from other classes.
502

503
    @warning: Use this decorator only after utils.LockedMethod!
504

505
    Example::
506
      @utils.LockedMethod
507
      @_RequireOpenQueue
508
      def Example(self):
509
        pass
510

511
    """
512
    def wrapper(self, *args, **kwargs):
513
      assert self._queue_lock is not None, "Queue should be open"
514
      return fn(self, *args, **kwargs)
515
    return wrapper
516

    
517
  def __init__(self, context):
518
    """Constructor for JobQueue.
519

520
    The constructor will initialize the job queue object and then
521
    start loading the current jobs from disk, either for starting them
522
    (if they were queue) or for aborting them (if they were already
523
    running).
524

525
    @type context: GanetiContext
526
    @param context: the context object for access to the configuration
527
        data and other ganeti objects
528

529
    """
530
    self.context = context
531
    self._memcache = weakref.WeakValueDictionary()
532
    self._my_hostname = utils.HostInfo().name
533

    
534
    # Locking
535
    self._lock = threading.Lock()
536
    self.acquire = self._lock.acquire
537
    self.release = self._lock.release
538

    
539
    # Initialize
540
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
541

    
542
    # Read serial file
543
    self._last_serial = jstore.ReadSerial()
544
    assert self._last_serial is not None, ("Serial file was modified between"
545
                                           " check in jstore and here")
546

    
547
    # Get initial list of nodes
548
    self._nodes = dict((n.name, n.primary_ip)
549
                       for n in self.context.cfg.GetAllNodesInfo().values()
550
                       if n.master_candidate)
551

    
552
    # Remove master node
553
    try:
554
      del self._nodes[self._my_hostname]
555
    except KeyError:
556
      pass
557

    
558
    # TODO: Check consistency across nodes
559

    
560
    # Setup worker pool
561
    self._wpool = _JobQueueWorkerPool(self)
562
    try:
563
      # We need to lock here because WorkerPool.AddTask() may start a job while
564
      # we're still doing our work.
565
      self.acquire()
566
      try:
567
        logging.info("Inspecting job queue")
568

    
569
        all_job_ids = self._GetJobIDsUnlocked()
570
        jobs_count = len(all_job_ids)
571
        lastinfo = time.time()
572
        for idx, job_id in enumerate(all_job_ids):
573
          # Give an update every 1000 jobs or 10 seconds
574
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
575
              idx == (jobs_count - 1)):
576
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
577
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
578
            lastinfo = time.time()
579

    
580
          job = self._LoadJobUnlocked(job_id)
581

    
582
          # a failure in loading the job can cause 'None' to be returned
583
          if job is None:
584
            continue
585

    
586
          status = job.CalcStatus()
587

    
588
          if status in (constants.JOB_STATUS_QUEUED, ):
589
            self._wpool.AddTask(job)
590

    
591
          elif status in (constants.JOB_STATUS_RUNNING,
592
                          constants.JOB_STATUS_WAITLOCK,
593
                          constants.JOB_STATUS_CANCELING):
594
            logging.warning("Unfinished job %s found: %s", job.id, job)
595
            try:
596
              for op in job.ops:
597
                op.status = constants.OP_STATUS_ERROR
598
                op.result = "Unclean master daemon shutdown"
599
            finally:
600
              self.UpdateJobUnlocked(job)
601

    
602
        logging.info("Job queue inspection finished")
603
      finally:
604
        self.release()
605
    except:
606
      self._wpool.TerminateWorkers()
607
      raise
608

    
609
  @utils.LockedMethod
610
  @_RequireOpenQueue
611
  def AddNode(self, node):
612
    """Register a new node with the queue.
613

614
    @type node: L{objects.Node}
615
    @param node: the node object to be added
616

617
    """
618
    node_name = node.name
619
    assert node_name != self._my_hostname
620

    
621
    # Clean queue directory on added node
622
    rpc.RpcRunner.call_jobqueue_purge(node_name)
623

    
624
    if not node.master_candidate:
625
      # remove if existing, ignoring errors
626
      self._nodes.pop(node_name, None)
627
      # and skip the replication of the job ids
628
      return
629

    
630
    # Upload the whole queue excluding archived jobs
631
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
632

    
633
    # Upload current serial file
634
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
635

    
636
    for file_name in files:
637
      # Read file content
638
      fd = open(file_name, "r")
639
      try:
640
        content = fd.read()
641
      finally:
642
        fd.close()
643

    
644
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
645
                                                  [node.primary_ip],
646
                                                  file_name, content)
647
      if not result[node_name]:
648
        logging.error("Failed to upload %s to %s", file_name, node_name)
649

    
650
    self._nodes[node_name] = node.primary_ip
651

    
652
  @utils.LockedMethod
653
  @_RequireOpenQueue
654
  def RemoveNode(self, node_name):
655
    """Callback called when removing nodes from the cluster.
656

657
    @type node_name: str
658
    @param node_name: the name of the node to remove
659

660
    """
661
    try:
662
      # The queue is removed by the "leave node" RPC call.
663
      del self._nodes[node_name]
664
    except KeyError:
665
      pass
666

    
667
  def _CheckRpcResult(self, result, nodes, failmsg):
668
    """Verifies the status of an RPC call.
669

670
    Since we aim to keep consistency should this node (the current
671
    master) fail, we will log errors if our rpc fail, and especially
672
    log the case when more than half of the nodes fails.
673

674
    @param result: the data as returned from the rpc call
675
    @type nodes: list
676
    @param nodes: the list of nodes we made the call to
677
    @type failmsg: str
678
    @param failmsg: the identifier to be used for logging
679

680
    """
681
    failed = []
682
    success = []
683

    
684
    for node in nodes:
685
      if result[node]:
686
        success.append(node)
687
      else:
688
        failed.append(node)
689

    
690
    if failed:
691
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
692

    
693
    # +1 for the master node
694
    if (len(success) + 1) < len(failed):
695
      # TODO: Handle failing nodes
696
      logging.error("More than half of the nodes failed")
697

    
698
  def _GetNodeIp(self):
699
    """Helper for returning the node name/ip list.
700

701
    @rtype: (list, list)
702
    @return: a tuple of two lists, the first one with the node
703
        names and the second one with the node addresses
704

705
    """
706
    name_list = self._nodes.keys()
707
    addr_list = [self._nodes[name] for name in name_list]
708
    return name_list, addr_list
709

    
710
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
711
    """Writes a file locally and then replicates it to all nodes.
712

713
    This function will replace the contents of a file on the local
714
    node and then replicate it to all the other nodes we have.
715

716
    @type file_name: str
717
    @param file_name: the path of the file to be replicated
718
    @type data: str
719
    @param data: the new contents of the file
720

721
    """
722
    utils.WriteFile(file_name, data=data)
723

    
724
    names, addrs = self._GetNodeIp()
725
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
726
    self._CheckRpcResult(result, self._nodes,
727
                         "Updating %s" % file_name)
728

    
729
  def _RenameFilesUnlocked(self, rename):
730
    """Renames a file locally and then replicate the change.
731

732
    This function will rename a file in the local queue directory
733
    and then replicate this rename to all the other nodes we have.
734

735
    @type rename: list of (old, new)
736
    @param rename: List containing tuples mapping old to new names
737

738
    """
739
    # Rename them locally
740
    for old, new in rename:
741
      utils.RenameFile(old, new, mkdir=True)
742

    
743
    # ... and on all nodes
744
    names, addrs = self._GetNodeIp()
745
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
746
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
747

    
748
  def _FormatJobID(self, job_id):
749
    """Convert a job ID to string format.
750

751
    Currently this just does C{str(job_id)} after performing some
752
    checks, but if we want to change the job id format this will
753
    abstract this change.
754

755
    @type job_id: int or long
756
    @param job_id: the numeric job id
757
    @rtype: str
758
    @return: the formatted job id
759

760
    """
761
    if not isinstance(job_id, (int, long)):
762
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
763
    if job_id < 0:
764
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
765

    
766
    return str(job_id)
767

    
768
  @classmethod
769
  def _GetArchiveDirectory(cls, job_id):
770
    """Returns the archive directory for a job.
771

772
    @type job_id: str
773
    @param job_id: Job identifier
774
    @rtype: str
775
    @return: Directory name
776

777
    """
778
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
779

    
780
  def _NewSerialUnlocked(self):
781
    """Generates a new job identifier.
782

783
    Job identifiers are unique during the lifetime of a cluster.
784

785
    @rtype: str
786
    @return: a string representing the job identifier.
787

788
    """
789
    # New number
790
    serial = self._last_serial + 1
791

    
792
    # Write to file
793
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
794
                                        "%s\n" % serial)
795

    
796
    # Keep it only if we were able to write the file
797
    self._last_serial = serial
798

    
799
    return self._FormatJobID(serial)
800

    
801
  @staticmethod
802
  def _GetJobPath(job_id):
803
    """Returns the job file for a given job id.
804

805
    @type job_id: str
806
    @param job_id: the job identifier
807
    @rtype: str
808
    @return: the path to the job file
809

810
    """
811
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
812

    
813
  @classmethod
814
  def _GetArchivedJobPath(cls, job_id):
815
    """Returns the archived job file for a give job id.
816

817
    @type job_id: str
818
    @param job_id: the job identifier
819
    @rtype: str
820
    @return: the path to the archived job file
821

822
    """
823
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
824
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
825

    
826
  @classmethod
827
  def _ExtractJobID(cls, name):
828
    """Extract the job id from a filename.
829

830
    @type name: str
831
    @param name: the job filename
832
    @rtype: job id or None
833
    @return: the job id corresponding to the given filename,
834
        or None if the filename does not represent a valid
835
        job file
836

837
    """
838
    m = cls._RE_JOB_FILE.match(name)
839
    if m:
840
      return m.group(1)
841
    else:
842
      return None
843

    
844
  def _GetJobIDsUnlocked(self, archived=False):
845
    """Return all known job IDs.
846

847
    If the parameter archived is True, archived jobs IDs will be
848
    included. Currently this argument is unused.
849

850
    The method only looks at disk because it's a requirement that all
851
    jobs are present on disk (so in the _memcache we don't have any
852
    extra IDs).
853

854
    @rtype: list
855
    @return: the list of job IDs
856

857
    """
858
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
859
    jlist = utils.NiceSort(jlist)
860
    return jlist
861

    
862
  def _ListJobFiles(self):
863
    """Returns the list of current job files.
864

865
    @rtype: list
866
    @return: the list of job file names
867

868
    """
869
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
870
            if self._RE_JOB_FILE.match(name)]
871

    
872
  def _LoadJobUnlocked(self, job_id):
873
    """Loads a job from the disk or memory.
874

875
    Given a job id, this will return the cached job object if
876
    existing, or try to load the job from the disk. If loading from
877
    disk, it will also add the job to the cache.
878

879
    @param job_id: the job id
880
    @rtype: L{_QueuedJob} or None
881
    @return: either None or the job object
882

883
    """
884
    job = self._memcache.get(job_id, None)
885
    if job:
886
      logging.debug("Found job %s in memcache", job_id)
887
      return job
888

    
889
    filepath = self._GetJobPath(job_id)
890
    logging.debug("Loading job from %s", filepath)
891
    try:
892
      fd = open(filepath, "r")
893
    except IOError, err:
894
      if err.errno in (errno.ENOENT, ):
895
        return None
896
      raise
897
    try:
898
      data = serializer.LoadJson(fd.read())
899
    finally:
900
      fd.close()
901

    
902
    try:
903
      job = _QueuedJob.Restore(self, data)
904
    except Exception, err:
905
      new_path = self._GetArchivedJobPath(job_id)
906
      if filepath == new_path:
907
        # job already archived (future case)
908
        logging.exception("Can't parse job %s", job_id)
909
      else:
910
        # non-archived case
911
        logging.exception("Can't parse job %s, will archive.", job_id)
912
        self._RenameFilesUnlocked([(filepath, new_path)])
913
      return None
914

    
915
    self._memcache[job_id] = job
916
    logging.debug("Added job %s to the cache", job_id)
917
    return job
918

    
919
  def _GetJobsUnlocked(self, job_ids):
920
    """Return a list of jobs based on their IDs.
921

922
    @type job_ids: list
923
    @param job_ids: either an empty list (meaning all jobs),
924
        or a list of job IDs
925
    @rtype: list
926
    @return: the list of job objects
927

928
    """
929
    if not job_ids:
930
      job_ids = self._GetJobIDsUnlocked()
931

    
932
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
933

    
934
  @staticmethod
935
  def _IsQueueMarkedDrain():
936
    """Check if the queue is marked from drain.
937

938
    This currently uses the queue drain file, which makes it a
939
    per-node flag. In the future this can be moved to the config file.
940

941
    @rtype: boolean
942
    @return: True of the job queue is marked for draining
943

944
    """
945
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
946

    
947
  @staticmethod
948
  def SetDrainFlag(drain_flag):
949
    """Sets the drain flag for the queue.
950

951
    This is similar to the function L{backend.JobQueueSetDrainFlag},
952
    and in the future we might merge them.
953

954
    @type drain_flag: boolean
955
    @param drain_flag: Whether to set or unset the drain flag
956

957
    """
958
    if drain_flag:
959
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
960
    else:
961
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
962
    return True
963

    
964
  @utils.LockedMethod
965
  @_RequireOpenQueue
966
  def SubmitJob(self, ops):
967
    """Create and store a new job.
968

969
    This enters the job into our job queue and also puts it on the new
970
    queue, in order for it to be picked up by the queue processors.
971

972
    @type ops: list
973
    @param ops: The list of OpCodes that will become the new job.
974
    @rtype: job ID
975
    @return: the job ID of the newly created job
976
    @raise errors.JobQueueDrainError: if the job is marked for draining
977

978
    """
979
    if self._IsQueueMarkedDrain():
980
      raise errors.JobQueueDrainError()
981

    
982
    # Check job queue size
983
    size = len(self._ListJobFiles())
984
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
985
      # TODO: Autoarchive jobs. Make sure it's not done on every job
986
      # submission, though.
987
      #size = ...
988
      pass
989

    
990
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
991
      raise errors.JobQueueFull()
992

    
993
    # Get job identifier
994
    job_id = self._NewSerialUnlocked()
995
    job = _QueuedJob(self, job_id, ops)
996

    
997
    # Write to disk
998
    self.UpdateJobUnlocked(job)
999

    
1000
    logging.debug("Adding new job %s to the cache", job_id)
1001
    self._memcache[job_id] = job
1002

    
1003
    # Add to worker pool
1004
    self._wpool.AddTask(job)
1005

    
1006
    return job.id
1007

    
1008
  @_RequireOpenQueue
1009
  def UpdateJobUnlocked(self, job):
1010
    """Update a job's on disk storage.
1011

1012
    After a job has been modified, this function needs to be called in
1013
    order to write the changes to disk and replicate them to the other
1014
    nodes.
1015

1016
    @type job: L{_QueuedJob}
1017
    @param job: the changed job
1018

1019
    """
1020
    filename = self._GetJobPath(job.id)
1021
    data = serializer.DumpJson(job.Serialize(), indent=False)
1022
    logging.debug("Writing job %s to %s", job.id, filename)
1023
    self._WriteAndReplicateFileUnlocked(filename, data)
1024

    
1025
    # Notify waiters about potential changes
1026
    job.change.notifyAll()
1027

    
1028
  @utils.LockedMethod
1029
  @_RequireOpenQueue
1030
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1031
                        timeout):
1032
    """Waits for changes in a job.
1033

1034
    @type job_id: string
1035
    @param job_id: Job identifier
1036
    @type fields: list of strings
1037
    @param fields: Which fields to check for changes
1038
    @type prev_job_info: list or None
1039
    @param prev_job_info: Last job information returned
1040
    @type prev_log_serial: int
1041
    @param prev_log_serial: Last job message serial number
1042
    @type timeout: float
1043
    @param timeout: maximum time to wait
1044
    @rtype: tuple (job info, log entries)
1045
    @return: a tuple of the job information as required via
1046
        the fields parameter, and the log entries as a list
1047

1048
        if the job has not changed and the timeout has expired,
1049
        we instead return a special value,
1050
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1051
        as such by the clients
1052

1053
    """
1054
    logging.debug("Waiting for changes in job %s", job_id)
1055
    end_time = time.time() + timeout
1056
    while True:
1057
      delta_time = end_time - time.time()
1058
      if delta_time < 0:
1059
        return constants.JOB_NOTCHANGED
1060

    
1061
      job = self._LoadJobUnlocked(job_id)
1062
      if not job:
1063
        logging.debug("Job %s not found", job_id)
1064
        break
1065

    
1066
      status = job.CalcStatus()
1067
      job_info = self._GetJobInfoUnlocked(job, fields)
1068
      log_entries = job.GetLogEntries(prev_log_serial)
1069

    
1070
      # Serializing and deserializing data can cause type changes (e.g. from
1071
      # tuple to list) or precision loss. We're doing it here so that we get
1072
      # the same modifications as the data received from the client. Without
1073
      # this, the comparison afterwards might fail without the data being
1074
      # significantly different.
1075
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1076
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1077

    
1078
      if status not in (constants.JOB_STATUS_QUEUED,
1079
                        constants.JOB_STATUS_RUNNING,
1080
                        constants.JOB_STATUS_WAITLOCK):
1081
        # Don't even try to wait if the job is no longer running, there will be
1082
        # no changes.
1083
        break
1084

    
1085
      if (prev_job_info != job_info or
1086
          (log_entries and prev_log_serial != log_entries[0][0])):
1087
        break
1088

    
1089
      logging.debug("Waiting again")
1090

    
1091
      # Release the queue lock while waiting
1092
      job.change.wait(delta_time)
1093

    
1094
    logging.debug("Job %s changed", job_id)
1095

    
1096
    return (job_info, log_entries)
1097

    
1098
  @utils.LockedMethod
1099
  @_RequireOpenQueue
1100
  def CancelJob(self, job_id):
1101
    """Cancels a job.
1102

1103
    This will only succeed if the job has not started yet.
1104

1105
    @type job_id: string
1106
    @param job_id: job ID of job to be cancelled.
1107

1108
    """
1109
    logging.info("Cancelling job %s", job_id)
1110

    
1111
    job = self._LoadJobUnlocked(job_id)
1112
    if not job:
1113
      logging.debug("Job %s not found", job_id)
1114
      return (False, "Job %s not found" % job_id)
1115

    
1116
    job_status = job.CalcStatus()
1117

    
1118
    if job_status not in (constants.JOB_STATUS_QUEUED,
1119
                          constants.JOB_STATUS_WAITLOCK):
1120
      logging.debug("Job %s is no longer in the queue", job.id)
1121
      return (False, "Job %s is no longer in the queue" % job.id)
1122

    
1123
    if job_status == constants.JOB_STATUS_QUEUED:
1124
      self.CancelJobUnlocked(job)
1125
      return (True, "Job %s canceled" % job.id)
1126

    
1127
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1128
      # The worker will notice the new status and cancel the job
1129
      try:
1130
        for op in job.ops:
1131
          op.status = constants.OP_STATUS_CANCELING
1132
      finally:
1133
        self.UpdateJobUnlocked(job)
1134
      return (True, "Job %s will be canceled" % job.id)
1135

    
1136
  @_RequireOpenQueue
1137
  def CancelJobUnlocked(self, job):
1138
    """Marks a job as canceled.
1139

1140
    """
1141
    try:
1142
      for op in job.ops:
1143
        op.status = constants.OP_STATUS_CANCELED
1144
        op.result = "Job canceled by request"
1145
    finally:
1146
      self.UpdateJobUnlocked(job)
1147

    
1148
  @_RequireOpenQueue
1149
  def _ArchiveJobsUnlocked(self, jobs):
1150
    """Archives jobs.
1151

1152
    @type jobs: list of L{_QueuedJob}
1153
    @param jobs: Job objects
1154
    @rtype: int
1155
    @return: Number of archived jobs
1156

1157
    """
1158
    archive_jobs = []
1159
    rename_files = []
1160
    for job in jobs:
1161
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1162
                                  constants.JOB_STATUS_SUCCESS,
1163
                                  constants.JOB_STATUS_ERROR):
1164
        logging.debug("Job %s is not yet done", job.id)
1165
        continue
1166

    
1167
      archive_jobs.append(job)
1168

    
1169
      old = self._GetJobPath(job.id)
1170
      new = self._GetArchivedJobPath(job.id)
1171
      rename_files.append((old, new))
1172

    
1173
    # TODO: What if 1..n files fail to rename?
1174
    self._RenameFilesUnlocked(rename_files)
1175

    
1176
    logging.debug("Successfully archived job(s) %s",
1177
                  ", ".join(job.id for job in archive_jobs))
1178

    
1179
    return len(archive_jobs)
1180

    
1181
  @utils.LockedMethod
1182
  @_RequireOpenQueue
1183
  def ArchiveJob(self, job_id):
1184
    """Archives a job.
1185

1186
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1187

1188
    @type job_id: string
1189
    @param job_id: Job ID of job to be archived.
1190
    @rtype: bool
1191
    @return: Whether job was archived
1192

1193
    """
1194
    logging.info("Archiving job %s", job_id)
1195

    
1196
    job = self._LoadJobUnlocked(job_id)
1197
    if not job:
1198
      logging.debug("Job %s not found", job_id)
1199
      return False
1200

    
1201
    return self._ArchiveJobsUnlocked([job]) == 1
1202

    
1203
  @utils.LockedMethod
1204
  @_RequireOpenQueue
1205
  def AutoArchiveJobs(self, age, timeout):
1206
    """Archives all jobs based on age.
1207

1208
    The method will archive all jobs which are older than the age
1209
    parameter. For jobs that don't have an end timestamp, the start
1210
    timestamp will be considered. The special '-1' age will cause
1211
    archival of all jobs (that are not running or queued).
1212

1213
    @type age: int
1214
    @param age: the minimum age in seconds
1215

1216
    """
1217
    logging.info("Archiving jobs with age more than %s seconds", age)
1218

    
1219
    now = time.time()
1220
    end_time = now + timeout
1221
    archived_count = 0
1222
    last_touched = 0
1223

    
1224
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1225
    pending = []
1226
    for idx, job_id in enumerate(all_job_ids):
1227
      last_touched = idx
1228

    
1229
      # Not optimal because jobs could be pending
1230
      # TODO: Measure average duration for job archival and take number of
1231
      # pending jobs into account.
1232
      if time.time() > end_time:
1233
        break
1234

    
1235
      # Returns None if the job failed to load
1236
      job = self._LoadJobUnlocked(job_id)
1237
      if job:
1238
        if job.end_timestamp is None:
1239
          if job.start_timestamp is None:
1240
            job_age = job.received_timestamp
1241
          else:
1242
            job_age = job.start_timestamp
1243
        else:
1244
          job_age = job.end_timestamp
1245

    
1246
        if age == -1 or now - job_age[0] > age:
1247
          pending.append(job)
1248

    
1249
          # Archive 10 jobs at a time
1250
          if len(pending) >= 10:
1251
            archived_count += self._ArchiveJobsUnlocked(pending)
1252
            pending = []
1253

    
1254
    if pending:
1255
      archived_count += self._ArchiveJobsUnlocked(pending)
1256

    
1257
    return (archived_count, len(all_job_ids) - last_touched - 1)
1258

    
1259
  def _GetJobInfoUnlocked(self, job, fields):
1260
    """Returns information about a job.
1261

1262
    @type job: L{_QueuedJob}
1263
    @param job: the job which we query
1264
    @type fields: list
1265
    @param fields: names of fields to return
1266
    @rtype: list
1267
    @return: list with one element for each field
1268
    @raise errors.OpExecError: when an invalid field
1269
        has been passed
1270

1271
    """
1272
    row = []
1273
    for fname in fields:
1274
      if fname == "id":
1275
        row.append(job.id)
1276
      elif fname == "status":
1277
        row.append(job.CalcStatus())
1278
      elif fname == "ops":
1279
        row.append([op.input.__getstate__() for op in job.ops])
1280
      elif fname == "opresult":
1281
        row.append([op.result for op in job.ops])
1282
      elif fname == "opstatus":
1283
        row.append([op.status for op in job.ops])
1284
      elif fname == "oplog":
1285
        row.append([op.log for op in job.ops])
1286
      elif fname == "opstart":
1287
        row.append([op.start_timestamp for op in job.ops])
1288
      elif fname == "opend":
1289
        row.append([op.end_timestamp for op in job.ops])
1290
      elif fname == "received_ts":
1291
        row.append(job.received_timestamp)
1292
      elif fname == "start_ts":
1293
        row.append(job.start_timestamp)
1294
      elif fname == "end_ts":
1295
        row.append(job.end_timestamp)
1296
      elif fname == "summary":
1297
        row.append([op.input.Summary() for op in job.ops])
1298
      else:
1299
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1300
    return row
1301

    
1302
  @utils.LockedMethod
1303
  @_RequireOpenQueue
1304
  def QueryJobs(self, job_ids, fields):
1305
    """Returns a list of jobs in queue.
1306

1307
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1308
    processing for each job.
1309

1310
    @type job_ids: list
1311
    @param job_ids: sequence of job identifiers or None for all
1312
    @type fields: list
1313
    @param fields: names of fields to return
1314
    @rtype: list
1315
    @return: list one element per job, each element being list with
1316
        the requested fields
1317

1318
    """
1319
    jobs = []
1320

    
1321
    for job in self._GetJobsUnlocked(job_ids):
1322
      if job is None:
1323
        jobs.append(None)
1324
      else:
1325
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1326

    
1327
    return jobs
1328

    
1329
  @utils.LockedMethod
1330
  @_RequireOpenQueue
1331
  def Shutdown(self):
1332
    """Stops the job queue.
1333

1334
    This shutdowns all the worker threads an closes the queue.
1335

1336
    """
1337
    self._wpool.TerminateWorkers()
1338

    
1339
    self._queue_lock.Close()
1340
    self._queue_lock = None