Fix parameter names in SimpleFillBE/NIC docstrings
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81   @ivar stop_timestamp: timestamp for the end of the execution
82
83   """
84   __slots__ = ["input", "status", "result", "log",
85                "start_timestamp", "exec_timestamp", "end_timestamp",
86                "__weakref__"]
87
88   def __init__(self, op):
89     """Constructor for the _QuededOpCode.
90
91     @type op: L{opcodes.OpCode}
92     @param op: the opcode we encapsulate
93
94     """
95     self.input = op
96     self.status = constants.OP_STATUS_QUEUED
97     self.result = None
98     self.log = []
99     self.start_timestamp = None
100     self.exec_timestamp = None
101     self.end_timestamp = None
102
103   @classmethod
104   def Restore(cls, state):
105     """Restore the _QueuedOpCode from the serialized form.
106
107     @type state: dict
108     @param state: the serialized state
109     @rtype: _QueuedOpCode
110     @return: a new _QueuedOpCode instance
111
112     """
113     obj = _QueuedOpCode.__new__(cls)
114     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115     obj.status = state["status"]
116     obj.result = state["result"]
117     obj.log = state["log"]
118     obj.start_timestamp = state.get("start_timestamp", None)
119     obj.exec_timestamp = state.get("exec_timestamp", None)
120     obj.end_timestamp = state.get("end_timestamp", None)
121     return obj
122
123   def Serialize(self):
124     """Serializes this _QueuedOpCode.
125
126     @rtype: dict
127     @return: the dictionary holding the serialized state
128
129     """
130     return {
131       "input": self.input.__getstate__(),
132       "status": self.status,
133       "result": self.result,
134       "log": self.log,
135       "start_timestamp": self.start_timestamp,
136       "exec_timestamp": self.exec_timestamp,
137       "end_timestamp": self.end_timestamp,
138       }
139
140
141 class _QueuedJob(object):
142   """In-memory job representation.
143
144   This is what we use to track the user-submitted jobs. Locking must
145   be taken care of by users of this class.
146
147   @type queue: L{JobQueue}
148   @ivar queue: the parent queue
149   @ivar id: the job ID
150   @type ops: list
151   @ivar ops: the list of _QueuedOpCode that constitute the job
152   @type log_serial: int
153   @ivar log_serial: holds the index for the next log entry
154   @ivar received_timestamp: the timestamp for when the job was received
155   @ivar start_timestmap: the timestamp for start of execution
156   @ivar end_timestamp: the timestamp for end of execution
157   @ivar lock_status: In-memory locking information for debugging
158   @ivar change: a Condition variable we use for waiting for job changes
159
160   """
161   # pylint: disable-msg=W0212
162   __slots__ = ["queue", "id", "ops", "log_serial",
163                "received_timestamp", "start_timestamp", "end_timestamp",
164                "lock_status", "change",
165                "__weakref__"]
166
167   def __init__(self, queue, job_id, ops):
168     """Constructor for the _QueuedJob.
169
170     @type queue: L{JobQueue}
171     @param queue: our parent queue
172     @type job_id: job_id
173     @param job_id: our job id
174     @type ops: list
175     @param ops: the list of opcodes we hold, which will be encapsulated
176         in _QueuedOpCodes
177
178     """
179     if not ops:
180       raise errors.GenericError("A job needs at least one opcode")
181
182     self.queue = queue
183     self.id = job_id
184     self.ops = [_QueuedOpCode(op) for op in ops]
185     self.log_serial = 0
186     self.received_timestamp = TimeStampNow()
187     self.start_timestamp = None
188     self.end_timestamp = None
189
190     # In-memory attributes
191     self.lock_status = None
192
193     # Condition to wait for changes
194     self.change = threading.Condition(self.queue._lock)
195
196   def __repr__(self):
197     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
198               "id=%s" % self.id,
199               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
200
201     return "<%s at %#x>" % (" ".join(status), id(self))
202
203   @classmethod
204   def Restore(cls, queue, state):
205     """Restore a _QueuedJob from serialized state:
206
207     @type queue: L{JobQueue}
208     @param queue: to which queue the restored job belongs
209     @type state: dict
210     @param state: the serialized state
211     @rtype: _JobQueue
212     @return: the restored _JobQueue instance
213
214     """
215     obj = _QueuedJob.__new__(cls)
216     obj.queue = queue
217     obj.id = state["id"]
218     obj.received_timestamp = state.get("received_timestamp", None)
219     obj.start_timestamp = state.get("start_timestamp", None)
220     obj.end_timestamp = state.get("end_timestamp", None)
221
222     # In-memory attributes
223     obj.lock_status = None
224
225     obj.ops = []
226     obj.log_serial = 0
227     for op_state in state["ops"]:
228       op = _QueuedOpCode.Restore(op_state)
229       for log_entry in op.log:
230         obj.log_serial = max(obj.log_serial, log_entry[0])
231       obj.ops.append(op)
232
233     # Condition to wait for changes
234     obj.change = threading.Condition(obj.queue._lock)
235
236     return obj
237
238   def Serialize(self):
239     """Serialize the _JobQueue instance.
240
241     @rtype: dict
242     @return: the serialized state
243
244     """
245     return {
246       "id": self.id,
247       "ops": [op.Serialize() for op in self.ops],
248       "start_timestamp": self.start_timestamp,
249       "end_timestamp": self.end_timestamp,
250       "received_timestamp": self.received_timestamp,
251       }
252
253   def CalcStatus(self):
254     """Compute the status of this job.
255
256     This function iterates over all the _QueuedOpCodes in the job and
257     based on their status, computes the job status.
258
259     The algorithm is:
260       - if we find a cancelled, or finished with error, the job
261         status will be the same
262       - otherwise, the last opcode with the status one of:
263           - waitlock
264           - canceling
265           - running
266
267         will determine the job status
268
269       - otherwise, it means either all opcodes are queued, or success,
270         and the job status will be the same
271
272     @return: the job status
273
274     """
275     status = constants.JOB_STATUS_QUEUED
276
277     all_success = True
278     for op in self.ops:
279       if op.status == constants.OP_STATUS_SUCCESS:
280         continue
281
282       all_success = False
283
284       if op.status == constants.OP_STATUS_QUEUED:
285         pass
286       elif op.status == constants.OP_STATUS_WAITLOCK:
287         status = constants.JOB_STATUS_WAITLOCK
288       elif op.status == constants.OP_STATUS_RUNNING:
289         status = constants.JOB_STATUS_RUNNING
290       elif op.status == constants.OP_STATUS_CANCELING:
291         status = constants.JOB_STATUS_CANCELING
292         break
293       elif op.status == constants.OP_STATUS_ERROR:
294         status = constants.JOB_STATUS_ERROR
295         # The whole job fails if one opcode failed
296         break
297       elif op.status == constants.OP_STATUS_CANCELED:
298         status = constants.OP_STATUS_CANCELED
299         break
300
301     if all_success:
302       status = constants.JOB_STATUS_SUCCESS
303
304     return status
305
306   def GetLogEntries(self, newer_than):
307     """Selectively returns the log entries.
308
309     @type newer_than: None or int
310     @param newer_than: if this is None, return all log entries,
311         otherwise return only the log entries with serial higher
312         than this value
313     @rtype: list
314     @return: the list of the log entries selected
315
316     """
317     if newer_than is None:
318       serial = -1
319     else:
320       serial = newer_than
321
322     entries = []
323     for op in self.ops:
324       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325
326     return entries
327
328   def GetInfo(self, fields):
329     """Returns information about a job.
330
331     @type fields: list
332     @param fields: names of fields to return
333     @rtype: list
334     @return: list with one element for each field
335     @raise errors.OpExecError: when an invalid field
336         has been passed
337
338     """
339     row = []
340     for fname in fields:
341       if fname == "id":
342         row.append(self.id)
343       elif fname == "status":
344         row.append(self.CalcStatus())
345       elif fname == "ops":
346         row.append([op.input.__getstate__() for op in self.ops])
347       elif fname == "opresult":
348         row.append([op.result for op in self.ops])
349       elif fname == "opstatus":
350         row.append([op.status for op in self.ops])
351       elif fname == "oplog":
352         row.append([op.log for op in self.ops])
353       elif fname == "opstart":
354         row.append([op.start_timestamp for op in self.ops])
355       elif fname == "opexec":
356         row.append([op.exec_timestamp for op in self.ops])
357       elif fname == "opend":
358         row.append([op.end_timestamp for op in self.ops])
359       elif fname == "received_ts":
360         row.append(self.received_timestamp)
361       elif fname == "start_ts":
362         row.append(self.start_timestamp)
363       elif fname == "end_ts":
364         row.append(self.end_timestamp)
365       elif fname == "lock_status":
366         row.append(self.lock_status)
367       elif fname == "summary":
368         row.append([op.input.Summary() for op in self.ops])
369       else:
370         raise errors.OpExecError("Invalid self query field '%s'" % fname)
371     return row
372
373   def MarkUnfinishedOps(self, status, result):
374     """Mark unfinished opcodes with a given status and result.
375
376     This is an utility function for marking all running or waiting to
377     be run opcodes with a given status. Opcodes which are already
378     finalised are not changed.
379
380     @param status: a given opcode status
381     @param result: the opcode result
382
383     """
384     not_marked = True
385     for op in self.ops:
386       if op.status in constants.OPS_FINALIZED:
387         assert not_marked, "Finalized opcodes found after non-finalized ones"
388         continue
389       op.status = status
390       op.result = result
391       not_marked = False
392
393
394 class _OpExecCallbacks(mcpu.OpExecCbBase):
395   def __init__(self, queue, job, op):
396     """Initializes this class.
397
398     @type queue: L{JobQueue}
399     @param queue: Job queue
400     @type job: L{_QueuedJob}
401     @param job: Job object
402     @type op: L{_QueuedOpCode}
403     @param op: OpCode
404
405     """
406     assert queue, "Queue is missing"
407     assert job, "Job is missing"
408     assert op, "Opcode is missing"
409
410     self._queue = queue
411     self._job = job
412     self._op = op
413
414   def NotifyStart(self):
415     """Mark the opcode as running, not lock-waiting.
416
417     This is called from the mcpu code as a notifier function, when the LU is
418     finally about to start the Exec() method. Of course, to have end-user
419     visible results, the opcode must be initially (before calling into
420     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
421
422     """
423     self._queue.acquire()
424     try:
425       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
426                                  constants.OP_STATUS_CANCELING)
427
428       # All locks are acquired by now
429       self._job.lock_status = None
430
431       # Cancel here if we were asked to
432       if self._op.status == constants.OP_STATUS_CANCELING:
433         raise CancelJob()
434
435       self._op.status = constants.OP_STATUS_RUNNING
436       self._op.exec_timestamp = TimeStampNow()
437     finally:
438       self._queue.release()
439
440   def Feedback(self, *args):
441     """Append a log entry.
442
443     """
444     assert len(args) < 3
445
446     if len(args) == 1:
447       log_type = constants.ELOG_MESSAGE
448       log_msg = args[0]
449     else:
450       (log_type, log_msg) = args
451
452     # The time is split to make serialization easier and not lose
453     # precision.
454     timestamp = utils.SplitTime(time.time())
455
456     self._queue.acquire()
457     try:
458       self._job.log_serial += 1
459       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
460
461       self._job.change.notifyAll()
462     finally:
463       self._queue.release()
464
465   def ReportLocks(self, msg):
466     """Write locking information to the job.
467
468     Called whenever the LU processor is waiting for a lock or has acquired one.
469
470     """
471     # Not getting the queue lock because this is a single assignment
472     self._job.lock_status = msg
473
474
475 class _JobQueueWorker(workerpool.BaseWorker):
476   """The actual job workers.
477
478   """
479   def RunTask(self, job): # pylint: disable-msg=W0221
480     """Job executor.
481
482     This functions processes a job. It is closely tied to the _QueuedJob and
483     _QueuedOpCode classes.
484
485     @type job: L{_QueuedJob}
486     @param job: the job to be processed
487
488     """
489     logging.info("Processing job %s", job.id)
490     proc = mcpu.Processor(self.pool.queue.context, job.id)
491     queue = job.queue
492     try:
493       try:
494         count = len(job.ops)
495         for idx, op in enumerate(job.ops):
496           op_summary = op.input.Summary()
497           if op.status == constants.OP_STATUS_SUCCESS:
498             # this is a job that was partially completed before master
499             # daemon shutdown, so it can be expected that some opcodes
500             # are already completed successfully (if any did error
501             # out, then the whole job should have been aborted and not
502             # resubmitted for processing)
503             logging.info("Op %s/%s: opcode %s already processed, skipping",
504                          idx + 1, count, op_summary)
505             continue
506           try:
507             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
508                          op_summary)
509
510             queue.acquire()
511             try:
512               if op.status == constants.OP_STATUS_CANCELED:
513                 raise CancelJob()
514               assert op.status == constants.OP_STATUS_QUEUED
515               op.status = constants.OP_STATUS_WAITLOCK
516               op.result = None
517               op.start_timestamp = TimeStampNow()
518               if idx == 0: # first opcode
519                 job.start_timestamp = op.start_timestamp
520               queue.UpdateJobUnlocked(job)
521
522               input_opcode = op.input
523             finally:
524               queue.release()
525
526             # Make sure not to hold queue lock while calling ExecOpCode
527             result = proc.ExecOpCode(input_opcode,
528                                      _OpExecCallbacks(queue, job, op))
529
530             queue.acquire()
531             try:
532               op.status = constants.OP_STATUS_SUCCESS
533               op.result = result
534               op.end_timestamp = TimeStampNow()
535               queue.UpdateJobUnlocked(job)
536             finally:
537               queue.release()
538
539             logging.info("Op %s/%s: Successfully finished opcode %s",
540                          idx + 1, count, op_summary)
541           except CancelJob:
542             # Will be handled further up
543             raise
544           except Exception, err:
545             queue.acquire()
546             try:
547               try:
548                 op.status = constants.OP_STATUS_ERROR
549                 if isinstance(err, errors.GenericError):
550                   op.result = errors.EncodeException(err)
551                 else:
552                   op.result = str(err)
553                 op.end_timestamp = TimeStampNow()
554                 logging.info("Op %s/%s: Error in opcode %s: %s",
555                              idx + 1, count, op_summary, err)
556               finally:
557                 queue.UpdateJobUnlocked(job)
558             finally:
559               queue.release()
560             raise
561
562       except CancelJob:
563         queue.acquire()
564         try:
565           queue.CancelJobUnlocked(job)
566         finally:
567           queue.release()
568       except errors.GenericError, err:
569         logging.exception("Ganeti exception")
570       except:
571         logging.exception("Unhandled exception")
572     finally:
573       queue.acquire()
574       try:
575         try:
576           job.lock_status = None
577           job.end_timestamp = TimeStampNow()
578           queue.UpdateJobUnlocked(job)
579         finally:
580           job_id = job.id
581           status = job.CalcStatus()
582       finally:
583         queue.release()
584
585       logging.info("Finished job %s, status = %s", job_id, status)
586
587
588 class _JobQueueWorkerPool(workerpool.WorkerPool):
589   """Simple class implementing a job-processing workerpool.
590
591   """
592   def __init__(self, queue):
593     super(_JobQueueWorkerPool, self).__init__("JobQueue",
594                                               JOBQUEUE_THREADS,
595                                               _JobQueueWorker)
596     self.queue = queue
597
598
599 def _RequireOpenQueue(fn):
600   """Decorator for "public" functions.
601
602   This function should be used for all 'public' functions. That is,
603   functions usually called from other classes. Note that this should
604   be applied only to methods (not plain functions), since it expects
605   that the decorated function is called with a first argument that has
606   a '_queue_filelock' argument.
607
608   @warning: Use this decorator only after utils.LockedMethod!
609
610   Example::
611     @utils.LockedMethod
612     @_RequireOpenQueue
613     def Example(self):
614       pass
615
616   """
617   def wrapper(self, *args, **kwargs):
618     # pylint: disable-msg=W0212
619     assert self._queue_filelock is not None, "Queue should be open"
620     return fn(self, *args, **kwargs)
621   return wrapper
622
623
624 class JobQueue(object):
625   """Queue used to manage the jobs.
626
627   @cvar _RE_JOB_FILE: regex matching the valid job file names
628
629   """
630   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
631
632   def __init__(self, context):
633     """Constructor for JobQueue.
634
635     The constructor will initialize the job queue object and then
636     start loading the current jobs from disk, either for starting them
637     (if they were queue) or for aborting them (if they were already
638     running).
639
640     @type context: GanetiContext
641     @param context: the context object for access to the configuration
642         data and other ganeti objects
643
644     """
645     self.context = context
646     self._memcache = weakref.WeakValueDictionary()
647     self._my_hostname = utils.HostInfo().name
648
649     # Locking
650     self._lock = threading.Lock()
651     self.acquire = self._lock.acquire
652     self.release = self._lock.release
653
654     # Initialize the queue, and acquire the filelock.
655     # This ensures no other process is working on the job queue.
656     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
657
658     # Read serial file
659     self._last_serial = jstore.ReadSerial()
660     assert self._last_serial is not None, ("Serial file was modified between"
661                                            " check in jstore and here")
662
663     # Get initial list of nodes
664     self._nodes = dict((n.name, n.primary_ip)
665                        for n in self.context.cfg.GetAllNodesInfo().values()
666                        if n.master_candidate)
667
668     # Remove master node
669     self._nodes.pop(self._my_hostname, None)
670
671     # TODO: Check consistency across nodes
672
673     self._queue_size = 0
674     self._UpdateQueueSizeUnlocked()
675     self._drained = self._IsQueueMarkedDrain()
676
677     # Setup worker pool
678     self._wpool = _JobQueueWorkerPool(self)
679     try:
680       # We need to lock here because WorkerPool.AddTask() may start a job while
681       # we're still doing our work.
682       self.acquire()
683       try:
684         logging.info("Inspecting job queue")
685
686         all_job_ids = self._GetJobIDsUnlocked()
687         jobs_count = len(all_job_ids)
688         lastinfo = time.time()
689         for idx, job_id in enumerate(all_job_ids):
690           # Give an update every 1000 jobs or 10 seconds
691           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
692               idx == (jobs_count - 1)):
693             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
694                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
695             lastinfo = time.time()
696
697           job = self._LoadJobUnlocked(job_id)
698
699           # a failure in loading the job can cause 'None' to be returned
700           if job is None:
701             continue
702
703           status = job.CalcStatus()
704
705           if status in (constants.JOB_STATUS_QUEUED, ):
706             self._wpool.AddTask(job)
707
708           elif status in (constants.JOB_STATUS_RUNNING,
709                           constants.JOB_STATUS_WAITLOCK,
710                           constants.JOB_STATUS_CANCELING):
711             logging.warning("Unfinished job %s found: %s", job.id, job)
712             try:
713               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
714                                     "Unclean master daemon shutdown")
715             finally:
716               self.UpdateJobUnlocked(job)
717
718         logging.info("Job queue inspection finished")
719       finally:
720         self.release()
721     except:
722       self._wpool.TerminateWorkers()
723       raise
724
725   @utils.LockedMethod
726   @_RequireOpenQueue
727   def AddNode(self, node):
728     """Register a new node with the queue.
729
730     @type node: L{objects.Node}
731     @param node: the node object to be added
732
733     """
734     node_name = node.name
735     assert node_name != self._my_hostname
736
737     # Clean queue directory on added node
738     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
739     msg = result.fail_msg
740     if msg:
741       logging.warning("Cannot cleanup queue directory on node %s: %s",
742                       node_name, msg)
743
744     if not node.master_candidate:
745       # remove if existing, ignoring errors
746       self._nodes.pop(node_name, None)
747       # and skip the replication of the job ids
748       return
749
750     # Upload the whole queue excluding archived jobs
751     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
752
753     # Upload current serial file
754     files.append(constants.JOB_QUEUE_SERIAL_FILE)
755
756     for file_name in files:
757       # Read file content
758       content = utils.ReadFile(file_name)
759
760       result = rpc.RpcRunner.call_jobqueue_update([node_name],
761                                                   [node.primary_ip],
762                                                   file_name, content)
763       msg = result[node_name].fail_msg
764       if msg:
765         logging.error("Failed to upload file %s to node %s: %s",
766                       file_name, node_name, msg)
767
768     self._nodes[node_name] = node.primary_ip
769
770   @utils.LockedMethod
771   @_RequireOpenQueue
772   def RemoveNode(self, node_name):
773     """Callback called when removing nodes from the cluster.
774
775     @type node_name: str
776     @param node_name: the name of the node to remove
777
778     """
779     self._nodes.pop(node_name, None)
780
781   @staticmethod
782   def _CheckRpcResult(result, nodes, failmsg):
783     """Verifies the status of an RPC call.
784
785     Since we aim to keep consistency should this node (the current
786     master) fail, we will log errors if our rpc fail, and especially
787     log the case when more than half of the nodes fails.
788
789     @param result: the data as returned from the rpc call
790     @type nodes: list
791     @param nodes: the list of nodes we made the call to
792     @type failmsg: str
793     @param failmsg: the identifier to be used for logging
794
795     """
796     failed = []
797     success = []
798
799     for node in nodes:
800       msg = result[node].fail_msg
801       if msg:
802         failed.append(node)
803         logging.error("RPC call %s (%s) failed on node %s: %s",
804                       result[node].call, failmsg, node, msg)
805       else:
806         success.append(node)
807
808     # +1 for the master node
809     if (len(success) + 1) < len(failed):
810       # TODO: Handle failing nodes
811       logging.error("More than half of the nodes failed")
812
813   def _GetNodeIp(self):
814     """Helper for returning the node name/ip list.
815
816     @rtype: (list, list)
817     @return: a tuple of two lists, the first one with the node
818         names and the second one with the node addresses
819
820     """
821     name_list = self._nodes.keys()
822     addr_list = [self._nodes[name] for name in name_list]
823     return name_list, addr_list
824
825   def _UpdateJobQueueFile(self, file_name, data, replicate):
826     """Writes a file locally and then replicates it to all nodes.
827
828     This function will replace the contents of a file on the local
829     node and then replicate it to all the other nodes we have.
830
831     @type file_name: str
832     @param file_name: the path of the file to be replicated
833     @type data: str
834     @param data: the new contents of the file
835     @type replicate: boolean
836     @param replicate: whether to spread the changes to the remote nodes
837
838     """
839     utils.WriteFile(file_name, data=data)
840
841     if replicate:
842       names, addrs = self._GetNodeIp()
843       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
844       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
845
846   def _RenameFilesUnlocked(self, rename):
847     """Renames a file locally and then replicate the change.
848
849     This function will rename a file in the local queue directory
850     and then replicate this rename to all the other nodes we have.
851
852     @type rename: list of (old, new)
853     @param rename: List containing tuples mapping old to new names
854
855     """
856     # Rename them locally
857     for old, new in rename:
858       utils.RenameFile(old, new, mkdir=True)
859
860     # ... and on all nodes
861     names, addrs = self._GetNodeIp()
862     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
863     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
864
865   @staticmethod
866   def _FormatJobID(job_id):
867     """Convert a job ID to string format.
868
869     Currently this just does C{str(job_id)} after performing some
870     checks, but if we want to change the job id format this will
871     abstract this change.
872
873     @type job_id: int or long
874     @param job_id: the numeric job id
875     @rtype: str
876     @return: the formatted job id
877
878     """
879     if not isinstance(job_id, (int, long)):
880       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
881     if job_id < 0:
882       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
883
884     return str(job_id)
885
886   @classmethod
887   def _GetArchiveDirectory(cls, job_id):
888     """Returns the archive directory for a job.
889
890     @type job_id: str
891     @param job_id: Job identifier
892     @rtype: str
893     @return: Directory name
894
895     """
896     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
897
898   def _NewSerialsUnlocked(self, count):
899     """Generates a new job identifier.
900
901     Job identifiers are unique during the lifetime of a cluster.
902
903     @type count: integer
904     @param count: how many serials to return
905     @rtype: str
906     @return: a string representing the job identifier.
907
908     """
909     assert count > 0
910     # New number
911     serial = self._last_serial + count
912
913     # Write to file
914     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
915                              "%s\n" % serial, True)
916
917     result = [self._FormatJobID(v)
918               for v in range(self._last_serial, serial + 1)]
919     # Keep it only if we were able to write the file
920     self._last_serial = serial
921
922     return result
923
924   @staticmethod
925   def _GetJobPath(job_id):
926     """Returns the job file for a given job id.
927
928     @type job_id: str
929     @param job_id: the job identifier
930     @rtype: str
931     @return: the path to the job file
932
933     """
934     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
935
936   @classmethod
937   def _GetArchivedJobPath(cls, job_id):
938     """Returns the archived job file for a give job id.
939
940     @type job_id: str
941     @param job_id: the job identifier
942     @rtype: str
943     @return: the path to the archived job file
944
945     """
946     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
947                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
948
949   def _GetJobIDsUnlocked(self, sort=True):
950     """Return all known job IDs.
951
952     The method only looks at disk because it's a requirement that all
953     jobs are present on disk (so in the _memcache we don't have any
954     extra IDs).
955
956     @type sort: boolean
957     @param sort: perform sorting on the returned job ids
958     @rtype: list
959     @return: the list of job IDs
960
961     """
962     jlist = []
963     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
964       m = self._RE_JOB_FILE.match(filename)
965       if m:
966         jlist.append(m.group(1))
967     if sort:
968       jlist = utils.NiceSort(jlist)
969     return jlist
970
971   def _LoadJobUnlocked(self, job_id):
972     """Loads a job from the disk or memory.
973
974     Given a job id, this will return the cached job object if
975     existing, or try to load the job from the disk. If loading from
976     disk, it will also add the job to the cache.
977
978     @param job_id: the job id
979     @rtype: L{_QueuedJob} or None
980     @return: either None or the job object
981
982     """
983     job = self._memcache.get(job_id, None)
984     if job:
985       logging.debug("Found job %s in memcache", job_id)
986       return job
987
988     job = self._LoadJobFromDisk(job_id)
989
990     self._memcache[job_id] = job
991     logging.debug("Added job %s to the cache", job_id)
992     return job
993
994   def _LoadJobFromDisk(self, job_id):
995     """Load the given job file from disk.
996
997     Given a job file, read, load and restore it in a _QueuedJob format.
998
999     @type job_id: string
1000     @param job_id: job identifier
1001     @rtype: L{_QueuedJob} or None
1002     @return: either None or the job object
1003
1004     """
1005     filepath = self._GetJobPath(job_id)
1006     logging.debug("Loading job from %s", filepath)
1007     try:
1008       raw_data = utils.ReadFile(filepath)
1009     except EnvironmentError, err:
1010       if err.errno in (errno.ENOENT, ):
1011         return None
1012       raise
1013
1014     try:
1015       data = serializer.LoadJson(raw_data)
1016       job = _QueuedJob.Restore(self, data)
1017     except Exception, err: # pylint: disable-msg=W0703
1018       new_path = self._GetArchivedJobPath(job_id)
1019       if filepath == new_path:
1020         # job already archived (future case)
1021         logging.exception("Can't parse job %s", job_id)
1022       else:
1023         # non-archived case
1024         logging.exception("Can't parse job %s, will archive.", job_id)
1025         self._RenameFilesUnlocked([(filepath, new_path)])
1026       return None
1027
1028     return job
1029
1030   def _GetJobsUnlocked(self, job_ids):
1031     """Return a list of jobs based on their IDs.
1032
1033     @type job_ids: list
1034     @param job_ids: either an empty list (meaning all jobs),
1035         or a list of job IDs
1036     @rtype: list
1037     @return: the list of job objects
1038
1039     """
1040     if not job_ids:
1041       job_ids = self._GetJobIDsUnlocked()
1042
1043     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1044
1045   @staticmethod
1046   def _IsQueueMarkedDrain():
1047     """Check if the queue is marked from drain.
1048
1049     This currently uses the queue drain file, which makes it a
1050     per-node flag. In the future this can be moved to the config file.
1051
1052     @rtype: boolean
1053     @return: True of the job queue is marked for draining
1054
1055     """
1056     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1057
1058   def _UpdateQueueSizeUnlocked(self):
1059     """Update the queue size.
1060
1061     """
1062     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1063
1064   @utils.LockedMethod
1065   @_RequireOpenQueue
1066   def SetDrainFlag(self, drain_flag):
1067     """Sets the drain flag for the queue.
1068
1069     @type drain_flag: boolean
1070     @param drain_flag: Whether to set or unset the drain flag
1071
1072     """
1073     if drain_flag:
1074       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1075     else:
1076       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1077
1078     self._drained = drain_flag
1079
1080     return True
1081
1082   @_RequireOpenQueue
1083   def _SubmitJobUnlocked(self, job_id, ops):
1084     """Create and store a new job.
1085
1086     This enters the job into our job queue and also puts it on the new
1087     queue, in order for it to be picked up by the queue processors.
1088
1089     @type job_id: job ID
1090     @param job_id: the job ID for the new job
1091     @type ops: list
1092     @param ops: The list of OpCodes that will become the new job.
1093     @rtype: job ID
1094     @return: the job ID of the newly created job
1095     @raise errors.JobQueueDrainError: if the job is marked for draining
1096
1097     """
1098     # Ok when sharing the big job queue lock, as the drain file is created when
1099     # the lock is exclusive.
1100     if self._drained:
1101       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1102
1103     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1104       raise errors.JobQueueFull()
1105
1106     job = _QueuedJob(self, job_id, ops)
1107
1108     # Write to disk
1109     self.UpdateJobUnlocked(job)
1110
1111     self._queue_size += 1
1112
1113     logging.debug("Adding new job %s to the cache", job_id)
1114     self._memcache[job_id] = job
1115
1116     # Add to worker pool
1117     self._wpool.AddTask(job)
1118
1119     return job.id
1120
1121   @utils.LockedMethod
1122   @_RequireOpenQueue
1123   def SubmitJob(self, ops):
1124     """Create and store a new job.
1125
1126     @see: L{_SubmitJobUnlocked}
1127
1128     """
1129     job_id = self._NewSerialsUnlocked(1)[0]
1130     return self._SubmitJobUnlocked(job_id, ops)
1131
1132   @utils.LockedMethod
1133   @_RequireOpenQueue
1134   def SubmitManyJobs(self, jobs):
1135     """Create and store multiple jobs.
1136
1137     @see: L{_SubmitJobUnlocked}
1138
1139     """
1140     results = []
1141     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1142     for job_id, ops in zip(all_job_ids, jobs):
1143       try:
1144         data = self._SubmitJobUnlocked(job_id, ops)
1145         status = True
1146       except errors.GenericError, err:
1147         data = str(err)
1148         status = False
1149       results.append((status, data))
1150
1151     return results
1152
1153   @_RequireOpenQueue
1154   def UpdateJobUnlocked(self, job, replicate=True):
1155     """Update a job's on disk storage.
1156
1157     After a job has been modified, this function needs to be called in
1158     order to write the changes to disk and replicate them to the other
1159     nodes.
1160
1161     @type job: L{_QueuedJob}
1162     @param job: the changed job
1163     @type replicate: boolean
1164     @param replicate: whether to replicate the change to remote nodes
1165
1166     """
1167     filename = self._GetJobPath(job.id)
1168     data = serializer.DumpJson(job.Serialize(), indent=False)
1169     logging.debug("Writing job %s to %s", job.id, filename)
1170     self._UpdateJobQueueFile(filename, data, replicate)
1171
1172     # Notify waiters about potential changes
1173     job.change.notifyAll()
1174
1175   @utils.LockedMethod
1176   @_RequireOpenQueue
1177   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1178                         timeout):
1179     """Waits for changes in a job.
1180
1181     @type job_id: string
1182     @param job_id: Job identifier
1183     @type fields: list of strings
1184     @param fields: Which fields to check for changes
1185     @type prev_job_info: list or None
1186     @param prev_job_info: Last job information returned
1187     @type prev_log_serial: int
1188     @param prev_log_serial: Last job message serial number
1189     @type timeout: float
1190     @param timeout: maximum time to wait
1191     @rtype: tuple (job info, log entries)
1192     @return: a tuple of the job information as required via
1193         the fields parameter, and the log entries as a list
1194
1195         if the job has not changed and the timeout has expired,
1196         we instead return a special value,
1197         L{constants.JOB_NOTCHANGED}, which should be interpreted
1198         as such by the clients
1199
1200     """
1201     job = self._LoadJobUnlocked(job_id)
1202     if not job:
1203       logging.debug("Job %s not found", job_id)
1204       return None
1205
1206     def _CheckForChanges():
1207       logging.debug("Waiting for changes in job %s", job_id)
1208
1209       status = job.CalcStatus()
1210       job_info = job.GetInfo(fields)
1211       log_entries = job.GetLogEntries(prev_log_serial)
1212
1213       # Serializing and deserializing data can cause type changes (e.g. from
1214       # tuple to list) or precision loss. We're doing it here so that we get
1215       # the same modifications as the data received from the client. Without
1216       # this, the comparison afterwards might fail without the data being
1217       # significantly different.
1218       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1219       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1220
1221       # Don't even try to wait if the job is no longer running, there will be
1222       # no changes.
1223       if (status not in (constants.JOB_STATUS_QUEUED,
1224                          constants.JOB_STATUS_RUNNING,
1225                          constants.JOB_STATUS_WAITLOCK) or
1226           prev_job_info != job_info or
1227           (log_entries and prev_log_serial != log_entries[0][0])):
1228         logging.debug("Job %s changed", job_id)
1229         return (job_info, log_entries)
1230
1231       raise utils.RetryAgain()
1232
1233     try:
1234       # Setting wait function to release the queue lock while waiting
1235       return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1236                          wait_fn=job.change.wait)
1237     except utils.RetryTimeout:
1238       return constants.JOB_NOTCHANGED
1239
1240   @utils.LockedMethod
1241   @_RequireOpenQueue
1242   def CancelJob(self, job_id):
1243     """Cancels a job.
1244
1245     This will only succeed if the job has not started yet.
1246
1247     @type job_id: string
1248     @param job_id: job ID of job to be cancelled.
1249
1250     """
1251     logging.info("Cancelling job %s", job_id)
1252
1253     job = self._LoadJobUnlocked(job_id)
1254     if not job:
1255       logging.debug("Job %s not found", job_id)
1256       return (False, "Job %s not found" % job_id)
1257
1258     job_status = job.CalcStatus()
1259
1260     if job_status not in (constants.JOB_STATUS_QUEUED,
1261                           constants.JOB_STATUS_WAITLOCK):
1262       logging.debug("Job %s is no longer waiting in the queue", job.id)
1263       return (False, "Job %s is no longer waiting in the queue" % job.id)
1264
1265     if job_status == constants.JOB_STATUS_QUEUED:
1266       self.CancelJobUnlocked(job)
1267       return (True, "Job %s canceled" % job.id)
1268
1269     elif job_status == constants.JOB_STATUS_WAITLOCK:
1270       # The worker will notice the new status and cancel the job
1271       try:
1272         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1273       finally:
1274         self.UpdateJobUnlocked(job)
1275       return (True, "Job %s will be canceled" % job.id)
1276
1277   @_RequireOpenQueue
1278   def CancelJobUnlocked(self, job):
1279     """Marks a job as canceled.
1280
1281     """
1282     try:
1283       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1284                             "Job canceled by request")
1285     finally:
1286       self.UpdateJobUnlocked(job)
1287
1288   @_RequireOpenQueue
1289   def _ArchiveJobsUnlocked(self, jobs):
1290     """Archives jobs.
1291
1292     @type jobs: list of L{_QueuedJob}
1293     @param jobs: Job objects
1294     @rtype: int
1295     @return: Number of archived jobs
1296
1297     """
1298     archive_jobs = []
1299     rename_files = []
1300     for job in jobs:
1301       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1302                                   constants.JOB_STATUS_SUCCESS,
1303                                   constants.JOB_STATUS_ERROR):
1304         logging.debug("Job %s is not yet done", job.id)
1305         continue
1306
1307       archive_jobs.append(job)
1308
1309       old = self._GetJobPath(job.id)
1310       new = self._GetArchivedJobPath(job.id)
1311       rename_files.append((old, new))
1312
1313     # TODO: What if 1..n files fail to rename?
1314     self._RenameFilesUnlocked(rename_files)
1315
1316     logging.debug("Successfully archived job(s) %s",
1317                   utils.CommaJoin(job.id for job in archive_jobs))
1318
1319     # Since we haven't quite checked, above, if we succeeded or failed renaming
1320     # the files, we update the cached queue size from the filesystem. When we
1321     # get around to fix the TODO: above, we can use the number of actually
1322     # archived jobs to fix this.
1323     self._UpdateQueueSizeUnlocked()
1324     return len(archive_jobs)
1325
1326   @utils.LockedMethod
1327   @_RequireOpenQueue
1328   def ArchiveJob(self, job_id):
1329     """Archives a job.
1330
1331     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1332
1333     @type job_id: string
1334     @param job_id: Job ID of job to be archived.
1335     @rtype: bool
1336     @return: Whether job was archived
1337
1338     """
1339     logging.info("Archiving job %s", job_id)
1340
1341     job = self._LoadJobUnlocked(job_id)
1342     if not job:
1343       logging.debug("Job %s not found", job_id)
1344       return False
1345
1346     return self._ArchiveJobsUnlocked([job]) == 1
1347
1348   @utils.LockedMethod
1349   @_RequireOpenQueue
1350   def AutoArchiveJobs(self, age, timeout):
1351     """Archives all jobs based on age.
1352
1353     The method will archive all jobs which are older than the age
1354     parameter. For jobs that don't have an end timestamp, the start
1355     timestamp will be considered. The special '-1' age will cause
1356     archival of all jobs (that are not running or queued).
1357
1358     @type age: int
1359     @param age: the minimum age in seconds
1360
1361     """
1362     logging.info("Archiving jobs with age more than %s seconds", age)
1363
1364     now = time.time()
1365     end_time = now + timeout
1366     archived_count = 0
1367     last_touched = 0
1368
1369     all_job_ids = self._GetJobIDsUnlocked()
1370     pending = []
1371     for idx, job_id in enumerate(all_job_ids):
1372       last_touched = idx + 1
1373
1374       # Not optimal because jobs could be pending
1375       # TODO: Measure average duration for job archival and take number of
1376       # pending jobs into account.
1377       if time.time() > end_time:
1378         break
1379
1380       # Returns None if the job failed to load
1381       job = self._LoadJobUnlocked(job_id)
1382       if job:
1383         if job.end_timestamp is None:
1384           if job.start_timestamp is None:
1385             job_age = job.received_timestamp
1386           else:
1387             job_age = job.start_timestamp
1388         else:
1389           job_age = job.end_timestamp
1390
1391         if age == -1 or now - job_age[0] > age:
1392           pending.append(job)
1393
1394           # Archive 10 jobs at a time
1395           if len(pending) >= 10:
1396             archived_count += self._ArchiveJobsUnlocked(pending)
1397             pending = []
1398
1399     if pending:
1400       archived_count += self._ArchiveJobsUnlocked(pending)
1401
1402     return (archived_count, len(all_job_ids) - last_touched)
1403
1404   @utils.LockedMethod
1405   @_RequireOpenQueue
1406   def QueryJobs(self, job_ids, fields):
1407     """Returns a list of jobs in queue.
1408
1409     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1410     processing for each job.
1411
1412     @type job_ids: list
1413     @param job_ids: sequence of job identifiers or None for all
1414     @type fields: list
1415     @param fields: names of fields to return
1416     @rtype: list
1417     @return: list one element per job, each element being list with
1418         the requested fields
1419
1420     """
1421     jobs = []
1422
1423     for job in self._GetJobsUnlocked(job_ids):
1424       if job is None:
1425         jobs.append(None)
1426       else:
1427         jobs.append(job.GetInfo(fields))
1428
1429     return jobs
1430
1431   @utils.LockedMethod
1432   @_RequireOpenQueue
1433   def Shutdown(self):
1434     """Stops the job queue.
1435
1436     This shutdowns all the worker threads an closes the queue.
1437
1438     """
1439     self._wpool.TerminateWorkers()
1440
1441     self._queue_filelock.Close()
1442     self._queue_filelock = None