Package zephir :: Package backend :: Module schedule
[frames] | no frames]

Source Code for Module zephir.backend.schedule

  1  # -*- coding: UTF-8 -*- 
  2  ########################################################################### 
  3  # Eole NG - 2007 
  4  # Copyright Pole de Competence Eole  (Ministere Education - Academie Dijon) 
  5  # Licence CeCill  cf /root/LicenceEole.txt 
  6  # eole@ac-dijon.fr 
  7  # 
  8  # schedule.py 
  9  # 
 10  # Classes de gestion de la périodicité des tâches 
 11  # 
 12  ########################################################################### 
 13  from zephir.backend import config 
 14  from zephir.backend.config import log 
 15  from zephir.backend.lib_backend import cx_pool 
 16   
 17  from twisted.internet import task 
 18  import datetime, calendar, traceback 
 19   
20 -class Task:
21 """Classe représentant une tâche à effectuer à un moment donné 22 23 sous classes définies: 24 25 WeekdayTask (weekdays, hour, min) 26 MonthdayTask (monthdays, hour, min) 27 DayTask(hour, min) 28 IntervalTask (interval) 29 SingleTask (month, day, hour, min) 30 """ 31
32 - def __init__(self, id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date=None):
33 self.id_task = id_task 34 self.name = name 35 self.cmd = cmd 36 self.month = month 37 self.day = day 38 self.min = min 39 self.hour = hour 40 self.week_day = week_day 41 self.periodicity = periodicity 42 self.exec_date = exec_date
43
44 - def __cmp__(self, b):
45 if self.__class__.__name__ == b.__class__.__name__: 46 return self.id_task.__cmp__(b.id_task) 47 else: 48 return self.periodicity.__cmp__(b.periodicity)
49
50 - def __str__(self):
51 return "%d (%s) -> %s" % (self.id_task, self.name, self.__repr__())
52
53 - def update_date(self):
54 # sauvegarde de la prochaine date d'exécution 55 cu = cx_pool.create() 56 try: 57 query = """update tasks set exec_date=%s where id_task=%s""" 58 params = (self.exec_date.strftime('%c'),self.id_task) 59 cu.execute(query, params) 60 cx_pool.commit(cu) 61 except: 62 import traceback 63 traceback.print_exc() 64 cx_pool.rollback(cu)
65
66 - def check_time(self):
67 """méthode à redéfinir dans les classes filles 68 doit retourner True si il est temps d'exécuter la tâche, False sinon. 69 Dans le cas ou True est retourné, l'attribut exec_date doit être mis à jour pour 70 refléter la prochaine date d'exécution. 71 """ 72 pass
73
74 - def run(self, serveurs):
75 """met en place l'appel de la tache pour tous les serveurs concernés""" 76 # mise en place de la commande 77 log.msg("tâche %d (%s) : execution de '%s' sur les serveurs %s" % (self.id_task, self.name, self.cmd, ','.join([str(serv) for serv in serveurs]))) 78 # lancement de la commande 79 # mise à jour de la date de prochaine exécution en base 80 if not self.single_use: 81 self.update_date()
82
83 -class WeeklyTask (Task):
84 85 single_use = False 86
87 - def __repr__(self):
88 return """WeeklyTask : %s (%s - %d:%d)""" % (self.cmd, calendar.day_name[self.week_day], self.hour, self.min)
89
90 - def check_time(self):
91 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5]) 92 if self.exec_date is None: 93 if now.isoweekday == self.week_day: 94 # date de la première exécution 95 self.exec_date = datetime.datetime(now.year, now.month, now.day, self.hour, self.min) 96 else: 97 return False 98 if now >= self.exec_date: 99 # date de la prochaine exécution (+1 semaine) 100 delta = datetime.timedelta(days=7) 101 self.exec_date = self.exec_date + delta 102 # on a atteint (ou dépassé) la date d'exécution 103 return True 104 return False
105
106 -class MonthlyTask (Task):
107 108 single_use = False 109
110 - def __repr__(self):
111 return """MonthlyTask : %s (%s - %d:%d)""" % (self.cmd, self.day, self.hour, self.min)
112
113 - def check_time(self):
114 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5]) 115 if self.exec_date is None: 116 if now.day == self.day: 117 # date de la première exécution 118 self.exec_date = datetime.datetime(now.year, now.month, self.day, self.hour, self.min) 119 else: 120 return False 121 if now >= self.exec_date: 122 # date de la prochaine exécution (+1 mois) 123 month_days = calendar.month_range(now.year,now.month)[1] 124 delta = datetime.timedelta(days=month_days) 125 self.exec_date = self.exec_date + delta 126 # on a atteint (ou dépassé) la date d'exécution 127 return True 128 return False
129
130 -class DailyTask (Task):
131 132 single_use = False 133
134 - def __repr__(self):
135 return """DailyTask : %s (%d:%d)""" % (self.cmd, self.hour, self.min)
136
137 - def check_time(self):
138 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5]) 139 if self.exec_date is None: 140 # date de la première exécution 141 self.exec_date = datetime.datetime(now.year, now.month, now.day, self.hour, self.min) 142 if now >= self.exec_date: 143 # date de la prochaine exécution (+1 jour) 144 delta = datetime.timedelta(days=1) 145 self.exec_date = self.exec_date + delta 146 # on a atteint (ou dépassé) la date d'exécution 147 return True 148 return False
149
150 -class LoopingTask (Task):
151 152 single_use = False 153
154 - def __repr__(self):
155 return """LoopingTask : %s (%d min)""" % (self.cmd, self.day*24*60 + self.hour*60 + self.min)
156
157 - def check_time(self):
158 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5]) 159 if self.exec_date is None: 160 # date de la première exécution 161 self.exec_date = now 162 if now >= self.exec_date: 163 # date de la prochaine exécution (+1 jour) 164 delta = datetime.timedelta(days=self.day or 0, hours=self.hour or 0, minutes=self.min or 0) 165 self.exec_date = now + delta 166 # on a atteint (ou dépassé) la date d'exécution 167 return True 168 return False
169
170 -class SingleTask (Task):
171 172 single_use = True 173
174 - def __repr__(self):
175 return """SingleTask : %s (%d %s - %d:%d)""" % (self.cmd, self.day, calendar.month_name[self.month], self.hour, self.min)
176
177 - def __init__(self, id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date=None):
178 """Pour ce type de tâche, la date d'exécution est 179 calculée une seule fois à l'initialisation 180 (évite les problèmes en cas de changement d'année) 181 """ 182 self.id_task = id_task 183 self.name = name 184 self.cmd = cmd 185 self.month = month 186 self.day = day 187 self.hour = hour 188 self.min = min 189 self.week_day = week_day 190 self.periodicity = periodicity 191 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5]) 192 if exec_date is None: 193 exec_date = datetime.datetime(now.year, month, day, hour, min) 194 if exec_date < now: 195 exec_date = datetime.datetime(now.year + 1, month, day, hour, min) 196 self.exec_date = exec_date 197 self.update_date()
198
199 - def check_time(self):
200 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5]) 201 if now >= self.exec_date: 202 # on a atteint (ou dépassé) la date d'exécution 203 return True 204 return False
205
206 -class TaskScheduler:
207 """Classe utilitaire pour permettre une gestion des taches uucp à la mode 'crontab' 208 """ 209
210 - def __init__(self, s_pool):
211 self.s_pool = s_pool 212 self.loop = task.LoopingCall(self.check_tasks) 213 self.tasks = {} 214 self.targets = {} 215 self.running=False 216 self.period_to_task = {0:SingleTask, 217 1:LoopingTask, 218 2:DailyTask, 219 3:WeeklyTask, 220 4:MonthlyTask}
221
222 - def start(self):
223 """démarre la boucle du scheduler 224 """ 225 self.load_tasks() 226 log.msg("Scheduler mainloop starting (delay : %d)" % config.SCHEDULER_DELAY) 227 self.loop.start(config.SCHEDULER_DELAY,now=False)
228
229 - def get_tasks(self, id_res=None, type_res='serveur', all=False):
230 """retourne la liste des tâches programmées pour un ou plusieurs serveur(s) 231 all : dans le cas d'un serveur, retourne aussi les taĉhes associées à ses groupes 232 """ 233 cu = cx_pool.create() 234 # recherche des tâches associées si ressource spécifiée 235 tasks = [] 236 query = """select id_res, type_res, id_task from task_targets""" 237 cu.execute(query) 238 targets = cu.fetchall() 239 cx_pool.close(cu) 240 if id_res is not None: 241 # récupération des associations tâche/ressource 242 groupes = [] 243 id_res = int(id_res) 244 # tâches spécifiques à la ressource 245 for res, type, id_task in targets: 246 if type == type_res and res == id_res and id_task not in tasks: 247 tasks.append(id_task) 248 if type_res == 'serveur' and all: 249 # serveur : on recherche les groupes possibles 250 for gr_id, groupe in self.s_pool._groupes_serveurs.items(): 251 if id_res in groupe[1]: 252 groupes.append(gr_id) 253 for res, type, id_task in targets: 254 if type == 'groupe' and res in groupes and id_task not in tasks: 255 tasks.append(id_task) 256 else: 257 tasks = self.tasks.keys() 258 # envoi de la liste des taches 259 res = [] 260 for id_task in tasks: 261 res.append(self.tasks[id_task]) 262 # tri du résultat par periodicité et id de tâche 263 res.sort() 264 # return [(task.id_task, task.name, task.cmd, task.month, task.day, task.hour, task.min, task.week_day, task.exec_date) for task in res] 265 return res
266
267 - def load_tasks(self):
268 """Charge les tâches définies dans la base de données""" 269 self.tasks = {} 270 self.targets = {} 271 query = """select id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date from tasks""" 272 cu = cx_pool.create() 273 cu.execute(query) 274 # récupération de l'id de la tâche ajoutée 275 tasks = cu.fetchall() 276 # instanciation des tâches 277 for task in tasks: 278 id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date = task 279 self.init_task(id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date) 280 # chargement des associations serveur(groupe) / tâche 281 query = """select id_res, type_res, id_task from task_targets""" 282 cu.execute(query) 283 targets = cu.fetchall() 284 cx_pool.close(cu) 285 for target in targets: 286 id_res, type_res, id_task = target 287 if not id_task in self.targets: 288 self.targets[id_task] = {} 289 liste_res = self.targets[id_task].get(type_res,[]) 290 liste_res.append(id_res) 291 self.targets[id_task][type_res] = liste_res
292
293 - def init_task(self, id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date=None):
294 """initialise un objet tâche dans le gestionnaire 295 """ 296 # par défaut ou si periodicité = 0 : tâche unique 297 task_class = self.period_to_task.get(periodicity, SingleTask) 298 new_t = task_class(id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date) 299 self.tasks[id_task] = new_t
300
301 - def add_task(self, name, cmd, month, day, hour, min, week_day, periodicity):
302 """programme une tâche sur un/plusieur(s) serveurs 303 periodicity : 0 = tâche unique 304 1 = tâche répétitive (délai : day, hour, min) -> lancée au début de la boucle 305 2 = tâche journalière 306 3 = tâche hebdomadaire 307 4 = tâche mensuelle 308 """ 309 # ajout de la tâche en base 310 cu = cx_pool.create() 311 try: 312 query = """insert into tasks (name, cmd, month, day, hour, min, week_day, periodicity) values (E%s, E%s, %s, %s, %s, %s, %s, %s)""" 313 params = (name, cmd, int(month), int(day), int(hour), int(min), int(week_day), int(periodicity)) 314 cu.execute(query, params) 315 # récupération de l'id de la tâche ajoutée 316 query = """select id_task from tasks where name=%s and cmd=%s and """ 317 params = [name, cmd] 318 for arg in [('month',month), ('day',day), ('hour',hour), ('min',min), ('week_day',week_day)]: 319 if arg[1] in [None, 'null']: 320 query += "%s is null and " % arg[0] 321 else: 322 query += arg[0] + "=%s and " 323 params.append(int(arg[1])) 324 query += """periodicity=%s order by id_task desc""" 325 params.append(int(periodicity)) 326 cu.execute(query, params) 327 id_task = int(cu.fetchone()[0]) 328 cx_pool.commit(cu) 329 except: 330 traceback.print_exc() 331 cx_pool.rollback(cu) 332 self.init_task(id_task, name, cmd, month, day, hour, min, week_day, periodicity) 333 return id_task
334
335 - def assign_task(self, cred_user, id_task, id_res, type_res):
336 """attribue une tâche à un(des) serveur(s) 337 @cred_user : utilisateur ayant demandé l'association: permet de vérifier les droits d'accès aux ressources 338 @id_task : numéro de tâche à associer à la ressource 339 @id_res : identifiant de la ressource à associer (serveur ou groupe) 340 @type_res : type de la ressource (serveur par défaut) 341 """ 342 try: 343 id_task = int(id_task) 344 id_res = int(id_res) 345 except: 346 return 0, """Identifiant de ressource de type incorrect""" 347 # association de la tâche aux serveurs 348 if id_task not in self.tasks: 349 return 0, """identifiant de tâche inconnu""" 350 try: 351 if type_res == 'groupe': 352 gr = self.s_pool.get_groupes(cred_user, id_res) 353 elif type_res == 'serveur': 354 serv = self.s_pool.get(cred_user, id_res) 355 else: 356 return 0, """type de ressource non reconnu (utiliser 'groupe' ou 'serveur')""" 357 except KeyError, ResourceAuthError: 358 return 0, """Permissions insuffisantes ou %s inexistant""" % type_res 359 if id_task not in self.targets: 360 self.targets[id_task] = {} 361 task_res = self.targets[id_task].get(type_res,[]) 362 if id_res in task_res: 363 return 0, """Tâche déjà assignée à cette ressource""" 364 # insertion dans la base 365 cu = cx_pool.create() 366 query = """insert into task_targets (id_res, type_res, id_task) values (%s, %s, %s)""" 367 params = (int(id_res), type_res, int(id_task)) 368 try: 369 cu.execute(query, params) 370 cx_pool.commit(cu) 371 except: 372 traceback.print_exc() 373 cx_pool.rollback(cu) 374 return 0, """Erreur de mise à jour de la base de données""" 375 # mise à jour en mémoire 376 task_res.append(id_res) 377 self.targets[id_task][type_res] = task_res 378 return 1, "OK"
379
380 - def unassign_task(self, cred_user, id_task, id_res, type_res):
381 """Supprime une association tâche/serveur(ou groupe) 382 """ 383 try: 384 id_task = int(id_task) 385 id_res = int(id_res) 386 except: 387 return 0, """Identifiant de ressource de type incorrect""" 388 # vérification des droits d'accès au serveur ou groupe 389 if id_task not in self.tasks: 390 return 0, """identifiant de tâche inconnu""" 391 try: 392 if type_res == 'groupe': 393 gr = self.s_pool.get_groupes(cred_user, id_res) 394 elif type_res == 'serveur': 395 serv = self.s_pool.get(cred_user, id_res) 396 else: 397 return 0, """type de ressource non reconnu (utiliser 'groupe' ou 'serveur')""" 398 except KeyError, ResourceAuthError: 399 return 0, """Permissions insuffisantes ou %s inexistant""" % type_res 400 assoc_ok = False 401 if id_task in self.targets: 402 task_res = self.targets[id_task].get(type_res,[]) 403 if id_res in task_res: 404 assoc_ok = True 405 if not assoc_ok: 406 return 0, """Tâche non assignée à cette ressource""" 407 # suppression dans la base 408 cu = cx_pool.create() 409 query = """delete from task_targets where id_res=%s and type_res=%s and id_task=%s""" 410 params = (int(id_res), type_res, int(id_task)) 411 try: 412 cu.execute(query, params) 413 cx_pool.commit(cu) 414 except: 415 traceback.print_exc() 416 cx_pool.rollback(cu) 417 return 0, """Erreur de mise à jour de la base de données""" 418 # mise à jour en mémoire 419 self.targets[id_task][type_res].remove(id_res) 420 return 1, "OK"
421
422 - def del_task(self, id_task):
423 """supprime une tâche programmée. 424 si serveurs est spécifié, la tâche sera supprimée seulement sur ce(s) serveur(s) 425 """ 426 cu = cx_pool.create() 427 try: 428 query = """delete from tasks where id_task=%d""" 429 del(self.tasks[int(id_task)]) 430 cu.execute(query, (int(id_task),)) 431 cx_pool.commit(cu) 432 return True 433 except: 434 traceback.print_exc() 435 cx_pool.rollback(cu) 436 return False
437
438 - def check_tasks(self, serveur=None):
439 """met en place les tâches a exécuter pour l'itération en cours 440 """ 441 for id_task, task in self.tasks.items(): 442 # on ne vérifie que les tâches associées à des serveurs / groupes 443 if id_task in self.targets: 444 if task.check_time(): 445 serveurs = [] 446 # recherche des serveurs affectés 447 for type_res in self.targets.get(id_task,[]): 448 for res in self.targets[id_task][type_res]: 449 # récupération de l'identifiant des serveurs suivant le type de ressource 450 if type_res == 'groupe': 451 gr_serv = self.s_pool._groupes_serveurs[res][1] 452 for serv in gr_serv: 453 if serv not in serveurs: serveurs.append(serv) 454 else: 455 serveurs.append(res) 456 # exécution de la tâche 457 task.run(serveurs) 458 # la tâche a été mise en place 459 if task.single_use == True: 460 # tâche unique : on la supprime après exécution 461 self.del_task(id_task) 462 # après un premier passage de la boucle, self.running passe à True 463 # (permet de démarrer les tâches basées sur un délai) 464 if not self.running: 465 self.running = True
466