Merge branch 'devel-2.1'
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81   @ivar stop_timestamp: timestamp for the end of the execution
82
83   """
84   __slots__ = ["input", "status", "result", "log",
85                "start_timestamp", "exec_timestamp", "end_timestamp",
86                "__weakref__"]
87
88   def __init__(self, op):
89     """Constructor for the _QuededOpCode.
90
91     @type op: L{opcodes.OpCode}
92     @param op: the opcode we encapsulate
93
94     """
95     self.input = op
96     self.status = constants.OP_STATUS_QUEUED
97     self.result = None
98     self.log = []
99     self.start_timestamp = None
100     self.exec_timestamp = None
101     self.end_timestamp = None
102
103   @classmethod
104   def Restore(cls, state):
105     """Restore the _QueuedOpCode from the serialized form.
106
107     @type state: dict
108     @param state: the serialized state
109     @rtype: _QueuedOpCode
110     @return: a new _QueuedOpCode instance
111
112     """
113     obj = _QueuedOpCode.__new__(cls)
114     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115     obj.status = state["status"]
116     obj.result = state["result"]
117     obj.log = state["log"]
118     obj.start_timestamp = state.get("start_timestamp", None)
119     obj.exec_timestamp = state.get("exec_timestamp", None)
120     obj.end_timestamp = state.get("end_timestamp", None)
121     return obj
122
123   def Serialize(self):
124     """Serializes this _QueuedOpCode.
125
126     @rtype: dict
127     @return: the dictionary holding the serialized state
128
129     """
130     return {
131       "input": self.input.__getstate__(),
132       "status": self.status,
133       "result": self.result,
134       "log": self.log,
135       "start_timestamp": self.start_timestamp,
136       "exec_timestamp": self.exec_timestamp,
137       "end_timestamp": self.end_timestamp,
138       }
139
140
141 class _QueuedJob(object):
142   """In-memory job representation.
143
144   This is what we use to track the user-submitted jobs. Locking must
145   be taken care of by users of this class.
146
147   @type queue: L{JobQueue}
148   @ivar queue: the parent queue
149   @ivar id: the job ID
150   @type ops: list
151   @ivar ops: the list of _QueuedOpCode that constitute the job
152   @type log_serial: int
153   @ivar log_serial: holds the index for the next log entry
154   @ivar received_timestamp: the timestamp for when the job was received
155   @ivar start_timestmap: the timestamp for start of execution
156   @ivar end_timestamp: the timestamp for end of execution
157   @ivar lock_status: In-memory locking information for debugging
158   @ivar change: a Condition variable we use for waiting for job changes
159
160   """
161   # pylint: disable-msg=W0212
162   __slots__ = ["queue", "id", "ops", "log_serial",
163                "received_timestamp", "start_timestamp", "end_timestamp",
164                "lock_status", "change",
165                "__weakref__"]
166
167   def __init__(self, queue, job_id, ops):
168     """Constructor for the _QueuedJob.
169
170     @type queue: L{JobQueue}
171     @param queue: our parent queue
172     @type job_id: job_id
173     @param job_id: our job id
174     @type ops: list
175     @param ops: the list of opcodes we hold, which will be encapsulated
176         in _QueuedOpCodes
177
178     """
179     if not ops:
180       # TODO: use a better exception
181       raise Exception("No opcodes")
182
183     self.queue = queue
184     self.id = job_id
185     self.ops = [_QueuedOpCode(op) for op in ops]
186     self.log_serial = 0
187     self.received_timestamp = TimeStampNow()
188     self.start_timestamp = None
189     self.end_timestamp = None
190
191     # In-memory attributes
192     self.lock_status = None
193
194     # Condition to wait for changes
195     self.change = threading.Condition(self.queue._lock)
196
197   def __repr__(self):
198     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
199               "id=%s" % self.id,
200               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
201
202     return "<%s at %#x>" % (" ".join(status), id(self))
203
204   @classmethod
205   def Restore(cls, queue, state):
206     """Restore a _QueuedJob from serialized state:
207
208     @type queue: L{JobQueue}
209     @param queue: to which queue the restored job belongs
210     @type state: dict
211     @param state: the serialized state
212     @rtype: _JobQueue
213     @return: the restored _JobQueue instance
214
215     """
216     obj = _QueuedJob.__new__(cls)
217     obj.queue = queue
218     obj.id = state["id"]
219     obj.received_timestamp = state.get("received_timestamp", None)
220     obj.start_timestamp = state.get("start_timestamp", None)
221     obj.end_timestamp = state.get("end_timestamp", None)
222
223     # In-memory attributes
224     obj.lock_status = None
225
226     obj.ops = []
227     obj.log_serial = 0
228     for op_state in state["ops"]:
229       op = _QueuedOpCode.Restore(op_state)
230       for log_entry in op.log:
231         obj.log_serial = max(obj.log_serial, log_entry[0])
232       obj.ops.append(op)
233
234     # Condition to wait for changes
235     obj.change = threading.Condition(obj.queue._lock)
236
237     return obj
238
239   def Serialize(self):
240     """Serialize the _JobQueue instance.
241
242     @rtype: dict
243     @return: the serialized state
244
245     """
246     return {
247       "id": self.id,
248       "ops": [op.Serialize() for op in self.ops],
249       "start_timestamp": self.start_timestamp,
250       "end_timestamp": self.end_timestamp,
251       "received_timestamp": self.received_timestamp,
252       }
253
254   def CalcStatus(self):
255     """Compute the status of this job.
256
257     This function iterates over all the _QueuedOpCodes in the job and
258     based on their status, computes the job status.
259
260     The algorithm is:
261       - if we find a cancelled, or finished with error, the job
262         status will be the same
263       - otherwise, the last opcode with the status one of:
264           - waitlock
265           - canceling
266           - running
267
268         will determine the job status
269
270       - otherwise, it means either all opcodes are queued, or success,
271         and the job status will be the same
272
273     @return: the job status
274
275     """
276     status = constants.JOB_STATUS_QUEUED
277
278     all_success = True
279     for op in self.ops:
280       if op.status == constants.OP_STATUS_SUCCESS:
281         continue
282
283       all_success = False
284
285       if op.status == constants.OP_STATUS_QUEUED:
286         pass
287       elif op.status == constants.OP_STATUS_WAITLOCK:
288         status = constants.JOB_STATUS_WAITLOCK
289       elif op.status == constants.OP_STATUS_RUNNING:
290         status = constants.JOB_STATUS_RUNNING
291       elif op.status == constants.OP_STATUS_CANCELING:
292         status = constants.JOB_STATUS_CANCELING
293         break
294       elif op.status == constants.OP_STATUS_ERROR:
295         status = constants.JOB_STATUS_ERROR
296         # The whole job fails if one opcode failed
297         break
298       elif op.status == constants.OP_STATUS_CANCELED:
299         status = constants.OP_STATUS_CANCELED
300         break
301
302     if all_success:
303       status = constants.JOB_STATUS_SUCCESS
304
305     return status
306
307   def GetLogEntries(self, newer_than):
308     """Selectively returns the log entries.
309
310     @type newer_than: None or int
311     @param newer_than: if this is None, return all log entries,
312         otherwise return only the log entries with serial higher
313         than this value
314     @rtype: list
315     @return: the list of the log entries selected
316
317     """
318     if newer_than is None:
319       serial = -1
320     else:
321       serial = newer_than
322
323     entries = []
324     for op in self.ops:
325       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
326
327     return entries
328
329   def MarkUnfinishedOps(self, status, result):
330     """Mark unfinished opcodes with a given status and result.
331
332     This is an utility function for marking all running or waiting to
333     be run opcodes with a given status. Opcodes which are already
334     finalised are not changed.
335
336     @param status: a given opcode status
337     @param result: the opcode result
338
339     """
340     not_marked = True
341     for op in self.ops:
342       if op.status in constants.OPS_FINALIZED:
343         assert not_marked, "Finalized opcodes found after non-finalized ones"
344         continue
345       op.status = status
346       op.result = result
347       not_marked = False
348
349
350 class _OpExecCallbacks(mcpu.OpExecCbBase):
351   def __init__(self, queue, job, op):
352     """Initializes this class.
353
354     @type queue: L{JobQueue}
355     @param queue: Job queue
356     @type job: L{_QueuedJob}
357     @param job: Job object
358     @type op: L{_QueuedOpCode}
359     @param op: OpCode
360
361     """
362     assert queue, "Queue is missing"
363     assert job, "Job is missing"
364     assert op, "Opcode is missing"
365
366     self._queue = queue
367     self._job = job
368     self._op = op
369
370   def NotifyStart(self):
371     """Mark the opcode as running, not lock-waiting.
372
373     This is called from the mcpu code as a notifier function, when the LU is
374     finally about to start the Exec() method. Of course, to have end-user
375     visible results, the opcode must be initially (before calling into
376     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
377
378     """
379     self._queue.acquire()
380     try:
381       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
382                                  constants.OP_STATUS_CANCELING)
383
384       # All locks are acquired by now
385       self._job.lock_status = None
386
387       # Cancel here if we were asked to
388       if self._op.status == constants.OP_STATUS_CANCELING:
389         raise CancelJob()
390
391       self._op.status = constants.OP_STATUS_RUNNING
392       self._op.exec_timestamp = TimeStampNow()
393     finally:
394       self._queue.release()
395
396   def Feedback(self, *args):
397     """Append a log entry.
398
399     """
400     assert len(args) < 3
401
402     if len(args) == 1:
403       log_type = constants.ELOG_MESSAGE
404       log_msg = args[0]
405     else:
406       (log_type, log_msg) = args
407
408     # The time is split to make serialization easier and not lose
409     # precision.
410     timestamp = utils.SplitTime(time.time())
411
412     self._queue.acquire()
413     try:
414       self._job.log_serial += 1
415       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
416
417       self._job.change.notifyAll()
418     finally:
419       self._queue.release()
420
421   def ReportLocks(self, msg):
422     """Write locking information to the job.
423
424     Called whenever the LU processor is waiting for a lock or has acquired one.
425
426     """
427     # Not getting the queue lock because this is a single assignment
428     self._job.lock_status = msg
429
430
431 class _JobQueueWorker(workerpool.BaseWorker):
432   """The actual job workers.
433
434   """
435   def RunTask(self, job): # pylint: disable-msg=W0221
436     """Job executor.
437
438     This functions processes a job. It is closely tied to the _QueuedJob and
439     _QueuedOpCode classes.
440
441     @type job: L{_QueuedJob}
442     @param job: the job to be processed
443
444     """
445     logging.info("Processing job %s", job.id)
446     proc = mcpu.Processor(self.pool.queue.context, job.id)
447     queue = job.queue
448     try:
449       try:
450         count = len(job.ops)
451         for idx, op in enumerate(job.ops):
452           op_summary = op.input.Summary()
453           if op.status == constants.OP_STATUS_SUCCESS:
454             # this is a job that was partially completed before master
455             # daemon shutdown, so it can be expected that some opcodes
456             # are already completed successfully (if any did error
457             # out, then the whole job should have been aborted and not
458             # resubmitted for processing)
459             logging.info("Op %s/%s: opcode %s already processed, skipping",
460                          idx + 1, count, op_summary)
461             continue
462           try:
463             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
464                          op_summary)
465
466             queue.acquire()
467             try:
468               if op.status == constants.OP_STATUS_CANCELED:
469                 raise CancelJob()
470               assert op.status == constants.OP_STATUS_QUEUED
471               op.status = constants.OP_STATUS_WAITLOCK
472               op.result = None
473               op.start_timestamp = TimeStampNow()
474               if idx == 0: # first opcode
475                 job.start_timestamp = op.start_timestamp
476               queue.UpdateJobUnlocked(job)
477
478               input_opcode = op.input
479             finally:
480               queue.release()
481
482             # Make sure not to hold queue lock while calling ExecOpCode
483             result = proc.ExecOpCode(input_opcode,
484                                      _OpExecCallbacks(queue, job, op))
485
486             queue.acquire()
487             try:
488               op.status = constants.OP_STATUS_SUCCESS
489               op.result = result
490               op.end_timestamp = TimeStampNow()
491               queue.UpdateJobUnlocked(job)
492             finally:
493               queue.release()
494
495             logging.info("Op %s/%s: Successfully finished opcode %s",
496                          idx + 1, count, op_summary)
497           except CancelJob:
498             # Will be handled further up
499             raise
500           except Exception, err:
501             queue.acquire()
502             try:
503               try:
504                 op.status = constants.OP_STATUS_ERROR
505                 if isinstance(err, errors.GenericError):
506                   op.result = errors.EncodeException(err)
507                 else:
508                   op.result = str(err)
509                 op.end_timestamp = TimeStampNow()
510                 logging.info("Op %s/%s: Error in opcode %s: %s",
511                              idx + 1, count, op_summary, err)
512               finally:
513                 queue.UpdateJobUnlocked(job)
514             finally:
515               queue.release()
516             raise
517
518       except CancelJob:
519         queue.acquire()
520         try:
521           queue.CancelJobUnlocked(job)
522         finally:
523           queue.release()
524       except errors.GenericError, err:
525         logging.exception("Ganeti exception")
526       except:
527         logging.exception("Unhandled exception")
528     finally:
529       queue.acquire()
530       try:
531         try:
532           job.lock_status = None
533           job.end_timestamp = TimeStampNow()
534           queue.UpdateJobUnlocked(job)
535         finally:
536           job_id = job.id
537           status = job.CalcStatus()
538       finally:
539         queue.release()
540
541       logging.info("Finished job %s, status = %s", job_id, status)
542
543
544 class _JobQueueWorkerPool(workerpool.WorkerPool):
545   """Simple class implementing a job-processing workerpool.
546
547   """
548   def __init__(self, queue):
549     super(_JobQueueWorkerPool, self).__init__("JobQueue",
550                                               JOBQUEUE_THREADS,
551                                               _JobQueueWorker)
552     self.queue = queue
553
554
555 def _RequireOpenQueue(fn):
556   """Decorator for "public" functions.
557
558   This function should be used for all 'public' functions. That is,
559   functions usually called from other classes. Note that this should
560   be applied only to methods (not plain functions), since it expects
561   that the decorated function is called with a first argument that has
562   a '_queue_lock' argument.
563
564   @warning: Use this decorator only after utils.LockedMethod!
565
566   Example::
567     @utils.LockedMethod
568     @_RequireOpenQueue
569     def Example(self):
570       pass
571
572   """
573   def wrapper(self, *args, **kwargs):
574     # pylint: disable-msg=W0212
575     assert self._queue_lock is not None, "Queue should be open"
576     return fn(self, *args, **kwargs)
577   return wrapper
578
579
580 class JobQueue(object):
581   """Queue used to manage the jobs.
582
583   @cvar _RE_JOB_FILE: regex matching the valid job file names
584
585   """
586   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
587
588   def __init__(self, context):
589     """Constructor for JobQueue.
590
591     The constructor will initialize the job queue object and then
592     start loading the current jobs from disk, either for starting them
593     (if they were queue) or for aborting them (if they were already
594     running).
595
596     @type context: GanetiContext
597     @param context: the context object for access to the configuration
598         data and other ganeti objects
599
600     """
601     self.context = context
602     self._memcache = weakref.WeakValueDictionary()
603     self._my_hostname = utils.HostInfo().name
604
605     # Locking
606     self._lock = threading.Lock()
607     self.acquire = self._lock.acquire
608     self.release = self._lock.release
609
610     # Initialize
611     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
612
613     # Read serial file
614     self._last_serial = jstore.ReadSerial()
615     assert self._last_serial is not None, ("Serial file was modified between"
616                                            " check in jstore and here")
617
618     # Get initial list of nodes
619     self._nodes = dict((n.name, n.primary_ip)
620                        for n in self.context.cfg.GetAllNodesInfo().values()
621                        if n.master_candidate)
622
623     # Remove master node
624     try:
625       del self._nodes[self._my_hostname]
626     except KeyError:
627       pass
628
629     # TODO: Check consistency across nodes
630
631     # Setup worker pool
632     self._wpool = _JobQueueWorkerPool(self)
633     try:
634       # We need to lock here because WorkerPool.AddTask() may start a job while
635       # we're still doing our work.
636       self.acquire()
637       try:
638         logging.info("Inspecting job queue")
639
640         all_job_ids = self._GetJobIDsUnlocked()
641         jobs_count = len(all_job_ids)
642         lastinfo = time.time()
643         for idx, job_id in enumerate(all_job_ids):
644           # Give an update every 1000 jobs or 10 seconds
645           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
646               idx == (jobs_count - 1)):
647             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
648                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
649             lastinfo = time.time()
650
651           job = self._LoadJobUnlocked(job_id)
652
653           # a failure in loading the job can cause 'None' to be returned
654           if job is None:
655             continue
656
657           status = job.CalcStatus()
658
659           if status in (constants.JOB_STATUS_QUEUED, ):
660             self._wpool.AddTask(job)
661
662           elif status in (constants.JOB_STATUS_RUNNING,
663                           constants.JOB_STATUS_WAITLOCK,
664                           constants.JOB_STATUS_CANCELING):
665             logging.warning("Unfinished job %s found: %s", job.id, job)
666             try:
667               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
668                                     "Unclean master daemon shutdown")
669             finally:
670               self.UpdateJobUnlocked(job)
671
672         logging.info("Job queue inspection finished")
673       finally:
674         self.release()
675     except:
676       self._wpool.TerminateWorkers()
677       raise
678
679   @utils.LockedMethod
680   @_RequireOpenQueue
681   def AddNode(self, node):
682     """Register a new node with the queue.
683
684     @type node: L{objects.Node}
685     @param node: the node object to be added
686
687     """
688     node_name = node.name
689     assert node_name != self._my_hostname
690
691     # Clean queue directory on added node
692     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
693     msg = result.fail_msg
694     if msg:
695       logging.warning("Cannot cleanup queue directory on node %s: %s",
696                       node_name, msg)
697
698     if not node.master_candidate:
699       # remove if existing, ignoring errors
700       self._nodes.pop(node_name, None)
701       # and skip the replication of the job ids
702       return
703
704     # Upload the whole queue excluding archived jobs
705     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
706
707     # Upload current serial file
708     files.append(constants.JOB_QUEUE_SERIAL_FILE)
709
710     for file_name in files:
711       # Read file content
712       content = utils.ReadFile(file_name)
713
714       result = rpc.RpcRunner.call_jobqueue_update([node_name],
715                                                   [node.primary_ip],
716                                                   file_name, content)
717       msg = result[node_name].fail_msg
718       if msg:
719         logging.error("Failed to upload file %s to node %s: %s",
720                       file_name, node_name, msg)
721
722     self._nodes[node_name] = node.primary_ip
723
724   @utils.LockedMethod
725   @_RequireOpenQueue
726   def RemoveNode(self, node_name):
727     """Callback called when removing nodes from the cluster.
728
729     @type node_name: str
730     @param node_name: the name of the node to remove
731
732     """
733     try:
734       # The queue is removed by the "leave node" RPC call.
735       del self._nodes[node_name]
736     except KeyError:
737       pass
738
739   @staticmethod
740   def _CheckRpcResult(result, nodes, failmsg):
741     """Verifies the status of an RPC call.
742
743     Since we aim to keep consistency should this node (the current
744     master) fail, we will log errors if our rpc fail, and especially
745     log the case when more than half of the nodes fails.
746
747     @param result: the data as returned from the rpc call
748     @type nodes: list
749     @param nodes: the list of nodes we made the call to
750     @type failmsg: str
751     @param failmsg: the identifier to be used for logging
752
753     """
754     failed = []
755     success = []
756
757     for node in nodes:
758       msg = result[node].fail_msg
759       if msg:
760         failed.append(node)
761         logging.error("RPC call %s (%s) failed on node %s: %s",
762                       result[node].call, failmsg, node, msg)
763       else:
764         success.append(node)
765
766     # +1 for the master node
767     if (len(success) + 1) < len(failed):
768       # TODO: Handle failing nodes
769       logging.error("More than half of the nodes failed")
770
771   def _GetNodeIp(self):
772     """Helper for returning the node name/ip list.
773
774     @rtype: (list, list)
775     @return: a tuple of two lists, the first one with the node
776         names and the second one with the node addresses
777
778     """
779     name_list = self._nodes.keys()
780     addr_list = [self._nodes[name] for name in name_list]
781     return name_list, addr_list
782
783   def _WriteAndReplicateFileUnlocked(self, file_name, data):
784     """Writes a file locally and then replicates it to all nodes.
785
786     This function will replace the contents of a file on the local
787     node and then replicate it to all the other nodes we have.
788
789     @type file_name: str
790     @param file_name: the path of the file to be replicated
791     @type data: str
792     @param data: the new contents of the file
793
794     """
795     utils.WriteFile(file_name, data=data)
796
797     names, addrs = self._GetNodeIp()
798     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
799     self._CheckRpcResult(result, self._nodes,
800                          "Updating %s" % file_name)
801
802   def _RenameFilesUnlocked(self, rename):
803     """Renames a file locally and then replicate the change.
804
805     This function will rename a file in the local queue directory
806     and then replicate this rename to all the other nodes we have.
807
808     @type rename: list of (old, new)
809     @param rename: List containing tuples mapping old to new names
810
811     """
812     # Rename them locally
813     for old, new in rename:
814       utils.RenameFile(old, new, mkdir=True)
815
816     # ... and on all nodes
817     names, addrs = self._GetNodeIp()
818     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
819     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
820
821   @staticmethod
822   def _FormatJobID(job_id):
823     """Convert a job ID to string format.
824
825     Currently this just does C{str(job_id)} after performing some
826     checks, but if we want to change the job id format this will
827     abstract this change.
828
829     @type job_id: int or long
830     @param job_id: the numeric job id
831     @rtype: str
832     @return: the formatted job id
833
834     """
835     if not isinstance(job_id, (int, long)):
836       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
837     if job_id < 0:
838       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
839
840     return str(job_id)
841
842   @classmethod
843   def _GetArchiveDirectory(cls, job_id):
844     """Returns the archive directory for a job.
845
846     @type job_id: str
847     @param job_id: Job identifier
848     @rtype: str
849     @return: Directory name
850
851     """
852     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
853
854   def _NewSerialsUnlocked(self, count):
855     """Generates a new job identifier.
856
857     Job identifiers are unique during the lifetime of a cluster.
858
859     @type count: integer
860     @param count: how many serials to return
861     @rtype: str
862     @return: a string representing the job identifier.
863
864     """
865     assert count > 0
866     # New number
867     serial = self._last_serial + count
868
869     # Write to file
870     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
871                                         "%s\n" % serial)
872
873     result = [self._FormatJobID(v)
874               for v in range(self._last_serial, serial + 1)]
875     # Keep it only if we were able to write the file
876     self._last_serial = serial
877
878     return result
879
880   @staticmethod
881   def _GetJobPath(job_id):
882     """Returns the job file for a given job id.
883
884     @type job_id: str
885     @param job_id: the job identifier
886     @rtype: str
887     @return: the path to the job file
888
889     """
890     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
891
892   @classmethod
893   def _GetArchivedJobPath(cls, job_id):
894     """Returns the archived job file for a give job id.
895
896     @type job_id: str
897     @param job_id: the job identifier
898     @rtype: str
899     @return: the path to the archived job file
900
901     """
902     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
903                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
904
905   @classmethod
906   def _ExtractJobID(cls, name):
907     """Extract the job id from a filename.
908
909     @type name: str
910     @param name: the job filename
911     @rtype: job id or None
912     @return: the job id corresponding to the given filename,
913         or None if the filename does not represent a valid
914         job file
915
916     """
917     m = cls._RE_JOB_FILE.match(name)
918     if m:
919       return m.group(1)
920     else:
921       return None
922
923   def _GetJobIDsUnlocked(self, archived=False):
924     """Return all known job IDs.
925
926     If the parameter archived is True, archived jobs IDs will be
927     included. Currently this argument is unused.
928
929     The method only looks at disk because it's a requirement that all
930     jobs are present on disk (so in the _memcache we don't have any
931     extra IDs).
932
933     @rtype: list
934     @return: the list of job IDs
935
936     """
937     # pylint: disable-msg=W0613
938     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
939     jlist = utils.NiceSort(jlist)
940     return jlist
941
942   def _ListJobFiles(self):
943     """Returns the list of current job files.
944
945     @rtype: list
946     @return: the list of job file names
947
948     """
949     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
950             if self._RE_JOB_FILE.match(name)]
951
952   def _LoadJobUnlocked(self, job_id):
953     """Loads a job from the disk or memory.
954
955     Given a job id, this will return the cached job object if
956     existing, or try to load the job from the disk. If loading from
957     disk, it will also add the job to the cache.
958
959     @param job_id: the job id
960     @rtype: L{_QueuedJob} or None
961     @return: either None or the job object
962
963     """
964     job = self._memcache.get(job_id, None)
965     if job:
966       logging.debug("Found job %s in memcache", job_id)
967       return job
968
969     filepath = self._GetJobPath(job_id)
970     logging.debug("Loading job from %s", filepath)
971     try:
972       raw_data = utils.ReadFile(filepath)
973     except IOError, err:
974       if err.errno in (errno.ENOENT, ):
975         return None
976       raise
977
978     data = serializer.LoadJson(raw_data)
979
980     try:
981       job = _QueuedJob.Restore(self, data)
982     except Exception, err: # pylint: disable-msg=W0703
983       new_path = self._GetArchivedJobPath(job_id)
984       if filepath == new_path:
985         # job already archived (future case)
986         logging.exception("Can't parse job %s", job_id)
987       else:
988         # non-archived case
989         logging.exception("Can't parse job %s, will archive.", job_id)
990         self._RenameFilesUnlocked([(filepath, new_path)])
991       return None
992
993     self._memcache[job_id] = job
994     logging.debug("Added job %s to the cache", job_id)
995     return job
996
997   def _GetJobsUnlocked(self, job_ids):
998     """Return a list of jobs based on their IDs.
999
1000     @type job_ids: list
1001     @param job_ids: either an empty list (meaning all jobs),
1002         or a list of job IDs
1003     @rtype: list
1004     @return: the list of job objects
1005
1006     """
1007     if not job_ids:
1008       job_ids = self._GetJobIDsUnlocked()
1009
1010     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1011
1012   @staticmethod
1013   def _IsQueueMarkedDrain():
1014     """Check if the queue is marked from drain.
1015
1016     This currently uses the queue drain file, which makes it a
1017     per-node flag. In the future this can be moved to the config file.
1018
1019     @rtype: boolean
1020     @return: True of the job queue is marked for draining
1021
1022     """
1023     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1024
1025   @staticmethod
1026   def SetDrainFlag(drain_flag):
1027     """Sets the drain flag for the queue.
1028
1029     This is similar to the function L{backend.JobQueueSetDrainFlag},
1030     and in the future we might merge them.
1031
1032     @type drain_flag: boolean
1033     @param drain_flag: Whether to set or unset the drain flag
1034
1035     """
1036     if drain_flag:
1037       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1038     else:
1039       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1040     return True
1041
1042   @_RequireOpenQueue
1043   def _SubmitJobUnlocked(self, job_id, ops):
1044     """Create and store a new job.
1045
1046     This enters the job into our job queue and also puts it on the new
1047     queue, in order for it to be picked up by the queue processors.
1048
1049     @type job_id: job ID
1050     @param job_id: the job ID for the new job
1051     @type ops: list
1052     @param ops: The list of OpCodes that will become the new job.
1053     @rtype: job ID
1054     @return: the job ID of the newly created job
1055     @raise errors.JobQueueDrainError: if the job is marked for draining
1056
1057     """
1058     if self._IsQueueMarkedDrain():
1059       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1060
1061     # Check job queue size
1062     size = len(self._ListJobFiles())
1063     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1064       # TODO: Autoarchive jobs. Make sure it's not done on every job
1065       # submission, though.
1066       #size = ...
1067       pass
1068
1069     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1070       raise errors.JobQueueFull()
1071
1072     job = _QueuedJob(self, job_id, ops)
1073
1074     # Write to disk
1075     self.UpdateJobUnlocked(job)
1076
1077     logging.debug("Adding new job %s to the cache", job_id)
1078     self._memcache[job_id] = job
1079
1080     # Add to worker pool
1081     self._wpool.AddTask(job)
1082
1083     return job.id
1084
1085   @utils.LockedMethod
1086   @_RequireOpenQueue
1087   def SubmitJob(self, ops):
1088     """Create and store a new job.
1089
1090     @see: L{_SubmitJobUnlocked}
1091
1092     """
1093     job_id = self._NewSerialsUnlocked(1)[0]
1094     return self._SubmitJobUnlocked(job_id, ops)
1095
1096   @utils.LockedMethod
1097   @_RequireOpenQueue
1098   def SubmitManyJobs(self, jobs):
1099     """Create and store multiple jobs.
1100
1101     @see: L{_SubmitJobUnlocked}
1102
1103     """
1104     results = []
1105     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1106     for job_id, ops in zip(all_job_ids, jobs):
1107       try:
1108         data = self._SubmitJobUnlocked(job_id, ops)
1109         status = True
1110       except errors.GenericError, err:
1111         data = str(err)
1112         status = False
1113       results.append((status, data))
1114
1115     return results
1116
1117   @_RequireOpenQueue
1118   def UpdateJobUnlocked(self, job):
1119     """Update a job's on disk storage.
1120
1121     After a job has been modified, this function needs to be called in
1122     order to write the changes to disk and replicate them to the other
1123     nodes.
1124
1125     @type job: L{_QueuedJob}
1126     @param job: the changed job
1127
1128     """
1129     filename = self._GetJobPath(job.id)
1130     data = serializer.DumpJson(job.Serialize(), indent=False)
1131     logging.debug("Writing job %s to %s", job.id, filename)
1132     self._WriteAndReplicateFileUnlocked(filename, data)
1133
1134     # Notify waiters about potential changes
1135     job.change.notifyAll()
1136
1137   @utils.LockedMethod
1138   @_RequireOpenQueue
1139   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1140                         timeout):
1141     """Waits for changes in a job.
1142
1143     @type job_id: string
1144     @param job_id: Job identifier
1145     @type fields: list of strings
1146     @param fields: Which fields to check for changes
1147     @type prev_job_info: list or None
1148     @param prev_job_info: Last job information returned
1149     @type prev_log_serial: int
1150     @param prev_log_serial: Last job message serial number
1151     @type timeout: float
1152     @param timeout: maximum time to wait
1153     @rtype: tuple (job info, log entries)
1154     @return: a tuple of the job information as required via
1155         the fields parameter, and the log entries as a list
1156
1157         if the job has not changed and the timeout has expired,
1158         we instead return a special value,
1159         L{constants.JOB_NOTCHANGED}, which should be interpreted
1160         as such by the clients
1161
1162     """
1163     job = self._LoadJobUnlocked(job_id)
1164     if not job:
1165       logging.debug("Job %s not found", job_id)
1166       return None
1167
1168     def _CheckForChanges():
1169       logging.debug("Waiting for changes in job %s", job_id)
1170
1171       status = job.CalcStatus()
1172       job_info = self._GetJobInfoUnlocked(job, fields)
1173       log_entries = job.GetLogEntries(prev_log_serial)
1174
1175       # Serializing and deserializing data can cause type changes (e.g. from
1176       # tuple to list) or precision loss. We're doing it here so that we get
1177       # the same modifications as the data received from the client. Without
1178       # this, the comparison afterwards might fail without the data being
1179       # significantly different.
1180       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1181       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1182
1183       # Don't even try to wait if the job is no longer running, there will be
1184       # no changes.
1185       if (status not in (constants.JOB_STATUS_QUEUED,
1186                          constants.JOB_STATUS_RUNNING,
1187                          constants.JOB_STATUS_WAITLOCK) or
1188           prev_job_info != job_info or
1189           (log_entries and prev_log_serial != log_entries[0][0])):
1190         logging.debug("Job %s changed", job_id)
1191         return (job_info, log_entries)
1192
1193       raise utils.RetryAgain()
1194
1195     try:
1196       # Setting wait function to release the queue lock while waiting
1197       return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1198                          wait_fn=job.change.wait)
1199     except utils.RetryTimeout:
1200       return constants.JOB_NOTCHANGED
1201
1202   @utils.LockedMethod
1203   @_RequireOpenQueue
1204   def CancelJob(self, job_id):
1205     """Cancels a job.
1206
1207     This will only succeed if the job has not started yet.
1208
1209     @type job_id: string
1210     @param job_id: job ID of job to be cancelled.
1211
1212     """
1213     logging.info("Cancelling job %s", job_id)
1214
1215     job = self._LoadJobUnlocked(job_id)
1216     if not job:
1217       logging.debug("Job %s not found", job_id)
1218       return (False, "Job %s not found" % job_id)
1219
1220     job_status = job.CalcStatus()
1221
1222     if job_status not in (constants.JOB_STATUS_QUEUED,
1223                           constants.JOB_STATUS_WAITLOCK):
1224       logging.debug("Job %s is no longer waiting in the queue", job.id)
1225       return (False, "Job %s is no longer waiting in the queue" % job.id)
1226
1227     if job_status == constants.JOB_STATUS_QUEUED:
1228       self.CancelJobUnlocked(job)
1229       return (True, "Job %s canceled" % job.id)
1230
1231     elif job_status == constants.JOB_STATUS_WAITLOCK:
1232       # The worker will notice the new status and cancel the job
1233       try:
1234         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1235       finally:
1236         self.UpdateJobUnlocked(job)
1237       return (True, "Job %s will be canceled" % job.id)
1238
1239   @_RequireOpenQueue
1240   def CancelJobUnlocked(self, job):
1241     """Marks a job as canceled.
1242
1243     """
1244     try:
1245       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1246                             "Job canceled by request")
1247     finally:
1248       self.UpdateJobUnlocked(job)
1249
1250   @_RequireOpenQueue
1251   def _ArchiveJobsUnlocked(self, jobs):
1252     """Archives jobs.
1253
1254     @type jobs: list of L{_QueuedJob}
1255     @param jobs: Job objects
1256     @rtype: int
1257     @return: Number of archived jobs
1258
1259     """
1260     archive_jobs = []
1261     rename_files = []
1262     for job in jobs:
1263       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1264                                   constants.JOB_STATUS_SUCCESS,
1265                                   constants.JOB_STATUS_ERROR):
1266         logging.debug("Job %s is not yet done", job.id)
1267         continue
1268
1269       archive_jobs.append(job)
1270
1271       old = self._GetJobPath(job.id)
1272       new = self._GetArchivedJobPath(job.id)
1273       rename_files.append((old, new))
1274
1275     # TODO: What if 1..n files fail to rename?
1276     self._RenameFilesUnlocked(rename_files)
1277
1278     logging.debug("Successfully archived job(s) %s",
1279                   utils.CommaJoin(job.id for job in archive_jobs))
1280
1281     return len(archive_jobs)
1282
1283   @utils.LockedMethod
1284   @_RequireOpenQueue
1285   def ArchiveJob(self, job_id):
1286     """Archives a job.
1287
1288     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1289
1290     @type job_id: string
1291     @param job_id: Job ID of job to be archived.
1292     @rtype: bool
1293     @return: Whether job was archived
1294
1295     """
1296     logging.info("Archiving job %s", job_id)
1297
1298     job = self._LoadJobUnlocked(job_id)
1299     if not job:
1300       logging.debug("Job %s not found", job_id)
1301       return False
1302
1303     return self._ArchiveJobsUnlocked([job]) == 1
1304
1305   @utils.LockedMethod
1306   @_RequireOpenQueue
1307   def AutoArchiveJobs(self, age, timeout):
1308     """Archives all jobs based on age.
1309
1310     The method will archive all jobs which are older than the age
1311     parameter. For jobs that don't have an end timestamp, the start
1312     timestamp will be considered. The special '-1' age will cause
1313     archival of all jobs (that are not running or queued).
1314
1315     @type age: int
1316     @param age: the minimum age in seconds
1317
1318     """
1319     logging.info("Archiving jobs with age more than %s seconds", age)
1320
1321     now = time.time()
1322     end_time = now + timeout
1323     archived_count = 0
1324     last_touched = 0
1325
1326     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1327     pending = []
1328     for idx, job_id in enumerate(all_job_ids):
1329       last_touched = idx + 1
1330
1331       # Not optimal because jobs could be pending
1332       # TODO: Measure average duration for job archival and take number of
1333       # pending jobs into account.
1334       if time.time() > end_time:
1335         break
1336
1337       # Returns None if the job failed to load
1338       job = self._LoadJobUnlocked(job_id)
1339       if job:
1340         if job.end_timestamp is None:
1341           if job.start_timestamp is None:
1342             job_age = job.received_timestamp
1343           else:
1344             job_age = job.start_timestamp
1345         else:
1346           job_age = job.end_timestamp
1347
1348         if age == -1 or now - job_age[0] > age:
1349           pending.append(job)
1350
1351           # Archive 10 jobs at a time
1352           if len(pending) >= 10:
1353             archived_count += self._ArchiveJobsUnlocked(pending)
1354             pending = []
1355
1356     if pending:
1357       archived_count += self._ArchiveJobsUnlocked(pending)
1358
1359     return (archived_count, len(all_job_ids) - last_touched)
1360
1361   @staticmethod
1362   def _GetJobInfoUnlocked(job, fields):
1363     """Returns information about a job.
1364
1365     @type job: L{_QueuedJob}
1366     @param job: the job which we query
1367     @type fields: list
1368     @param fields: names of fields to return
1369     @rtype: list
1370     @return: list with one element for each field
1371     @raise errors.OpExecError: when an invalid field
1372         has been passed
1373
1374     """
1375     row = []
1376     for fname in fields:
1377       if fname == "id":
1378         row.append(job.id)
1379       elif fname == "status":
1380         row.append(job.CalcStatus())
1381       elif fname == "ops":
1382         row.append([op.input.__getstate__() for op in job.ops])
1383       elif fname == "opresult":
1384         row.append([op.result for op in job.ops])
1385       elif fname == "opstatus":
1386         row.append([op.status for op in job.ops])
1387       elif fname == "oplog":
1388         row.append([op.log for op in job.ops])
1389       elif fname == "opstart":
1390         row.append([op.start_timestamp for op in job.ops])
1391       elif fname == "opexec":
1392         row.append([op.exec_timestamp for op in job.ops])
1393       elif fname == "opend":
1394         row.append([op.end_timestamp for op in job.ops])
1395       elif fname == "received_ts":
1396         row.append(job.received_timestamp)
1397       elif fname == "start_ts":
1398         row.append(job.start_timestamp)
1399       elif fname == "end_ts":
1400         row.append(job.end_timestamp)
1401       elif fname == "lock_status":
1402         row.append(job.lock_status)
1403       elif fname == "summary":
1404         row.append([op.input.Summary() for op in job.ops])
1405       else:
1406         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1407     return row
1408
1409   @utils.LockedMethod
1410   @_RequireOpenQueue
1411   def QueryJobs(self, job_ids, fields):
1412     """Returns a list of jobs in queue.
1413
1414     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1415     processing for each job.
1416
1417     @type job_ids: list
1418     @param job_ids: sequence of job identifiers or None for all
1419     @type fields: list
1420     @param fields: names of fields to return
1421     @rtype: list
1422     @return: list one element per job, each element being list with
1423         the requested fields
1424
1425     """
1426     jobs = []
1427
1428     for job in self._GetJobsUnlocked(job_ids):
1429       if job is None:
1430         jobs.append(None)
1431       else:
1432         jobs.append(self._GetJobInfoUnlocked(job, fields))
1433
1434     return jobs
1435
1436   @utils.LockedMethod
1437   @_RequireOpenQueue
1438   def Shutdown(self):
1439     """Stops the job queue.
1440
1441     This shutdowns all the worker threads an closes the queue.
1442
1443     """
1444     self._wpool.TerminateWorkers()
1445
1446     self._queue_lock.Close()
1447     self._queue_lock = None