Revision ea03467c lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
21 | 21 |
|
22 | 22 |
"""Module implementing the job queue handling. |
23 | 23 |
|
24 |
Locking: |
|
25 |
There's a single, large lock in the JobQueue class. It's used by all other |
|
26 |
classes in this module. |
|
24 |
Locking: there's a single, large lock in the L{JobQueue} class. It's |
|
25 |
used by all other classes in this module. |
|
26 |
|
|
27 |
@var JOBQUEUE_THREADS: the number of worker threads we start for |
|
28 |
processing jobs |
|
27 | 29 |
|
28 | 30 |
""" |
29 | 31 |
|
... | ... | |
51 | 53 |
|
52 | 54 |
|
53 | 55 |
def TimeStampNow(): |
56 |
"""Returns the current timestamp. |
|
57 |
|
|
58 |
@rtype: tuple |
|
59 |
@return: the current time in the (seconds, microseconds) format |
|
60 |
|
|
61 |
""" |
|
54 | 62 |
return utils.SplitTime(time.time()) |
55 | 63 |
|
56 | 64 |
|
57 | 65 |
class _QueuedOpCode(object): |
58 | 66 |
"""Encasulates an opcode object. |
59 | 67 |
|
60 |
The 'log' attribute holds the execution log and consists of tuples |
|
61 |
of the form (log_serial, timestamp, level, message). |
|
68 |
@ivar log: holds the execution log and consists of tuples |
|
69 |
of the form C{(log_serial, timestamp, level, message)} |
|
70 |
@ivar input: the OpCode we encapsulate |
|
71 |
@ivar status: the current status |
|
72 |
@ivar result: the result of the LU execution |
|
73 |
@ivar start_timestamp: timestamp for the start of the execution |
|
74 |
@ivar stop_timestamp: timestamp for the end of the execution |
|
62 | 75 |
|
63 | 76 |
""" |
64 | 77 |
def __init__(self, op): |
78 |
"""Constructor for the _QuededOpCode. |
|
79 |
|
|
80 |
@type op: L{opcodes.OpCode} |
|
81 |
@param op: the opcode we encapsulate |
|
82 |
|
|
83 |
""" |
|
65 | 84 |
self.input = op |
66 | 85 |
self.status = constants.OP_STATUS_QUEUED |
67 | 86 |
self.result = None |
... | ... | |
71 | 90 |
|
72 | 91 |
@classmethod |
73 | 92 |
def Restore(cls, state): |
93 |
"""Restore the _QueuedOpCode from the serialized form. |
|
94 |
|
|
95 |
@type state: dict |
|
96 |
@param state: the serialized state |
|
97 |
@rtype: _QueuedOpCode |
|
98 |
@return: a new _QueuedOpCode instance |
|
99 |
|
|
100 |
""" |
|
74 | 101 |
obj = _QueuedOpCode.__new__(cls) |
75 | 102 |
obj.input = opcodes.OpCode.LoadOpCode(state["input"]) |
76 | 103 |
obj.status = state["status"] |
... | ... | |
81 | 108 |
return obj |
82 | 109 |
|
83 | 110 |
def Serialize(self): |
111 |
"""Serializes this _QueuedOpCode. |
|
112 |
|
|
113 |
@rtype: dict |
|
114 |
@return: the dictionary holding the serialized state |
|
115 |
|
|
116 |
""" |
|
84 | 117 |
return { |
85 | 118 |
"input": self.input.__getstate__(), |
86 | 119 |
"status": self.status, |
... | ... | |
94 | 127 |
class _QueuedJob(object): |
95 | 128 |
"""In-memory job representation. |
96 | 129 |
|
97 |
This is what we use to track the user-submitted jobs. Locking must be taken |
|
98 |
care of by users of this class. |
|
130 |
This is what we use to track the user-submitted jobs. Locking must |
|
131 |
be taken care of by users of this class. |
|
132 |
|
|
133 |
@type queue: L{JobQueue} |
|
134 |
@ivar queue: the parent queue |
|
135 |
@ivar id: the job ID |
|
136 |
@type ops: list |
|
137 |
@ivar ops: the list of _QueuedOpCode that constitute the job |
|
138 |
@type run_op_index: int |
|
139 |
@ivar run_op_index: the currently executing opcode, or -1 if |
|
140 |
we didn't yet start executing |
|
141 |
@type log_serial: int |
|
142 |
@ivar log_serial: holds the index for the next log entry |
|
143 |
@ivar received_timestamp: the timestamp for when the job was received |
|
144 |
@ivar start_timestmap: the timestamp for start of execution |
|
145 |
@ivar end_timestamp: the timestamp for end of execution |
|
146 |
@ivar change: a Condition variable we use for waiting for job changes |
|
99 | 147 |
|
100 | 148 |
""" |
101 | 149 |
def __init__(self, queue, job_id, ops): |
150 |
"""Constructor for the _QueuedJob. |
|
151 |
|
|
152 |
@type queue: L{JobQueue} |
|
153 |
@param queue: our parent queue |
|
154 |
@type job_id: job_id |
|
155 |
@param job_id: our job id |
|
156 |
@type ops: list |
|
157 |
@param ops: the list of opcodes we hold, which will be encapsulated |
|
158 |
in _QueuedOpCodes |
|
159 |
|
|
160 |
""" |
|
102 | 161 |
if not ops: |
103 |
# TODO |
|
162 |
# TODO: use a better exception
|
|
104 | 163 |
raise Exception("No opcodes") |
105 | 164 |
|
106 | 165 |
self.queue = queue |
... | ... | |
117 | 176 |
|
118 | 177 |
@classmethod |
119 | 178 |
def Restore(cls, queue, state): |
179 |
"""Restore a _QueuedJob from serialized state: |
|
180 |
|
|
181 |
@type queue: L{JobQueue} |
|
182 |
@param queue: to which queue the restored job belongs |
|
183 |
@type state: dict |
|
184 |
@param state: the serialized state |
|
185 |
@rtype: _JobQueue |
|
186 |
@return: the restored _JobQueue instance |
|
187 |
|
|
188 |
""" |
|
120 | 189 |
obj = _QueuedJob.__new__(cls) |
121 | 190 |
obj.queue = queue |
122 | 191 |
obj.id = state["id"] |
... | ... | |
139 | 208 |
return obj |
140 | 209 |
|
141 | 210 |
def Serialize(self): |
211 |
"""Serialize the _JobQueue instance. |
|
212 |
|
|
213 |
@rtype: dict |
|
214 |
@return: the serialized state |
|
215 |
|
|
216 |
""" |
|
142 | 217 |
return { |
143 | 218 |
"id": self.id, |
144 | 219 |
"ops": [op.Serialize() for op in self.ops], |
... | ... | |
149 | 224 |
} |
150 | 225 |
|
151 | 226 |
def CalcStatus(self): |
227 |
"""Compute the status of this job. |
|
228 |
|
|
229 |
This function iterates over all the _QueuedOpCodes in the job and |
|
230 |
based on their status, computes the job status. |
|
231 |
|
|
232 |
The algorithm is: |
|
233 |
- if we find a cancelled, or finished with error, the job |
|
234 |
status will be the same |
|
235 |
- otherwise, the last opcode with the status one of: |
|
236 |
- waitlock |
|
237 |
- running |
|
238 |
|
|
239 |
will determine the job status |
|
240 |
|
|
241 |
- otherwise, it means either all opcodes are queued, or success, |
|
242 |
and the job status will be the same |
|
243 |
|
|
244 |
@return: the job status |
|
245 |
|
|
246 |
""" |
|
152 | 247 |
status = constants.JOB_STATUS_QUEUED |
153 | 248 |
|
154 | 249 |
all_success = True |
... | ... | |
178 | 273 |
return status |
179 | 274 |
|
180 | 275 |
def GetLogEntries(self, newer_than): |
276 |
"""Selectively returns the log entries. |
|
277 |
|
|
278 |
@type newer_than: None or int |
|
279 |
@param newer_than: if this is None, return all log enties, |
|
280 |
otherwise return only the log entries with serial higher |
|
281 |
than this value |
|
282 |
@rtype: list |
|
283 |
@return: the list of the log entries selected |
|
284 |
|
|
285 |
""" |
|
181 | 286 |
if newer_than is None: |
182 | 287 |
serial = -1 |
183 | 288 |
else: |
... | ... | |
191 | 296 |
|
192 | 297 |
|
193 | 298 |
class _JobQueueWorker(workerpool.BaseWorker): |
299 |
"""The actual job workers. |
|
300 |
|
|
301 |
""" |
|
194 | 302 |
def _NotifyStart(self): |
195 | 303 |
"""Mark the opcode as running, not lock-waiting. |
196 | 304 |
|
... | ... | |
215 | 323 |
This functions processes a job. It is closely tied to the _QueuedJob and |
216 | 324 |
_QueuedOpCode classes. |
217 | 325 |
|
326 |
@type job: L{_QueuedJob} |
|
327 |
@param job: the job to be processed |
|
328 |
|
|
218 | 329 |
""" |
219 | 330 |
logging.debug("Worker %s processing job %s", |
220 | 331 |
self.worker_id, job.id) |
... | ... | |
316 | 427 |
|
317 | 428 |
|
318 | 429 |
class _JobQueueWorkerPool(workerpool.WorkerPool): |
430 |
"""Simple class implementing a job-processing workerpool. |
|
431 |
|
|
432 |
""" |
|
319 | 433 |
def __init__(self, queue): |
320 | 434 |
super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS, |
321 | 435 |
_JobQueueWorker) |
... | ... | |
323 | 437 |
|
324 | 438 |
|
325 | 439 |
class JobQueue(object): |
440 |
"""Quue used to manaage the jobs. |
|
441 |
|
|
442 |
@cvar _RE_JOB_FILE: regex matching the valid job file names |
|
443 |
|
|
444 |
""" |
|
326 | 445 |
_RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE) |
327 | 446 |
|
328 | 447 |
def _RequireOpenQueue(fn): |
329 | 448 |
"""Decorator for "public" functions. |
330 | 449 |
|
331 |
This function should be used for all "public" functions. That is, functions
|
|
332 |
usually called from other classes. |
|
450 |
This function should be used for all 'public' functions. That is,
|
|
451 |
functions usually called from other classes.
|
|
333 | 452 |
|
334 |
Important: Use this decorator only after utils.LockedMethod!
|
|
453 |
@warning: Use this decorator only after utils.LockedMethod!
|
|
335 | 454 |
|
336 |
Example: |
|
455 |
Example::
|
|
337 | 456 |
@utils.LockedMethod |
338 | 457 |
@_RequireOpenQueue |
339 | 458 |
def Example(self): |
... | ... | |
346 | 465 |
return wrapper |
347 | 466 |
|
348 | 467 |
def __init__(self, context): |
468 |
"""Constructor for JobQueue. |
|
469 |
|
|
470 |
The constructor will initialize the job queue object and then |
|
471 |
start loading the current jobs from disk, either for starting them |
|
472 |
(if they were queue) or for aborting them (if they were already |
|
473 |
running). |
|
474 |
|
|
475 |
@type context: GanetiContext |
|
476 |
@param context: the context object for access to the configuration |
|
477 |
data and other ganeti objects |
|
478 |
|
|
479 |
""" |
|
349 | 480 |
self.context = context |
350 | 481 |
self._memcache = weakref.WeakValueDictionary() |
351 | 482 |
self._my_hostname = utils.HostInfo().name |
... | ... | |
443 | 574 |
@utils.LockedMethod |
444 | 575 |
@_RequireOpenQueue |
445 | 576 |
def RemoveNode(self, node_name): |
577 |
"""Callback called when removing nodes from the cluster. |
|
578 |
|
|
579 |
@type node_name: str |
|
580 |
@param node_name: the name of the node to remove |
|
581 |
|
|
582 |
""" |
|
446 | 583 |
try: |
447 | 584 |
# The queue is removed by the "leave node" RPC call. |
448 | 585 |
del self._nodes[node_name] |
... | ... | |
450 | 587 |
pass |
451 | 588 |
|
452 | 589 |
def _CheckRpcResult(self, result, nodes, failmsg): |
590 |
"""Verifies the status of an RPC call. |
|
591 |
|
|
592 |
Since we aim to keep consistency should this node (the current |
|
593 |
master) fail, we will log errors if our rpc fail, and especially |
|
594 |
log the case when more than half of the nodes failes. |
|
595 |
|
|
596 |
@param result: the data as returned from the rpc call |
|
597 |
@type nodes: list |
|
598 |
@param nodes: the list of nodes we made the call to |
|
599 |
@type failmsg: str |
|
600 |
@param failmsg: the identifier to be used for logging |
|
601 |
|
|
602 |
""" |
|
453 | 603 |
failed = [] |
454 | 604 |
success = [] |
455 | 605 |
|
... | ... | |
470 | 620 |
def _GetNodeIp(self): |
471 | 621 |
"""Helper for returning the node name/ip list. |
472 | 622 |
|
623 |
@rtype: (list, list) |
|
624 |
@return: a tuple of two lists, the first one with the node |
|
625 |
names and the second one with the node addresses |
|
626 |
|
|
473 | 627 |
""" |
474 | 628 |
name_list = self._nodes.keys() |
475 | 629 |
addr_list = [self._nodes[name] for name in name_list] |
... | ... | |
478 | 632 |
def _WriteAndReplicateFileUnlocked(self, file_name, data): |
479 | 633 |
"""Writes a file locally and then replicates it to all nodes. |
480 | 634 |
|
635 |
This function will replace the contents of a file on the local |
|
636 |
node and then replicate it to all the other nodes we have. |
|
637 |
|
|
638 |
@type file_name: str |
|
639 |
@param file_name: the path of the file to be replicated |
|
640 |
@type data: str |
|
641 |
@param data: the new contents of the file |
|
642 |
|
|
481 | 643 |
""" |
482 | 644 |
utils.WriteFile(file_name, data=data) |
483 | 645 |
|
... | ... | |
487 | 649 |
"Updating %s" % file_name) |
488 | 650 |
|
489 | 651 |
def _RenameFileUnlocked(self, old, new): |
652 |
"""Renames a file locally and then replicate the change. |
|
653 |
|
|
654 |
This function will rename a file in the local queue directory |
|
655 |
and then replicate this rename to all the other nodes we have. |
|
656 |
|
|
657 |
@type old: str |
|
658 |
@param old: the current name of the file |
|
659 |
@type new: str |
|
660 |
@param new: the new name of the file |
|
661 |
|
|
662 |
""" |
|
490 | 663 |
os.rename(old, new) |
491 | 664 |
|
492 | 665 |
names, addrs = self._GetNodeIp() |
... | ... | |
495 | 668 |
"Moving %s to %s" % (old, new)) |
496 | 669 |
|
497 | 670 |
def _FormatJobID(self, job_id): |
671 |
"""Convert a job ID to string format. |
|
672 |
|
|
673 |
Currently this just does C{str(job_id)} after performing some |
|
674 |
checks, but if we want to change the job id format this will |
|
675 |
abstract this change. |
|
676 |
|
|
677 |
@type job_id: int or long |
|
678 |
@param job_id: the numeric job id |
|
679 |
@rtype: str |
|
680 |
@return: the formatted job id |
|
681 |
|
|
682 |
""" |
|
498 | 683 |
if not isinstance(job_id, (int, long)): |
499 | 684 |
raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) |
500 | 685 |
if job_id < 0: |
... | ... | |
507 | 692 |
|
508 | 693 |
Job identifiers are unique during the lifetime of a cluster. |
509 | 694 |
|
510 |
Returns: A string representing the job identifier. |
|
695 |
@rtype: str |
|
696 |
@return: a string representing the job identifier. |
|
511 | 697 |
|
512 | 698 |
""" |
513 | 699 |
# New number |
... | ... | |
524 | 710 |
|
525 | 711 |
@staticmethod |
526 | 712 |
def _GetJobPath(job_id): |
713 |
"""Returns the job file for a given job id. |
|
714 |
|
|
715 |
@type job_id: str |
|
716 |
@param job_id: the job identifier |
|
717 |
@rtype: str |
|
718 |
@return: the path to the job file |
|
719 |
|
|
720 |
""" |
|
527 | 721 |
return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id) |
528 | 722 |
|
529 | 723 |
@staticmethod |
530 | 724 |
def _GetArchivedJobPath(job_id): |
725 |
"""Returns the archived job file for a give job id. |
|
726 |
|
|
727 |
@type job_id: str |
|
728 |
@param job_id: the job identifier |
|
729 |
@rtype: str |
|
730 |
@return: the path to the archived job file |
|
731 |
|
|
732 |
""" |
|
531 | 733 |
return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id) |
532 | 734 |
|
533 | 735 |
@classmethod |
534 | 736 |
def _ExtractJobID(cls, name): |
737 |
"""Extract the job id from a filename. |
|
738 |
|
|
739 |
@type name: str |
|
740 |
@param name: the job filename |
|
741 |
@rtype: job id or None |
|
742 |
@return: the job id corresponding to the given filename, |
|
743 |
or None if the filename does not represent a valid |
|
744 |
job file |
|
745 |
|
|
746 |
""" |
|
535 | 747 |
m = cls._RE_JOB_FILE.match(name) |
536 | 748 |
if m: |
537 | 749 |
return m.group(1) |
... | ... | |
548 | 760 |
jobs are present on disk (so in the _memcache we don't have any |
549 | 761 |
extra IDs). |
550 | 762 |
|
763 |
@rtype: list |
|
764 |
@return: the list of job IDs |
|
765 |
|
|
551 | 766 |
""" |
552 | 767 |
jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()] |
553 | 768 |
jlist = utils.NiceSort(jlist) |
554 | 769 |
return jlist |
555 | 770 |
|
556 | 771 |
def _ListJobFiles(self): |
772 |
"""Returns the list of current job files. |
|
773 |
|
|
774 |
@rtype: list |
|
775 |
@return: the list of job file names |
|
776 |
|
|
777 |
""" |
|
557 | 778 |
return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR) |
558 | 779 |
if self._RE_JOB_FILE.match(name)] |
559 | 780 |
|
560 | 781 |
def _LoadJobUnlocked(self, job_id): |
782 |
"""Loads a job from the disk or memory. |
|
783 |
|
|
784 |
Given a job id, this will return the cached job object if |
|
785 |
existing, or try to load the job from the disk. If loading from |
|
786 |
disk, it will also add the job to the cache. |
|
787 |
|
|
788 |
@param job_id: the job id |
|
789 |
@rtype: L{_QueuedJob} or None |
|
790 |
@return: either None or the job object |
|
791 |
|
|
792 |
""" |
|
561 | 793 |
job = self._memcache.get(job_id, None) |
562 | 794 |
if job: |
563 | 795 |
logging.debug("Found job %s in memcache", job_id) |
... | ... | |
594 | 826 |
return job |
595 | 827 |
|
596 | 828 |
def _GetJobsUnlocked(self, job_ids): |
829 |
"""Return a list of jobs based on their IDs. |
|
830 |
|
|
831 |
@type job_ids: list |
|
832 |
@param job_ids: either an empty list (meaning all jobs), |
|
833 |
or a list of job IDs |
|
834 |
@rtype: list |
|
835 |
@return: the list of job objects |
|
836 |
|
|
837 |
""" |
|
597 | 838 |
if not job_ids: |
598 | 839 |
job_ids = self._GetJobIDsUnlocked() |
599 | 840 |
|
... | ... | |
606 | 847 |
This currently uses the queue drain file, which makes it a |
607 | 848 |
per-node flag. In the future this can be moved to the config file. |
608 | 849 |
|
850 |
@rtype: boolean |
|
851 |
@return: True of the job queue is marked for draining |
|
852 |
|
|
609 | 853 |
""" |
610 | 854 |
return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) |
611 | 855 |
|
... | ... | |
616 | 860 |
This is similar to the function L{backend.JobQueueSetDrainFlag}, |
617 | 861 |
and in the future we might merge them. |
618 | 862 |
|
863 |
@type drain_flag: boolean |
|
864 |
@param drain_flag: wheter to set or unset the drain flag |
|
865 |
|
|
619 | 866 |
""" |
620 | 867 |
if drain_flag: |
621 | 868 |
utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) |
... | ... | |
633 | 880 |
|
634 | 881 |
@type ops: list |
635 | 882 |
@param ops: The list of OpCodes that will become the new job. |
883 |
@rtype: job ID |
|
884 |
@return: the job ID of the newly created job |
|
885 |
@raise errors.JobQueueDrainError: if the job is marked for draining |
|
636 | 886 |
|
637 | 887 |
""" |
638 | 888 |
if self._IsQueueMarkedDrain(): |
... | ... | |
654 | 904 |
|
655 | 905 |
@_RequireOpenQueue |
656 | 906 |
def UpdateJobUnlocked(self, job): |
907 |
"""Update a job's on disk storage. |
|
908 |
|
|
909 |
After a job has been modified, this function needs to be called in |
|
910 |
order to write the changes to disk and replicate them to the other |
|
911 |
nodes. |
|
912 |
|
|
913 |
@type job: L{_QueuedJob} |
|
914 |
@param job: the changed job |
|
915 |
|
|
916 |
""" |
|
657 | 917 |
filename = self._GetJobPath(job.id) |
658 | 918 |
data = serializer.DumpJson(job.Serialize(), indent=False) |
659 | 919 |
logging.debug("Writing job %s to %s", job.id, filename) |
... | ... | |
678 | 938 |
@param prev_log_serial: Last job message serial number |
679 | 939 |
@type timeout: float |
680 | 940 |
@param timeout: maximum time to wait |
941 |
@rtype: tuple (job info, log entries) |
|
942 |
@return: a tuple of the job information as required via |
|
943 |
the fields parameter, and the log entries as a list |
|
944 |
|
|
945 |
if the job has not changed and the timeout has expired, |
|
946 |
we instead return a special value, |
|
947 |
L{constants.JOB_NOTCHANGED}, which should be interpreted |
|
948 |
as such by the clients |
|
681 | 949 |
|
682 | 950 |
""" |
683 | 951 |
logging.debug("Waiting for changes in job %s", job_id) |
... | ... | |
729 | 997 |
def CancelJob(self, job_id): |
730 | 998 |
"""Cancels a job. |
731 | 999 |
|
1000 |
This will only succeed if the job has not started yet. |
|
1001 |
|
|
732 | 1002 |
@type job_id: string |
733 |
@param job_id: Job ID of job to be cancelled.
|
|
1003 |
@param job_id: job ID of job to be cancelled.
|
|
734 | 1004 |
|
735 | 1005 |
""" |
736 | 1006 |
logging.debug("Cancelling job %s", job_id) |
... | ... | |
784 | 1054 |
def ArchiveJob(self, job_id): |
785 | 1055 |
"""Archives a job. |
786 | 1056 |
|
1057 |
This is just a wrapper over L{_ArchiveJobUnlocked}. |
|
1058 |
|
|
787 | 1059 |
@type job_id: string |
788 | 1060 |
@param job_id: Job ID of job to be archived. |
789 | 1061 |
|
... | ... | |
825 | 1097 |
self._ArchiveJobUnlocked(jid) |
826 | 1098 |
|
827 | 1099 |
def _GetJobInfoUnlocked(self, job, fields): |
1100 |
"""Returns information about a job. |
|
1101 |
|
|
1102 |
@type job: L{_QueuedJob} |
|
1103 |
@param job: the job which we query |
|
1104 |
@type fields: list |
|
1105 |
@param fields: names of fields to return |
|
1106 |
@rtype: list |
|
1107 |
@return: list with one element for each field |
|
1108 |
@raise errors.OpExecError: when an invalid field |
|
1109 |
has been passed |
|
1110 |
|
|
1111 |
""" |
|
828 | 1112 |
row = [] |
829 | 1113 |
for fname in fields: |
830 | 1114 |
if fname == "id": |
... | ... | |
860 | 1144 |
def QueryJobs(self, job_ids, fields): |
861 | 1145 |
"""Returns a list of jobs in queue. |
862 | 1146 |
|
863 |
Args: |
|
864 |
- job_ids: Sequence of job identifiers or None for all |
|
865 |
- fields: Names of fields to return |
|
1147 |
This is a wrapper of L{_GetJobsUnlocked}, which actually does the |
|
1148 |
processing for each job. |
|
1149 |
|
|
1150 |
@type job_ids: list |
|
1151 |
@param job_ids: sequence of job identifiers or None for all |
|
1152 |
@type fields: list |
|
1153 |
@param fields: names of fields to return |
|
1154 |
@rtype: list |
|
1155 |
@return: list one element per job, each element being list with |
|
1156 |
the requested fields |
|
866 | 1157 |
|
867 | 1158 |
""" |
868 | 1159 |
jobs = [] |
... | ... | |
880 | 1171 |
def Shutdown(self): |
881 | 1172 |
"""Stops the job queue. |
882 | 1173 |
|
1174 |
This shutdowns all the worker threads an closes the queue. |
|
1175 |
|
|
883 | 1176 |
""" |
884 | 1177 |
self._wpool.TerminateWorkers() |
885 | 1178 |
|
Also available in: Unified diff