Add strict name validation for the LVM backend
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type log_serial: int
149   @ivar log_serial: holds the index for the next log entry
150   @ivar received_timestamp: the timestamp for when the job was received
151   @ivar start_timestmap: the timestamp for start of execution
152   @ivar end_timestamp: the timestamp for end of execution
153   @ivar lock_status: In-memory locking information for debugging
154   @ivar change: a Condition variable we use for waiting for job changes
155
156   """
157   # pylint: disable-msg=W0212
158   __slots__ = ["queue", "id", "ops", "log_serial",
159                "received_timestamp", "start_timestamp", "end_timestamp",
160                "lock_status", "change",
161                "__weakref__"]
162
163   def __init__(self, queue, job_id, ops):
164     """Constructor for the _QueuedJob.
165
166     @type queue: L{JobQueue}
167     @param queue: our parent queue
168     @type job_id: job_id
169     @param job_id: our job id
170     @type ops: list
171     @param ops: the list of opcodes we hold, which will be encapsulated
172         in _QueuedOpCodes
173
174     """
175     if not ops:
176       # TODO: use a better exception
177       raise Exception("No opcodes")
178
179     self.queue = queue
180     self.id = job_id
181     self.ops = [_QueuedOpCode(op) for op in ops]
182     self.log_serial = 0
183     self.received_timestamp = TimeStampNow()
184     self.start_timestamp = None
185     self.end_timestamp = None
186
187     # In-memory attributes
188     self.lock_status = None
189
190     # Condition to wait for changes
191     self.change = threading.Condition(self.queue._lock)
192
193   def __repr__(self):
194     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
195               "id=%s" % self.id,
196               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
197
198     return "<%s at %#x>" % (" ".join(status), id(self))
199
200   @classmethod
201   def Restore(cls, queue, state):
202     """Restore a _QueuedJob from serialized state:
203
204     @type queue: L{JobQueue}
205     @param queue: to which queue the restored job belongs
206     @type state: dict
207     @param state: the serialized state
208     @rtype: _JobQueue
209     @return: the restored _JobQueue instance
210
211     """
212     obj = _QueuedJob.__new__(cls)
213     obj.queue = queue
214     obj.id = state["id"]
215     obj.received_timestamp = state.get("received_timestamp", None)
216     obj.start_timestamp = state.get("start_timestamp", None)
217     obj.end_timestamp = state.get("end_timestamp", None)
218
219     # In-memory attributes
220     obj.lock_status = None
221
222     obj.ops = []
223     obj.log_serial = 0
224     for op_state in state["ops"]:
225       op = _QueuedOpCode.Restore(op_state)
226       for log_entry in op.log:
227         obj.log_serial = max(obj.log_serial, log_entry[0])
228       obj.ops.append(op)
229
230     # Condition to wait for changes
231     obj.change = threading.Condition(obj.queue._lock)
232
233     return obj
234
235   def Serialize(self):
236     """Serialize the _JobQueue instance.
237
238     @rtype: dict
239     @return: the serialized state
240
241     """
242     return {
243       "id": self.id,
244       "ops": [op.Serialize() for op in self.ops],
245       "start_timestamp": self.start_timestamp,
246       "end_timestamp": self.end_timestamp,
247       "received_timestamp": self.received_timestamp,
248       }
249
250   def CalcStatus(self):
251     """Compute the status of this job.
252
253     This function iterates over all the _QueuedOpCodes in the job and
254     based on their status, computes the job status.
255
256     The algorithm is:
257       - if we find a cancelled, or finished with error, the job
258         status will be the same
259       - otherwise, the last opcode with the status one of:
260           - waitlock
261           - canceling
262           - running
263
264         will determine the job status
265
266       - otherwise, it means either all opcodes are queued, or success,
267         and the job status will be the same
268
269     @return: the job status
270
271     """
272     status = constants.JOB_STATUS_QUEUED
273
274     all_success = True
275     for op in self.ops:
276       if op.status == constants.OP_STATUS_SUCCESS:
277         continue
278
279       all_success = False
280
281       if op.status == constants.OP_STATUS_QUEUED:
282         pass
283       elif op.status == constants.OP_STATUS_WAITLOCK:
284         status = constants.JOB_STATUS_WAITLOCK
285       elif op.status == constants.OP_STATUS_RUNNING:
286         status = constants.JOB_STATUS_RUNNING
287       elif op.status == constants.OP_STATUS_CANCELING:
288         status = constants.JOB_STATUS_CANCELING
289         break
290       elif op.status == constants.OP_STATUS_ERROR:
291         status = constants.JOB_STATUS_ERROR
292         # The whole job fails if one opcode failed
293         break
294       elif op.status == constants.OP_STATUS_CANCELED:
295         status = constants.OP_STATUS_CANCELED
296         break
297
298     if all_success:
299       status = constants.JOB_STATUS_SUCCESS
300
301     return status
302
303   def GetLogEntries(self, newer_than):
304     """Selectively returns the log entries.
305
306     @type newer_than: None or int
307     @param newer_than: if this is None, return all log entries,
308         otherwise return only the log entries with serial higher
309         than this value
310     @rtype: list
311     @return: the list of the log entries selected
312
313     """
314     if newer_than is None:
315       serial = -1
316     else:
317       serial = newer_than
318
319     entries = []
320     for op in self.ops:
321       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
322
323     return entries
324
325   def MarkUnfinishedOps(self, status, result):
326     """Mark unfinished opcodes with a given status and result.
327
328     This is an utility function for marking all running or waiting to
329     be run opcodes with a given status. Opcodes which are already
330     finalised are not changed.
331
332     @param status: a given opcode status
333     @param result: the opcode result
334
335     """
336     not_marked = True
337     for op in self.ops:
338       if op.status in constants.OPS_FINALIZED:
339         assert not_marked, "Finalized opcodes found after non-finalized ones"
340         continue
341       op.status = status
342       op.result = result
343       not_marked = False
344
345
346 class _OpExecCallbacks(mcpu.OpExecCbBase):
347   def __init__(self, queue, job, op):
348     """Initializes this class.
349
350     @type queue: L{JobQueue}
351     @param queue: Job queue
352     @type job: L{_QueuedJob}
353     @param job: Job object
354     @type op: L{_QueuedOpCode}
355     @param op: OpCode
356
357     """
358     assert queue, "Queue is missing"
359     assert job, "Job is missing"
360     assert op, "Opcode is missing"
361
362     self._queue = queue
363     self._job = job
364     self._op = op
365
366   def NotifyStart(self):
367     """Mark the opcode as running, not lock-waiting.
368
369     This is called from the mcpu code as a notifier function, when the LU is
370     finally about to start the Exec() method. Of course, to have end-user
371     visible results, the opcode must be initially (before calling into
372     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
373
374     """
375     self._queue.acquire()
376     try:
377       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
378                                  constants.OP_STATUS_CANCELING)
379
380       # All locks are acquired by now
381       self._job.lock_status = None
382
383       # Cancel here if we were asked to
384       if self._op.status == constants.OP_STATUS_CANCELING:
385         raise CancelJob()
386
387       self._op.status = constants.OP_STATUS_RUNNING
388     finally:
389       self._queue.release()
390
391   def Feedback(self, *args):
392     """Append a log entry.
393
394     """
395     assert len(args) < 3
396
397     if len(args) == 1:
398       log_type = constants.ELOG_MESSAGE
399       log_msg = args[0]
400     else:
401       (log_type, log_msg) = args
402
403     # The time is split to make serialization easier and not lose
404     # precision.
405     timestamp = utils.SplitTime(time.time())
406
407     self._queue.acquire()
408     try:
409       self._job.log_serial += 1
410       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
411
412       self._job.change.notifyAll()
413     finally:
414       self._queue.release()
415
416   def ReportLocks(self, msg):
417     """Write locking information to the job.
418
419     Called whenever the LU processor is waiting for a lock or has acquired one.
420
421     """
422     # Not getting the queue lock because this is a single assignment
423     self._job.lock_status = msg
424
425
426 class _JobQueueWorker(workerpool.BaseWorker):
427   """The actual job workers.
428
429   """
430   def RunTask(self, job): # pylint: disable-msg=W0221
431     """Job executor.
432
433     This functions processes a job. It is closely tied to the _QueuedJob and
434     _QueuedOpCode classes.
435
436     @type job: L{_QueuedJob}
437     @param job: the job to be processed
438
439     """
440     logging.info("Processing job %s", job.id)
441     proc = mcpu.Processor(self.pool.queue.context, job.id)
442     queue = job.queue
443     try:
444       try:
445         count = len(job.ops)
446         for idx, op in enumerate(job.ops):
447           op_summary = op.input.Summary()
448           if op.status == constants.OP_STATUS_SUCCESS:
449             # this is a job that was partially completed before master
450             # daemon shutdown, so it can be expected that some opcodes
451             # are already completed successfully (if any did error
452             # out, then the whole job should have been aborted and not
453             # resubmitted for processing)
454             logging.info("Op %s/%s: opcode %s already processed, skipping",
455                          idx + 1, count, op_summary)
456             continue
457           try:
458             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
459                          op_summary)
460
461             queue.acquire()
462             try:
463               if op.status == constants.OP_STATUS_CANCELED:
464                 raise CancelJob()
465               assert op.status == constants.OP_STATUS_QUEUED
466               op.status = constants.OP_STATUS_WAITLOCK
467               op.result = None
468               op.start_timestamp = TimeStampNow()
469               if idx == 0: # first opcode
470                 job.start_timestamp = op.start_timestamp
471               queue.UpdateJobUnlocked(job)
472
473               input_opcode = op.input
474             finally:
475               queue.release()
476
477             # Make sure not to hold queue lock while calling ExecOpCode
478             result = proc.ExecOpCode(input_opcode,
479                                      _OpExecCallbacks(queue, job, op))
480
481             queue.acquire()
482             try:
483               op.status = constants.OP_STATUS_SUCCESS
484               op.result = result
485               op.end_timestamp = TimeStampNow()
486               queue.UpdateJobUnlocked(job)
487             finally:
488               queue.release()
489
490             logging.info("Op %s/%s: Successfully finished opcode %s",
491                          idx + 1, count, op_summary)
492           except CancelJob:
493             # Will be handled further up
494             raise
495           except Exception, err:
496             queue.acquire()
497             try:
498               try:
499                 op.status = constants.OP_STATUS_ERROR
500                 if isinstance(err, errors.GenericError):
501                   op.result = errors.EncodeException(err)
502                 else:
503                   op.result = str(err)
504                 op.end_timestamp = TimeStampNow()
505                 logging.info("Op %s/%s: Error in opcode %s: %s",
506                              idx + 1, count, op_summary, err)
507               finally:
508                 queue.UpdateJobUnlocked(job)
509             finally:
510               queue.release()
511             raise
512
513       except CancelJob:
514         queue.acquire()
515         try:
516           queue.CancelJobUnlocked(job)
517         finally:
518           queue.release()
519       except errors.GenericError, err:
520         logging.exception("Ganeti exception")
521       except:
522         logging.exception("Unhandled exception")
523     finally:
524       queue.acquire()
525       try:
526         try:
527           job.lock_status = None
528           job.end_timestamp = TimeStampNow()
529           queue.UpdateJobUnlocked(job)
530         finally:
531           job_id = job.id
532           status = job.CalcStatus()
533       finally:
534         queue.release()
535
536       logging.info("Finished job %s, status = %s", job_id, status)
537
538
539 class _JobQueueWorkerPool(workerpool.WorkerPool):
540   """Simple class implementing a job-processing workerpool.
541
542   """
543   def __init__(self, queue):
544     super(_JobQueueWorkerPool, self).__init__("JobQueue",
545                                               JOBQUEUE_THREADS,
546                                               _JobQueueWorker)
547     self.queue = queue
548
549
550 def _RequireOpenQueue(fn):
551   """Decorator for "public" functions.
552
553   This function should be used for all 'public' functions. That is,
554   functions usually called from other classes. Note that this should
555   be applied only to methods (not plain functions), since it expects
556   that the decorated function is called with a first argument that has
557   a '_queue_lock' argument.
558
559   @warning: Use this decorator only after utils.LockedMethod!
560
561   Example::
562     @utils.LockedMethod
563     @_RequireOpenQueue
564     def Example(self):
565       pass
566
567   """
568   def wrapper(self, *args, **kwargs):
569     # pylint: disable-msg=W0212
570     assert self._queue_lock is not None, "Queue should be open"
571     return fn(self, *args, **kwargs)
572   return wrapper
573
574
575 class JobQueue(object):
576   """Queue used to manage the jobs.
577
578   @cvar _RE_JOB_FILE: regex matching the valid job file names
579
580   """
581   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
582
583   def __init__(self, context):
584     """Constructor for JobQueue.
585
586     The constructor will initialize the job queue object and then
587     start loading the current jobs from disk, either for starting them
588     (if they were queue) or for aborting them (if they were already
589     running).
590
591     @type context: GanetiContext
592     @param context: the context object for access to the configuration
593         data and other ganeti objects
594
595     """
596     self.context = context
597     self._memcache = weakref.WeakValueDictionary()
598     self._my_hostname = utils.HostInfo().name
599
600     # Locking
601     self._lock = threading.Lock()
602     self.acquire = self._lock.acquire
603     self.release = self._lock.release
604
605     # Initialize
606     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
607
608     # Read serial file
609     self._last_serial = jstore.ReadSerial()
610     assert self._last_serial is not None, ("Serial file was modified between"
611                                            " check in jstore and here")
612
613     # Get initial list of nodes
614     self._nodes = dict((n.name, n.primary_ip)
615                        for n in self.context.cfg.GetAllNodesInfo().values()
616                        if n.master_candidate)
617
618     # Remove master node
619     try:
620       del self._nodes[self._my_hostname]
621     except KeyError:
622       pass
623
624     # TODO: Check consistency across nodes
625
626     # Setup worker pool
627     self._wpool = _JobQueueWorkerPool(self)
628     try:
629       # We need to lock here because WorkerPool.AddTask() may start a job while
630       # we're still doing our work.
631       self.acquire()
632       try:
633         logging.info("Inspecting job queue")
634
635         all_job_ids = self._GetJobIDsUnlocked()
636         jobs_count = len(all_job_ids)
637         lastinfo = time.time()
638         for idx, job_id in enumerate(all_job_ids):
639           # Give an update every 1000 jobs or 10 seconds
640           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
641               idx == (jobs_count - 1)):
642             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
643                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
644             lastinfo = time.time()
645
646           job = self._LoadJobUnlocked(job_id)
647
648           # a failure in loading the job can cause 'None' to be returned
649           if job is None:
650             continue
651
652           status = job.CalcStatus()
653
654           if status in (constants.JOB_STATUS_QUEUED, ):
655             self._wpool.AddTask(job)
656
657           elif status in (constants.JOB_STATUS_RUNNING,
658                           constants.JOB_STATUS_WAITLOCK,
659                           constants.JOB_STATUS_CANCELING):
660             logging.warning("Unfinished job %s found: %s", job.id, job)
661             try:
662               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
663                                     "Unclean master daemon shutdown")
664             finally:
665               self.UpdateJobUnlocked(job)
666
667         logging.info("Job queue inspection finished")
668       finally:
669         self.release()
670     except:
671       self._wpool.TerminateWorkers()
672       raise
673
674   @utils.LockedMethod
675   @_RequireOpenQueue
676   def AddNode(self, node):
677     """Register a new node with the queue.
678
679     @type node: L{objects.Node}
680     @param node: the node object to be added
681
682     """
683     node_name = node.name
684     assert node_name != self._my_hostname
685
686     # Clean queue directory on added node
687     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
688     msg = result.fail_msg
689     if msg:
690       logging.warning("Cannot cleanup queue directory on node %s: %s",
691                       node_name, msg)
692
693     if not node.master_candidate:
694       # remove if existing, ignoring errors
695       self._nodes.pop(node_name, None)
696       # and skip the replication of the job ids
697       return
698
699     # Upload the whole queue excluding archived jobs
700     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
701
702     # Upload current serial file
703     files.append(constants.JOB_QUEUE_SERIAL_FILE)
704
705     for file_name in files:
706       # Read file content
707       content = utils.ReadFile(file_name)
708
709       result = rpc.RpcRunner.call_jobqueue_update([node_name],
710                                                   [node.primary_ip],
711                                                   file_name, content)
712       msg = result[node_name].fail_msg
713       if msg:
714         logging.error("Failed to upload file %s to node %s: %s",
715                       file_name, node_name, msg)
716
717     self._nodes[node_name] = node.primary_ip
718
719   @utils.LockedMethod
720   @_RequireOpenQueue
721   def RemoveNode(self, node_name):
722     """Callback called when removing nodes from the cluster.
723
724     @type node_name: str
725     @param node_name: the name of the node to remove
726
727     """
728     try:
729       # The queue is removed by the "leave node" RPC call.
730       del self._nodes[node_name]
731     except KeyError:
732       pass
733
734   @staticmethod
735   def _CheckRpcResult(result, nodes, failmsg):
736     """Verifies the status of an RPC call.
737
738     Since we aim to keep consistency should this node (the current
739     master) fail, we will log errors if our rpc fail, and especially
740     log the case when more than half of the nodes fails.
741
742     @param result: the data as returned from the rpc call
743     @type nodes: list
744     @param nodes: the list of nodes we made the call to
745     @type failmsg: str
746     @param failmsg: the identifier to be used for logging
747
748     """
749     failed = []
750     success = []
751
752     for node in nodes:
753       msg = result[node].fail_msg
754       if msg:
755         failed.append(node)
756         logging.error("RPC call %s (%s) failed on node %s: %s",
757                       result[node].call, failmsg, node, msg)
758       else:
759         success.append(node)
760
761     # +1 for the master node
762     if (len(success) + 1) < len(failed):
763       # TODO: Handle failing nodes
764       logging.error("More than half of the nodes failed")
765
766   def _GetNodeIp(self):
767     """Helper for returning the node name/ip list.
768
769     @rtype: (list, list)
770     @return: a tuple of two lists, the first one with the node
771         names and the second one with the node addresses
772
773     """
774     name_list = self._nodes.keys()
775     addr_list = [self._nodes[name] for name in name_list]
776     return name_list, addr_list
777
778   def _WriteAndReplicateFileUnlocked(self, file_name, data):
779     """Writes a file locally and then replicates it to all nodes.
780
781     This function will replace the contents of a file on the local
782     node and then replicate it to all the other nodes we have.
783
784     @type file_name: str
785     @param file_name: the path of the file to be replicated
786     @type data: str
787     @param data: the new contents of the file
788
789     """
790     utils.WriteFile(file_name, data=data)
791
792     names, addrs = self._GetNodeIp()
793     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
794     self._CheckRpcResult(result, self._nodes,
795                          "Updating %s" % file_name)
796
797   def _RenameFilesUnlocked(self, rename):
798     """Renames a file locally and then replicate the change.
799
800     This function will rename a file in the local queue directory
801     and then replicate this rename to all the other nodes we have.
802
803     @type rename: list of (old, new)
804     @param rename: List containing tuples mapping old to new names
805
806     """
807     # Rename them locally
808     for old, new in rename:
809       utils.RenameFile(old, new, mkdir=True)
810
811     # ... and on all nodes
812     names, addrs = self._GetNodeIp()
813     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
814     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
815
816   @staticmethod
817   def _FormatJobID(job_id):
818     """Convert a job ID to string format.
819
820     Currently this just does C{str(job_id)} after performing some
821     checks, but if we want to change the job id format this will
822     abstract this change.
823
824     @type job_id: int or long
825     @param job_id: the numeric job id
826     @rtype: str
827     @return: the formatted job id
828
829     """
830     if not isinstance(job_id, (int, long)):
831       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
832     if job_id < 0:
833       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
834
835     return str(job_id)
836
837   @classmethod
838   def _GetArchiveDirectory(cls, job_id):
839     """Returns the archive directory for a job.
840
841     @type job_id: str
842     @param job_id: Job identifier
843     @rtype: str
844     @return: Directory name
845
846     """
847     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
848
849   def _NewSerialsUnlocked(self, count):
850     """Generates a new job identifier.
851
852     Job identifiers are unique during the lifetime of a cluster.
853
854     @type count: integer
855     @param count: how many serials to return
856     @rtype: str
857     @return: a string representing the job identifier.
858
859     """
860     assert count > 0
861     # New number
862     serial = self._last_serial + count
863
864     # Write to file
865     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
866                                         "%s\n" % serial)
867
868     result = [self._FormatJobID(v)
869               for v in range(self._last_serial, serial + 1)]
870     # Keep it only if we were able to write the file
871     self._last_serial = serial
872
873     return result
874
875   @staticmethod
876   def _GetJobPath(job_id):
877     """Returns the job file for a given job id.
878
879     @type job_id: str
880     @param job_id: the job identifier
881     @rtype: str
882     @return: the path to the job file
883
884     """
885     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
886
887   @classmethod
888   def _GetArchivedJobPath(cls, job_id):
889     """Returns the archived job file for a give job id.
890
891     @type job_id: str
892     @param job_id: the job identifier
893     @rtype: str
894     @return: the path to the archived job file
895
896     """
897     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
898     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR, path)
899
900   @classmethod
901   def _ExtractJobID(cls, name):
902     """Extract the job id from a filename.
903
904     @type name: str
905     @param name: the job filename
906     @rtype: job id or None
907     @return: the job id corresponding to the given filename,
908         or None if the filename does not represent a valid
909         job file
910
911     """
912     m = cls._RE_JOB_FILE.match(name)
913     if m:
914       return m.group(1)
915     else:
916       return None
917
918   def _GetJobIDsUnlocked(self, archived=False):
919     """Return all known job IDs.
920
921     If the parameter archived is True, archived jobs IDs will be
922     included. Currently this argument is unused.
923
924     The method only looks at disk because it's a requirement that all
925     jobs are present on disk (so in the _memcache we don't have any
926     extra IDs).
927
928     @rtype: list
929     @return: the list of job IDs
930
931     """
932     # pylint: disable-msg=W0613
933     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
934     jlist = utils.NiceSort(jlist)
935     return jlist
936
937   def _ListJobFiles(self):
938     """Returns the list of current job files.
939
940     @rtype: list
941     @return: the list of job file names
942
943     """
944     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
945             if self._RE_JOB_FILE.match(name)]
946
947   def _LoadJobUnlocked(self, job_id):
948     """Loads a job from the disk or memory.
949
950     Given a job id, this will return the cached job object if
951     existing, or try to load the job from the disk. If loading from
952     disk, it will also add the job to the cache.
953
954     @param job_id: the job id
955     @rtype: L{_QueuedJob} or None
956     @return: either None or the job object
957
958     """
959     job = self._memcache.get(job_id, None)
960     if job:
961       logging.debug("Found job %s in memcache", job_id)
962       return job
963
964     filepath = self._GetJobPath(job_id)
965     logging.debug("Loading job from %s", filepath)
966     try:
967       raw_data = utils.ReadFile(filepath)
968     except IOError, err:
969       if err.errno in (errno.ENOENT, ):
970         return None
971       raise
972
973     data = serializer.LoadJson(raw_data)
974
975     try:
976       job = _QueuedJob.Restore(self, data)
977     except Exception, err: # pylint: disable-msg=W0703
978       new_path = self._GetArchivedJobPath(job_id)
979       if filepath == new_path:
980         # job already archived (future case)
981         logging.exception("Can't parse job %s", job_id)
982       else:
983         # non-archived case
984         logging.exception("Can't parse job %s, will archive.", job_id)
985         self._RenameFilesUnlocked([(filepath, new_path)])
986       return None
987
988     self._memcache[job_id] = job
989     logging.debug("Added job %s to the cache", job_id)
990     return job
991
992   def _GetJobsUnlocked(self, job_ids):
993     """Return a list of jobs based on their IDs.
994
995     @type job_ids: list
996     @param job_ids: either an empty list (meaning all jobs),
997         or a list of job IDs
998     @rtype: list
999     @return: the list of job objects
1000
1001     """
1002     if not job_ids:
1003       job_ids = self._GetJobIDsUnlocked()
1004
1005     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1006
1007   @staticmethod
1008   def _IsQueueMarkedDrain():
1009     """Check if the queue is marked from drain.
1010
1011     This currently uses the queue drain file, which makes it a
1012     per-node flag. In the future this can be moved to the config file.
1013
1014     @rtype: boolean
1015     @return: True of the job queue is marked for draining
1016
1017     """
1018     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1019
1020   @staticmethod
1021   def SetDrainFlag(drain_flag):
1022     """Sets the drain flag for the queue.
1023
1024     This is similar to the function L{backend.JobQueueSetDrainFlag},
1025     and in the future we might merge them.
1026
1027     @type drain_flag: boolean
1028     @param drain_flag: Whether to set or unset the drain flag
1029
1030     """
1031     if drain_flag:
1032       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1033     else:
1034       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1035     return True
1036
1037   @_RequireOpenQueue
1038   def _SubmitJobUnlocked(self, job_id, ops):
1039     """Create and store a new job.
1040
1041     This enters the job into our job queue and also puts it on the new
1042     queue, in order for it to be picked up by the queue processors.
1043
1044     @type job_id: job ID
1045     @param job_id: the job ID for the new job
1046     @type ops: list
1047     @param ops: The list of OpCodes that will become the new job.
1048     @rtype: job ID
1049     @return: the job ID of the newly created job
1050     @raise errors.JobQueueDrainError: if the job is marked for draining
1051
1052     """
1053     if self._IsQueueMarkedDrain():
1054       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1055
1056     # Check job queue size
1057     size = len(self._ListJobFiles())
1058     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1059       # TODO: Autoarchive jobs. Make sure it's not done on every job
1060       # submission, though.
1061       #size = ...
1062       pass
1063
1064     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1065       raise errors.JobQueueFull()
1066
1067     job = _QueuedJob(self, job_id, ops)
1068
1069     # Write to disk
1070     self.UpdateJobUnlocked(job)
1071
1072     logging.debug("Adding new job %s to the cache", job_id)
1073     self._memcache[job_id] = job
1074
1075     # Add to worker pool
1076     self._wpool.AddTask(job)
1077
1078     return job.id
1079
1080   @utils.LockedMethod
1081   @_RequireOpenQueue
1082   def SubmitJob(self, ops):
1083     """Create and store a new job.
1084
1085     @see: L{_SubmitJobUnlocked}
1086
1087     """
1088     job_id = self._NewSerialsUnlocked(1)[0]
1089     return self._SubmitJobUnlocked(job_id, ops)
1090
1091   @utils.LockedMethod
1092   @_RequireOpenQueue
1093   def SubmitManyJobs(self, jobs):
1094     """Create and store multiple jobs.
1095
1096     @see: L{_SubmitJobUnlocked}
1097
1098     """
1099     results = []
1100     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1101     for job_id, ops in zip(all_job_ids, jobs):
1102       try:
1103         data = self._SubmitJobUnlocked(job_id, ops)
1104         status = True
1105       except errors.GenericError, err:
1106         data = str(err)
1107         status = False
1108       results.append((status, data))
1109
1110     return results
1111
1112   @_RequireOpenQueue
1113   def UpdateJobUnlocked(self, job):
1114     """Update a job's on disk storage.
1115
1116     After a job has been modified, this function needs to be called in
1117     order to write the changes to disk and replicate them to the other
1118     nodes.
1119
1120     @type job: L{_QueuedJob}
1121     @param job: the changed job
1122
1123     """
1124     filename = self._GetJobPath(job.id)
1125     data = serializer.DumpJson(job.Serialize(), indent=False)
1126     logging.debug("Writing job %s to %s", job.id, filename)
1127     self._WriteAndReplicateFileUnlocked(filename, data)
1128
1129     # Notify waiters about potential changes
1130     job.change.notifyAll()
1131
1132   @utils.LockedMethod
1133   @_RequireOpenQueue
1134   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1135                         timeout):
1136     """Waits for changes in a job.
1137
1138     @type job_id: string
1139     @param job_id: Job identifier
1140     @type fields: list of strings
1141     @param fields: Which fields to check for changes
1142     @type prev_job_info: list or None
1143     @param prev_job_info: Last job information returned
1144     @type prev_log_serial: int
1145     @param prev_log_serial: Last job message serial number
1146     @type timeout: float
1147     @param timeout: maximum time to wait
1148     @rtype: tuple (job info, log entries)
1149     @return: a tuple of the job information as required via
1150         the fields parameter, and the log entries as a list
1151
1152         if the job has not changed and the timeout has expired,
1153         we instead return a special value,
1154         L{constants.JOB_NOTCHANGED}, which should be interpreted
1155         as such by the clients
1156
1157     """
1158     job = self._LoadJobUnlocked(job_id)
1159     if not job:
1160       logging.debug("Job %s not found", job_id)
1161       return None
1162
1163     def _CheckForChanges():
1164       logging.debug("Waiting for changes in job %s", job_id)
1165
1166       status = job.CalcStatus()
1167       job_info = self._GetJobInfoUnlocked(job, fields)
1168       log_entries = job.GetLogEntries(prev_log_serial)
1169
1170       # Serializing and deserializing data can cause type changes (e.g. from
1171       # tuple to list) or precision loss. We're doing it here so that we get
1172       # the same modifications as the data received from the client. Without
1173       # this, the comparison afterwards might fail without the data being
1174       # significantly different.
1175       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1176       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1177
1178       # Don't even try to wait if the job is no longer running, there will be
1179       # no changes.
1180       if (status not in (constants.JOB_STATUS_QUEUED,
1181                          constants.JOB_STATUS_RUNNING,
1182                          constants.JOB_STATUS_WAITLOCK) or
1183           prev_job_info != job_info or
1184           (log_entries and prev_log_serial != log_entries[0][0])):
1185         logging.debug("Job %s changed", job_id)
1186         return (job_info, log_entries)
1187
1188       raise utils.RetryAgain()
1189
1190     try:
1191       # Setting wait function to release the queue lock while waiting
1192       return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1193                          wait_fn=job.change.wait)
1194     except utils.RetryTimeout:
1195       return constants.JOB_NOTCHANGED
1196
1197   @utils.LockedMethod
1198   @_RequireOpenQueue
1199   def CancelJob(self, job_id):
1200     """Cancels a job.
1201
1202     This will only succeed if the job has not started yet.
1203
1204     @type job_id: string
1205     @param job_id: job ID of job to be cancelled.
1206
1207     """
1208     logging.info("Cancelling job %s", job_id)
1209
1210     job = self._LoadJobUnlocked(job_id)
1211     if not job:
1212       logging.debug("Job %s not found", job_id)
1213       return (False, "Job %s not found" % job_id)
1214
1215     job_status = job.CalcStatus()
1216
1217     if job_status not in (constants.JOB_STATUS_QUEUED,
1218                           constants.JOB_STATUS_WAITLOCK):
1219       logging.debug("Job %s is no longer waiting in the queue", job.id)
1220       return (False, "Job %s is no longer waiting in the queue" % job.id)
1221
1222     if job_status == constants.JOB_STATUS_QUEUED:
1223       self.CancelJobUnlocked(job)
1224       return (True, "Job %s canceled" % job.id)
1225
1226     elif job_status == constants.JOB_STATUS_WAITLOCK:
1227       # The worker will notice the new status and cancel the job
1228       try:
1229         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1230       finally:
1231         self.UpdateJobUnlocked(job)
1232       return (True, "Job %s will be canceled" % job.id)
1233
1234   @_RequireOpenQueue
1235   def CancelJobUnlocked(self, job):
1236     """Marks a job as canceled.
1237
1238     """
1239     try:
1240       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1241                             "Job canceled by request")
1242     finally:
1243       self.UpdateJobUnlocked(job)
1244
1245   @_RequireOpenQueue
1246   def _ArchiveJobsUnlocked(self, jobs):
1247     """Archives jobs.
1248
1249     @type jobs: list of L{_QueuedJob}
1250     @param jobs: Job objects
1251     @rtype: int
1252     @return: Number of archived jobs
1253
1254     """
1255     archive_jobs = []
1256     rename_files = []
1257     for job in jobs:
1258       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1259                                   constants.JOB_STATUS_SUCCESS,
1260                                   constants.JOB_STATUS_ERROR):
1261         logging.debug("Job %s is not yet done", job.id)
1262         continue
1263
1264       archive_jobs.append(job)
1265
1266       old = self._GetJobPath(job.id)
1267       new = self._GetArchivedJobPath(job.id)
1268       rename_files.append((old, new))
1269
1270     # TODO: What if 1..n files fail to rename?
1271     self._RenameFilesUnlocked(rename_files)
1272
1273     logging.debug("Successfully archived job(s) %s",
1274                   utils.CommaJoin(job.id for job in archive_jobs))
1275
1276     return len(archive_jobs)
1277
1278   @utils.LockedMethod
1279   @_RequireOpenQueue
1280   def ArchiveJob(self, job_id):
1281     """Archives a job.
1282
1283     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1284
1285     @type job_id: string
1286     @param job_id: Job ID of job to be archived.
1287     @rtype: bool
1288     @return: Whether job was archived
1289
1290     """
1291     logging.info("Archiving job %s", job_id)
1292
1293     job = self._LoadJobUnlocked(job_id)
1294     if not job:
1295       logging.debug("Job %s not found", job_id)
1296       return False
1297
1298     return self._ArchiveJobsUnlocked([job]) == 1
1299
1300   @utils.LockedMethod
1301   @_RequireOpenQueue
1302   def AutoArchiveJobs(self, age, timeout):
1303     """Archives all jobs based on age.
1304
1305     The method will archive all jobs which are older than the age
1306     parameter. For jobs that don't have an end timestamp, the start
1307     timestamp will be considered. The special '-1' age will cause
1308     archival of all jobs (that are not running or queued).
1309
1310     @type age: int
1311     @param age: the minimum age in seconds
1312
1313     """
1314     logging.info("Archiving jobs with age more than %s seconds", age)
1315
1316     now = time.time()
1317     end_time = now + timeout
1318     archived_count = 0
1319     last_touched = 0
1320
1321     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1322     pending = []
1323     for idx, job_id in enumerate(all_job_ids):
1324       last_touched = idx + 1
1325
1326       # Not optimal because jobs could be pending
1327       # TODO: Measure average duration for job archival and take number of
1328       # pending jobs into account.
1329       if time.time() > end_time:
1330         break
1331
1332       # Returns None if the job failed to load
1333       job = self._LoadJobUnlocked(job_id)
1334       if job:
1335         if job.end_timestamp is None:
1336           if job.start_timestamp is None:
1337             job_age = job.received_timestamp
1338           else:
1339             job_age = job.start_timestamp
1340         else:
1341           job_age = job.end_timestamp
1342
1343         if age == -1 or now - job_age[0] > age:
1344           pending.append(job)
1345
1346           # Archive 10 jobs at a time
1347           if len(pending) >= 10:
1348             archived_count += self._ArchiveJobsUnlocked(pending)
1349             pending = []
1350
1351     if pending:
1352       archived_count += self._ArchiveJobsUnlocked(pending)
1353
1354     return (archived_count, len(all_job_ids) - last_touched)
1355
1356   @staticmethod
1357   def _GetJobInfoUnlocked(job, fields):
1358     """Returns information about a job.
1359
1360     @type job: L{_QueuedJob}
1361     @param job: the job which we query
1362     @type fields: list
1363     @param fields: names of fields to return
1364     @rtype: list
1365     @return: list with one element for each field
1366     @raise errors.OpExecError: when an invalid field
1367         has been passed
1368
1369     """
1370     row = []
1371     for fname in fields:
1372       if fname == "id":
1373         row.append(job.id)
1374       elif fname == "status":
1375         row.append(job.CalcStatus())
1376       elif fname == "ops":
1377         row.append([op.input.__getstate__() for op in job.ops])
1378       elif fname == "opresult":
1379         row.append([op.result for op in job.ops])
1380       elif fname == "opstatus":
1381         row.append([op.status for op in job.ops])
1382       elif fname == "oplog":
1383         row.append([op.log for op in job.ops])
1384       elif fname == "opstart":
1385         row.append([op.start_timestamp for op in job.ops])
1386       elif fname == "opend":
1387         row.append([op.end_timestamp for op in job.ops])
1388       elif fname == "received_ts":
1389         row.append(job.received_timestamp)
1390       elif fname == "start_ts":
1391         row.append(job.start_timestamp)
1392       elif fname == "end_ts":
1393         row.append(job.end_timestamp)
1394       elif fname == "lock_status":
1395         row.append(job.lock_status)
1396       elif fname == "summary":
1397         row.append([op.input.Summary() for op in job.ops])
1398       else:
1399         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1400     return row
1401
1402   @utils.LockedMethod
1403   @_RequireOpenQueue
1404   def QueryJobs(self, job_ids, fields):
1405     """Returns a list of jobs in queue.
1406
1407     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1408     processing for each job.
1409
1410     @type job_ids: list
1411     @param job_ids: sequence of job identifiers or None for all
1412     @type fields: list
1413     @param fields: names of fields to return
1414     @rtype: list
1415     @return: list one element per job, each element being list with
1416         the requested fields
1417
1418     """
1419     jobs = []
1420
1421     for job in self._GetJobsUnlocked(job_ids):
1422       if job is None:
1423         jobs.append(None)
1424       else:
1425         jobs.append(self._GetJobInfoUnlocked(job, fields))
1426
1427     return jobs
1428
1429   @utils.LockedMethod
1430   @_RequireOpenQueue
1431   def Shutdown(self):
1432     """Stops the job queue.
1433
1434     This shutdowns all the worker threads an closes the queue.
1435
1436     """
1437     self._wpool.TerminateWorkers()
1438
1439     self._queue_lock.Close()
1440     self._queue_lock = None