a96947883073171613e2915754f524002273648e
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import os
25 import logging
26 import threading
27 import errno
28 import re
29 import time
30
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import jstore
39 from ganeti import rpc
40
41
42 JOBQUEUE_THREADS = 5
43
44
45 class _QueuedOpCode(object):
46   """Encasulates an opcode object.
47
48   Access is synchronized by the '_lock' attribute.
49
50   The 'log' attribute holds the execution log and consists of tuples
51   of the form (timestamp, level, message).
52
53   """
54   def __new__(cls, *args, **kwargs):
55     obj = object.__new__(cls, *args, **kwargs)
56     # Create a special lock for logging
57     obj._log_lock = threading.Lock()
58     return obj
59
60   def __init__(self, op):
61     self.input = op
62     self.status = constants.OP_STATUS_QUEUED
63     self.result = None
64     self.log = []
65
66   @classmethod
67   def Restore(cls, state):
68     obj = _QueuedOpCode.__new__(cls)
69     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70     obj.status = state["status"]
71     obj.result = state["result"]
72     obj.log = state["log"]
73     return obj
74
75   def Serialize(self):
76     self._log_lock.acquire()
77     try:
78       return {
79         "input": self.input.__getstate__(),
80         "status": self.status,
81         "result": self.result,
82         "log": self.log,
83         }
84     finally:
85       self._log_lock.release()
86
87   def Log(self, *args):
88     """Append a log entry.
89
90     """
91     assert len(args) < 3
92
93     if len(args) == 1:
94       log_type = constants.ELOG_MESSAGE
95       log_msg = args[0]
96     else:
97       log_type, log_msg = args
98
99     self._log_lock.acquire()
100     try:
101       self.log.append((time.time(), log_type, log_msg))
102     finally:
103       self._log_lock.release()
104
105   def RetrieveLog(self, start_at=0):
106     """Retrieve (a part of) the execution log.
107
108     """
109     self._log_lock.acquire()
110     try:
111       return self.log[start_at:]
112     finally:
113       self._log_lock.release()
114
115
116 class _QueuedJob(object):
117   """In-memory job representation.
118
119   This is what we use to track the user-submitted jobs.
120
121   """
122   def __init__(self, queue, job_id, ops):
123     if not ops:
124       # TODO
125       raise Exception("No opcodes")
126
127     self.queue = queue
128     self.id = job_id
129     self.ops = [_QueuedOpCode(op) for op in ops]
130     self.run_op_index = -1
131
132   @classmethod
133   def Restore(cls, queue, state):
134     obj = _QueuedJob.__new__(cls)
135     obj.queue = queue
136     obj.id = state["id"]
137     obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
138     obj.run_op_index = state["run_op_index"]
139     return obj
140
141   def Serialize(self):
142     return {
143       "id": self.id,
144       "ops": [op.Serialize() for op in self.ops],
145       "run_op_index": self.run_op_index,
146       }
147
148   def CalcStatus(self):
149     status = constants.JOB_STATUS_QUEUED
150
151     all_success = True
152     for op in self.ops:
153       if op.status == constants.OP_STATUS_SUCCESS:
154         continue
155
156       all_success = False
157
158       if op.status == constants.OP_STATUS_QUEUED:
159         pass
160       elif op.status == constants.OP_STATUS_RUNNING:
161         status = constants.JOB_STATUS_RUNNING
162       elif op.status == constants.OP_STATUS_ERROR:
163         status = constants.JOB_STATUS_ERROR
164         # The whole job fails if one opcode failed
165         break
166       elif op.status == constants.OP_STATUS_CANCELED:
167         status = constants.OP_STATUS_CANCELED
168         break
169
170     if all_success:
171       status = constants.JOB_STATUS_SUCCESS
172
173     return status
174
175
176 class _JobQueueWorker(workerpool.BaseWorker):
177   def RunTask(self, job):
178     """Job executor.
179
180     This functions processes a job.
181
182     """
183     logging.debug("Worker %s processing job %s",
184                   self.worker_id, job.id)
185     proc = mcpu.Processor(self.pool.queue.context)
186     queue = job.queue
187     try:
188       try:
189         count = len(job.ops)
190         for idx, op in enumerate(job.ops):
191           try:
192             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
193
194             queue.acquire()
195             try:
196               job.run_op_index = idx
197               op.status = constants.OP_STATUS_RUNNING
198               op.result = None
199               queue.UpdateJobUnlocked(job)
200
201               input_opcode = op.input
202             finally:
203               queue.release()
204
205             result = proc.ExecOpCode(input_opcode, op.Log)
206
207             queue.acquire()
208             try:
209               op.status = constants.OP_STATUS_SUCCESS
210               op.result = result
211               queue.UpdateJobUnlocked(job)
212             finally:
213               queue.release()
214
215             logging.debug("Op %s/%s: Successfully finished %s",
216                           idx + 1, count, op)
217           except Exception, err:
218             queue.acquire()
219             try:
220               try:
221                 op.status = constants.OP_STATUS_ERROR
222                 op.result = str(err)
223                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
224               finally:
225                 queue.UpdateJobUnlocked(job)
226             finally:
227               queue.release()
228             raise
229
230       except errors.GenericError, err:
231         logging.exception("Ganeti exception")
232       except:
233         logging.exception("Unhandled exception")
234     finally:
235       queue.acquire()
236       try:
237         job_id = job.id
238         status = job.CalcStatus()
239       finally:
240         queue.release()
241       logging.debug("Worker %s finished job %s, status = %s",
242                     self.worker_id, job_id, status)
243
244
245 class _JobQueueWorkerPool(workerpool.WorkerPool):
246   def __init__(self, queue):
247     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
248                                               _JobQueueWorker)
249     self.queue = queue
250
251
252 class JobQueue(object):
253   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
254
255   def _RequireOpenQueue(fn):
256     """Decorator for "public" functions.
257
258     This function should be used for all "public" functions. That is, functions
259     usually called from other classes.
260
261     Important: Use this decorator only after utils.LockedMethod!
262
263     Example:
264       @utils.LockedMethod
265       @_RequireOpenQueue
266       def Example(self):
267         pass
268
269     """
270     def wrapper(self, *args, **kwargs):
271       assert self._queue_lock is not None, "Queue should be open"
272       return fn(self, *args, **kwargs)
273     return wrapper
274
275   def __init__(self, context):
276     self.context = context
277     self._memcache = {}
278     self._my_hostname = utils.HostInfo().name
279
280     # Locking
281     self._lock = threading.Lock()
282     self.acquire = self._lock.acquire
283     self.release = self._lock.release
284
285     # Initialize
286     self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
287
288     # Read serial file
289     self._last_serial = jstore.ReadSerial()
290     assert self._last_serial is not None, ("Serial file was modified between"
291                                            " check in jstore and here")
292
293     # Get initial list of nodes
294     self._nodes = set(self.context.cfg.GetNodeList())
295
296     # Remove master node
297     try:
298       self._nodes.remove(self._my_hostname)
299     except ValueError:
300       pass
301
302     # TODO: Check consistency across nodes
303
304     # Setup worker pool
305     self._wpool = _JobQueueWorkerPool(self)
306
307     # We need to lock here because WorkerPool.AddTask() may start a job while
308     # we're still doing our work.
309     self.acquire()
310     try:
311       for job in self._GetJobsUnlocked(None):
312         status = job.CalcStatus()
313
314         if status in (constants.JOB_STATUS_QUEUED, ):
315           self._wpool.AddTask(job)
316
317         elif status in (constants.JOB_STATUS_RUNNING, ):
318           logging.warning("Unfinished job %s found: %s", job.id, job)
319           try:
320             for op in job.ops:
321               op.status = constants.OP_STATUS_ERROR
322               op.result = "Unclean master daemon shutdown"
323           finally:
324             self.UpdateJobUnlocked(job)
325     finally:
326       self.release()
327
328   @utils.LockedMethod
329   @_RequireOpenQueue
330   def AddNode(self, node_name):
331     assert node_name != self._my_hostname
332
333     # TODO: Clean queue directory on added node
334
335     # Upload the whole queue excluding archived jobs
336     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
337
338     # Upload current serial file
339     files.append(constants.JOB_QUEUE_SERIAL_FILE)
340
341     for file_name in files:
342       result = rpc.call_upload_file([node_name], file_name)
343       if not result[node_name]:
344         logging.error("Failed to upload %s to %s", file_name, node_name)
345
346     self._nodes.add(node_name)
347
348   @utils.LockedMethod
349   @_RequireOpenQueue
350   def RemoveNode(self, node_name):
351     try:
352       # The queue is removed by the "leave node" RPC call.
353       self._nodes.remove(node_name)
354     except KeyError:
355       pass
356
357   def _WriteAndReplicateFileUnlocked(self, file_name, data):
358     """Writes a file locally and then replicates it to all nodes.
359
360     """
361     utils.WriteFile(file_name, data=data)
362
363     failed_nodes = 0
364     result = rpc.call_upload_file(self._nodes, file_name)
365     for node in self._nodes:
366       if not result[node]:
367         failed_nodes += 1
368         logging.error("Copy of job queue file to node %s failed", node)
369
370     # TODO: check failed_nodes
371
372   def _FormatJobID(self, job_id):
373     if not isinstance(job_id, (int, long)):
374       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
375     if job_id < 0:
376       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
377
378     return str(job_id)
379
380   def _NewSerialUnlocked(self):
381     """Generates a new job identifier.
382
383     Job identifiers are unique during the lifetime of a cluster.
384
385     Returns: A string representing the job identifier.
386
387     """
388     # New number
389     serial = self._last_serial + 1
390
391     # Write to file
392     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
393                                         "%s\n" % serial)
394
395     # Keep it only if we were able to write the file
396     self._last_serial = serial
397
398     return self._FormatJobID(serial)
399
400   @staticmethod
401   def _GetJobPath(job_id):
402     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
403
404   @staticmethod
405   def _GetArchivedJobPath(job_id):
406     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
407
408   @classmethod
409   def _ExtractJobID(cls, name):
410     m = cls._RE_JOB_FILE.match(name)
411     if m:
412       return m.group(1)
413     else:
414       return None
415
416   def _GetJobIDsUnlocked(self, archived=False):
417     """Return all known job IDs.
418
419     If the parameter archived is True, archived jobs IDs will be
420     included. Currently this argument is unused.
421
422     The method only looks at disk because it's a requirement that all
423     jobs are present on disk (so in the _memcache we don't have any
424     extra IDs).
425
426     """
427     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
428     jlist.sort()
429     return jlist
430
431   def _ListJobFiles(self):
432     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
433             if self._RE_JOB_FILE.match(name)]
434
435   def _LoadJobUnlocked(self, job_id):
436     if job_id in self._memcache:
437       logging.debug("Found job %s in memcache", job_id)
438       return self._memcache[job_id]
439
440     filepath = self._GetJobPath(job_id)
441     logging.debug("Loading job from %s", filepath)
442     try:
443       fd = open(filepath, "r")
444     except IOError, err:
445       if err.errno in (errno.ENOENT, ):
446         return None
447       raise
448     try:
449       data = serializer.LoadJson(fd.read())
450     finally:
451       fd.close()
452
453     job = _QueuedJob.Restore(self, data)
454     self._memcache[job_id] = job
455     logging.debug("Added job %s to the cache", job_id)
456     return job
457
458   def _GetJobsUnlocked(self, job_ids):
459     if not job_ids:
460       job_ids = self._GetJobIDsUnlocked()
461
462     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
463
464   @utils.LockedMethod
465   @_RequireOpenQueue
466   def SubmitJob(self, ops):
467     """Create and store a new job.
468
469     This enters the job into our job queue and also puts it on the new
470     queue, in order for it to be picked up by the queue processors.
471
472     @type ops: list
473     @param ops: The list of OpCodes that will become the new job.
474
475     """
476     # Get job identifier
477     job_id = self._NewSerialUnlocked()
478     job = _QueuedJob(self, job_id, ops)
479
480     # Write to disk
481     self.UpdateJobUnlocked(job)
482
483     logging.debug("Added new job %s to the cache", job_id)
484     self._memcache[job_id] = job
485
486     # Add to worker pool
487     self._wpool.AddTask(job)
488
489     return job.id
490
491   @_RequireOpenQueue
492   def UpdateJobUnlocked(self, job):
493     filename = self._GetJobPath(job.id)
494     data = serializer.DumpJson(job.Serialize(), indent=False)
495     logging.debug("Writing job %s to %s", job.id, filename)
496     self._WriteAndReplicateFileUnlocked(filename, data)
497     self._CleanCacheUnlocked([job.id])
498
499   def _CleanCacheUnlocked(self, exclude):
500     """Clean the memory cache.
501
502     The exceptions argument contains job IDs that should not be
503     cleaned.
504
505     """
506     assert isinstance(exclude, list)
507
508     for job in self._memcache.values():
509       if job.id in exclude:
510         continue
511       if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
512                                   constants.JOB_STATUS_RUNNING):
513         logging.debug("Cleaning job %s from the cache", job.id)
514         try:
515           del self._memcache[job.id]
516         except KeyError:
517           pass
518
519   @utils.LockedMethod
520   @_RequireOpenQueue
521   def CancelJob(self, job_id):
522     """Cancels a job.
523
524     @type job_id: string
525     @param job_id: Job ID of job to be cancelled.
526
527     """
528     logging.debug("Cancelling job %s", job_id)
529
530     job = self._LoadJobUnlocked(job_id)
531     if not job:
532       logging.debug("Job %s not found", job_id)
533       return
534
535     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
536       logging.debug("Job %s is no longer in the queue", job.id)
537       return
538
539     try:
540       for op in job.ops:
541         op.status = constants.OP_STATUS_ERROR
542         op.result = "Job cancelled by request"
543     finally:
544       self.UpdateJobUnlocked(job)
545
546   @utils.LockedMethod
547   @_RequireOpenQueue
548   def ArchiveJob(self, job_id):
549     """Archives a job.
550
551     @type job_id: string
552     @param job_id: Job ID of job to be archived.
553
554     """
555     logging.debug("Archiving job %s", job_id)
556
557     job = self._LoadJobUnlocked(job_id)
558     if not job:
559       logging.debug("Job %s not found", job_id)
560       return
561
562     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
563                                 constants.JOB_STATUS_SUCCESS,
564                                 constants.JOB_STATUS_ERROR):
565       logging.debug("Job %s is not yet done", job.id)
566       return
567
568     try:
569       old = self._GetJobPath(job.id)
570       new = self._GetArchivedJobPath(job.id)
571
572       os.rename(old, new)
573
574       logging.debug("Successfully archived job %s", job.id)
575     finally:
576       # Cleaning the cache because we don't know what os.rename actually did
577       # and to be on the safe side.
578       self._CleanCacheUnlocked([])
579
580   def _GetJobInfoUnlocked(self, job, fields):
581     row = []
582     for fname in fields:
583       if fname == "id":
584         row.append(job.id)
585       elif fname == "status":
586         row.append(job.CalcStatus())
587       elif fname == "ops":
588         row.append([op.input.__getstate__() for op in job.ops])
589       elif fname == "opresult":
590         row.append([op.result for op in job.ops])
591       elif fname == "opstatus":
592         row.append([op.status for op in job.ops])
593       elif fname == "ticker":
594         ji = job.run_op_index
595         if ji < 0:
596           lmsg = None
597         else:
598           lmsg = job.ops[ji].RetrieveLog(-1)
599           # message might be empty here
600           if lmsg:
601             lmsg = lmsg[0]
602           else:
603             lmsg = None
604         row.append(lmsg)
605       else:
606         raise errors.OpExecError("Invalid job query field '%s'" % fname)
607     return row
608
609   @utils.LockedMethod
610   @_RequireOpenQueue
611   def QueryJobs(self, job_ids, fields):
612     """Returns a list of jobs in queue.
613
614     Args:
615     - job_ids: Sequence of job identifiers or None for all
616     - fields: Names of fields to return
617
618     """
619     jobs = []
620
621     for job in self._GetJobsUnlocked(job_ids):
622       if job is None:
623         jobs.append(None)
624       else:
625         jobs.append(self._GetJobInfoUnlocked(job, fields))
626
627     return jobs
628
629   @utils.LockedMethod
630   @_RequireOpenQueue
631   def Shutdown(self):
632     """Stops the job queue.
633
634     """
635     self._wpool.TerminateWorkers()
636
637     self._queue_lock.Close()
638     self._queue_lock = None