1fdd1ecb07717bcab7a63414b967b90a7cfa6141
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking:
25 There's a single, large lock in the JobQueue class. It's used by all other
26 classes in this module.
27
28 """
29
30 import os
31 import logging
32 import threading
33 import errno
34 import re
35 import time
36 import weakref
37
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import opcodes
42 from ganeti import errors
43 from ganeti import mcpu
44 from ganeti import utils
45 from ganeti import jstore
46 from ganeti import rpc
47
48
49 JOBQUEUE_THREADS = 5
50
51
52 def TimeStampNow():
53   return utils.SplitTime(time.time())
54
55
56 class _QueuedOpCode(object):
57   """Encasulates an opcode object.
58
59   The 'log' attribute holds the execution log and consists of tuples
60   of the form (log_serial, timestamp, level, message).
61
62   """
63   def __init__(self, op):
64     self.input = op
65     self.status = constants.OP_STATUS_QUEUED
66     self.result = None
67     self.log = []
68     self.start_timestamp = None
69     self.end_timestamp = None
70
71   @classmethod
72   def Restore(cls, state):
73     obj = _QueuedOpCode.__new__(cls)
74     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75     obj.status = state["status"]
76     obj.result = state["result"]
77     obj.log = state["log"]
78     obj.start_timestamp = state.get("start_timestamp", None)
79     obj.end_timestamp = state.get("end_timestamp", None)
80     return obj
81
82   def Serialize(self):
83     return {
84       "input": self.input.__getstate__(),
85       "status": self.status,
86       "result": self.result,
87       "log": self.log,
88       "start_timestamp": self.start_timestamp,
89       "end_timestamp": self.end_timestamp,
90       }
91
92
93 class _QueuedJob(object):
94   """In-memory job representation.
95
96   This is what we use to track the user-submitted jobs. Locking must be taken
97   care of by users of this class.
98
99   """
100   def __init__(self, queue, job_id, ops):
101     if not ops:
102       # TODO
103       raise Exception("No opcodes")
104
105     self.queue = queue
106     self.id = job_id
107     self.ops = [_QueuedOpCode(op) for op in ops]
108     self.run_op_index = -1
109     self.log_serial = 0
110     self.received_timestamp = TimeStampNow()
111     self.start_timestamp = None
112     self.end_timestamp = None
113
114     # Condition to wait for changes
115     self.change = threading.Condition(self.queue._lock)
116
117   @classmethod
118   def Restore(cls, queue, state):
119     obj = _QueuedJob.__new__(cls)
120     obj.queue = queue
121     obj.id = state["id"]
122     obj.run_op_index = state["run_op_index"]
123     obj.received_timestamp = state.get("received_timestamp", None)
124     obj.start_timestamp = state.get("start_timestamp", None)
125     obj.end_timestamp = state.get("end_timestamp", None)
126
127     obj.ops = []
128     obj.log_serial = 0
129     for op_state in state["ops"]:
130       op = _QueuedOpCode.Restore(op_state)
131       for log_entry in op.log:
132         obj.log_serial = max(obj.log_serial, log_entry[0])
133       obj.ops.append(op)
134
135     # Condition to wait for changes
136     obj.change = threading.Condition(obj.queue._lock)
137
138     return obj
139
140   def Serialize(self):
141     return {
142       "id": self.id,
143       "ops": [op.Serialize() for op in self.ops],
144       "run_op_index": self.run_op_index,
145       "start_timestamp": self.start_timestamp,
146       "end_timestamp": self.end_timestamp,
147       "received_timestamp": self.received_timestamp,
148       }
149
150   def CalcStatus(self):
151     status = constants.JOB_STATUS_QUEUED
152
153     all_success = True
154     for op in self.ops:
155       if op.status == constants.OP_STATUS_SUCCESS:
156         continue
157
158       all_success = False
159
160       if op.status == constants.OP_STATUS_QUEUED:
161         pass
162       elif op.status == constants.OP_STATUS_RUNNING:
163         status = constants.JOB_STATUS_RUNNING
164       elif op.status == constants.OP_STATUS_ERROR:
165         status = constants.JOB_STATUS_ERROR
166         # The whole job fails if one opcode failed
167         break
168       elif op.status == constants.OP_STATUS_CANCELED:
169         status = constants.OP_STATUS_CANCELED
170         break
171
172     if all_success:
173       status = constants.JOB_STATUS_SUCCESS
174
175     return status
176
177   def GetLogEntries(self, newer_than):
178     if newer_than is None:
179       serial = -1
180     else:
181       serial = newer_than
182
183     entries = []
184     for op in self.ops:
185       entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
186
187     return entries
188
189
190 class _JobQueueWorker(workerpool.BaseWorker):
191   def RunTask(self, job):
192     """Job executor.
193
194     This functions processes a job. It is closely tied to the _QueuedJob and
195     _QueuedOpCode classes.
196
197     """
198     logging.debug("Worker %s processing job %s",
199                   self.worker_id, job.id)
200     proc = mcpu.Processor(self.pool.queue.context)
201     queue = job.queue
202     try:
203       try:
204         count = len(job.ops)
205         for idx, op in enumerate(job.ops):
206           try:
207             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
208
209             queue.acquire()
210             try:
211               job.run_op_index = idx
212               op.status = constants.OP_STATUS_RUNNING
213               op.result = None
214               op.start_timestamp = TimeStampNow()
215               if idx == 0: # first opcode
216                 job.start_timestamp = op.start_timestamp
217               queue.UpdateJobUnlocked(job)
218
219               input_opcode = op.input
220             finally:
221               queue.release()
222
223             def _Log(*args):
224               """Append a log entry.
225
226               """
227               assert len(args) < 3
228
229               if len(args) == 1:
230                 log_type = constants.ELOG_MESSAGE
231                 log_msg = args[0]
232               else:
233                 log_type, log_msg = args
234
235               # The time is split to make serialization easier and not lose
236               # precision.
237               timestamp = utils.SplitTime(time.time())
238
239               queue.acquire()
240               try:
241                 job.log_serial += 1
242                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
243
244                 job.change.notifyAll()
245               finally:
246                 queue.release()
247
248             # Make sure not to hold lock while _Log is called
249             result = proc.ExecOpCode(input_opcode, _Log)
250
251             queue.acquire()
252             try:
253               op.status = constants.OP_STATUS_SUCCESS
254               op.result = result
255               op.end_timestamp = TimeStampNow()
256               queue.UpdateJobUnlocked(job)
257             finally:
258               queue.release()
259
260             logging.debug("Op %s/%s: Successfully finished %s",
261                           idx + 1, count, op)
262           except Exception, err:
263             queue.acquire()
264             try:
265               try:
266                 op.status = constants.OP_STATUS_ERROR
267                 op.result = str(err)
268                 op.end_timestamp = TimeStampNow()
269                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
270               finally:
271                 queue.UpdateJobUnlocked(job)
272             finally:
273               queue.release()
274             raise
275
276       except errors.GenericError, err:
277         logging.exception("Ganeti exception")
278       except:
279         logging.exception("Unhandled exception")
280     finally:
281       queue.acquire()
282       try:
283         try:
284           job.run_op_idx = -1
285           job.end_timestamp = TimeStampNow()
286           queue.UpdateJobUnlocked(job)
287         finally:
288           job_id = job.id
289           status = job.CalcStatus()
290       finally:
291         queue.release()
292       logging.debug("Worker %s finished job %s, status = %s",
293                     self.worker_id, job_id, status)
294
295
296 class _JobQueueWorkerPool(workerpool.WorkerPool):
297   def __init__(self, queue):
298     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
299                                               _JobQueueWorker)
300     self.queue = queue
301
302
303 class JobQueue(object):
304   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
305
306   def _RequireOpenQueue(fn):
307     """Decorator for "public" functions.
308
309     This function should be used for all "public" functions. That is, functions
310     usually called from other classes.
311
312     Important: Use this decorator only after utils.LockedMethod!
313
314     Example:
315       @utils.LockedMethod
316       @_RequireOpenQueue
317       def Example(self):
318         pass
319
320     """
321     def wrapper(self, *args, **kwargs):
322       assert self._queue_lock is not None, "Queue should be open"
323       return fn(self, *args, **kwargs)
324     return wrapper
325
326   def __init__(self, context):
327     self.context = context
328     self._memcache = weakref.WeakValueDictionary()
329     self._my_hostname = utils.HostInfo().name
330
331     # Locking
332     self._lock = threading.Lock()
333     self.acquire = self._lock.acquire
334     self.release = self._lock.release
335
336     # Initialize
337     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
338
339     # Read serial file
340     self._last_serial = jstore.ReadSerial()
341     assert self._last_serial is not None, ("Serial file was modified between"
342                                            " check in jstore and here")
343
344     # Get initial list of nodes
345     self._nodes = set(self.context.cfg.GetNodeList())
346
347     # Remove master node
348     try:
349       self._nodes.remove(self._my_hostname)
350     except ValueError:
351       pass
352
353     # TODO: Check consistency across nodes
354
355     # Setup worker pool
356     self._wpool = _JobQueueWorkerPool(self)
357
358     # We need to lock here because WorkerPool.AddTask() may start a job while
359     # we're still doing our work.
360     self.acquire()
361     try:
362       for job in self._GetJobsUnlocked(None):
363         status = job.CalcStatus()
364
365         if status in (constants.JOB_STATUS_QUEUED, ):
366           self._wpool.AddTask(job)
367
368         elif status in (constants.JOB_STATUS_RUNNING, ):
369           logging.warning("Unfinished job %s found: %s", job.id, job)
370           try:
371             for op in job.ops:
372               op.status = constants.OP_STATUS_ERROR
373               op.result = "Unclean master daemon shutdown"
374           finally:
375             self.UpdateJobUnlocked(job)
376     finally:
377       self.release()
378
379   @utils.LockedMethod
380   @_RequireOpenQueue
381   def AddNode(self, node_name):
382     assert node_name != self._my_hostname
383
384     # Clean queue directory on added node
385     rpc.call_jobqueue_purge(node_name)
386
387     # Upload the whole queue excluding archived jobs
388     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
389
390     # Upload current serial file
391     files.append(constants.JOB_QUEUE_SERIAL_FILE)
392
393     for file_name in files:
394       # Read file content
395       fd = open(file_name, "r")
396       try:
397         content = fd.read()
398       finally:
399         fd.close()
400
401       result = rpc.call_jobqueue_update([node_name], file_name, content)
402       if not result[node_name]:
403         logging.error("Failed to upload %s to %s", file_name, node_name)
404
405     self._nodes.add(node_name)
406
407   @utils.LockedMethod
408   @_RequireOpenQueue
409   def RemoveNode(self, node_name):
410     try:
411       # The queue is removed by the "leave node" RPC call.
412       self._nodes.remove(node_name)
413     except KeyError:
414       pass
415
416   def _CheckRpcResult(self, result, nodes, failmsg):
417     failed = []
418     success = []
419
420     for node in nodes:
421       if result[node]:
422         success.append(node)
423       else:
424         failed.append(node)
425
426     if failed:
427       logging.error("%s failed on %s", failmsg, ", ".join(failed))
428
429     # +1 for the master node
430     if (len(success) + 1) < len(failed):
431       # TODO: Handle failing nodes
432       logging.error("More than half of the nodes failed")
433
434   def _WriteAndReplicateFileUnlocked(self, file_name, data):
435     """Writes a file locally and then replicates it to all nodes.
436
437     """
438     utils.WriteFile(file_name, data=data)
439
440     result = rpc.call_jobqueue_update(self._nodes, file_name, data)
441     self._CheckRpcResult(result, self._nodes,
442                          "Updating %s" % file_name)
443
444   def _RenameFileUnlocked(self, old, new):
445     os.rename(old, new)
446
447     result = rpc.call_jobqueue_rename(self._nodes, old, new)
448     self._CheckRpcResult(result, self._nodes,
449                          "Moving %s to %s" % (old, new))
450
451   def _FormatJobID(self, job_id):
452     if not isinstance(job_id, (int, long)):
453       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
454     if job_id < 0:
455       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
456
457     return str(job_id)
458
459   def _NewSerialUnlocked(self):
460     """Generates a new job identifier.
461
462     Job identifiers are unique during the lifetime of a cluster.
463
464     Returns: A string representing the job identifier.
465
466     """
467     # New number
468     serial = self._last_serial + 1
469
470     # Write to file
471     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
472                                         "%s\n" % serial)
473
474     # Keep it only if we were able to write the file
475     self._last_serial = serial
476
477     return self._FormatJobID(serial)
478
479   @staticmethod
480   def _GetJobPath(job_id):
481     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
482
483   @staticmethod
484   def _GetArchivedJobPath(job_id):
485     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
486
487   @classmethod
488   def _ExtractJobID(cls, name):
489     m = cls._RE_JOB_FILE.match(name)
490     if m:
491       return m.group(1)
492     else:
493       return None
494
495   def _GetJobIDsUnlocked(self, archived=False):
496     """Return all known job IDs.
497
498     If the parameter archived is True, archived jobs IDs will be
499     included. Currently this argument is unused.
500
501     The method only looks at disk because it's a requirement that all
502     jobs are present on disk (so in the _memcache we don't have any
503     extra IDs).
504
505     """
506     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
507     jlist = utils.NiceSort(jlist)
508     return jlist
509
510   def _ListJobFiles(self):
511     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
512             if self._RE_JOB_FILE.match(name)]
513
514   def _LoadJobUnlocked(self, job_id):
515     job = self._memcache.get(job_id, None)
516     if job:
517       logging.debug("Found job %s in memcache", job_id)
518       return job
519
520     filepath = self._GetJobPath(job_id)
521     logging.debug("Loading job from %s", filepath)
522     try:
523       fd = open(filepath, "r")
524     except IOError, err:
525       if err.errno in (errno.ENOENT, ):
526         return None
527       raise
528     try:
529       data = serializer.LoadJson(fd.read())
530     finally:
531       fd.close()
532
533     job = _QueuedJob.Restore(self, data)
534     self._memcache[job_id] = job
535     logging.debug("Added job %s to the cache", job_id)
536     return job
537
538   def _GetJobsUnlocked(self, job_ids):
539     if not job_ids:
540       job_ids = self._GetJobIDsUnlocked()
541
542     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
543
544   @utils.LockedMethod
545   @_RequireOpenQueue
546   def SubmitJob(self, ops):
547     """Create and store a new job.
548
549     This enters the job into our job queue and also puts it on the new
550     queue, in order for it to be picked up by the queue processors.
551
552     @type ops: list
553     @param ops: The list of OpCodes that will become the new job.
554
555     """
556     # Get job identifier
557     job_id = self._NewSerialUnlocked()
558     job = _QueuedJob(self, job_id, ops)
559
560     # Write to disk
561     self.UpdateJobUnlocked(job)
562
563     logging.debug("Adding new job %s to the cache", job_id)
564     self._memcache[job_id] = job
565
566     # Add to worker pool
567     self._wpool.AddTask(job)
568
569     return job.id
570
571   @_RequireOpenQueue
572   def UpdateJobUnlocked(self, job):
573     filename = self._GetJobPath(job.id)
574     data = serializer.DumpJson(job.Serialize(), indent=False)
575     logging.debug("Writing job %s to %s", job.id, filename)
576     self._WriteAndReplicateFileUnlocked(filename, data)
577
578     # Notify waiters about potential changes
579     job.change.notifyAll()
580
581   @utils.LockedMethod
582   @_RequireOpenQueue
583   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
584                         timeout):
585     """Waits for changes in a job.
586
587     @type job_id: string
588     @param job_id: Job identifier
589     @type fields: list of strings
590     @param fields: Which fields to check for changes
591     @type prev_job_info: list or None
592     @param prev_job_info: Last job information returned
593     @type prev_log_serial: int
594     @param prev_log_serial: Last job message serial number
595     @type timeout: float
596     @param timeout: maximum time to wait
597
598     """
599     logging.debug("Waiting for changes in job %s", job_id)
600     end_time = time.time() + timeout
601     while True:
602       delta_time = end_time - time.time()
603       if delta_time < 0:
604         return constants.JOB_NOTCHANGED
605
606       job = self._LoadJobUnlocked(job_id)
607       if not job:
608         logging.debug("Job %s not found", job_id)
609         break
610
611       status = job.CalcStatus()
612       job_info = self._GetJobInfoUnlocked(job, fields)
613       log_entries = job.GetLogEntries(prev_log_serial)
614
615       # Serializing and deserializing data can cause type changes (e.g. from
616       # tuple to list) or precision loss. We're doing it here so that we get
617       # the same modifications as the data received from the client. Without
618       # this, the comparison afterwards might fail without the data being
619       # significantly different.
620       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
621       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
622
623       if status not in (constants.JOB_STATUS_QUEUED,
624                         constants.JOB_STATUS_RUNNING):
625         # Don't even try to wait if the job is no longer running, there will be
626         # no changes.
627         break
628
629       if (prev_job_info != job_info or
630           (log_entries and prev_log_serial != log_entries[0][0])):
631         break
632
633       logging.debug("Waiting again")
634
635       # Release the queue lock while waiting
636       job.change.wait(delta_time)
637
638     logging.debug("Job %s changed", job_id)
639
640     return (job_info, log_entries)
641
642   @utils.LockedMethod
643   @_RequireOpenQueue
644   def CancelJob(self, job_id):
645     """Cancels a job.
646
647     @type job_id: string
648     @param job_id: Job ID of job to be cancelled.
649
650     """
651     logging.debug("Cancelling job %s", job_id)
652
653     job = self._LoadJobUnlocked(job_id)
654     if not job:
655       logging.debug("Job %s not found", job_id)
656       return
657
658     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
659       logging.debug("Job %s is no longer in the queue", job.id)
660       return
661
662     try:
663       for op in job.ops:
664         op.status = constants.OP_STATUS_ERROR
665         op.result = "Job cancelled by request"
666     finally:
667       self.UpdateJobUnlocked(job)
668
669   @utils.LockedMethod
670   @_RequireOpenQueue
671   def ArchiveJob(self, job_id):
672     """Archives a job.
673
674     @type job_id: string
675     @param job_id: Job ID of job to be archived.
676
677     """
678     logging.debug("Archiving job %s", job_id)
679
680     job = self._LoadJobUnlocked(job_id)
681     if not job:
682       logging.debug("Job %s not found", job_id)
683       return
684
685     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
686                                 constants.JOB_STATUS_SUCCESS,
687                                 constants.JOB_STATUS_ERROR):
688       logging.debug("Job %s is not yet done", job.id)
689       return
690
691     old = self._GetJobPath(job.id)
692     new = self._GetArchivedJobPath(job.id)
693
694     self._RenameFileUnlocked(old, new)
695
696     logging.debug("Successfully archived job %s", job.id)
697
698   def _GetJobInfoUnlocked(self, job, fields):
699     row = []
700     for fname in fields:
701       if fname == "id":
702         row.append(job.id)
703       elif fname == "status":
704         row.append(job.CalcStatus())
705       elif fname == "ops":
706         row.append([op.input.__getstate__() for op in job.ops])
707       elif fname == "opresult":
708         row.append([op.result for op in job.ops])
709       elif fname == "opstatus":
710         row.append([op.status for op in job.ops])
711       elif fname == "oplog":
712         row.append([op.log for op in job.ops])
713       elif fname == "opstart":
714         row.append([op.start_timestamp for op in job.ops])
715       elif fname == "opend":
716         row.append([op.end_timestamp for op in job.ops])
717       elif fname == "received_ts":
718         row.append(job.received_timestamp)
719       elif fname == "start_ts":
720         row.append(job.start_timestamp)
721       elif fname == "end_ts":
722         row.append(job.end_timestamp)
723       elif fname == "summary":
724         row.append([op.input.Summary() for op in job.ops])
725       else:
726         raise errors.OpExecError("Invalid job query field '%s'" % fname)
727     return row
728
729   @utils.LockedMethod
730   @_RequireOpenQueue
731   def QueryJobs(self, job_ids, fields):
732     """Returns a list of jobs in queue.
733
734     Args:
735     - job_ids: Sequence of job identifiers or None for all
736     - fields: Names of fields to return
737
738     """
739     jobs = []
740
741     for job in self._GetJobsUnlocked(job_ids):
742       if job is None:
743         jobs.append(None)
744       else:
745         jobs.append(self._GetJobInfoUnlocked(job, fields))
746
747     return jobs
748
749   @utils.LockedMethod
750   @_RequireOpenQueue
751   def Shutdown(self):
752     """Stops the job queue.
753
754     """
755     self._wpool.TerminateWorkers()
756
757     self._queue_lock.Close()
758     self._queue_lock = None