Rename OpRemoveInstance and LURemoveInstance
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40   # pylint: disable-msg=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59
60
61 JOBQUEUE_THREADS = 25
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63
64 # member lock names to be passed to @ssynchronized decorator
65 _LOCK = "_lock"
66 _QUEUE = "_queue"
67
68
69 class CancelJob(Exception):
70   """Special exception to cancel a job.
71
72   """
73
74
75 def TimeStampNow():
76   """Returns the current timestamp.
77
78   @rtype: tuple
79   @return: the current time in the (seconds, microseconds) format
80
81   """
82   return utils.SplitTime(time.time())
83
84
85 class _QueuedOpCode(object):
86   """Encapsulates an opcode object.
87
88   @ivar log: holds the execution log and consists of tuples
89   of the form C{(log_serial, timestamp, level, message)}
90   @ivar input: the OpCode we encapsulate
91   @ivar status: the current status
92   @ivar result: the result of the LU execution
93   @ivar start_timestamp: timestamp for the start of the execution
94   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95   @ivar stop_timestamp: timestamp for the end of the execution
96
97   """
98   __slots__ = ["input", "status", "result", "log", "priority",
99                "start_timestamp", "exec_timestamp", "end_timestamp",
100                "__weakref__"]
101
102   def __init__(self, op):
103     """Constructor for the _QuededOpCode.
104
105     @type op: L{opcodes.OpCode}
106     @param op: the opcode we encapsulate
107
108     """
109     self.input = op
110     self.status = constants.OP_STATUS_QUEUED
111     self.result = None
112     self.log = []
113     self.start_timestamp = None
114     self.exec_timestamp = None
115     self.end_timestamp = None
116
117     # Get initial priority (it might change during the lifetime of this opcode)
118     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119
120   @classmethod
121   def Restore(cls, state):
122     """Restore the _QueuedOpCode from the serialized form.
123
124     @type state: dict
125     @param state: the serialized state
126     @rtype: _QueuedOpCode
127     @return: a new _QueuedOpCode instance
128
129     """
130     obj = _QueuedOpCode.__new__(cls)
131     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132     obj.status = state["status"]
133     obj.result = state["result"]
134     obj.log = state["log"]
135     obj.start_timestamp = state.get("start_timestamp", None)
136     obj.exec_timestamp = state.get("exec_timestamp", None)
137     obj.end_timestamp = state.get("end_timestamp", None)
138     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139     return obj
140
141   def Serialize(self):
142     """Serializes this _QueuedOpCode.
143
144     @rtype: dict
145     @return: the dictionary holding the serialized state
146
147     """
148     return {
149       "input": self.input.__getstate__(),
150       "status": self.status,
151       "result": self.result,
152       "log": self.log,
153       "start_timestamp": self.start_timestamp,
154       "exec_timestamp": self.exec_timestamp,
155       "end_timestamp": self.end_timestamp,
156       "priority": self.priority,
157       }
158
159
160 class _QueuedJob(object):
161   """In-memory job representation.
162
163   This is what we use to track the user-submitted jobs. Locking must
164   be taken care of by users of this class.
165
166   @type queue: L{JobQueue}
167   @ivar queue: the parent queue
168   @ivar id: the job ID
169   @type ops: list
170   @ivar ops: the list of _QueuedOpCode that constitute the job
171   @type log_serial: int
172   @ivar log_serial: holds the index for the next log entry
173   @ivar received_timestamp: the timestamp for when the job was received
174   @ivar start_timestmap: the timestamp for start of execution
175   @ivar end_timestamp: the timestamp for end of execution
176
177   """
178   # pylint: disable-msg=W0212
179   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180                "received_timestamp", "start_timestamp", "end_timestamp",
181                "__weakref__"]
182
183   def __init__(self, queue, job_id, ops):
184     """Constructor for the _QueuedJob.
185
186     @type queue: L{JobQueue}
187     @param queue: our parent queue
188     @type job_id: job_id
189     @param job_id: our job id
190     @type ops: list
191     @param ops: the list of opcodes we hold, which will be encapsulated
192         in _QueuedOpCodes
193
194     """
195     if not ops:
196       raise errors.GenericError("A job needs at least one opcode")
197
198     self.queue = queue
199     self.id = job_id
200     self.ops = [_QueuedOpCode(op) for op in ops]
201     self.log_serial = 0
202     self.received_timestamp = TimeStampNow()
203     self.start_timestamp = None
204     self.end_timestamp = None
205
206     self._InitInMemory(self)
207
208   @staticmethod
209   def _InitInMemory(obj):
210     """Initializes in-memory variables.
211
212     """
213     obj.ops_iter = None
214     obj.cur_opctx = None
215
216   def __repr__(self):
217     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218               "id=%s" % self.id,
219               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220
221     return "<%s at %#x>" % (" ".join(status), id(self))
222
223   @classmethod
224   def Restore(cls, queue, state):
225     """Restore a _QueuedJob from serialized state:
226
227     @type queue: L{JobQueue}
228     @param queue: to which queue the restored job belongs
229     @type state: dict
230     @param state: the serialized state
231     @rtype: _JobQueue
232     @return: the restored _JobQueue instance
233
234     """
235     obj = _QueuedJob.__new__(cls)
236     obj.queue = queue
237     obj.id = state["id"]
238     obj.received_timestamp = state.get("received_timestamp", None)
239     obj.start_timestamp = state.get("start_timestamp", None)
240     obj.end_timestamp = state.get("end_timestamp", None)
241
242     obj.ops = []
243     obj.log_serial = 0
244     for op_state in state["ops"]:
245       op = _QueuedOpCode.Restore(op_state)
246       for log_entry in op.log:
247         obj.log_serial = max(obj.log_serial, log_entry[0])
248       obj.ops.append(op)
249
250     cls._InitInMemory(obj)
251
252     return obj
253
254   def Serialize(self):
255     """Serialize the _JobQueue instance.
256
257     @rtype: dict
258     @return: the serialized state
259
260     """
261     return {
262       "id": self.id,
263       "ops": [op.Serialize() for op in self.ops],
264       "start_timestamp": self.start_timestamp,
265       "end_timestamp": self.end_timestamp,
266       "received_timestamp": self.received_timestamp,
267       }
268
269   def CalcStatus(self):
270     """Compute the status of this job.
271
272     This function iterates over all the _QueuedOpCodes in the job and
273     based on their status, computes the job status.
274
275     The algorithm is:
276       - if we find a cancelled, or finished with error, the job
277         status will be the same
278       - otherwise, the last opcode with the status one of:
279           - waitlock
280           - canceling
281           - running
282
283         will determine the job status
284
285       - otherwise, it means either all opcodes are queued, or success,
286         and the job status will be the same
287
288     @return: the job status
289
290     """
291     status = constants.JOB_STATUS_QUEUED
292
293     all_success = True
294     for op in self.ops:
295       if op.status == constants.OP_STATUS_SUCCESS:
296         continue
297
298       all_success = False
299
300       if op.status == constants.OP_STATUS_QUEUED:
301         pass
302       elif op.status == constants.OP_STATUS_WAITLOCK:
303         status = constants.JOB_STATUS_WAITLOCK
304       elif op.status == constants.OP_STATUS_RUNNING:
305         status = constants.JOB_STATUS_RUNNING
306       elif op.status == constants.OP_STATUS_CANCELING:
307         status = constants.JOB_STATUS_CANCELING
308         break
309       elif op.status == constants.OP_STATUS_ERROR:
310         status = constants.JOB_STATUS_ERROR
311         # The whole job fails if one opcode failed
312         break
313       elif op.status == constants.OP_STATUS_CANCELED:
314         status = constants.OP_STATUS_CANCELED
315         break
316
317     if all_success:
318       status = constants.JOB_STATUS_SUCCESS
319
320     return status
321
322   def CalcPriority(self):
323     """Gets the current priority for this job.
324
325     Only unfinished opcodes are considered. When all are done, the default
326     priority is used.
327
328     @rtype: int
329
330     """
331     priorities = [op.priority for op in self.ops
332                   if op.status not in constants.OPS_FINALIZED]
333
334     if not priorities:
335       # All opcodes are done, assume default priority
336       return constants.OP_PRIO_DEFAULT
337
338     return min(priorities)
339
340   def GetLogEntries(self, newer_than):
341     """Selectively returns the log entries.
342
343     @type newer_than: None or int
344     @param newer_than: if this is None, return all log entries,
345         otherwise return only the log entries with serial higher
346         than this value
347     @rtype: list
348     @return: the list of the log entries selected
349
350     """
351     if newer_than is None:
352       serial = -1
353     else:
354       serial = newer_than
355
356     entries = []
357     for op in self.ops:
358       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359
360     return entries
361
362   def GetInfo(self, fields):
363     """Returns information about a job.
364
365     @type fields: list
366     @param fields: names of fields to return
367     @rtype: list
368     @return: list with one element for each field
369     @raise errors.OpExecError: when an invalid field
370         has been passed
371
372     """
373     row = []
374     for fname in fields:
375       if fname == "id":
376         row.append(self.id)
377       elif fname == "status":
378         row.append(self.CalcStatus())
379       elif fname == "priority":
380         row.append(self.CalcPriority())
381       elif fname == "ops":
382         row.append([op.input.__getstate__() for op in self.ops])
383       elif fname == "opresult":
384         row.append([op.result for op in self.ops])
385       elif fname == "opstatus":
386         row.append([op.status for op in self.ops])
387       elif fname == "oplog":
388         row.append([op.log for op in self.ops])
389       elif fname == "opstart":
390         row.append([op.start_timestamp for op in self.ops])
391       elif fname == "opexec":
392         row.append([op.exec_timestamp for op in self.ops])
393       elif fname == "opend":
394         row.append([op.end_timestamp for op in self.ops])
395       elif fname == "oppriority":
396         row.append([op.priority for op in self.ops])
397       elif fname == "received_ts":
398         row.append(self.received_timestamp)
399       elif fname == "start_ts":
400         row.append(self.start_timestamp)
401       elif fname == "end_ts":
402         row.append(self.end_timestamp)
403       elif fname == "summary":
404         row.append([op.input.Summary() for op in self.ops])
405       else:
406         raise errors.OpExecError("Invalid self query field '%s'" % fname)
407     return row
408
409   def MarkUnfinishedOps(self, status, result):
410     """Mark unfinished opcodes with a given status and result.
411
412     This is an utility function for marking all running or waiting to
413     be run opcodes with a given status. Opcodes which are already
414     finalised are not changed.
415
416     @param status: a given opcode status
417     @param result: the opcode result
418
419     """
420     not_marked = True
421     for op in self.ops:
422       if op.status in constants.OPS_FINALIZED:
423         assert not_marked, "Finalized opcodes found after non-finalized ones"
424         continue
425       op.status = status
426       op.result = result
427       not_marked = False
428
429   def Cancel(self):
430     """Marks job as canceled/-ing if possible.
431
432     @rtype: tuple; (bool, string)
433     @return: Boolean describing whether job was successfully canceled or marked
434       as canceling and a text message
435
436     """
437     status = self.CalcStatus()
438
439     if status == constants.JOB_STATUS_QUEUED:
440       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
441                              "Job canceled by request")
442       return (True, "Job %s canceled" % self.id)
443
444     elif status == constants.JOB_STATUS_WAITLOCK:
445       # The worker will notice the new status and cancel the job
446       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
447       return (True, "Job %s will be canceled" % self.id)
448
449     else:
450       logging.debug("Job %s is no longer waiting in the queue", self.id)
451       return (False, "Job %s is no longer waiting in the queue" % self.id)
452
453
454 class _OpExecCallbacks(mcpu.OpExecCbBase):
455   def __init__(self, queue, job, op):
456     """Initializes this class.
457
458     @type queue: L{JobQueue}
459     @param queue: Job queue
460     @type job: L{_QueuedJob}
461     @param job: Job object
462     @type op: L{_QueuedOpCode}
463     @param op: OpCode
464
465     """
466     assert queue, "Queue is missing"
467     assert job, "Job is missing"
468     assert op, "Opcode is missing"
469
470     self._queue = queue
471     self._job = job
472     self._op = op
473
474   def _CheckCancel(self):
475     """Raises an exception to cancel the job if asked to.
476
477     """
478     # Cancel here if we were asked to
479     if self._op.status == constants.OP_STATUS_CANCELING:
480       logging.debug("Canceling opcode")
481       raise CancelJob()
482
483   @locking.ssynchronized(_QUEUE, shared=1)
484   def NotifyStart(self):
485     """Mark the opcode as running, not lock-waiting.
486
487     This is called from the mcpu code as a notifier function, when the LU is
488     finally about to start the Exec() method. Of course, to have end-user
489     visible results, the opcode must be initially (before calling into
490     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
491
492     """
493     assert self._op in self._job.ops
494     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
495                                constants.OP_STATUS_CANCELING)
496
497     # Cancel here if we were asked to
498     self._CheckCancel()
499
500     logging.debug("Opcode is now running")
501
502     self._op.status = constants.OP_STATUS_RUNNING
503     self._op.exec_timestamp = TimeStampNow()
504
505     # And finally replicate the job status
506     self._queue.UpdateJobUnlocked(self._job)
507
508   @locking.ssynchronized(_QUEUE, shared=1)
509   def _AppendFeedback(self, timestamp, log_type, log_msg):
510     """Internal feedback append function, with locks
511
512     """
513     self._job.log_serial += 1
514     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
515     self._queue.UpdateJobUnlocked(self._job, replicate=False)
516
517   def Feedback(self, *args):
518     """Append a log entry.
519
520     """
521     assert len(args) < 3
522
523     if len(args) == 1:
524       log_type = constants.ELOG_MESSAGE
525       log_msg = args[0]
526     else:
527       (log_type, log_msg) = args
528
529     # The time is split to make serialization easier and not lose
530     # precision.
531     timestamp = utils.SplitTime(time.time())
532     self._AppendFeedback(timestamp, log_type, log_msg)
533
534   def CheckCancel(self):
535     """Check whether job has been cancelled.
536
537     """
538     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
539                                constants.OP_STATUS_CANCELING)
540
541     # Cancel here if we were asked to
542     self._CheckCancel()
543
544
545 class _JobChangesChecker(object):
546   def __init__(self, fields, prev_job_info, prev_log_serial):
547     """Initializes this class.
548
549     @type fields: list of strings
550     @param fields: Fields requested by LUXI client
551     @type prev_job_info: string
552     @param prev_job_info: previous job info, as passed by the LUXI client
553     @type prev_log_serial: string
554     @param prev_log_serial: previous job serial, as passed by the LUXI client
555
556     """
557     self._fields = fields
558     self._prev_job_info = prev_job_info
559     self._prev_log_serial = prev_log_serial
560
561   def __call__(self, job):
562     """Checks whether job has changed.
563
564     @type job: L{_QueuedJob}
565     @param job: Job object
566
567     """
568     status = job.CalcStatus()
569     job_info = job.GetInfo(self._fields)
570     log_entries = job.GetLogEntries(self._prev_log_serial)
571
572     # Serializing and deserializing data can cause type changes (e.g. from
573     # tuple to list) or precision loss. We're doing it here so that we get
574     # the same modifications as the data received from the client. Without
575     # this, the comparison afterwards might fail without the data being
576     # significantly different.
577     # TODO: we just deserialized from disk, investigate how to make sure that
578     # the job info and log entries are compatible to avoid this further step.
579     # TODO: Doing something like in testutils.py:UnifyValueType might be more
580     # efficient, though floats will be tricky
581     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
582     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
583
584     # Don't even try to wait if the job is no longer running, there will be
585     # no changes.
586     if (status not in (constants.JOB_STATUS_QUEUED,
587                        constants.JOB_STATUS_RUNNING,
588                        constants.JOB_STATUS_WAITLOCK) or
589         job_info != self._prev_job_info or
590         (log_entries and self._prev_log_serial != log_entries[0][0])):
591       logging.debug("Job %s changed", job.id)
592       return (job_info, log_entries)
593
594     return None
595
596
597 class _JobFileChangesWaiter(object):
598   def __init__(self, filename):
599     """Initializes this class.
600
601     @type filename: string
602     @param filename: Path to job file
603     @raises errors.InotifyError: if the notifier cannot be setup
604
605     """
606     self._wm = pyinotify.WatchManager()
607     self._inotify_handler = \
608       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
609     self._notifier = \
610       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
611     try:
612       self._inotify_handler.enable()
613     except Exception:
614       # pyinotify doesn't close file descriptors automatically
615       self._notifier.stop()
616       raise
617
618   def _OnInotify(self, notifier_enabled):
619     """Callback for inotify.
620
621     """
622     if not notifier_enabled:
623       self._inotify_handler.enable()
624
625   def Wait(self, timeout):
626     """Waits for the job file to change.
627
628     @type timeout: float
629     @param timeout: Timeout in seconds
630     @return: Whether there have been events
631
632     """
633     assert timeout >= 0
634     have_events = self._notifier.check_events(timeout * 1000)
635     if have_events:
636       self._notifier.read_events()
637     self._notifier.process_events()
638     return have_events
639
640   def Close(self):
641     """Closes underlying notifier and its file descriptor.
642
643     """
644     self._notifier.stop()
645
646
647 class _JobChangesWaiter(object):
648   def __init__(self, filename):
649     """Initializes this class.
650
651     @type filename: string
652     @param filename: Path to job file
653
654     """
655     self._filewaiter = None
656     self._filename = filename
657
658   def Wait(self, timeout):
659     """Waits for a job to change.
660
661     @type timeout: float
662     @param timeout: Timeout in seconds
663     @return: Whether there have been events
664
665     """
666     if self._filewaiter:
667       return self._filewaiter.Wait(timeout)
668
669     # Lazy setup: Avoid inotify setup cost when job file has already changed.
670     # If this point is reached, return immediately and let caller check the job
671     # file again in case there were changes since the last check. This avoids a
672     # race condition.
673     self._filewaiter = _JobFileChangesWaiter(self._filename)
674
675     return True
676
677   def Close(self):
678     """Closes underlying waiter.
679
680     """
681     if self._filewaiter:
682       self._filewaiter.Close()
683
684
685 class _WaitForJobChangesHelper(object):
686   """Helper class using inotify to wait for changes in a job file.
687
688   This class takes a previous job status and serial, and alerts the client when
689   the current job status has changed.
690
691   """
692   @staticmethod
693   def _CheckForChanges(job_load_fn, check_fn):
694     job = job_load_fn()
695     if not job:
696       raise errors.JobLost()
697
698     result = check_fn(job)
699     if result is None:
700       raise utils.RetryAgain()
701
702     return result
703
704   def __call__(self, filename, job_load_fn,
705                fields, prev_job_info, prev_log_serial, timeout):
706     """Waits for changes on a job.
707
708     @type filename: string
709     @param filename: File on which to wait for changes
710     @type job_load_fn: callable
711     @param job_load_fn: Function to load job
712     @type fields: list of strings
713     @param fields: Which fields to check for changes
714     @type prev_job_info: list or None
715     @param prev_job_info: Last job information returned
716     @type prev_log_serial: int
717     @param prev_log_serial: Last job message serial number
718     @type timeout: float
719     @param timeout: maximum time to wait in seconds
720
721     """
722     try:
723       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
724       waiter = _JobChangesWaiter(filename)
725       try:
726         return utils.Retry(compat.partial(self._CheckForChanges,
727                                           job_load_fn, check_fn),
728                            utils.RETRY_REMAINING_TIME, timeout,
729                            wait_fn=waiter.Wait)
730       finally:
731         waiter.Close()
732     except (errors.InotifyError, errors.JobLost):
733       return None
734     except utils.RetryTimeout:
735       return constants.JOB_NOTCHANGED
736
737
738 def _EncodeOpError(err):
739   """Encodes an error which occurred while processing an opcode.
740
741   """
742   if isinstance(err, errors.GenericError):
743     to_encode = err
744   else:
745     to_encode = errors.OpExecError(str(err))
746
747   return errors.EncodeException(to_encode)
748
749
750 class _TimeoutStrategyWrapper:
751   def __init__(self, fn):
752     """Initializes this class.
753
754     """
755     self._fn = fn
756     self._next = None
757
758   def _Advance(self):
759     """Gets the next timeout if necessary.
760
761     """
762     if self._next is None:
763       self._next = self._fn()
764
765   def Peek(self):
766     """Returns the next timeout.
767
768     """
769     self._Advance()
770     return self._next
771
772   def Next(self):
773     """Returns the current timeout and advances the internal state.
774
775     """
776     self._Advance()
777     result = self._next
778     self._next = None
779     return result
780
781
782 class _OpExecContext:
783   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
784     """Initializes this class.
785
786     """
787     self.op = op
788     self.index = index
789     self.log_prefix = log_prefix
790     self.summary = op.input.Summary()
791
792     self._timeout_strategy_factory = timeout_strategy_factory
793     self._ResetTimeoutStrategy()
794
795   def _ResetTimeoutStrategy(self):
796     """Creates a new timeout strategy.
797
798     """
799     self._timeout_strategy = \
800       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
801
802   def CheckPriorityIncrease(self):
803     """Checks whether priority can and should be increased.
804
805     Called when locks couldn't be acquired.
806
807     """
808     op = self.op
809
810     # Exhausted all retries and next round should not use blocking acquire
811     # for locks?
812     if (self._timeout_strategy.Peek() is None and
813         op.priority > constants.OP_PRIO_HIGHEST):
814       logging.debug("Increasing priority")
815       op.priority -= 1
816       self._ResetTimeoutStrategy()
817       return True
818
819     return False
820
821   def GetNextLockTimeout(self):
822     """Returns the next lock acquire timeout.
823
824     """
825     return self._timeout_strategy.Next()
826
827
828 class _JobProcessor(object):
829   def __init__(self, queue, opexec_fn, job,
830                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
831     """Initializes this class.
832
833     """
834     self.queue = queue
835     self.opexec_fn = opexec_fn
836     self.job = job
837     self._timeout_strategy_factory = _timeout_strategy_factory
838
839   @staticmethod
840   def _FindNextOpcode(job, timeout_strategy_factory):
841     """Locates the next opcode to run.
842
843     @type job: L{_QueuedJob}
844     @param job: Job object
845     @param timeout_strategy_factory: Callable to create new timeout strategy
846
847     """
848     # Create some sort of a cache to speed up locating next opcode for future
849     # lookups
850     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
851     # pending and one for processed ops.
852     if job.ops_iter is None:
853       job.ops_iter = enumerate(job.ops)
854
855     # Find next opcode to run
856     while True:
857       try:
858         (idx, op) = job.ops_iter.next()
859       except StopIteration:
860         raise errors.ProgrammerError("Called for a finished job")
861
862       if op.status == constants.OP_STATUS_RUNNING:
863         # Found an opcode already marked as running
864         raise errors.ProgrammerError("Called for job marked as running")
865
866       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
867                              timeout_strategy_factory)
868
869       if op.status == constants.OP_STATUS_CANCELED:
870         # Cancelled jobs are handled by the caller
871         assert not compat.any(i.status != constants.OP_STATUS_CANCELED
872                               for i in job.ops[idx:])
873
874       elif op.status in constants.OPS_FINALIZED:
875         # This is a job that was partially completed before master daemon
876         # shutdown, so it can be expected that some opcodes are already
877         # completed successfully (if any did error out, then the whole job
878         # should have been aborted and not resubmitted for processing).
879         logging.info("%s: opcode %s already processed, skipping",
880                      opctx.log_prefix, opctx.summary)
881         continue
882
883       return opctx
884
885   @staticmethod
886   def _MarkWaitlock(job, op):
887     """Marks an opcode as waiting for locks.
888
889     The job's start timestamp is also set if necessary.
890
891     @type job: L{_QueuedJob}
892     @param job: Job object
893     @type op: L{_QueuedOpCode}
894     @param op: Opcode object
895
896     """
897     assert op in job.ops
898     assert op.status in (constants.OP_STATUS_QUEUED,
899                          constants.OP_STATUS_WAITLOCK)
900
901     update = False
902
903     op.result = None
904
905     if op.status == constants.OP_STATUS_QUEUED:
906       op.status = constants.OP_STATUS_WAITLOCK
907       update = True
908
909     if op.start_timestamp is None:
910       op.start_timestamp = TimeStampNow()
911       update = True
912
913     if job.start_timestamp is None:
914       job.start_timestamp = op.start_timestamp
915       update = True
916
917     assert op.status == constants.OP_STATUS_WAITLOCK
918
919     return update
920
921   def _ExecOpCodeUnlocked(self, opctx):
922     """Processes one opcode and returns the result.
923
924     """
925     op = opctx.op
926
927     assert op.status == constants.OP_STATUS_WAITLOCK
928
929     timeout = opctx.GetNextLockTimeout()
930
931     try:
932       # Make sure not to hold queue lock while calling ExecOpCode
933       result = self.opexec_fn(op.input,
934                               _OpExecCallbacks(self.queue, self.job, op),
935                               timeout=timeout, priority=op.priority)
936     except mcpu.LockAcquireTimeout:
937       assert timeout is not None, "Received timeout for blocking acquire"
938       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
939
940       assert op.status in (constants.OP_STATUS_WAITLOCK,
941                            constants.OP_STATUS_CANCELING)
942
943       # Was job cancelled while we were waiting for the lock?
944       if op.status == constants.OP_STATUS_CANCELING:
945         return (constants.OP_STATUS_CANCELING, None)
946
947       # Stay in waitlock while trying to re-acquire lock
948       return (constants.OP_STATUS_WAITLOCK, None)
949     except CancelJob:
950       logging.exception("%s: Canceling job", opctx.log_prefix)
951       assert op.status == constants.OP_STATUS_CANCELING
952       return (constants.OP_STATUS_CANCELING, None)
953     except Exception, err: # pylint: disable-msg=W0703
954       logging.exception("%s: Caught exception in %s",
955                         opctx.log_prefix, opctx.summary)
956       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
957     else:
958       logging.debug("%s: %s successful",
959                     opctx.log_prefix, opctx.summary)
960       return (constants.OP_STATUS_SUCCESS, result)
961
962   def __call__(self, _nextop_fn=None):
963     """Continues execution of a job.
964
965     @param _nextop_fn: Callback function for tests
966     @rtype: bool
967     @return: True if job is finished, False if processor needs to be called
968              again
969
970     """
971     queue = self.queue
972     job = self.job
973
974     logging.debug("Processing job %s", job.id)
975
976     queue.acquire(shared=1)
977     try:
978       opcount = len(job.ops)
979
980       # Is a previous opcode still pending?
981       if job.cur_opctx:
982         opctx = job.cur_opctx
983         job.cur_opctx = None
984       else:
985         if __debug__ and _nextop_fn:
986           _nextop_fn()
987         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
988
989       op = opctx.op
990
991       # Consistency check
992       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
993                                      constants.OP_STATUS_CANCELING,
994                                      constants.OP_STATUS_CANCELED)
995                         for i in job.ops[opctx.index + 1:])
996
997       assert op.status in (constants.OP_STATUS_QUEUED,
998                            constants.OP_STATUS_WAITLOCK,
999                            constants.OP_STATUS_CANCELING,
1000                            constants.OP_STATUS_CANCELED)
1001
1002       assert (op.priority <= constants.OP_PRIO_LOWEST and
1003               op.priority >= constants.OP_PRIO_HIGHEST)
1004
1005       if op.status not in (constants.OP_STATUS_CANCELING,
1006                            constants.OP_STATUS_CANCELED):
1007         assert op.status in (constants.OP_STATUS_QUEUED,
1008                              constants.OP_STATUS_WAITLOCK)
1009
1010         # Prepare to start opcode
1011         if self._MarkWaitlock(job, op):
1012           # Write to disk
1013           queue.UpdateJobUnlocked(job)
1014
1015         assert op.status == constants.OP_STATUS_WAITLOCK
1016         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1017         assert job.start_timestamp and op.start_timestamp
1018
1019         logging.info("%s: opcode %s waiting for locks",
1020                      opctx.log_prefix, opctx.summary)
1021
1022         queue.release()
1023         try:
1024           (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1025         finally:
1026           queue.acquire(shared=1)
1027
1028         op.status = op_status
1029         op.result = op_result
1030
1031         if op.status == constants.OP_STATUS_WAITLOCK:
1032           # Couldn't get locks in time
1033           assert not op.end_timestamp
1034         else:
1035           # Finalize opcode
1036           op.end_timestamp = TimeStampNow()
1037
1038           if op.status == constants.OP_STATUS_CANCELING:
1039             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1040                                   for i in job.ops[opctx.index:])
1041           else:
1042             assert op.status in constants.OPS_FINALIZED
1043
1044       if op.status == constants.OP_STATUS_WAITLOCK:
1045         finalize = False
1046
1047         if opctx.CheckPriorityIncrease():
1048           # Priority was changed, need to update on-disk file
1049           queue.UpdateJobUnlocked(job)
1050
1051         # Keep around for another round
1052         job.cur_opctx = opctx
1053
1054         assert (op.priority <= constants.OP_PRIO_LOWEST and
1055                 op.priority >= constants.OP_PRIO_HIGHEST)
1056
1057         # In no case must the status be finalized here
1058         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1059
1060       else:
1061         # Ensure all opcodes so far have been successful
1062         assert (opctx.index == 0 or
1063                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1064                            for i in job.ops[:opctx.index]))
1065
1066         # Reset context
1067         job.cur_opctx = None
1068
1069         if op.status == constants.OP_STATUS_SUCCESS:
1070           finalize = False
1071
1072         elif op.status == constants.OP_STATUS_ERROR:
1073           # Ensure failed opcode has an exception as its result
1074           assert errors.GetEncodedError(job.ops[opctx.index].result)
1075
1076           to_encode = errors.OpExecError("Preceding opcode failed")
1077           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1078                                 _EncodeOpError(to_encode))
1079           finalize = True
1080
1081           # Consistency check
1082           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1083                             errors.GetEncodedError(i.result)
1084                             for i in job.ops[opctx.index:])
1085
1086         elif op.status == constants.OP_STATUS_CANCELING:
1087           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1088                                 "Job canceled by request")
1089           finalize = True
1090
1091         elif op.status == constants.OP_STATUS_CANCELED:
1092           finalize = True
1093
1094         else:
1095           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1096
1097         # Finalizing or last opcode?
1098         if finalize or opctx.index == (opcount - 1):
1099           # All opcodes have been run, finalize job
1100           job.end_timestamp = TimeStampNow()
1101
1102         # Write to disk. If the job status is final, this is the final write
1103         # allowed. Once the file has been written, it can be archived anytime.
1104         queue.UpdateJobUnlocked(job)
1105
1106         if finalize or opctx.index == (opcount - 1):
1107           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1108           return True
1109
1110       return False
1111     finally:
1112       queue.release()
1113
1114
1115 class _JobQueueWorker(workerpool.BaseWorker):
1116   """The actual job workers.
1117
1118   """
1119   def RunTask(self, job): # pylint: disable-msg=W0221
1120     """Job executor.
1121
1122     This functions processes a job. It is closely tied to the L{_QueuedJob} and
1123     L{_QueuedOpCode} classes.
1124
1125     @type job: L{_QueuedJob}
1126     @param job: the job to be processed
1127
1128     """
1129     queue = job.queue
1130     assert queue == self.pool.queue
1131
1132     self.SetTaskName("Job%s" % job.id)
1133
1134     proc = mcpu.Processor(queue.context, job.id)
1135
1136     if not _JobProcessor(queue, proc.ExecOpCode, job)():
1137       # Schedule again
1138       raise workerpool.DeferTask(priority=job.CalcPriority())
1139
1140
1141 class _JobQueueWorkerPool(workerpool.WorkerPool):
1142   """Simple class implementing a job-processing workerpool.
1143
1144   """
1145   def __init__(self, queue):
1146     super(_JobQueueWorkerPool, self).__init__("JobQueue",
1147                                               JOBQUEUE_THREADS,
1148                                               _JobQueueWorker)
1149     self.queue = queue
1150
1151
1152 def _RequireOpenQueue(fn):
1153   """Decorator for "public" functions.
1154
1155   This function should be used for all 'public' functions. That is,
1156   functions usually called from other classes. Note that this should
1157   be applied only to methods (not plain functions), since it expects
1158   that the decorated function is called with a first argument that has
1159   a '_queue_filelock' argument.
1160
1161   @warning: Use this decorator only after locking.ssynchronized
1162
1163   Example::
1164     @locking.ssynchronized(_LOCK)
1165     @_RequireOpenQueue
1166     def Example(self):
1167       pass
1168
1169   """
1170   def wrapper(self, *args, **kwargs):
1171     # pylint: disable-msg=W0212
1172     assert self._queue_filelock is not None, "Queue should be open"
1173     return fn(self, *args, **kwargs)
1174   return wrapper
1175
1176
1177 class JobQueue(object):
1178   """Queue used to manage the jobs.
1179
1180   @cvar _RE_JOB_FILE: regex matching the valid job file names
1181
1182   """
1183   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1184
1185   def __init__(self, context):
1186     """Constructor for JobQueue.
1187
1188     The constructor will initialize the job queue object and then
1189     start loading the current jobs from disk, either for starting them
1190     (if they were queue) or for aborting them (if they were already
1191     running).
1192
1193     @type context: GanetiContext
1194     @param context: the context object for access to the configuration
1195         data and other ganeti objects
1196
1197     """
1198     self.context = context
1199     self._memcache = weakref.WeakValueDictionary()
1200     self._my_hostname = netutils.Hostname.GetSysName()
1201
1202     # The Big JobQueue lock. If a code block or method acquires it in shared
1203     # mode safe it must guarantee concurrency with all the code acquiring it in
1204     # shared mode, including itself. In order not to acquire it at all
1205     # concurrency must be guaranteed with all code acquiring it in shared mode
1206     # and all code acquiring it exclusively.
1207     self._lock = locking.SharedLock("JobQueue")
1208
1209     self.acquire = self._lock.acquire
1210     self.release = self._lock.release
1211
1212     # Initialize the queue, and acquire the filelock.
1213     # This ensures no other process is working on the job queue.
1214     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1215
1216     # Read serial file
1217     self._last_serial = jstore.ReadSerial()
1218     assert self._last_serial is not None, ("Serial file was modified between"
1219                                            " check in jstore and here")
1220
1221     # Get initial list of nodes
1222     self._nodes = dict((n.name, n.primary_ip)
1223                        for n in self.context.cfg.GetAllNodesInfo().values()
1224                        if n.master_candidate)
1225
1226     # Remove master node
1227     self._nodes.pop(self._my_hostname, None)
1228
1229     # TODO: Check consistency across nodes
1230
1231     self._queue_size = 0
1232     self._UpdateQueueSizeUnlocked()
1233     self._drained = self._IsQueueMarkedDrain()
1234
1235     # Setup worker pool
1236     self._wpool = _JobQueueWorkerPool(self)
1237     try:
1238       self._InspectQueue()
1239     except:
1240       self._wpool.TerminateWorkers()
1241       raise
1242
1243   @locking.ssynchronized(_LOCK)
1244   @_RequireOpenQueue
1245   def _InspectQueue(self):
1246     """Loads the whole job queue and resumes unfinished jobs.
1247
1248     This function needs the lock here because WorkerPool.AddTask() may start a
1249     job while we're still doing our work.
1250
1251     """
1252     logging.info("Inspecting job queue")
1253
1254     restartjobs = []
1255
1256     all_job_ids = self._GetJobIDsUnlocked()
1257     jobs_count = len(all_job_ids)
1258     lastinfo = time.time()
1259     for idx, job_id in enumerate(all_job_ids):
1260       # Give an update every 1000 jobs or 10 seconds
1261       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1262           idx == (jobs_count - 1)):
1263         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1264                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1265         lastinfo = time.time()
1266
1267       job = self._LoadJobUnlocked(job_id)
1268
1269       # a failure in loading the job can cause 'None' to be returned
1270       if job is None:
1271         continue
1272
1273       status = job.CalcStatus()
1274
1275       if status == constants.JOB_STATUS_QUEUED:
1276         restartjobs.append(job)
1277
1278       elif status in (constants.JOB_STATUS_RUNNING,
1279                       constants.JOB_STATUS_WAITLOCK,
1280                       constants.JOB_STATUS_CANCELING):
1281         logging.warning("Unfinished job %s found: %s", job.id, job)
1282
1283         if status == constants.JOB_STATUS_WAITLOCK:
1284           # Restart job
1285           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1286           restartjobs.append(job)
1287         else:
1288           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1289                                 "Unclean master daemon shutdown")
1290
1291         self.UpdateJobUnlocked(job)
1292
1293     if restartjobs:
1294       logging.info("Restarting %s jobs", len(restartjobs))
1295       self._EnqueueJobs(restartjobs)
1296
1297     logging.info("Job queue inspection finished")
1298
1299   @locking.ssynchronized(_LOCK)
1300   @_RequireOpenQueue
1301   def AddNode(self, node):
1302     """Register a new node with the queue.
1303
1304     @type node: L{objects.Node}
1305     @param node: the node object to be added
1306
1307     """
1308     node_name = node.name
1309     assert node_name != self._my_hostname
1310
1311     # Clean queue directory on added node
1312     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1313     msg = result.fail_msg
1314     if msg:
1315       logging.warning("Cannot cleanup queue directory on node %s: %s",
1316                       node_name, msg)
1317
1318     if not node.master_candidate:
1319       # remove if existing, ignoring errors
1320       self._nodes.pop(node_name, None)
1321       # and skip the replication of the job ids
1322       return
1323
1324     # Upload the whole queue excluding archived jobs
1325     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1326
1327     # Upload current serial file
1328     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1329
1330     for file_name in files:
1331       # Read file content
1332       content = utils.ReadFile(file_name)
1333
1334       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1335                                                   [node.primary_ip],
1336                                                   file_name, content)
1337       msg = result[node_name].fail_msg
1338       if msg:
1339         logging.error("Failed to upload file %s to node %s: %s",
1340                       file_name, node_name, msg)
1341
1342     self._nodes[node_name] = node.primary_ip
1343
1344   @locking.ssynchronized(_LOCK)
1345   @_RequireOpenQueue
1346   def RemoveNode(self, node_name):
1347     """Callback called when removing nodes from the cluster.
1348
1349     @type node_name: str
1350     @param node_name: the name of the node to remove
1351
1352     """
1353     self._nodes.pop(node_name, None)
1354
1355   @staticmethod
1356   def _CheckRpcResult(result, nodes, failmsg):
1357     """Verifies the status of an RPC call.
1358
1359     Since we aim to keep consistency should this node (the current
1360     master) fail, we will log errors if our rpc fail, and especially
1361     log the case when more than half of the nodes fails.
1362
1363     @param result: the data as returned from the rpc call
1364     @type nodes: list
1365     @param nodes: the list of nodes we made the call to
1366     @type failmsg: str
1367     @param failmsg: the identifier to be used for logging
1368
1369     """
1370     failed = []
1371     success = []
1372
1373     for node in nodes:
1374       msg = result[node].fail_msg
1375       if msg:
1376         failed.append(node)
1377         logging.error("RPC call %s (%s) failed on node %s: %s",
1378                       result[node].call, failmsg, node, msg)
1379       else:
1380         success.append(node)
1381
1382     # +1 for the master node
1383     if (len(success) + 1) < len(failed):
1384       # TODO: Handle failing nodes
1385       logging.error("More than half of the nodes failed")
1386
1387   def _GetNodeIp(self):
1388     """Helper for returning the node name/ip list.
1389
1390     @rtype: (list, list)
1391     @return: a tuple of two lists, the first one with the node
1392         names and the second one with the node addresses
1393
1394     """
1395     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1396     name_list = self._nodes.keys()
1397     addr_list = [self._nodes[name] for name in name_list]
1398     return name_list, addr_list
1399
1400   def _UpdateJobQueueFile(self, file_name, data, replicate):
1401     """Writes a file locally and then replicates it to all nodes.
1402
1403     This function will replace the contents of a file on the local
1404     node and then replicate it to all the other nodes we have.
1405
1406     @type file_name: str
1407     @param file_name: the path of the file to be replicated
1408     @type data: str
1409     @param data: the new contents of the file
1410     @type replicate: boolean
1411     @param replicate: whether to spread the changes to the remote nodes
1412
1413     """
1414     getents = runtime.GetEnts()
1415     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1416                     gid=getents.masterd_gid)
1417
1418     if replicate:
1419       names, addrs = self._GetNodeIp()
1420       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1421       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1422
1423   def _RenameFilesUnlocked(self, rename):
1424     """Renames a file locally and then replicate the change.
1425
1426     This function will rename a file in the local queue directory
1427     and then replicate this rename to all the other nodes we have.
1428
1429     @type rename: list of (old, new)
1430     @param rename: List containing tuples mapping old to new names
1431
1432     """
1433     # Rename them locally
1434     for old, new in rename:
1435       utils.RenameFile(old, new, mkdir=True)
1436
1437     # ... and on all nodes
1438     names, addrs = self._GetNodeIp()
1439     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1440     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1441
1442   @staticmethod
1443   def _FormatJobID(job_id):
1444     """Convert a job ID to string format.
1445
1446     Currently this just does C{str(job_id)} after performing some
1447     checks, but if we want to change the job id format this will
1448     abstract this change.
1449
1450     @type job_id: int or long
1451     @param job_id: the numeric job id
1452     @rtype: str
1453     @return: the formatted job id
1454
1455     """
1456     if not isinstance(job_id, (int, long)):
1457       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1458     if job_id < 0:
1459       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1460
1461     return str(job_id)
1462
1463   @classmethod
1464   def _GetArchiveDirectory(cls, job_id):
1465     """Returns the archive directory for a job.
1466
1467     @type job_id: str
1468     @param job_id: Job identifier
1469     @rtype: str
1470     @return: Directory name
1471
1472     """
1473     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1474
1475   def _NewSerialsUnlocked(self, count):
1476     """Generates a new job identifier.
1477
1478     Job identifiers are unique during the lifetime of a cluster.
1479
1480     @type count: integer
1481     @param count: how many serials to return
1482     @rtype: str
1483     @return: a string representing the job identifier.
1484
1485     """
1486     assert count > 0
1487     # New number
1488     serial = self._last_serial + count
1489
1490     # Write to file
1491     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1492                              "%s\n" % serial, True)
1493
1494     result = [self._FormatJobID(v)
1495               for v in range(self._last_serial, serial + 1)]
1496     # Keep it only if we were able to write the file
1497     self._last_serial = serial
1498
1499     return result
1500
1501   @staticmethod
1502   def _GetJobPath(job_id):
1503     """Returns the job file for a given job id.
1504
1505     @type job_id: str
1506     @param job_id: the job identifier
1507     @rtype: str
1508     @return: the path to the job file
1509
1510     """
1511     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1512
1513   @classmethod
1514   def _GetArchivedJobPath(cls, job_id):
1515     """Returns the archived job file for a give job id.
1516
1517     @type job_id: str
1518     @param job_id: the job identifier
1519     @rtype: str
1520     @return: the path to the archived job file
1521
1522     """
1523     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1524                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1525
1526   def _GetJobIDsUnlocked(self, sort=True):
1527     """Return all known job IDs.
1528
1529     The method only looks at disk because it's a requirement that all
1530     jobs are present on disk (so in the _memcache we don't have any
1531     extra IDs).
1532
1533     @type sort: boolean
1534     @param sort: perform sorting on the returned job ids
1535     @rtype: list
1536     @return: the list of job IDs
1537
1538     """
1539     jlist = []
1540     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1541       m = self._RE_JOB_FILE.match(filename)
1542       if m:
1543         jlist.append(m.group(1))
1544     if sort:
1545       jlist = utils.NiceSort(jlist)
1546     return jlist
1547
1548   def _LoadJobUnlocked(self, job_id):
1549     """Loads a job from the disk or memory.
1550
1551     Given a job id, this will return the cached job object if
1552     existing, or try to load the job from the disk. If loading from
1553     disk, it will also add the job to the cache.
1554
1555     @param job_id: the job id
1556     @rtype: L{_QueuedJob} or None
1557     @return: either None or the job object
1558
1559     """
1560     job = self._memcache.get(job_id, None)
1561     if job:
1562       logging.debug("Found job %s in memcache", job_id)
1563       return job
1564
1565     try:
1566       job = self._LoadJobFromDisk(job_id)
1567       if job is None:
1568         return job
1569     except errors.JobFileCorrupted:
1570       old_path = self._GetJobPath(job_id)
1571       new_path = self._GetArchivedJobPath(job_id)
1572       if old_path == new_path:
1573         # job already archived (future case)
1574         logging.exception("Can't parse job %s", job_id)
1575       else:
1576         # non-archived case
1577         logging.exception("Can't parse job %s, will archive.", job_id)
1578         self._RenameFilesUnlocked([(old_path, new_path)])
1579       return None
1580
1581     self._memcache[job_id] = job
1582     logging.debug("Added job %s to the cache", job_id)
1583     return job
1584
1585   def _LoadJobFromDisk(self, job_id):
1586     """Load the given job file from disk.
1587
1588     Given a job file, read, load and restore it in a _QueuedJob format.
1589
1590     @type job_id: string
1591     @param job_id: job identifier
1592     @rtype: L{_QueuedJob} or None
1593     @return: either None or the job object
1594
1595     """
1596     filepath = self._GetJobPath(job_id)
1597     logging.debug("Loading job from %s", filepath)
1598     try:
1599       raw_data = utils.ReadFile(filepath)
1600     except EnvironmentError, err:
1601       if err.errno in (errno.ENOENT, ):
1602         return None
1603       raise
1604
1605     try:
1606       data = serializer.LoadJson(raw_data)
1607       job = _QueuedJob.Restore(self, data)
1608     except Exception, err: # pylint: disable-msg=W0703
1609       raise errors.JobFileCorrupted(err)
1610
1611     return job
1612
1613   def SafeLoadJobFromDisk(self, job_id):
1614     """Load the given job file from disk.
1615
1616     Given a job file, read, load and restore it in a _QueuedJob format.
1617     In case of error reading the job, it gets returned as None, and the
1618     exception is logged.
1619
1620     @type job_id: string
1621     @param job_id: job identifier
1622     @rtype: L{_QueuedJob} or None
1623     @return: either None or the job object
1624
1625     """
1626     try:
1627       return self._LoadJobFromDisk(job_id)
1628     except (errors.JobFileCorrupted, EnvironmentError):
1629       logging.exception("Can't load/parse job %s", job_id)
1630       return None
1631
1632   @staticmethod
1633   def _IsQueueMarkedDrain():
1634     """Check if the queue is marked from drain.
1635
1636     This currently uses the queue drain file, which makes it a
1637     per-node flag. In the future this can be moved to the config file.
1638
1639     @rtype: boolean
1640     @return: True of the job queue is marked for draining
1641
1642     """
1643     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1644
1645   def _UpdateQueueSizeUnlocked(self):
1646     """Update the queue size.
1647
1648     """
1649     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1650
1651   @locking.ssynchronized(_LOCK)
1652   @_RequireOpenQueue
1653   def SetDrainFlag(self, drain_flag):
1654     """Sets the drain flag for the queue.
1655
1656     @type drain_flag: boolean
1657     @param drain_flag: Whether to set or unset the drain flag
1658
1659     """
1660     getents = runtime.GetEnts()
1661
1662     if drain_flag:
1663       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1664                       uid=getents.masterd_uid, gid=getents.masterd_gid)
1665     else:
1666       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1667
1668     self._drained = drain_flag
1669
1670     return True
1671
1672   @_RequireOpenQueue
1673   def _SubmitJobUnlocked(self, job_id, ops):
1674     """Create and store a new job.
1675
1676     This enters the job into our job queue and also puts it on the new
1677     queue, in order for it to be picked up by the queue processors.
1678
1679     @type job_id: job ID
1680     @param job_id: the job ID for the new job
1681     @type ops: list
1682     @param ops: The list of OpCodes that will become the new job.
1683     @rtype: L{_QueuedJob}
1684     @return: the job object to be queued
1685     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1686     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1687     @raise errors.GenericError: If an opcode is not valid
1688
1689     """
1690     # Ok when sharing the big job queue lock, as the drain file is created when
1691     # the lock is exclusive.
1692     if self._drained:
1693       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1694
1695     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1696       raise errors.JobQueueFull()
1697
1698     job = _QueuedJob(self, job_id, ops)
1699
1700     # Check priority
1701     for idx, op in enumerate(job.ops):
1702       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1703         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1704         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1705                                   " are %s" % (idx, op.priority, allowed))
1706
1707     # Write to disk
1708     self.UpdateJobUnlocked(job)
1709
1710     self._queue_size += 1
1711
1712     logging.debug("Adding new job %s to the cache", job_id)
1713     self._memcache[job_id] = job
1714
1715     return job
1716
1717   @locking.ssynchronized(_LOCK)
1718   @_RequireOpenQueue
1719   def SubmitJob(self, ops):
1720     """Create and store a new job.
1721
1722     @see: L{_SubmitJobUnlocked}
1723
1724     """
1725     job_id = self._NewSerialsUnlocked(1)[0]
1726     self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1727     return job_id
1728
1729   @locking.ssynchronized(_LOCK)
1730   @_RequireOpenQueue
1731   def SubmitManyJobs(self, jobs):
1732     """Create and store multiple jobs.
1733
1734     @see: L{_SubmitJobUnlocked}
1735
1736     """
1737     results = []
1738     added_jobs = []
1739     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1740     for job_id, ops in zip(all_job_ids, jobs):
1741       try:
1742         added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1743         status = True
1744         data = job_id
1745       except errors.GenericError, err:
1746         data = str(err)
1747         status = False
1748       results.append((status, data))
1749
1750     self._EnqueueJobs(added_jobs)
1751
1752     return results
1753
1754   def _EnqueueJobs(self, jobs):
1755     """Helper function to add jobs to worker pool's queue.
1756
1757     @type jobs: list
1758     @param jobs: List of all jobs
1759
1760     """
1761     self._wpool.AddManyTasks([(job, ) for job in jobs],
1762                              priority=[job.CalcPriority() for job in jobs])
1763
1764   @_RequireOpenQueue
1765   def UpdateJobUnlocked(self, job, replicate=True):
1766     """Update a job's on disk storage.
1767
1768     After a job has been modified, this function needs to be called in
1769     order to write the changes to disk and replicate them to the other
1770     nodes.
1771
1772     @type job: L{_QueuedJob}
1773     @param job: the changed job
1774     @type replicate: boolean
1775     @param replicate: whether to replicate the change to remote nodes
1776
1777     """
1778     filename = self._GetJobPath(job.id)
1779     data = serializer.DumpJson(job.Serialize(), indent=False)
1780     logging.debug("Writing job %s to %s", job.id, filename)
1781     self._UpdateJobQueueFile(filename, data, replicate)
1782
1783   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1784                         timeout):
1785     """Waits for changes in a job.
1786
1787     @type job_id: string
1788     @param job_id: Job identifier
1789     @type fields: list of strings
1790     @param fields: Which fields to check for changes
1791     @type prev_job_info: list or None
1792     @param prev_job_info: Last job information returned
1793     @type prev_log_serial: int
1794     @param prev_log_serial: Last job message serial number
1795     @type timeout: float
1796     @param timeout: maximum time to wait in seconds
1797     @rtype: tuple (job info, log entries)
1798     @return: a tuple of the job information as required via
1799         the fields parameter, and the log entries as a list
1800
1801         if the job has not changed and the timeout has expired,
1802         we instead return a special value,
1803         L{constants.JOB_NOTCHANGED}, which should be interpreted
1804         as such by the clients
1805
1806     """
1807     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1808
1809     helper = _WaitForJobChangesHelper()
1810
1811     return helper(self._GetJobPath(job_id), load_fn,
1812                   fields, prev_job_info, prev_log_serial, timeout)
1813
1814   @locking.ssynchronized(_LOCK)
1815   @_RequireOpenQueue
1816   def CancelJob(self, job_id):
1817     """Cancels a job.
1818
1819     This will only succeed if the job has not started yet.
1820
1821     @type job_id: string
1822     @param job_id: job ID of job to be cancelled.
1823
1824     """
1825     logging.info("Cancelling job %s", job_id)
1826
1827     job = self._LoadJobUnlocked(job_id)
1828     if not job:
1829       logging.debug("Job %s not found", job_id)
1830       return (False, "Job %s not found" % job_id)
1831
1832     (success, msg) = job.Cancel()
1833
1834     if success:
1835       self.UpdateJobUnlocked(job)
1836
1837     return (success, msg)
1838
1839   @_RequireOpenQueue
1840   def _ArchiveJobsUnlocked(self, jobs):
1841     """Archives jobs.
1842
1843     @type jobs: list of L{_QueuedJob}
1844     @param jobs: Job objects
1845     @rtype: int
1846     @return: Number of archived jobs
1847
1848     """
1849     archive_jobs = []
1850     rename_files = []
1851     for job in jobs:
1852       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1853         logging.debug("Job %s is not yet done", job.id)
1854         continue
1855
1856       archive_jobs.append(job)
1857
1858       old = self._GetJobPath(job.id)
1859       new = self._GetArchivedJobPath(job.id)
1860       rename_files.append((old, new))
1861
1862     # TODO: What if 1..n files fail to rename?
1863     self._RenameFilesUnlocked(rename_files)
1864
1865     logging.debug("Successfully archived job(s) %s",
1866                   utils.CommaJoin(job.id for job in archive_jobs))
1867
1868     # Since we haven't quite checked, above, if we succeeded or failed renaming
1869     # the files, we update the cached queue size from the filesystem. When we
1870     # get around to fix the TODO: above, we can use the number of actually
1871     # archived jobs to fix this.
1872     self._UpdateQueueSizeUnlocked()
1873     return len(archive_jobs)
1874
1875   @locking.ssynchronized(_LOCK)
1876   @_RequireOpenQueue
1877   def ArchiveJob(self, job_id):
1878     """Archives a job.
1879
1880     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1881
1882     @type job_id: string
1883     @param job_id: Job ID of job to be archived.
1884     @rtype: bool
1885     @return: Whether job was archived
1886
1887     """
1888     logging.info("Archiving job %s", job_id)
1889
1890     job = self._LoadJobUnlocked(job_id)
1891     if not job:
1892       logging.debug("Job %s not found", job_id)
1893       return False
1894
1895     return self._ArchiveJobsUnlocked([job]) == 1
1896
1897   @locking.ssynchronized(_LOCK)
1898   @_RequireOpenQueue
1899   def AutoArchiveJobs(self, age, timeout):
1900     """Archives all jobs based on age.
1901
1902     The method will archive all jobs which are older than the age
1903     parameter. For jobs that don't have an end timestamp, the start
1904     timestamp will be considered. The special '-1' age will cause
1905     archival of all jobs (that are not running or queued).
1906
1907     @type age: int
1908     @param age: the minimum age in seconds
1909
1910     """
1911     logging.info("Archiving jobs with age more than %s seconds", age)
1912
1913     now = time.time()
1914     end_time = now + timeout
1915     archived_count = 0
1916     last_touched = 0
1917
1918     all_job_ids = self._GetJobIDsUnlocked()
1919     pending = []
1920     for idx, job_id in enumerate(all_job_ids):
1921       last_touched = idx + 1
1922
1923       # Not optimal because jobs could be pending
1924       # TODO: Measure average duration for job archival and take number of
1925       # pending jobs into account.
1926       if time.time() > end_time:
1927         break
1928
1929       # Returns None if the job failed to load
1930       job = self._LoadJobUnlocked(job_id)
1931       if job:
1932         if job.end_timestamp is None:
1933           if job.start_timestamp is None:
1934             job_age = job.received_timestamp
1935           else:
1936             job_age = job.start_timestamp
1937         else:
1938           job_age = job.end_timestamp
1939
1940         if age == -1 or now - job_age[0] > age:
1941           pending.append(job)
1942
1943           # Archive 10 jobs at a time
1944           if len(pending) >= 10:
1945             archived_count += self._ArchiveJobsUnlocked(pending)
1946             pending = []
1947
1948     if pending:
1949       archived_count += self._ArchiveJobsUnlocked(pending)
1950
1951     return (archived_count, len(all_job_ids) - last_touched)
1952
1953   def QueryJobs(self, job_ids, fields):
1954     """Returns a list of jobs in queue.
1955
1956     @type job_ids: list
1957     @param job_ids: sequence of job identifiers or None for all
1958     @type fields: list
1959     @param fields: names of fields to return
1960     @rtype: list
1961     @return: list one element per job, each element being list with
1962         the requested fields
1963
1964     """
1965     jobs = []
1966     list_all = False
1967     if not job_ids:
1968       # Since files are added to/removed from the queue atomically, there's no
1969       # risk of getting the job ids in an inconsistent state.
1970       job_ids = self._GetJobIDsUnlocked()
1971       list_all = True
1972
1973     for job_id in job_ids:
1974       job = self.SafeLoadJobFromDisk(job_id)
1975       if job is not None:
1976         jobs.append(job.GetInfo(fields))
1977       elif not list_all:
1978         jobs.append(None)
1979
1980     return jobs
1981
1982   @locking.ssynchronized(_LOCK)
1983   @_RequireOpenQueue
1984   def Shutdown(self):
1985     """Stops the job queue.
1986
1987     This shutdowns all the worker threads an closes the queue.
1988
1989     """
1990     self._wpool.TerminateWorkers()
1991
1992     self._queue_filelock.Close()
1993     self._queue_filelock = None