Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 56d8ff91

History | View | Annotate | Download (39.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar change: a Condition variable we use for waiting for job changes
157

158
  """
159
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "__weakref__"]
163

    
164
  def __init__(self, queue, job_id, ops):
165
    """Constructor for the _QueuedJob.
166

167
    @type queue: L{JobQueue}
168
    @param queue: our parent queue
169
    @type job_id: job_id
170
    @param job_id: our job id
171
    @type ops: list
172
    @param ops: the list of opcodes we hold, which will be encapsulated
173
        in _QueuedOpCodes
174

175
    """
176
    if not ops:
177
      # TODO: use a better exception
178
      raise Exception("No opcodes")
179

    
180
    self.queue = queue
181
    self.id = job_id
182
    self.ops = [_QueuedOpCode(op) for op in ops]
183
    self.run_op_index = -1
184
    self.log_serial = 0
185
    self.received_timestamp = TimeStampNow()
186
    self.start_timestamp = None
187
    self.end_timestamp = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.run_op_index = state["run_op_index"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    obj.ops = []
213
    obj.log_serial = 0
214
    for op_state in state["ops"]:
215
      op = _QueuedOpCode.Restore(op_state)
216
      for log_entry in op.log:
217
        obj.log_serial = max(obj.log_serial, log_entry[0])
218
      obj.ops.append(op)
219

    
220
    # Condition to wait for changes
221
    obj.change = threading.Condition(obj.queue._lock)
222

    
223
    return obj
224

    
225
  def Serialize(self):
226
    """Serialize the _JobQueue instance.
227

228
    @rtype: dict
229
    @return: the serialized state
230

231
    """
232
    return {
233
      "id": self.id,
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
      "start_timestamp": self.start_timestamp,
237
      "end_timestamp": self.end_timestamp,
238
      "received_timestamp": self.received_timestamp,
239
      }
240

    
241
  def CalcStatus(self):
242
    """Compute the status of this job.
243

244
    This function iterates over all the _QueuedOpCodes in the job and
245
    based on their status, computes the job status.
246

247
    The algorithm is:
248
      - if we find a cancelled, or finished with error, the job
249
        status will be the same
250
      - otherwise, the last opcode with the status one of:
251
          - waitlock
252
          - canceling
253
          - running
254

255
        will determine the job status
256

257
      - otherwise, it means either all opcodes are queued, or success,
258
        and the job status will be the same
259

260
    @return: the job status
261

262
    """
263
    status = constants.JOB_STATUS_QUEUED
264

    
265
    all_success = True
266
    for op in self.ops:
267
      if op.status == constants.OP_STATUS_SUCCESS:
268
        continue
269

    
270
      all_success = False
271

    
272
      if op.status == constants.OP_STATUS_QUEUED:
273
        pass
274
      elif op.status == constants.OP_STATUS_WAITLOCK:
275
        status = constants.JOB_STATUS_WAITLOCK
276
      elif op.status == constants.OP_STATUS_RUNNING:
277
        status = constants.JOB_STATUS_RUNNING
278
      elif op.status == constants.OP_STATUS_CANCELING:
279
        status = constants.JOB_STATUS_CANCELING
280
        break
281
      elif op.status == constants.OP_STATUS_ERROR:
282
        status = constants.JOB_STATUS_ERROR
283
        # The whole job fails if one opcode failed
284
        break
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
        status = constants.OP_STATUS_CANCELED
287
        break
288

    
289
    if all_success:
290
      status = constants.JOB_STATUS_SUCCESS
291

    
292
    return status
293

    
294
  def GetLogEntries(self, newer_than):
295
    """Selectively returns the log entries.
296

297
    @type newer_than: None or int
298
    @param newer_than: if this is None, return all log entries,
299
        otherwise return only the log entries with serial higher
300
        than this value
301
    @rtype: list
302
    @return: the list of the log entries selected
303

304
    """
305
    if newer_than is None:
306
      serial = -1
307
    else:
308
      serial = newer_than
309

    
310
    entries = []
311
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313

    
314
    return entries
315

    
316

    
317
class _JobQueueWorker(workerpool.BaseWorker):
318
  """The actual job workers.
319

320
  """
321
  def _NotifyStart(self):
322
    """Mark the opcode as running, not lock-waiting.
323

324
    This is called from the mcpu code as a notifier function, when the
325
    LU is finally about to start the Exec() method. Of course, to have
326
    end-user visible results, the opcode must be initially (before
327
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
328

329
    """
330
    assert self.queue, "Queue attribute is missing"
331
    assert self.opcode, "Opcode attribute is missing"
332

    
333
    self.queue.acquire()
334
    try:
335
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
336
                                    constants.OP_STATUS_CANCELING)
337

    
338
      # Cancel here if we were asked to
339
      if self.opcode.status == constants.OP_STATUS_CANCELING:
340
        raise CancelJob()
341

    
342
      self.opcode.status = constants.OP_STATUS_RUNNING
343
    finally:
344
      self.queue.release()
345

    
346
  def RunTask(self, job):
347
    """Job executor.
348

349
    This functions processes a job. It is closely tied to the _QueuedJob and
350
    _QueuedOpCode classes.
351

352
    @type job: L{_QueuedJob}
353
    @param job: the job to be processed
354

355
    """
356
    logging.info("Worker %s processing job %s",
357
                  self.worker_id, job.id)
358
    proc = mcpu.Processor(self.pool.queue.context)
359
    self.queue = queue = job.queue
360
    try:
361
      try:
362
        count = len(job.ops)
363
        for idx, op in enumerate(job.ops):
364
          op_summary = op.input.Summary()
365
          if op.status == constants.OP_STATUS_SUCCESS:
366
            # this is a job that was partially completed before master
367
            # daemon shutdown, so it can be expected that some opcodes
368
            # are already completed successfully (if any did error
369
            # out, then the whole job should have been aborted and not
370
            # resubmitted for processing)
371
            logging.info("Op %s/%s: opcode %s already processed, skipping",
372
                         idx + 1, count, op_summary)
373
            continue
374
          try:
375
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
376
                         op_summary)
377

    
378
            queue.acquire()
379
            try:
380
              if op.status == constants.OP_STATUS_CANCELED:
381
                raise CancelJob()
382
              assert op.status == constants.OP_STATUS_QUEUED
383
              job.run_op_index = idx
384
              op.status = constants.OP_STATUS_WAITLOCK
385
              op.result = None
386
              op.start_timestamp = TimeStampNow()
387
              if idx == 0: # first opcode
388
                job.start_timestamp = op.start_timestamp
389
              queue.UpdateJobUnlocked(job)
390

    
391
              input_opcode = op.input
392
            finally:
393
              queue.release()
394

    
395
            def _Log(*args):
396
              """Append a log entry.
397

398
              """
399
              assert len(args) < 3
400

    
401
              if len(args) == 1:
402
                log_type = constants.ELOG_MESSAGE
403
                log_msg = args[0]
404
              else:
405
                log_type, log_msg = args
406

    
407
              # The time is split to make serialization easier and not lose
408
              # precision.
409
              timestamp = utils.SplitTime(time.time())
410

    
411
              queue.acquire()
412
              try:
413
                job.log_serial += 1
414
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
415

    
416
                job.change.notifyAll()
417
              finally:
418
                queue.release()
419

    
420
            # Make sure not to hold lock while _Log is called
421
            self.opcode = op
422
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
423

    
424
            queue.acquire()
425
            try:
426
              op.status = constants.OP_STATUS_SUCCESS
427
              op.result = result
428
              op.end_timestamp = TimeStampNow()
429
              queue.UpdateJobUnlocked(job)
430
            finally:
431
              queue.release()
432

    
433
            logging.info("Op %s/%s: Successfully finished opcode %s",
434
                         idx + 1, count, op_summary)
435
          except CancelJob:
436
            # Will be handled further up
437
            raise
438
          except Exception, err:
439
            queue.acquire()
440
            try:
441
              try:
442
                op.status = constants.OP_STATUS_ERROR
443
                op.result = str(err)
444
                op.end_timestamp = TimeStampNow()
445
                logging.info("Op %s/%s: Error in opcode %s: %s",
446
                             idx + 1, count, op_summary, err)
447
              finally:
448
                queue.UpdateJobUnlocked(job)
449
            finally:
450
              queue.release()
451
            raise
452

    
453
      except CancelJob:
454
        queue.acquire()
455
        try:
456
          queue.CancelJobUnlocked(job)
457
        finally:
458
          queue.release()
459
      except errors.GenericError, err:
460
        logging.exception("Ganeti exception")
461
      except:
462
        logging.exception("Unhandled exception")
463
    finally:
464
      queue.acquire()
465
      try:
466
        try:
467
          job.run_op_index = -1
468
          job.end_timestamp = TimeStampNow()
469
          queue.UpdateJobUnlocked(job)
470
        finally:
471
          job_id = job.id
472
          status = job.CalcStatus()
473
      finally:
474
        queue.release()
475
      logging.info("Worker %s finished job %s, status = %s",
476
                   self.worker_id, job_id, status)
477

    
478

    
479
class _JobQueueWorkerPool(workerpool.WorkerPool):
480
  """Simple class implementing a job-processing workerpool.
481

482
  """
483
  def __init__(self, queue):
484
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
485
                                              _JobQueueWorker)
486
    self.queue = queue
487

    
488

    
489
class JobQueue(object):
490
  """Queue used to manage the jobs.
491

492
  @cvar _RE_JOB_FILE: regex matching the valid job file names
493

494
  """
495
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
496

    
497
  def _RequireOpenQueue(fn):
498
    """Decorator for "public" functions.
499

500
    This function should be used for all 'public' functions. That is,
501
    functions usually called from other classes.
502

503
    @warning: Use this decorator only after utils.LockedMethod!
504

505
    Example::
506
      @utils.LockedMethod
507
      @_RequireOpenQueue
508
      def Example(self):
509
        pass
510

511
    """
512
    def wrapper(self, *args, **kwargs):
513
      assert self._queue_lock is not None, "Queue should be open"
514
      return fn(self, *args, **kwargs)
515
    return wrapper
516

    
517
  def __init__(self, context):
518
    """Constructor for JobQueue.
519

520
    The constructor will initialize the job queue object and then
521
    start loading the current jobs from disk, either for starting them
522
    (if they were queue) or for aborting them (if they were already
523
    running).
524

525
    @type context: GanetiContext
526
    @param context: the context object for access to the configuration
527
        data and other ganeti objects
528

529
    """
530
    self.context = context
531
    self._memcache = weakref.WeakValueDictionary()
532
    self._my_hostname = utils.HostInfo().name
533

    
534
    # Locking
535
    self._lock = threading.Lock()
536
    self.acquire = self._lock.acquire
537
    self.release = self._lock.release
538

    
539
    # Initialize
540
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
541

    
542
    # Read serial file
543
    self._last_serial = jstore.ReadSerial()
544
    assert self._last_serial is not None, ("Serial file was modified between"
545
                                           " check in jstore and here")
546

    
547
    # Get initial list of nodes
548
    self._nodes = dict((n.name, n.primary_ip)
549
                       for n in self.context.cfg.GetAllNodesInfo().values()
550
                       if n.master_candidate)
551

    
552
    # Remove master node
553
    try:
554
      del self._nodes[self._my_hostname]
555
    except KeyError:
556
      pass
557

    
558
    # TODO: Check consistency across nodes
559

    
560
    # Setup worker pool
561
    self._wpool = _JobQueueWorkerPool(self)
562
    try:
563
      # We need to lock here because WorkerPool.AddTask() may start a job while
564
      # we're still doing our work.
565
      self.acquire()
566
      try:
567
        logging.info("Inspecting job queue")
568

    
569
        all_job_ids = self._GetJobIDsUnlocked()
570
        jobs_count = len(all_job_ids)
571
        lastinfo = time.time()
572
        for idx, job_id in enumerate(all_job_ids):
573
          # Give an update every 1000 jobs or 10 seconds
574
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
575
              idx == (jobs_count - 1)):
576
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
577
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
578
            lastinfo = time.time()
579

    
580
          job = self._LoadJobUnlocked(job_id)
581

    
582
          # a failure in loading the job can cause 'None' to be returned
583
          if job is None:
584
            continue
585

    
586
          status = job.CalcStatus()
587

    
588
          if status in (constants.JOB_STATUS_QUEUED, ):
589
            self._wpool.AddTask(job)
590

    
591
          elif status in (constants.JOB_STATUS_RUNNING,
592
                          constants.JOB_STATUS_WAITLOCK,
593
                          constants.JOB_STATUS_CANCELING):
594
            logging.warning("Unfinished job %s found: %s", job.id, job)
595
            try:
596
              for op in job.ops:
597
                op.status = constants.OP_STATUS_ERROR
598
                op.result = "Unclean master daemon shutdown"
599
            finally:
600
              self.UpdateJobUnlocked(job)
601

    
602
        logging.info("Job queue inspection finished")
603
      finally:
604
        self.release()
605
    except:
606
      self._wpool.TerminateWorkers()
607
      raise
608

    
609
  @utils.LockedMethod
610
  @_RequireOpenQueue
611
  def AddNode(self, node):
612
    """Register a new node with the queue.
613

614
    @type node: L{objects.Node}
615
    @param node: the node object to be added
616

617
    """
618
    node_name = node.name
619
    assert node_name != self._my_hostname
620

    
621
    # Clean queue directory on added node
622
    rpc.RpcRunner.call_jobqueue_purge(node_name)
623

    
624
    if not node.master_candidate:
625
      # remove if existing, ignoring errors
626
      self._nodes.pop(node_name, None)
627
      # and skip the replication of the job ids
628
      return
629

    
630
    # Upload the whole queue excluding archived jobs
631
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
632

    
633
    # Upload current serial file
634
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
635

    
636
    for file_name in files:
637
      # Read file content
638
      fd = open(file_name, "r")
639
      try:
640
        content = fd.read()
641
      finally:
642
        fd.close()
643

    
644
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
645
                                                  [node.primary_ip],
646
                                                  file_name, content)
647
      if not result[node_name]:
648
        logging.error("Failed to upload %s to %s", file_name, node_name)
649

    
650
    self._nodes[node_name] = node.primary_ip
651

    
652
  @utils.LockedMethod
653
  @_RequireOpenQueue
654
  def RemoveNode(self, node_name):
655
    """Callback called when removing nodes from the cluster.
656

657
    @type node_name: str
658
    @param node_name: the name of the node to remove
659

660
    """
661
    try:
662
      # The queue is removed by the "leave node" RPC call.
663
      del self._nodes[node_name]
664
    except KeyError:
665
      pass
666

    
667
  def _CheckRpcResult(self, result, nodes, failmsg):
668
    """Verifies the status of an RPC call.
669

670
    Since we aim to keep consistency should this node (the current
671
    master) fail, we will log errors if our rpc fail, and especially
672
    log the case when more than half of the nodes fails.
673

674
    @param result: the data as returned from the rpc call
675
    @type nodes: list
676
    @param nodes: the list of nodes we made the call to
677
    @type failmsg: str
678
    @param failmsg: the identifier to be used for logging
679

680
    """
681
    failed = []
682
    success = []
683

    
684
    for node in nodes:
685
      if result[node]:
686
        success.append(node)
687
      else:
688
        failed.append(node)
689

    
690
    if failed:
691
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
692

    
693
    # +1 for the master node
694
    if (len(success) + 1) < len(failed):
695
      # TODO: Handle failing nodes
696
      logging.error("More than half of the nodes failed")
697

    
698
  def _GetNodeIp(self):
699
    """Helper for returning the node name/ip list.
700

701
    @rtype: (list, list)
702
    @return: a tuple of two lists, the first one with the node
703
        names and the second one with the node addresses
704

705
    """
706
    name_list = self._nodes.keys()
707
    addr_list = [self._nodes[name] for name in name_list]
708
    return name_list, addr_list
709

    
710
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
711
    """Writes a file locally and then replicates it to all nodes.
712

713
    This function will replace the contents of a file on the local
714
    node and then replicate it to all the other nodes we have.
715

716
    @type file_name: str
717
    @param file_name: the path of the file to be replicated
718
    @type data: str
719
    @param data: the new contents of the file
720

721
    """
722
    utils.WriteFile(file_name, data=data)
723

    
724
    names, addrs = self._GetNodeIp()
725
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
726
    self._CheckRpcResult(result, self._nodes,
727
                         "Updating %s" % file_name)
728

    
729
  def _RenameFilesUnlocked(self, rename):
730
    """Renames a file locally and then replicate the change.
731

732
    This function will rename a file in the local queue directory
733
    and then replicate this rename to all the other nodes we have.
734

735
    @type rename: list of (old, new)
736
    @param rename: List containing tuples mapping old to new names
737

738
    """
739
    # Rename them locally
740
    for old, new in rename:
741
      utils.RenameFile(old, new, mkdir=True)
742

    
743
    # ... and on all nodes
744
    names, addrs = self._GetNodeIp()
745
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
746
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
747

    
748
  def _FormatJobID(self, job_id):
749
    """Convert a job ID to string format.
750

751
    Currently this just does C{str(job_id)} after performing some
752
    checks, but if we want to change the job id format this will
753
    abstract this change.
754

755
    @type job_id: int or long
756
    @param job_id: the numeric job id
757
    @rtype: str
758
    @return: the formatted job id
759

760
    """
761
    if not isinstance(job_id, (int, long)):
762
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
763
    if job_id < 0:
764
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
765

    
766
    return str(job_id)
767

    
768
  @classmethod
769
  def _GetArchiveDirectory(cls, job_id):
770
    """Returns the archive directory for a job.
771

772
    @type job_id: str
773
    @param job_id: Job identifier
774
    @rtype: str
775
    @return: Directory name
776

777
    """
778
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
779

    
780
  def _NewSerialUnlocked(self):
781
    """Generates a new job identifier.
782

783
    Job identifiers are unique during the lifetime of a cluster.
784

785
    @rtype: str
786
    @return: a string representing the job identifier.
787

788
    """
789
    # New number
790
    serial = self._last_serial + 1
791

    
792
    # Write to file
793
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
794
                                        "%s\n" % serial)
795

    
796
    # Keep it only if we were able to write the file
797
    self._last_serial = serial
798

    
799
    return self._FormatJobID(serial)
800

    
801
  @staticmethod
802
  def _GetJobPath(job_id):
803
    """Returns the job file for a given job id.
804

805
    @type job_id: str
806
    @param job_id: the job identifier
807
    @rtype: str
808
    @return: the path to the job file
809

810
    """
811
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
812

    
813
  @classmethod
814
  def _GetArchivedJobPath(cls, job_id):
815
    """Returns the archived job file for a give job id.
816

817
    @type job_id: str
818
    @param job_id: the job identifier
819
    @rtype: str
820
    @return: the path to the archived job file
821

822
    """
823
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
824
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
825

    
826
  @classmethod
827
  def _ExtractJobID(cls, name):
828
    """Extract the job id from a filename.
829

830
    @type name: str
831
    @param name: the job filename
832
    @rtype: job id or None
833
    @return: the job id corresponding to the given filename,
834
        or None if the filename does not represent a valid
835
        job file
836

837
    """
838
    m = cls._RE_JOB_FILE.match(name)
839
    if m:
840
      return m.group(1)
841
    else:
842
      return None
843

    
844
  def _GetJobIDsUnlocked(self, archived=False):
845
    """Return all known job IDs.
846

847
    If the parameter archived is True, archived jobs IDs will be
848
    included. Currently this argument is unused.
849

850
    The method only looks at disk because it's a requirement that all
851
    jobs are present on disk (so in the _memcache we don't have any
852
    extra IDs).
853

854
    @rtype: list
855
    @return: the list of job IDs
856

857
    """
858
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
859
    jlist = utils.NiceSort(jlist)
860
    return jlist
861

    
862
  def _ListJobFiles(self):
863
    """Returns the list of current job files.
864

865
    @rtype: list
866
    @return: the list of job file names
867

868
    """
869
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
870
            if self._RE_JOB_FILE.match(name)]
871

    
872
  def _LoadJobUnlocked(self, job_id):
873
    """Loads a job from the disk or memory.
874

875
    Given a job id, this will return the cached job object if
876
    existing, or try to load the job from the disk. If loading from
877
    disk, it will also add the job to the cache.
878

879
    @param job_id: the job id
880
    @rtype: L{_QueuedJob} or None
881
    @return: either None or the job object
882

883
    """
884
    job = self._memcache.get(job_id, None)
885
    if job:
886
      logging.debug("Found job %s in memcache", job_id)
887
      return job
888

    
889
    filepath = self._GetJobPath(job_id)
890
    logging.debug("Loading job from %s", filepath)
891
    try:
892
      fd = open(filepath, "r")
893
    except IOError, err:
894
      if err.errno in (errno.ENOENT, ):
895
        return None
896
      raise
897
    try:
898
      data = serializer.LoadJson(fd.read())
899
    finally:
900
      fd.close()
901

    
902
    try:
903
      job = _QueuedJob.Restore(self, data)
904
    except Exception, err:
905
      new_path = self._GetArchivedJobPath(job_id)
906
      if filepath == new_path:
907
        # job already archived (future case)
908
        logging.exception("Can't parse job %s", job_id)
909
      else:
910
        # non-archived case
911
        logging.exception("Can't parse job %s, will archive.", job_id)
912
        self._RenameFilesUnlocked([(filepath, new_path)])
913
      return None
914

    
915
    self._memcache[job_id] = job
916
    logging.debug("Added job %s to the cache", job_id)
917
    return job
918

    
919
  def _GetJobsUnlocked(self, job_ids):
920
    """Return a list of jobs based on their IDs.
921

922
    @type job_ids: list
923
    @param job_ids: either an empty list (meaning all jobs),
924
        or a list of job IDs
925
    @rtype: list
926
    @return: the list of job objects
927

928
    """
929
    if not job_ids:
930
      job_ids = self._GetJobIDsUnlocked()
931

    
932
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
933

    
934
  @staticmethod
935
  def _IsQueueMarkedDrain():
936
    """Check if the queue is marked from drain.
937

938
    This currently uses the queue drain file, which makes it a
939
    per-node flag. In the future this can be moved to the config file.
940

941
    @rtype: boolean
942
    @return: True of the job queue is marked for draining
943

944
    """
945
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
946

    
947
  @staticmethod
948
  def SetDrainFlag(drain_flag):
949
    """Sets the drain flag for the queue.
950

951
    This is similar to the function L{backend.JobQueueSetDrainFlag},
952
    and in the future we might merge them.
953

954
    @type drain_flag: boolean
955
    @param drain_flag: Whether to set or unset the drain flag
956

957
    """
958
    if drain_flag:
959
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
960
    else:
961
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
962
    return True
963

    
964
  @_RequireOpenQueue
965
  def _SubmitJobUnlocked(self, ops):
966
    """Create and store a new job.
967

968
    This enters the job into our job queue and also puts it on the new
969
    queue, in order for it to be picked up by the queue processors.
970

971
    @type ops: list
972
    @param ops: The list of OpCodes that will become the new job.
973
    @rtype: job ID
974
    @return: the job ID of the newly created job
975
    @raise errors.JobQueueDrainError: if the job is marked for draining
976

977
    """
978
    if self._IsQueueMarkedDrain():
979
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
980

    
981
    # Check job queue size
982
    size = len(self._ListJobFiles())
983
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
984
      # TODO: Autoarchive jobs. Make sure it's not done on every job
985
      # submission, though.
986
      #size = ...
987
      pass
988

    
989
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
990
      raise errors.JobQueueFull()
991

    
992
    # Get job identifier
993
    job_id = self._NewSerialUnlocked()
994
    job = _QueuedJob(self, job_id, ops)
995

    
996
    # Write to disk
997
    self.UpdateJobUnlocked(job)
998

    
999
    logging.debug("Adding new job %s to the cache", job_id)
1000
    self._memcache[job_id] = job
1001

    
1002
    # Add to worker pool
1003
    self._wpool.AddTask(job)
1004

    
1005
    return job.id
1006

    
1007
  @utils.LockedMethod
1008
  @_RequireOpenQueue
1009
  def SubmitJob(self, ops):
1010
    """Create and store a new job.
1011

1012
    @see: L{_SubmitJobUnlocked}
1013

1014
    """
1015
    return self._SubmitJobUnlocked(ops)
1016

    
1017
  @utils.LockedMethod
1018
  @_RequireOpenQueue
1019
  def SubmitManyJobs(self, jobs):
1020
    """Create and store multiple jobs.
1021

1022
    @see: L{_SubmitJobUnlocked}
1023

1024
    """
1025
    results = []
1026
    for ops in jobs:
1027
      try:
1028
        data = self._SubmitJobUnlocked(ops)
1029
        status = True
1030
      except errors.GenericError, err:
1031
        data = str(err)
1032
        status = False
1033
      results.append((status, data))
1034

    
1035
    return results
1036

    
1037

    
1038
  @_RequireOpenQueue
1039
  def UpdateJobUnlocked(self, job):
1040
    """Update a job's on disk storage.
1041

1042
    After a job has been modified, this function needs to be called in
1043
    order to write the changes to disk and replicate them to the other
1044
    nodes.
1045

1046
    @type job: L{_QueuedJob}
1047
    @param job: the changed job
1048

1049
    """
1050
    filename = self._GetJobPath(job.id)
1051
    data = serializer.DumpJson(job.Serialize(), indent=False)
1052
    logging.debug("Writing job %s to %s", job.id, filename)
1053
    self._WriteAndReplicateFileUnlocked(filename, data)
1054

    
1055
    # Notify waiters about potential changes
1056
    job.change.notifyAll()
1057

    
1058
  @utils.LockedMethod
1059
  @_RequireOpenQueue
1060
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1061
                        timeout):
1062
    """Waits for changes in a job.
1063

1064
    @type job_id: string
1065
    @param job_id: Job identifier
1066
    @type fields: list of strings
1067
    @param fields: Which fields to check for changes
1068
    @type prev_job_info: list or None
1069
    @param prev_job_info: Last job information returned
1070
    @type prev_log_serial: int
1071
    @param prev_log_serial: Last job message serial number
1072
    @type timeout: float
1073
    @param timeout: maximum time to wait
1074
    @rtype: tuple (job info, log entries)
1075
    @return: a tuple of the job information as required via
1076
        the fields parameter, and the log entries as a list
1077

1078
        if the job has not changed and the timeout has expired,
1079
        we instead return a special value,
1080
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1081
        as such by the clients
1082

1083
    """
1084
    logging.debug("Waiting for changes in job %s", job_id)
1085
    end_time = time.time() + timeout
1086
    while True:
1087
      delta_time = end_time - time.time()
1088
      if delta_time < 0:
1089
        return constants.JOB_NOTCHANGED
1090

    
1091
      job = self._LoadJobUnlocked(job_id)
1092
      if not job:
1093
        logging.debug("Job %s not found", job_id)
1094
        break
1095

    
1096
      status = job.CalcStatus()
1097
      job_info = self._GetJobInfoUnlocked(job, fields)
1098
      log_entries = job.GetLogEntries(prev_log_serial)
1099

    
1100
      # Serializing and deserializing data can cause type changes (e.g. from
1101
      # tuple to list) or precision loss. We're doing it here so that we get
1102
      # the same modifications as the data received from the client. Without
1103
      # this, the comparison afterwards might fail without the data being
1104
      # significantly different.
1105
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1106
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1107

    
1108
      if status not in (constants.JOB_STATUS_QUEUED,
1109
                        constants.JOB_STATUS_RUNNING,
1110
                        constants.JOB_STATUS_WAITLOCK):
1111
        # Don't even try to wait if the job is no longer running, there will be
1112
        # no changes.
1113
        break
1114

    
1115
      if (prev_job_info != job_info or
1116
          (log_entries and prev_log_serial != log_entries[0][0])):
1117
        break
1118

    
1119
      logging.debug("Waiting again")
1120

    
1121
      # Release the queue lock while waiting
1122
      job.change.wait(delta_time)
1123

    
1124
    logging.debug("Job %s changed", job_id)
1125

    
1126
    return (job_info, log_entries)
1127

    
1128
  @utils.LockedMethod
1129
  @_RequireOpenQueue
1130
  def CancelJob(self, job_id):
1131
    """Cancels a job.
1132

1133
    This will only succeed if the job has not started yet.
1134

1135
    @type job_id: string
1136
    @param job_id: job ID of job to be cancelled.
1137

1138
    """
1139
    logging.info("Cancelling job %s", job_id)
1140

    
1141
    job = self._LoadJobUnlocked(job_id)
1142
    if not job:
1143
      logging.debug("Job %s not found", job_id)
1144
      return (False, "Job %s not found" % job_id)
1145

    
1146
    job_status = job.CalcStatus()
1147

    
1148
    if job_status not in (constants.JOB_STATUS_QUEUED,
1149
                          constants.JOB_STATUS_WAITLOCK):
1150
      logging.debug("Job %s is no longer in the queue", job.id)
1151
      return (False, "Job %s is no longer in the queue" % job.id)
1152

    
1153
    if job_status == constants.JOB_STATUS_QUEUED:
1154
      self.CancelJobUnlocked(job)
1155
      return (True, "Job %s canceled" % job.id)
1156

    
1157
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1158
      # The worker will notice the new status and cancel the job
1159
      try:
1160
        for op in job.ops:
1161
          op.status = constants.OP_STATUS_CANCELING
1162
      finally:
1163
        self.UpdateJobUnlocked(job)
1164
      return (True, "Job %s will be canceled" % job.id)
1165

    
1166
  @_RequireOpenQueue
1167
  def CancelJobUnlocked(self, job):
1168
    """Marks a job as canceled.
1169

1170
    """
1171
    try:
1172
      for op in job.ops:
1173
        op.status = constants.OP_STATUS_CANCELED
1174
        op.result = "Job canceled by request"
1175
    finally:
1176
      self.UpdateJobUnlocked(job)
1177

    
1178
  @_RequireOpenQueue
1179
  def _ArchiveJobsUnlocked(self, jobs):
1180
    """Archives jobs.
1181

1182
    @type jobs: list of L{_QueuedJob}
1183
    @param jobs: Job objects
1184
    @rtype: int
1185
    @return: Number of archived jobs
1186

1187
    """
1188
    archive_jobs = []
1189
    rename_files = []
1190
    for job in jobs:
1191
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1192
                                  constants.JOB_STATUS_SUCCESS,
1193
                                  constants.JOB_STATUS_ERROR):
1194
        logging.debug("Job %s is not yet done", job.id)
1195
        continue
1196

    
1197
      archive_jobs.append(job)
1198

    
1199
      old = self._GetJobPath(job.id)
1200
      new = self._GetArchivedJobPath(job.id)
1201
      rename_files.append((old, new))
1202

    
1203
    # TODO: What if 1..n files fail to rename?
1204
    self._RenameFilesUnlocked(rename_files)
1205

    
1206
    logging.debug("Successfully archived job(s) %s",
1207
                  ", ".join(job.id for job in archive_jobs))
1208

    
1209
    return len(archive_jobs)
1210

    
1211
  @utils.LockedMethod
1212
  @_RequireOpenQueue
1213
  def ArchiveJob(self, job_id):
1214
    """Archives a job.
1215

1216
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1217

1218
    @type job_id: string
1219
    @param job_id: Job ID of job to be archived.
1220
    @rtype: bool
1221
    @return: Whether job was archived
1222

1223
    """
1224
    logging.info("Archiving job %s", job_id)
1225

    
1226
    job = self._LoadJobUnlocked(job_id)
1227
    if not job:
1228
      logging.debug("Job %s not found", job_id)
1229
      return False
1230

    
1231
    return self._ArchiveJobsUnlocked([job]) == 1
1232

    
1233
  @utils.LockedMethod
1234
  @_RequireOpenQueue
1235
  def AutoArchiveJobs(self, age, timeout):
1236
    """Archives all jobs based on age.
1237

1238
    The method will archive all jobs which are older than the age
1239
    parameter. For jobs that don't have an end timestamp, the start
1240
    timestamp will be considered. The special '-1' age will cause
1241
    archival of all jobs (that are not running or queued).
1242

1243
    @type age: int
1244
    @param age: the minimum age in seconds
1245

1246
    """
1247
    logging.info("Archiving jobs with age more than %s seconds", age)
1248

    
1249
    now = time.time()
1250
    end_time = now + timeout
1251
    archived_count = 0
1252
    last_touched = 0
1253

    
1254
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1255
    pending = []
1256
    for idx, job_id in enumerate(all_job_ids):
1257
      last_touched = idx
1258

    
1259
      # Not optimal because jobs could be pending
1260
      # TODO: Measure average duration for job archival and take number of
1261
      # pending jobs into account.
1262
      if time.time() > end_time:
1263
        break
1264

    
1265
      # Returns None if the job failed to load
1266
      job = self._LoadJobUnlocked(job_id)
1267
      if job:
1268
        if job.end_timestamp is None:
1269
          if job.start_timestamp is None:
1270
            job_age = job.received_timestamp
1271
          else:
1272
            job_age = job.start_timestamp
1273
        else:
1274
          job_age = job.end_timestamp
1275

    
1276
        if age == -1 or now - job_age[0] > age:
1277
          pending.append(job)
1278

    
1279
          # Archive 10 jobs at a time
1280
          if len(pending) >= 10:
1281
            archived_count += self._ArchiveJobsUnlocked(pending)
1282
            pending = []
1283

    
1284
    if pending:
1285
      archived_count += self._ArchiveJobsUnlocked(pending)
1286

    
1287
    return (archived_count, len(all_job_ids) - last_touched - 1)
1288

    
1289
  def _GetJobInfoUnlocked(self, job, fields):
1290
    """Returns information about a job.
1291

1292
    @type job: L{_QueuedJob}
1293
    @param job: the job which we query
1294
    @type fields: list
1295
    @param fields: names of fields to return
1296
    @rtype: list
1297
    @return: list with one element for each field
1298
    @raise errors.OpExecError: when an invalid field
1299
        has been passed
1300

1301
    """
1302
    row = []
1303
    for fname in fields:
1304
      if fname == "id":
1305
        row.append(job.id)
1306
      elif fname == "status":
1307
        row.append(job.CalcStatus())
1308
      elif fname == "ops":
1309
        row.append([op.input.__getstate__() for op in job.ops])
1310
      elif fname == "opresult":
1311
        row.append([op.result for op in job.ops])
1312
      elif fname == "opstatus":
1313
        row.append([op.status for op in job.ops])
1314
      elif fname == "oplog":
1315
        row.append([op.log for op in job.ops])
1316
      elif fname == "opstart":
1317
        row.append([op.start_timestamp for op in job.ops])
1318
      elif fname == "opend":
1319
        row.append([op.end_timestamp for op in job.ops])
1320
      elif fname == "received_ts":
1321
        row.append(job.received_timestamp)
1322
      elif fname == "start_ts":
1323
        row.append(job.start_timestamp)
1324
      elif fname == "end_ts":
1325
        row.append(job.end_timestamp)
1326
      elif fname == "summary":
1327
        row.append([op.input.Summary() for op in job.ops])
1328
      else:
1329
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1330
    return row
1331

    
1332
  @utils.LockedMethod
1333
  @_RequireOpenQueue
1334
  def QueryJobs(self, job_ids, fields):
1335
    """Returns a list of jobs in queue.
1336

1337
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1338
    processing for each job.
1339

1340
    @type job_ids: list
1341
    @param job_ids: sequence of job identifiers or None for all
1342
    @type fields: list
1343
    @param fields: names of fields to return
1344
    @rtype: list
1345
    @return: list one element per job, each element being list with
1346
        the requested fields
1347

1348
    """
1349
    jobs = []
1350

    
1351
    for job in self._GetJobsUnlocked(job_ids):
1352
      if job is None:
1353
        jobs.append(None)
1354
      else:
1355
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1356

    
1357
    return jobs
1358

    
1359
  @utils.LockedMethod
1360
  @_RequireOpenQueue
1361
  def Shutdown(self):
1362
    """Stops the job queue.
1363

1364
    This shutdowns all the worker threads an closes the queue.
1365

1366
    """
1367
    self._wpool.TerminateWorkers()
1368

    
1369
    self._queue_lock.Close()
1370
    self._queue_lock = None