Factorize LUXI parsing and handling code
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type log_serial: int
149   @ivar log_serial: holds the index for the next log entry
150   @ivar received_timestamp: the timestamp for when the job was received
151   @ivar start_timestmap: the timestamp for start of execution
152   @ivar end_timestamp: the timestamp for end of execution
153   @ivar lock_status: In-memory locking information for debugging
154   @ivar change: a Condition variable we use for waiting for job changes
155
156   """
157   # pylint: disable-msg=W0212
158   __slots__ = ["queue", "id", "ops", "log_serial",
159                "received_timestamp", "start_timestamp", "end_timestamp",
160                "lock_status", "change",
161                "__weakref__"]
162
163   def __init__(self, queue, job_id, ops):
164     """Constructor for the _QueuedJob.
165
166     @type queue: L{JobQueue}
167     @param queue: our parent queue
168     @type job_id: job_id
169     @param job_id: our job id
170     @type ops: list
171     @param ops: the list of opcodes we hold, which will be encapsulated
172         in _QueuedOpCodes
173
174     """
175     if not ops:
176       # TODO: use a better exception
177       raise Exception("No opcodes")
178
179     self.queue = queue
180     self.id = job_id
181     self.ops = [_QueuedOpCode(op) for op in ops]
182     self.log_serial = 0
183     self.received_timestamp = TimeStampNow()
184     self.start_timestamp = None
185     self.end_timestamp = None
186
187     # In-memory attributes
188     self.lock_status = None
189
190     # Condition to wait for changes
191     self.change = threading.Condition(self.queue._lock)
192
193   @classmethod
194   def Restore(cls, queue, state):
195     """Restore a _QueuedJob from serialized state:
196
197     @type queue: L{JobQueue}
198     @param queue: to which queue the restored job belongs
199     @type state: dict
200     @param state: the serialized state
201     @rtype: _JobQueue
202     @return: the restored _JobQueue instance
203
204     """
205     obj = _QueuedJob.__new__(cls)
206     obj.queue = queue
207     obj.id = state["id"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     # In-memory attributes
213     obj.lock_status = None
214
215     obj.ops = []
216     obj.log_serial = 0
217     for op_state in state["ops"]:
218       op = _QueuedOpCode.Restore(op_state)
219       for log_entry in op.log:
220         obj.log_serial = max(obj.log_serial, log_entry[0])
221       obj.ops.append(op)
222
223     # Condition to wait for changes
224     obj.change = threading.Condition(obj.queue._lock)
225
226     return obj
227
228   def Serialize(self):
229     """Serialize the _JobQueue instance.
230
231     @rtype: dict
232     @return: the serialized state
233
234     """
235     return {
236       "id": self.id,
237       "ops": [op.Serialize() for op in self.ops],
238       "start_timestamp": self.start_timestamp,
239       "end_timestamp": self.end_timestamp,
240       "received_timestamp": self.received_timestamp,
241       }
242
243   def CalcStatus(self):
244     """Compute the status of this job.
245
246     This function iterates over all the _QueuedOpCodes in the job and
247     based on their status, computes the job status.
248
249     The algorithm is:
250       - if we find a cancelled, or finished with error, the job
251         status will be the same
252       - otherwise, the last opcode with the status one of:
253           - waitlock
254           - canceling
255           - running
256
257         will determine the job status
258
259       - otherwise, it means either all opcodes are queued, or success,
260         and the job status will be the same
261
262     @return: the job status
263
264     """
265     status = constants.JOB_STATUS_QUEUED
266
267     all_success = True
268     for op in self.ops:
269       if op.status == constants.OP_STATUS_SUCCESS:
270         continue
271
272       all_success = False
273
274       if op.status == constants.OP_STATUS_QUEUED:
275         pass
276       elif op.status == constants.OP_STATUS_WAITLOCK:
277         status = constants.JOB_STATUS_WAITLOCK
278       elif op.status == constants.OP_STATUS_RUNNING:
279         status = constants.JOB_STATUS_RUNNING
280       elif op.status == constants.OP_STATUS_CANCELING:
281         status = constants.JOB_STATUS_CANCELING
282         break
283       elif op.status == constants.OP_STATUS_ERROR:
284         status = constants.JOB_STATUS_ERROR
285         # The whole job fails if one opcode failed
286         break
287       elif op.status == constants.OP_STATUS_CANCELED:
288         status = constants.OP_STATUS_CANCELED
289         break
290
291     if all_success:
292       status = constants.JOB_STATUS_SUCCESS
293
294     return status
295
296   def GetLogEntries(self, newer_than):
297     """Selectively returns the log entries.
298
299     @type newer_than: None or int
300     @param newer_than: if this is None, return all log entries,
301         otherwise return only the log entries with serial higher
302         than this value
303     @rtype: list
304     @return: the list of the log entries selected
305
306     """
307     if newer_than is None:
308       serial = -1
309     else:
310       serial = newer_than
311
312     entries = []
313     for op in self.ops:
314       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
315
316     return entries
317
318   def MarkUnfinishedOps(self, status, result):
319     """Mark unfinished opcodes with a given status and result.
320
321     This is an utility function for marking all running or waiting to
322     be run opcodes with a given status. Opcodes which are already
323     finalised are not changed.
324
325     @param status: a given opcode status
326     @param result: the opcode result
327
328     """
329     not_marked = True
330     for op in self.ops:
331       if op.status in constants.OPS_FINALIZED:
332         assert not_marked, "Finalized opcodes found after non-finalized ones"
333         continue
334       op.status = status
335       op.result = result
336       not_marked = False
337
338
339 class _OpExecCallbacks(mcpu.OpExecCbBase):
340   def __init__(self, queue, job, op):
341     """Initializes this class.
342
343     @type queue: L{JobQueue}
344     @param queue: Job queue
345     @type job: L{_QueuedJob}
346     @param job: Job object
347     @type op: L{_QueuedOpCode}
348     @param op: OpCode
349
350     """
351     assert queue, "Queue is missing"
352     assert job, "Job is missing"
353     assert op, "Opcode is missing"
354
355     self._queue = queue
356     self._job = job
357     self._op = op
358
359   def NotifyStart(self):
360     """Mark the opcode as running, not lock-waiting.
361
362     This is called from the mcpu code as a notifier function, when the LU is
363     finally about to start the Exec() method. Of course, to have end-user
364     visible results, the opcode must be initially (before calling into
365     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
366
367     """
368     self._queue.acquire()
369     try:
370       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
371                                  constants.OP_STATUS_CANCELING)
372
373       # All locks are acquired by now
374       self._job.lock_status = None
375
376       # Cancel here if we were asked to
377       if self._op.status == constants.OP_STATUS_CANCELING:
378         raise CancelJob()
379
380       self._op.status = constants.OP_STATUS_RUNNING
381     finally:
382       self._queue.release()
383
384   def Feedback(self, *args):
385     """Append a log entry.
386
387     """
388     assert len(args) < 3
389
390     if len(args) == 1:
391       log_type = constants.ELOG_MESSAGE
392       log_msg = args[0]
393     else:
394       (log_type, log_msg) = args
395
396     # The time is split to make serialization easier and not lose
397     # precision.
398     timestamp = utils.SplitTime(time.time())
399
400     self._queue.acquire()
401     try:
402       self._job.log_serial += 1
403       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
404
405       self._job.change.notifyAll()
406     finally:
407       self._queue.release()
408
409   def ReportLocks(self, msg):
410     """Write locking information to the job.
411
412     Called whenever the LU processor is waiting for a lock or has acquired one.
413
414     """
415     # Not getting the queue lock because this is a single assignment
416     self._job.lock_status = msg
417
418
419 class _JobQueueWorker(workerpool.BaseWorker):
420   """The actual job workers.
421
422   """
423   def RunTask(self, job): # pylint: disable-msg=W0221
424     """Job executor.
425
426     This functions processes a job. It is closely tied to the _QueuedJob and
427     _QueuedOpCode classes.
428
429     @type job: L{_QueuedJob}
430     @param job: the job to be processed
431
432     """
433     logging.info("Worker %s processing job %s",
434                   self.worker_id, job.id)
435     proc = mcpu.Processor(self.pool.queue.context, job.id)
436     queue = job.queue
437     try:
438       try:
439         count = len(job.ops)
440         for idx, op in enumerate(job.ops):
441           op_summary = op.input.Summary()
442           if op.status == constants.OP_STATUS_SUCCESS:
443             # this is a job that was partially completed before master
444             # daemon shutdown, so it can be expected that some opcodes
445             # are already completed successfully (if any did error
446             # out, then the whole job should have been aborted and not
447             # resubmitted for processing)
448             logging.info("Op %s/%s: opcode %s already processed, skipping",
449                          idx + 1, count, op_summary)
450             continue
451           try:
452             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
453                          op_summary)
454
455             queue.acquire()
456             try:
457               if op.status == constants.OP_STATUS_CANCELED:
458                 raise CancelJob()
459               assert op.status == constants.OP_STATUS_QUEUED
460               op.status = constants.OP_STATUS_WAITLOCK
461               op.result = None
462               op.start_timestamp = TimeStampNow()
463               if idx == 0: # first opcode
464                 job.start_timestamp = op.start_timestamp
465               queue.UpdateJobUnlocked(job)
466
467               input_opcode = op.input
468             finally:
469               queue.release()
470
471             # Make sure not to hold queue lock while calling ExecOpCode
472             result = proc.ExecOpCode(input_opcode,
473                                      _OpExecCallbacks(queue, job, op))
474
475             queue.acquire()
476             try:
477               op.status = constants.OP_STATUS_SUCCESS
478               op.result = result
479               op.end_timestamp = TimeStampNow()
480               queue.UpdateJobUnlocked(job)
481             finally:
482               queue.release()
483
484             logging.info("Op %s/%s: Successfully finished opcode %s",
485                          idx + 1, count, op_summary)
486           except CancelJob:
487             # Will be handled further up
488             raise
489           except Exception, err:
490             queue.acquire()
491             try:
492               try:
493                 op.status = constants.OP_STATUS_ERROR
494                 if isinstance(err, errors.GenericError):
495                   op.result = errors.EncodeException(err)
496                 else:
497                   op.result = str(err)
498                 op.end_timestamp = TimeStampNow()
499                 logging.info("Op %s/%s: Error in opcode %s: %s",
500                              idx + 1, count, op_summary, err)
501               finally:
502                 queue.UpdateJobUnlocked(job)
503             finally:
504               queue.release()
505             raise
506
507       except CancelJob:
508         queue.acquire()
509         try:
510           queue.CancelJobUnlocked(job)
511         finally:
512           queue.release()
513       except errors.GenericError, err:
514         logging.exception("Ganeti exception")
515       except:
516         logging.exception("Unhandled exception")
517     finally:
518       queue.acquire()
519       try:
520         try:
521           job.lock_status = None
522           job.end_timestamp = TimeStampNow()
523           queue.UpdateJobUnlocked(job)
524         finally:
525           job_id = job.id
526           status = job.CalcStatus()
527       finally:
528         queue.release()
529
530       logging.info("Worker %s finished job %s, status = %s",
531                    self.worker_id, job_id, status)
532
533
534 class _JobQueueWorkerPool(workerpool.WorkerPool):
535   """Simple class implementing a job-processing workerpool.
536
537   """
538   def __init__(self, queue):
539     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
540                                               _JobQueueWorker)
541     self.queue = queue
542
543
544 def _RequireOpenQueue(fn):
545   """Decorator for "public" functions.
546
547   This function should be used for all 'public' functions. That is,
548   functions usually called from other classes. Note that this should
549   be applied only to methods (not plain functions), since it expects
550   that the decorated function is called with a first argument that has
551   a '_queue_lock' argument.
552
553   @warning: Use this decorator only after utils.LockedMethod!
554
555   Example::
556     @utils.LockedMethod
557     @_RequireOpenQueue
558     def Example(self):
559       pass
560
561   """
562   def wrapper(self, *args, **kwargs):
563     # pylint: disable-msg=W0212
564     assert self._queue_lock is not None, "Queue should be open"
565     return fn(self, *args, **kwargs)
566   return wrapper
567
568
569 class JobQueue(object):
570   """Queue used to manage the jobs.
571
572   @cvar _RE_JOB_FILE: regex matching the valid job file names
573
574   """
575   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
576
577   def __init__(self, context):
578     """Constructor for JobQueue.
579
580     The constructor will initialize the job queue object and then
581     start loading the current jobs from disk, either for starting them
582     (if they were queue) or for aborting them (if they were already
583     running).
584
585     @type context: GanetiContext
586     @param context: the context object for access to the configuration
587         data and other ganeti objects
588
589     """
590     self.context = context
591     self._memcache = weakref.WeakValueDictionary()
592     self._my_hostname = utils.HostInfo().name
593
594     # Locking
595     self._lock = threading.Lock()
596     self.acquire = self._lock.acquire
597     self.release = self._lock.release
598
599     # Initialize
600     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
601
602     # Read serial file
603     self._last_serial = jstore.ReadSerial()
604     assert self._last_serial is not None, ("Serial file was modified between"
605                                            " check in jstore and here")
606
607     # Get initial list of nodes
608     self._nodes = dict((n.name, n.primary_ip)
609                        for n in self.context.cfg.GetAllNodesInfo().values()
610                        if n.master_candidate)
611
612     # Remove master node
613     try:
614       del self._nodes[self._my_hostname]
615     except KeyError:
616       pass
617
618     # TODO: Check consistency across nodes
619
620     # Setup worker pool
621     self._wpool = _JobQueueWorkerPool(self)
622     try:
623       # We need to lock here because WorkerPool.AddTask() may start a job while
624       # we're still doing our work.
625       self.acquire()
626       try:
627         logging.info("Inspecting job queue")
628
629         all_job_ids = self._GetJobIDsUnlocked()
630         jobs_count = len(all_job_ids)
631         lastinfo = time.time()
632         for idx, job_id in enumerate(all_job_ids):
633           # Give an update every 1000 jobs or 10 seconds
634           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
635               idx == (jobs_count - 1)):
636             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
637                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
638             lastinfo = time.time()
639
640           job = self._LoadJobUnlocked(job_id)
641
642           # a failure in loading the job can cause 'None' to be returned
643           if job is None:
644             continue
645
646           status = job.CalcStatus()
647
648           if status in (constants.JOB_STATUS_QUEUED, ):
649             self._wpool.AddTask(job)
650
651           elif status in (constants.JOB_STATUS_RUNNING,
652                           constants.JOB_STATUS_WAITLOCK,
653                           constants.JOB_STATUS_CANCELING):
654             logging.warning("Unfinished job %s found: %s", job.id, job)
655             try:
656               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
657                                     "Unclean master daemon shutdown")
658             finally:
659               self.UpdateJobUnlocked(job)
660
661         logging.info("Job queue inspection finished")
662       finally:
663         self.release()
664     except:
665       self._wpool.TerminateWorkers()
666       raise
667
668   @utils.LockedMethod
669   @_RequireOpenQueue
670   def AddNode(self, node):
671     """Register a new node with the queue.
672
673     @type node: L{objects.Node}
674     @param node: the node object to be added
675
676     """
677     node_name = node.name
678     assert node_name != self._my_hostname
679
680     # Clean queue directory on added node
681     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
682     msg = result.fail_msg
683     if msg:
684       logging.warning("Cannot cleanup queue directory on node %s: %s",
685                       node_name, msg)
686
687     if not node.master_candidate:
688       # remove if existing, ignoring errors
689       self._nodes.pop(node_name, None)
690       # and skip the replication of the job ids
691       return
692
693     # Upload the whole queue excluding archived jobs
694     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
695
696     # Upload current serial file
697     files.append(constants.JOB_QUEUE_SERIAL_FILE)
698
699     for file_name in files:
700       # Read file content
701       content = utils.ReadFile(file_name)
702
703       result = rpc.RpcRunner.call_jobqueue_update([node_name],
704                                                   [node.primary_ip],
705                                                   file_name, content)
706       msg = result[node_name].fail_msg
707       if msg:
708         logging.error("Failed to upload file %s to node %s: %s",
709                       file_name, node_name, msg)
710
711     self._nodes[node_name] = node.primary_ip
712
713   @utils.LockedMethod
714   @_RequireOpenQueue
715   def RemoveNode(self, node_name):
716     """Callback called when removing nodes from the cluster.
717
718     @type node_name: str
719     @param node_name: the name of the node to remove
720
721     """
722     try:
723       # The queue is removed by the "leave node" RPC call.
724       del self._nodes[node_name]
725     except KeyError:
726       pass
727
728   @staticmethod
729   def _CheckRpcResult(result, nodes, failmsg):
730     """Verifies the status of an RPC call.
731
732     Since we aim to keep consistency should this node (the current
733     master) fail, we will log errors if our rpc fail, and especially
734     log the case when more than half of the nodes fails.
735
736     @param result: the data as returned from the rpc call
737     @type nodes: list
738     @param nodes: the list of nodes we made the call to
739     @type failmsg: str
740     @param failmsg: the identifier to be used for logging
741
742     """
743     failed = []
744     success = []
745
746     for node in nodes:
747       msg = result[node].fail_msg
748       if msg:
749         failed.append(node)
750         logging.error("RPC call %s (%s) failed on node %s: %s",
751                       result[node].call, failmsg, node, msg)
752       else:
753         success.append(node)
754
755     # +1 for the master node
756     if (len(success) + 1) < len(failed):
757       # TODO: Handle failing nodes
758       logging.error("More than half of the nodes failed")
759
760   def _GetNodeIp(self):
761     """Helper for returning the node name/ip list.
762
763     @rtype: (list, list)
764     @return: a tuple of two lists, the first one with the node
765         names and the second one with the node addresses
766
767     """
768     name_list = self._nodes.keys()
769     addr_list = [self._nodes[name] for name in name_list]
770     return name_list, addr_list
771
772   def _WriteAndReplicateFileUnlocked(self, file_name, data):
773     """Writes a file locally and then replicates it to all nodes.
774
775     This function will replace the contents of a file on the local
776     node and then replicate it to all the other nodes we have.
777
778     @type file_name: str
779     @param file_name: the path of the file to be replicated
780     @type data: str
781     @param data: the new contents of the file
782
783     """
784     utils.WriteFile(file_name, data=data)
785
786     names, addrs = self._GetNodeIp()
787     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
788     self._CheckRpcResult(result, self._nodes,
789                          "Updating %s" % file_name)
790
791   def _RenameFilesUnlocked(self, rename):
792     """Renames a file locally and then replicate the change.
793
794     This function will rename a file in the local queue directory
795     and then replicate this rename to all the other nodes we have.
796
797     @type rename: list of (old, new)
798     @param rename: List containing tuples mapping old to new names
799
800     """
801     # Rename them locally
802     for old, new in rename:
803       utils.RenameFile(old, new, mkdir=True)
804
805     # ... and on all nodes
806     names, addrs = self._GetNodeIp()
807     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
808     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
809
810   @staticmethod
811   def _FormatJobID(job_id):
812     """Convert a job ID to string format.
813
814     Currently this just does C{str(job_id)} after performing some
815     checks, but if we want to change the job id format this will
816     abstract this change.
817
818     @type job_id: int or long
819     @param job_id: the numeric job id
820     @rtype: str
821     @return: the formatted job id
822
823     """
824     if not isinstance(job_id, (int, long)):
825       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
826     if job_id < 0:
827       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
828
829     return str(job_id)
830
831   @classmethod
832   def _GetArchiveDirectory(cls, job_id):
833     """Returns the archive directory for a job.
834
835     @type job_id: str
836     @param job_id: Job identifier
837     @rtype: str
838     @return: Directory name
839
840     """
841     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
842
843   def _NewSerialsUnlocked(self, count):
844     """Generates a new job identifier.
845
846     Job identifiers are unique during the lifetime of a cluster.
847
848     @type count: integer
849     @param count: how many serials to return
850     @rtype: str
851     @return: a string representing the job identifier.
852
853     """
854     assert count > 0
855     # New number
856     serial = self._last_serial + count
857
858     # Write to file
859     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
860                                         "%s\n" % serial)
861
862     result = [self._FormatJobID(v)
863               for v in range(self._last_serial, serial + 1)]
864     # Keep it only if we were able to write the file
865     self._last_serial = serial
866
867     return result
868
869   @staticmethod
870   def _GetJobPath(job_id):
871     """Returns the job file for a given job id.
872
873     @type job_id: str
874     @param job_id: the job identifier
875     @rtype: str
876     @return: the path to the job file
877
878     """
879     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
880
881   @classmethod
882   def _GetArchivedJobPath(cls, job_id):
883     """Returns the archived job file for a give job id.
884
885     @type job_id: str
886     @param job_id: the job identifier
887     @rtype: str
888     @return: the path to the archived job file
889
890     """
891     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
892     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
893
894   @classmethod
895   def _ExtractJobID(cls, name):
896     """Extract the job id from a filename.
897
898     @type name: str
899     @param name: the job filename
900     @rtype: job id or None
901     @return: the job id corresponding to the given filename,
902         or None if the filename does not represent a valid
903         job file
904
905     """
906     m = cls._RE_JOB_FILE.match(name)
907     if m:
908       return m.group(1)
909     else:
910       return None
911
912   def _GetJobIDsUnlocked(self, archived=False):
913     """Return all known job IDs.
914
915     If the parameter archived is True, archived jobs IDs will be
916     included. Currently this argument is unused.
917
918     The method only looks at disk because it's a requirement that all
919     jobs are present on disk (so in the _memcache we don't have any
920     extra IDs).
921
922     @rtype: list
923     @return: the list of job IDs
924
925     """
926     # pylint: disable-msg=W0613
927     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
928     jlist = utils.NiceSort(jlist)
929     return jlist
930
931   def _ListJobFiles(self):
932     """Returns the list of current job files.
933
934     @rtype: list
935     @return: the list of job file names
936
937     """
938     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
939             if self._RE_JOB_FILE.match(name)]
940
941   def _LoadJobUnlocked(self, job_id):
942     """Loads a job from the disk or memory.
943
944     Given a job id, this will return the cached job object if
945     existing, or try to load the job from the disk. If loading from
946     disk, it will also add the job to the cache.
947
948     @param job_id: the job id
949     @rtype: L{_QueuedJob} or None
950     @return: either None or the job object
951
952     """
953     job = self._memcache.get(job_id, None)
954     if job:
955       logging.debug("Found job %s in memcache", job_id)
956       return job
957
958     filepath = self._GetJobPath(job_id)
959     logging.debug("Loading job from %s", filepath)
960     try:
961       raw_data = utils.ReadFile(filepath)
962     except IOError, err:
963       if err.errno in (errno.ENOENT, ):
964         return None
965       raise
966
967     data = serializer.LoadJson(raw_data)
968
969     try:
970       job = _QueuedJob.Restore(self, data)
971     except Exception, err: # pylint: disable-msg=W0703
972       new_path = self._GetArchivedJobPath(job_id)
973       if filepath == new_path:
974         # job already archived (future case)
975         logging.exception("Can't parse job %s", job_id)
976       else:
977         # non-archived case
978         logging.exception("Can't parse job %s, will archive.", job_id)
979         self._RenameFilesUnlocked([(filepath, new_path)])
980       return None
981
982     self._memcache[job_id] = job
983     logging.debug("Added job %s to the cache", job_id)
984     return job
985
986   def _GetJobsUnlocked(self, job_ids):
987     """Return a list of jobs based on their IDs.
988
989     @type job_ids: list
990     @param job_ids: either an empty list (meaning all jobs),
991         or a list of job IDs
992     @rtype: list
993     @return: the list of job objects
994
995     """
996     if not job_ids:
997       job_ids = self._GetJobIDsUnlocked()
998
999     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1000
1001   @staticmethod
1002   def _IsQueueMarkedDrain():
1003     """Check if the queue is marked from drain.
1004
1005     This currently uses the queue drain file, which makes it a
1006     per-node flag. In the future this can be moved to the config file.
1007
1008     @rtype: boolean
1009     @return: True of the job queue is marked for draining
1010
1011     """
1012     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1013
1014   @staticmethod
1015   def SetDrainFlag(drain_flag):
1016     """Sets the drain flag for the queue.
1017
1018     This is similar to the function L{backend.JobQueueSetDrainFlag},
1019     and in the future we might merge them.
1020
1021     @type drain_flag: boolean
1022     @param drain_flag: Whether to set or unset the drain flag
1023
1024     """
1025     if drain_flag:
1026       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1027     else:
1028       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1029     return True
1030
1031   @_RequireOpenQueue
1032   def _SubmitJobUnlocked(self, job_id, ops):
1033     """Create and store a new job.
1034
1035     This enters the job into our job queue and also puts it on the new
1036     queue, in order for it to be picked up by the queue processors.
1037
1038     @type job_id: job ID
1039     @param job_id: the job ID for the new job
1040     @type ops: list
1041     @param ops: The list of OpCodes that will become the new job.
1042     @rtype: job ID
1043     @return: the job ID of the newly created job
1044     @raise errors.JobQueueDrainError: if the job is marked for draining
1045
1046     """
1047     if self._IsQueueMarkedDrain():
1048       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1049
1050     # Check job queue size
1051     size = len(self._ListJobFiles())
1052     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1053       # TODO: Autoarchive jobs. Make sure it's not done on every job
1054       # submission, though.
1055       #size = ...
1056       pass
1057
1058     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1059       raise errors.JobQueueFull()
1060
1061     job = _QueuedJob(self, job_id, ops)
1062
1063     # Write to disk
1064     self.UpdateJobUnlocked(job)
1065
1066     logging.debug("Adding new job %s to the cache", job_id)
1067     self._memcache[job_id] = job
1068
1069     # Add to worker pool
1070     self._wpool.AddTask(job)
1071
1072     return job.id
1073
1074   @utils.LockedMethod
1075   @_RequireOpenQueue
1076   def SubmitJob(self, ops):
1077     """Create and store a new job.
1078
1079     @see: L{_SubmitJobUnlocked}
1080
1081     """
1082     job_id = self._NewSerialsUnlocked(1)[0]
1083     return self._SubmitJobUnlocked(job_id, ops)
1084
1085   @utils.LockedMethod
1086   @_RequireOpenQueue
1087   def SubmitManyJobs(self, jobs):
1088     """Create and store multiple jobs.
1089
1090     @see: L{_SubmitJobUnlocked}
1091
1092     """
1093     results = []
1094     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1095     for job_id, ops in zip(all_job_ids, jobs):
1096       try:
1097         data = self._SubmitJobUnlocked(job_id, ops)
1098         status = True
1099       except errors.GenericError, err:
1100         data = str(err)
1101         status = False
1102       results.append((status, data))
1103
1104     return results
1105
1106   @_RequireOpenQueue
1107   def UpdateJobUnlocked(self, job):
1108     """Update a job's on disk storage.
1109
1110     After a job has been modified, this function needs to be called in
1111     order to write the changes to disk and replicate them to the other
1112     nodes.
1113
1114     @type job: L{_QueuedJob}
1115     @param job: the changed job
1116
1117     """
1118     filename = self._GetJobPath(job.id)
1119     data = serializer.DumpJson(job.Serialize(), indent=False)
1120     logging.debug("Writing job %s to %s", job.id, filename)
1121     self._WriteAndReplicateFileUnlocked(filename, data)
1122
1123     # Notify waiters about potential changes
1124     job.change.notifyAll()
1125
1126   @utils.LockedMethod
1127   @_RequireOpenQueue
1128   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1129                         timeout):
1130     """Waits for changes in a job.
1131
1132     @type job_id: string
1133     @param job_id: Job identifier
1134     @type fields: list of strings
1135     @param fields: Which fields to check for changes
1136     @type prev_job_info: list or None
1137     @param prev_job_info: Last job information returned
1138     @type prev_log_serial: int
1139     @param prev_log_serial: Last job message serial number
1140     @type timeout: float
1141     @param timeout: maximum time to wait
1142     @rtype: tuple (job info, log entries)
1143     @return: a tuple of the job information as required via
1144         the fields parameter, and the log entries as a list
1145
1146         if the job has not changed and the timeout has expired,
1147         we instead return a special value,
1148         L{constants.JOB_NOTCHANGED}, which should be interpreted
1149         as such by the clients
1150
1151     """
1152     job = self._LoadJobUnlocked(job_id)
1153     if not job:
1154       logging.debug("Job %s not found", job_id)
1155       return None
1156
1157     def _CheckForChanges():
1158       logging.debug("Waiting for changes in job %s", job_id)
1159
1160       status = job.CalcStatus()
1161       job_info = self._GetJobInfoUnlocked(job, fields)
1162       log_entries = job.GetLogEntries(prev_log_serial)
1163
1164       # Serializing and deserializing data can cause type changes (e.g. from
1165       # tuple to list) or precision loss. We're doing it here so that we get
1166       # the same modifications as the data received from the client. Without
1167       # this, the comparison afterwards might fail without the data being
1168       # significantly different.
1169       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1170       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1171
1172       # Don't even try to wait if the job is no longer running, there will be
1173       # no changes.
1174       if (status not in (constants.JOB_STATUS_QUEUED,
1175                          constants.JOB_STATUS_RUNNING,
1176                          constants.JOB_STATUS_WAITLOCK) or
1177           prev_job_info != job_info or
1178           (log_entries and prev_log_serial != log_entries[0][0])):
1179         logging.debug("Job %s changed", job_id)
1180         return (job_info, log_entries)
1181
1182       raise utils.RetryAgain()
1183
1184     try:
1185       # Setting wait function to release the queue lock while waiting
1186       return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1187                          wait_fn=job.change.wait)
1188     except utils.RetryTimeout:
1189       return constants.JOB_NOTCHANGED
1190
1191   @utils.LockedMethod
1192   @_RequireOpenQueue
1193   def CancelJob(self, job_id):
1194     """Cancels a job.
1195
1196     This will only succeed if the job has not started yet.
1197
1198     @type job_id: string
1199     @param job_id: job ID of job to be cancelled.
1200
1201     """
1202     logging.info("Cancelling job %s", job_id)
1203
1204     job = self._LoadJobUnlocked(job_id)
1205     if not job:
1206       logging.debug("Job %s not found", job_id)
1207       return (False, "Job %s not found" % job_id)
1208
1209     job_status = job.CalcStatus()
1210
1211     if job_status not in (constants.JOB_STATUS_QUEUED,
1212                           constants.JOB_STATUS_WAITLOCK):
1213       logging.debug("Job %s is no longer waiting in the queue", job.id)
1214       return (False, "Job %s is no longer waiting in the queue" % job.id)
1215
1216     if job_status == constants.JOB_STATUS_QUEUED:
1217       self.CancelJobUnlocked(job)
1218       return (True, "Job %s canceled" % job.id)
1219
1220     elif job_status == constants.JOB_STATUS_WAITLOCK:
1221       # The worker will notice the new status and cancel the job
1222       try:
1223         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1224       finally:
1225         self.UpdateJobUnlocked(job)
1226       return (True, "Job %s will be canceled" % job.id)
1227
1228   @_RequireOpenQueue
1229   def CancelJobUnlocked(self, job):
1230     """Marks a job as canceled.
1231
1232     """
1233     try:
1234       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1235                             "Job canceled by request")
1236     finally:
1237       self.UpdateJobUnlocked(job)
1238
1239   @_RequireOpenQueue
1240   def _ArchiveJobsUnlocked(self, jobs):
1241     """Archives jobs.
1242
1243     @type jobs: list of L{_QueuedJob}
1244     @param jobs: Job objects
1245     @rtype: int
1246     @return: Number of archived jobs
1247
1248     """
1249     archive_jobs = []
1250     rename_files = []
1251     for job in jobs:
1252       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1253                                   constants.JOB_STATUS_SUCCESS,
1254                                   constants.JOB_STATUS_ERROR):
1255         logging.debug("Job %s is not yet done", job.id)
1256         continue
1257
1258       archive_jobs.append(job)
1259
1260       old = self._GetJobPath(job.id)
1261       new = self._GetArchivedJobPath(job.id)
1262       rename_files.append((old, new))
1263
1264     # TODO: What if 1..n files fail to rename?
1265     self._RenameFilesUnlocked(rename_files)
1266
1267     logging.debug("Successfully archived job(s) %s",
1268                   utils.CommaJoin(job.id for job in archive_jobs))
1269
1270     return len(archive_jobs)
1271
1272   @utils.LockedMethod
1273   @_RequireOpenQueue
1274   def ArchiveJob(self, job_id):
1275     """Archives a job.
1276
1277     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1278
1279     @type job_id: string
1280     @param job_id: Job ID of job to be archived.
1281     @rtype: bool
1282     @return: Whether job was archived
1283
1284     """
1285     logging.info("Archiving job %s", job_id)
1286
1287     job = self._LoadJobUnlocked(job_id)
1288     if not job:
1289       logging.debug("Job %s not found", job_id)
1290       return False
1291
1292     return self._ArchiveJobsUnlocked([job]) == 1
1293
1294   @utils.LockedMethod
1295   @_RequireOpenQueue
1296   def AutoArchiveJobs(self, age, timeout):
1297     """Archives all jobs based on age.
1298
1299     The method will archive all jobs which are older than the age
1300     parameter. For jobs that don't have an end timestamp, the start
1301     timestamp will be considered. The special '-1' age will cause
1302     archival of all jobs (that are not running or queued).
1303
1304     @type age: int
1305     @param age: the minimum age in seconds
1306
1307     """
1308     logging.info("Archiving jobs with age more than %s seconds", age)
1309
1310     now = time.time()
1311     end_time = now + timeout
1312     archived_count = 0
1313     last_touched = 0
1314
1315     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1316     pending = []
1317     for idx, job_id in enumerate(all_job_ids):
1318       last_touched = idx
1319
1320       # Not optimal because jobs could be pending
1321       # TODO: Measure average duration for job archival and take number of
1322       # pending jobs into account.
1323       if time.time() > end_time:
1324         break
1325
1326       # Returns None if the job failed to load
1327       job = self._LoadJobUnlocked(job_id)
1328       if job:
1329         if job.end_timestamp is None:
1330           if job.start_timestamp is None:
1331             job_age = job.received_timestamp
1332           else:
1333             job_age = job.start_timestamp
1334         else:
1335           job_age = job.end_timestamp
1336
1337         if age == -1 or now - job_age[0] > age:
1338           pending.append(job)
1339
1340           # Archive 10 jobs at a time
1341           if len(pending) >= 10:
1342             archived_count += self._ArchiveJobsUnlocked(pending)
1343             pending = []
1344
1345     if pending:
1346       archived_count += self._ArchiveJobsUnlocked(pending)
1347
1348     return (archived_count, len(all_job_ids) - last_touched - 1)
1349
1350   @staticmethod
1351   def _GetJobInfoUnlocked(job, fields):
1352     """Returns information about a job.
1353
1354     @type job: L{_QueuedJob}
1355     @param job: the job which we query
1356     @type fields: list
1357     @param fields: names of fields to return
1358     @rtype: list
1359     @return: list with one element for each field
1360     @raise errors.OpExecError: when an invalid field
1361         has been passed
1362
1363     """
1364     row = []
1365     for fname in fields:
1366       if fname == "id":
1367         row.append(job.id)
1368       elif fname == "status":
1369         row.append(job.CalcStatus())
1370       elif fname == "ops":
1371         row.append([op.input.__getstate__() for op in job.ops])
1372       elif fname == "opresult":
1373         row.append([op.result for op in job.ops])
1374       elif fname == "opstatus":
1375         row.append([op.status for op in job.ops])
1376       elif fname == "oplog":
1377         row.append([op.log for op in job.ops])
1378       elif fname == "opstart":
1379         row.append([op.start_timestamp for op in job.ops])
1380       elif fname == "opend":
1381         row.append([op.end_timestamp for op in job.ops])
1382       elif fname == "received_ts":
1383         row.append(job.received_timestamp)
1384       elif fname == "start_ts":
1385         row.append(job.start_timestamp)
1386       elif fname == "end_ts":
1387         row.append(job.end_timestamp)
1388       elif fname == "lock_status":
1389         row.append(job.lock_status)
1390       elif fname == "summary":
1391         row.append([op.input.Summary() for op in job.ops])
1392       else:
1393         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1394     return row
1395
1396   @utils.LockedMethod
1397   @_RequireOpenQueue
1398   def QueryJobs(self, job_ids, fields):
1399     """Returns a list of jobs in queue.
1400
1401     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1402     processing for each job.
1403
1404     @type job_ids: list
1405     @param job_ids: sequence of job identifiers or None for all
1406     @type fields: list
1407     @param fields: names of fields to return
1408     @rtype: list
1409     @return: list one element per job, each element being list with
1410         the requested fields
1411
1412     """
1413     jobs = []
1414
1415     for job in self._GetJobsUnlocked(job_ids):
1416       if job is None:
1417         jobs.append(None)
1418       else:
1419         jobs.append(self._GetJobInfoUnlocked(job, fields))
1420
1421     return jobs
1422
1423   @utils.LockedMethod
1424   @_RequireOpenQueue
1425   def Shutdown(self):
1426     """Stops the job queue.
1427
1428     This shutdowns all the worker threads an closes the queue.
1429
1430     """
1431     self._wpool.TerminateWorkers()
1432
1433     self._queue_lock.Close()
1434     self._queue_lock = None