Revision ea03467c

b/lib/jqueue.py
21 21

  
22 22
"""Module implementing the job queue handling.
23 23

  
24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

  
27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
27 29

  
28 30
"""
29 31

  
......
51 53

  
52 54

  
53 55
def TimeStampNow():
56
  """Returns the current timestamp.
57

  
58
  @rtype: tuple
59
  @return: the current time in the (seconds, microseconds) format
60

  
61
  """
54 62
  return utils.SplitTime(time.time())
55 63

  
56 64

  
57 65
class _QueuedOpCode(object):
58 66
  """Encasulates an opcode object.
59 67

  
60
  The 'log' attribute holds the execution log and consists of tuples
61
  of the form (log_serial, timestamp, level, message).
68
  @ivar log: holds the execution log and consists of tuples
69
  of the form C{(log_serial, timestamp, level, message)}
70
  @ivar input: the OpCode we encapsulate
71
  @ivar status: the current status
72
  @ivar result: the result of the LU execution
73
  @ivar start_timestamp: timestamp for the start of the execution
74
  @ivar stop_timestamp: timestamp for the end of the execution
62 75

  
63 76
  """
64 77
  def __init__(self, op):
78
    """Constructor for the _QuededOpCode.
79

  
80
    @type op: L{opcodes.OpCode}
81
    @param op: the opcode we encapsulate
82

  
83
    """
65 84
    self.input = op
66 85
    self.status = constants.OP_STATUS_QUEUED
67 86
    self.result = None
......
71 90

  
72 91
  @classmethod
73 92
  def Restore(cls, state):
93
    """Restore the _QueuedOpCode from the serialized form.
94

  
95
    @type state: dict
96
    @param state: the serialized state
97
    @rtype: _QueuedOpCode
98
    @return: a new _QueuedOpCode instance
99

  
100
    """
74 101
    obj = _QueuedOpCode.__new__(cls)
75 102
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76 103
    obj.status = state["status"]
......
81 108
    return obj
82 109

  
83 110
  def Serialize(self):
111
    """Serializes this _QueuedOpCode.
112

  
113
    @rtype: dict
114
    @return: the dictionary holding the serialized state
115

  
116
    """
84 117
    return {
85 118
      "input": self.input.__getstate__(),
86 119
      "status": self.status,
......
94 127
class _QueuedJob(object):
95 128
  """In-memory job representation.
96 129

  
97
  This is what we use to track the user-submitted jobs. Locking must be taken
98
  care of by users of this class.
130
  This is what we use to track the user-submitted jobs. Locking must
131
  be taken care of by users of this class.
132

  
133
  @type queue: L{JobQueue}
134
  @ivar queue: the parent queue
135
  @ivar id: the job ID
136
  @type ops: list
137
  @ivar ops: the list of _QueuedOpCode that constitute the job
138
  @type run_op_index: int
139
  @ivar run_op_index: the currently executing opcode, or -1 if
140
      we didn't yet start executing
141
  @type log_serial: int
142
  @ivar log_serial: holds the index for the next log entry
143
  @ivar received_timestamp: the timestamp for when the job was received
144
  @ivar start_timestmap: the timestamp for start of execution
145
  @ivar end_timestamp: the timestamp for end of execution
146
  @ivar change: a Condition variable we use for waiting for job changes
99 147

  
100 148
  """
101 149
  def __init__(self, queue, job_id, ops):
150
    """Constructor for the _QueuedJob.
151

  
152
    @type queue: L{JobQueue}
153
    @param queue: our parent queue
154
    @type job_id: job_id
155
    @param job_id: our job id
156
    @type ops: list
157
    @param ops: the list of opcodes we hold, which will be encapsulated
158
        in _QueuedOpCodes
159

  
160
    """
102 161
    if not ops:
103
      # TODO
162
      # TODO: use a better exception
104 163
      raise Exception("No opcodes")
105 164

  
106 165
    self.queue = queue
......
117 176

  
118 177
  @classmethod
119 178
  def Restore(cls, queue, state):
179
    """Restore a _QueuedJob from serialized state:
180

  
181
    @type queue: L{JobQueue}
182
    @param queue: to which queue the restored job belongs
183
    @type state: dict
184
    @param state: the serialized state
185
    @rtype: _JobQueue
186
    @return: the restored _JobQueue instance
187

  
188
    """
120 189
    obj = _QueuedJob.__new__(cls)
121 190
    obj.queue = queue
122 191
    obj.id = state["id"]
......
139 208
    return obj
140 209

  
141 210
  def Serialize(self):
211
    """Serialize the _JobQueue instance.
212

  
213
    @rtype: dict
214
    @return: the serialized state
215

  
216
    """
142 217
    return {
143 218
      "id": self.id,
144 219
      "ops": [op.Serialize() for op in self.ops],
......
149 224
      }
150 225

  
151 226
  def CalcStatus(self):
227
    """Compute the status of this job.
228

  
229
    This function iterates over all the _QueuedOpCodes in the job and
230
    based on their status, computes the job status.
231

  
232
    The algorithm is:
233
      - if we find a cancelled, or finished with error, the job
234
        status will be the same
235
      - otherwise, the last opcode with the status one of:
236
          - waitlock
237
          - running
238

  
239
        will determine the job status
240

  
241
      - otherwise, it means either all opcodes are queued, or success,
242
        and the job status will be the same
243

  
244
    @return: the job status
245

  
246
    """
152 247
    status = constants.JOB_STATUS_QUEUED
153 248

  
154 249
    all_success = True
......
178 273
    return status
179 274

  
180 275
  def GetLogEntries(self, newer_than):
276
    """Selectively returns the log entries.
277

  
278
    @type newer_than: None or int
279
    @param newer_than: if this is None, return all log enties,
280
        otherwise return only the log entries with serial higher
281
        than this value
282
    @rtype: list
283
    @return: the list of the log entries selected
284

  
285
    """
181 286
    if newer_than is None:
182 287
      serial = -1
183 288
    else:
......
191 296

  
192 297

  
193 298
class _JobQueueWorker(workerpool.BaseWorker):
299
  """The actual job workers.
300

  
301
  """
194 302
  def _NotifyStart(self):
195 303
    """Mark the opcode as running, not lock-waiting.
196 304

  
......
215 323
    This functions processes a job. It is closely tied to the _QueuedJob and
216 324
    _QueuedOpCode classes.
217 325

  
326
    @type job: L{_QueuedJob}
327
    @param job: the job to be processed
328

  
218 329
    """
219 330
    logging.debug("Worker %s processing job %s",
220 331
                  self.worker_id, job.id)
......
316 427

  
317 428

  
318 429
class _JobQueueWorkerPool(workerpool.WorkerPool):
430
  """Simple class implementing a job-processing workerpool.
431

  
432
  """
319 433
  def __init__(self, queue):
320 434
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321 435
                                              _JobQueueWorker)
......
323 437

  
324 438

  
325 439
class JobQueue(object):
440
  """Quue used to manaage the jobs.
441

  
442
  @cvar _RE_JOB_FILE: regex matching the valid job file names
443

  
444
  """
326 445
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327 446

  
328 447
  def _RequireOpenQueue(fn):
329 448
    """Decorator for "public" functions.
330 449

  
331
    This function should be used for all "public" functions. That is, functions
332
    usually called from other classes.
450
    This function should be used for all 'public' functions. That is,
451
    functions usually called from other classes.
333 452

  
334
    Important: Use this decorator only after utils.LockedMethod!
453
    @warning: Use this decorator only after utils.LockedMethod!
335 454

  
336
    Example:
455
    Example::
337 456
      @utils.LockedMethod
338 457
      @_RequireOpenQueue
339 458
      def Example(self):
......
346 465
    return wrapper
347 466

  
348 467
  def __init__(self, context):
468
    """Constructor for JobQueue.
469

  
470
    The constructor will initialize the job queue object and then
471
    start loading the current jobs from disk, either for starting them
472
    (if they were queue) or for aborting them (if they were already
473
    running).
474

  
475
    @type context: GanetiContext
476
    @param context: the context object for access to the configuration
477
        data and other ganeti objects
478

  
479
    """
349 480
    self.context = context
350 481
    self._memcache = weakref.WeakValueDictionary()
351 482
    self._my_hostname = utils.HostInfo().name
......
443 574
  @utils.LockedMethod
444 575
  @_RequireOpenQueue
445 576
  def RemoveNode(self, node_name):
577
    """Callback called when removing nodes from the cluster.
578

  
579
    @type node_name: str
580
    @param node_name: the name of the node to remove
581

  
582
    """
446 583
    try:
447 584
      # The queue is removed by the "leave node" RPC call.
448 585
      del self._nodes[node_name]
......
450 587
      pass
451 588

  
452 589
  def _CheckRpcResult(self, result, nodes, failmsg):
590
    """Verifies the status of an RPC call.
591

  
592
    Since we aim to keep consistency should this node (the current
593
    master) fail, we will log errors if our rpc fail, and especially
594
    log the case when more than half of the nodes failes.
595

  
596
    @param result: the data as returned from the rpc call
597
    @type nodes: list
598
    @param nodes: the list of nodes we made the call to
599
    @type failmsg: str
600
    @param failmsg: the identifier to be used for logging
601

  
602
    """
453 603
    failed = []
454 604
    success = []
455 605

  
......
470 620
  def _GetNodeIp(self):
471 621
    """Helper for returning the node name/ip list.
472 622

  
623
    @rtype: (list, list)
624
    @return: a tuple of two lists, the first one with the node
625
        names and the second one with the node addresses
626

  
473 627
    """
474 628
    name_list = self._nodes.keys()
475 629
    addr_list = [self._nodes[name] for name in name_list]
......
478 632
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
479 633
    """Writes a file locally and then replicates it to all nodes.
480 634

  
635
    This function will replace the contents of a file on the local
636
    node and then replicate it to all the other nodes we have.
637

  
638
    @type file_name: str
639
    @param file_name: the path of the file to be replicated
640
    @type data: str
641
    @param data: the new contents of the file
642

  
481 643
    """
482 644
    utils.WriteFile(file_name, data=data)
483 645

  
......
487 649
                         "Updating %s" % file_name)
488 650

  
489 651
  def _RenameFileUnlocked(self, old, new):
652
    """Renames a file locally and then replicate the change.
653

  
654
    This function will rename a file in the local queue directory
655
    and then replicate this rename to all the other nodes we have.
656

  
657
    @type old: str
658
    @param old: the current name of the file
659
    @type new: str
660
    @param new: the new name of the file
661

  
662
    """
490 663
    os.rename(old, new)
491 664

  
492 665
    names, addrs = self._GetNodeIp()
......
495 668
                         "Moving %s to %s" % (old, new))
496 669

  
497 670
  def _FormatJobID(self, job_id):
671
    """Convert a job ID to string format.
672

  
673
    Currently this just does C{str(job_id)} after performing some
674
    checks, but if we want to change the job id format this will
675
    abstract this change.
676

  
677
    @type job_id: int or long
678
    @param job_id: the numeric job id
679
    @rtype: str
680
    @return: the formatted job id
681

  
682
    """
498 683
    if not isinstance(job_id, (int, long)):
499 684
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
500 685
    if job_id < 0:
......
507 692

  
508 693
    Job identifiers are unique during the lifetime of a cluster.
509 694

  
510
    Returns: A string representing the job identifier.
695
    @rtype: str
696
    @return: a string representing the job identifier.
511 697

  
512 698
    """
513 699
    # New number
......
524 710

  
525 711
  @staticmethod
526 712
  def _GetJobPath(job_id):
713
    """Returns the job file for a given job id.
714

  
715
    @type job_id: str
716
    @param job_id: the job identifier
717
    @rtype: str
718
    @return: the path to the job file
719

  
720
    """
527 721
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
528 722

  
529 723
  @staticmethod
530 724
  def _GetArchivedJobPath(job_id):
725
    """Returns the archived job file for a give job id.
726

  
727
    @type job_id: str
728
    @param job_id: the job identifier
729
    @rtype: str
730
    @return: the path to the archived job file
731

  
732
    """
531 733
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
532 734

  
533 735
  @classmethod
534 736
  def _ExtractJobID(cls, name):
737
    """Extract the job id from a filename.
738

  
739
    @type name: str
740
    @param name: the job filename
741
    @rtype: job id or None
742
    @return: the job id corresponding to the given filename,
743
        or None if the filename does not represent a valid
744
        job file
745

  
746
    """
535 747
    m = cls._RE_JOB_FILE.match(name)
536 748
    if m:
537 749
      return m.group(1)
......
548 760
    jobs are present on disk (so in the _memcache we don't have any
549 761
    extra IDs).
550 762

  
763
    @rtype: list
764
    @return: the list of job IDs
765

  
551 766
    """
552 767
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
553 768
    jlist = utils.NiceSort(jlist)
554 769
    return jlist
555 770

  
556 771
  def _ListJobFiles(self):
772
    """Returns the list of current job files.
773

  
774
    @rtype: list
775
    @return: the list of job file names
776

  
777
    """
557 778
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
558 779
            if self._RE_JOB_FILE.match(name)]
559 780

  
560 781
  def _LoadJobUnlocked(self, job_id):
782
    """Loads a job from the disk or memory.
783

  
784
    Given a job id, this will return the cached job object if
785
    existing, or try to load the job from the disk. If loading from
786
    disk, it will also add the job to the cache.
787

  
788
    @param job_id: the job id
789
    @rtype: L{_QueuedJob} or None
790
    @return: either None or the job object
791

  
792
    """
561 793
    job = self._memcache.get(job_id, None)
562 794
    if job:
563 795
      logging.debug("Found job %s in memcache", job_id)
......
594 826
    return job
595 827

  
596 828
  def _GetJobsUnlocked(self, job_ids):
829
    """Return a list of jobs based on their IDs.
830

  
831
    @type job_ids: list
832
    @param job_ids: either an empty list (meaning all jobs),
833
        or a list of job IDs
834
    @rtype: list
835
    @return: the list of job objects
836

  
837
    """
597 838
    if not job_ids:
598 839
      job_ids = self._GetJobIDsUnlocked()
599 840

  
......
606 847
    This currently uses the queue drain file, which makes it a
607 848
    per-node flag. In the future this can be moved to the config file.
608 849

  
850
    @rtype: boolean
851
    @return: True of the job queue is marked for draining
852

  
609 853
    """
610 854
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
611 855

  
......
616 860
    This is similar to the function L{backend.JobQueueSetDrainFlag},
617 861
    and in the future we might merge them.
618 862

  
863
    @type drain_flag: boolean
864
    @param drain_flag: wheter to set or unset the drain flag
865

  
619 866
    """
620 867
    if drain_flag:
621 868
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
......
633 880

  
634 881
    @type ops: list
635 882
    @param ops: The list of OpCodes that will become the new job.
883
    @rtype: job ID
884
    @return: the job ID of the newly created job
885
    @raise errors.JobQueueDrainError: if the job is marked for draining
636 886

  
637 887
    """
638 888
    if self._IsQueueMarkedDrain():
......
654 904

  
655 905
  @_RequireOpenQueue
656 906
  def UpdateJobUnlocked(self, job):
907
    """Update a job's on disk storage.
908

  
909
    After a job has been modified, this function needs to be called in
910
    order to write the changes to disk and replicate them to the other
911
    nodes.
912

  
913
    @type job: L{_QueuedJob}
914
    @param job: the changed job
915

  
916
    """
657 917
    filename = self._GetJobPath(job.id)
658 918
    data = serializer.DumpJson(job.Serialize(), indent=False)
659 919
    logging.debug("Writing job %s to %s", job.id, filename)
......
678 938
    @param prev_log_serial: Last job message serial number
679 939
    @type timeout: float
680 940
    @param timeout: maximum time to wait
941
    @rtype: tuple (job info, log entries)
942
    @return: a tuple of the job information as required via
943
        the fields parameter, and the log entries as a list
944

  
945
        if the job has not changed and the timeout has expired,
946
        we instead return a special value,
947
        L{constants.JOB_NOTCHANGED}, which should be interpreted
948
        as such by the clients
681 949

  
682 950
    """
683 951
    logging.debug("Waiting for changes in job %s", job_id)
......
729 997
  def CancelJob(self, job_id):
730 998
    """Cancels a job.
731 999

  
1000
    This will only succeed if the job has not started yet.
1001

  
732 1002
    @type job_id: string
733
    @param job_id: Job ID of job to be cancelled.
1003
    @param job_id: job ID of job to be cancelled.
734 1004

  
735 1005
    """
736 1006
    logging.debug("Cancelling job %s", job_id)
......
784 1054
  def ArchiveJob(self, job_id):
785 1055
    """Archives a job.
786 1056

  
1057
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1058

  
787 1059
    @type job_id: string
788 1060
    @param job_id: Job ID of job to be archived.
789 1061

  
......
825 1097
        self._ArchiveJobUnlocked(jid)
826 1098

  
827 1099
  def _GetJobInfoUnlocked(self, job, fields):
1100
    """Returns information about a job.
1101

  
1102
    @type job: L{_QueuedJob}
1103
    @param job: the job which we query
1104
    @type fields: list
1105
    @param fields: names of fields to return
1106
    @rtype: list
1107
    @return: list with one element for each field
1108
    @raise errors.OpExecError: when an invalid field
1109
        has been passed
1110

  
1111
    """
828 1112
    row = []
829 1113
    for fname in fields:
830 1114
      if fname == "id":
......
860 1144
  def QueryJobs(self, job_ids, fields):
861 1145
    """Returns a list of jobs in queue.
862 1146

  
863
    Args:
864
    - job_ids: Sequence of job identifiers or None for all
865
    - fields: Names of fields to return
1147
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1148
    processing for each job.
1149

  
1150
    @type job_ids: list
1151
    @param job_ids: sequence of job identifiers or None for all
1152
    @type fields: list
1153
    @param fields: names of fields to return
1154
    @rtype: list
1155
    @return: list one element per job, each element being list with
1156
        the requested fields
866 1157

  
867 1158
    """
868 1159
    jobs = []
......
880 1171
  def Shutdown(self):
881 1172
    """Stops the job queue.
882 1173

  
1174
    This shutdowns all the worker threads an closes the queue.
1175

  
883 1176
    """
884 1177
    self._wpool.TerminateWorkers()
885 1178

  

Also available in: Unified diff