1
2
3
4
5
6
7
8
9
10
11
12
13 from zephir.backend import config
14 from zephir.backend.config import log
15 from zephir.backend.lib_backend import cx_pool
16
17 from twisted.internet import task
18 import datetime, calendar, traceback
19
21 """Classe représentant une tâche à effectuer à un moment donné
22
23 sous classes définies:
24
25 WeekdayTask (weekdays, hour, min)
26 MonthdayTask (monthdays, hour, min)
27 DayTask(hour, min)
28 IntervalTask (interval)
29 SingleTask (month, day, hour, min)
30 """
31
32 - def __init__(self, id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date=None):
33 self.id_task = id_task
34 self.name = name
35 self.cmd = cmd
36 self.month = month
37 self.day = day
38 self.min = min
39 self.hour = hour
40 self.week_day = week_day
41 self.periodicity = periodicity
42 self.exec_date = exec_date
43
45 if self.__class__.__name__ == b.__class__.__name__:
46 return self.id_task.__cmp__(b.id_task)
47 else:
48 return self.periodicity.__cmp__(b.periodicity)
49
51 return "%d (%s) -> %s" % (self.id_task, self.name, self.__repr__())
52
54
55 cu = cx_pool.create()
56 try:
57 query = """update tasks set exec_date=%s where id_task=%s"""
58 params = (self.exec_date.strftime('%c'),self.id_task)
59 cu.execute(query, params)
60 cx_pool.commit(cu)
61 except:
62 import traceback
63 traceback.print_exc()
64 cx_pool.rollback(cu)
65
67 """méthode à redéfinir dans les classes filles
68 doit retourner True si il est temps d'exécuter la tâche, False sinon.
69 Dans le cas ou True est retourné, l'attribut exec_date doit être mis à jour pour
70 refléter la prochaine date d'exécution.
71 """
72 pass
73
74 - def run(self, serveurs):
75 """met en place l'appel de la tache pour tous les serveurs concernés"""
76
77 log.msg("tâche %d (%s) : execution de '%s' sur les serveurs %s" % (self.id_task, self.name, self.cmd, ','.join([str(serv) for serv in serveurs])))
78
79
80 if not self.single_use:
81 self.update_date()
82
84
85 single_use = False
86
88 return """WeeklyTask : %s (%s - %d:%d)""" % (self.cmd, calendar.day_name[self.week_day], self.hour, self.min)
89
91 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5])
92 if self.exec_date is None:
93 if now.isoweekday == self.week_day:
94
95 self.exec_date = datetime.datetime(now.year, now.month, now.day, self.hour, self.min)
96 else:
97 return False
98 if now >= self.exec_date:
99
100 delta = datetime.timedelta(days=7)
101 self.exec_date = self.exec_date + delta
102
103 return True
104 return False
105
107
108 single_use = False
109
111 return """MonthlyTask : %s (%s - %d:%d)""" % (self.cmd, self.day, self.hour, self.min)
112
114 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5])
115 if self.exec_date is None:
116 if now.day == self.day:
117
118 self.exec_date = datetime.datetime(now.year, now.month, self.day, self.hour, self.min)
119 else:
120 return False
121 if now >= self.exec_date:
122
123 month_days = calendar.month_range(now.year,now.month)[1]
124 delta = datetime.timedelta(days=month_days)
125 self.exec_date = self.exec_date + delta
126
127 return True
128 return False
129
131
132 single_use = False
133
135 return """DailyTask : %s (%d:%d)""" % (self.cmd, self.hour, self.min)
136
138 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5])
139 if self.exec_date is None:
140
141 self.exec_date = datetime.datetime(now.year, now.month, now.day, self.hour, self.min)
142 if now >= self.exec_date:
143
144 delta = datetime.timedelta(days=1)
145 self.exec_date = self.exec_date + delta
146
147 return True
148 return False
149
151
152 single_use = False
153
155 return """LoopingTask : %s (%d min)""" % (self.cmd, self.day*24*60 + self.hour*60 + self.min)
156
158 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5])
159 if self.exec_date is None:
160
161 self.exec_date = now
162 if now >= self.exec_date:
163
164 delta = datetime.timedelta(days=self.day or 0, hours=self.hour or 0, minutes=self.min or 0)
165 self.exec_date = now + delta
166
167 return True
168 return False
169
171
172 single_use = True
173
175 return """SingleTask : %s (%d %s - %d:%d)""" % (self.cmd, self.day, calendar.month_name[self.month], self.hour, self.min)
176
177 - def __init__(self, id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date=None):
178 """Pour ce type de tâche, la date d'exécution est
179 calculée une seule fois à l'initialisation
180 (évite les problèmes en cas de changement d'année)
181 """
182 self.id_task = id_task
183 self.name = name
184 self.cmd = cmd
185 self.month = month
186 self.day = day
187 self.hour = hour
188 self.min = min
189 self.week_day = week_day
190 self.periodicity = periodicity
191 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5])
192 if exec_date is None:
193 exec_date = datetime.datetime(now.year, month, day, hour, min)
194 if exec_date < now:
195 exec_date = datetime.datetime(now.year + 1, month, day, hour, min)
196 self.exec_date = exec_date
197 self.update_date()
198
200 now = datetime.datetime(*datetime.datetime.now().timetuple()[:5])
201 if now >= self.exec_date:
202
203 return True
204 return False
205
207 """Classe utilitaire pour permettre une gestion des taches uucp à la mode 'crontab'
208 """
209
221
228
229 - def get_tasks(self, id_res=None, type_res='serveur', all=False):
230 """retourne la liste des tâches programmées pour un ou plusieurs serveur(s)
231 all : dans le cas d'un serveur, retourne aussi les taĉhes associées à ses groupes
232 """
233 cu = cx_pool.create()
234
235 tasks = []
236 query = """select id_res, type_res, id_task from task_targets"""
237 cu.execute(query)
238 targets = cu.fetchall()
239 cx_pool.close(cu)
240 if id_res is not None:
241
242 groupes = []
243 id_res = int(id_res)
244
245 for res, type, id_task in targets:
246 if type == type_res and res == id_res and id_task not in tasks:
247 tasks.append(id_task)
248 if type_res == 'serveur' and all:
249
250 for gr_id, groupe in self.s_pool._groupes_serveurs.items():
251 if id_res in groupe[1]:
252 groupes.append(gr_id)
253 for res, type, id_task in targets:
254 if type == 'groupe' and res in groupes and id_task not in tasks:
255 tasks.append(id_task)
256 else:
257 tasks = self.tasks.keys()
258
259 res = []
260 for id_task in tasks:
261 res.append(self.tasks[id_task])
262
263 res.sort()
264
265 return res
266
268 """Charge les tâches définies dans la base de données"""
269 self.tasks = {}
270 self.targets = {}
271 query = """select id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date from tasks"""
272 cu = cx_pool.create()
273 cu.execute(query)
274
275 tasks = cu.fetchall()
276
277 for task in tasks:
278 id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date = task
279 self.init_task(id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date)
280
281 query = """select id_res, type_res, id_task from task_targets"""
282 cu.execute(query)
283 targets = cu.fetchall()
284 cx_pool.close(cu)
285 for target in targets:
286 id_res, type_res, id_task = target
287 if not id_task in self.targets:
288 self.targets[id_task] = {}
289 liste_res = self.targets[id_task].get(type_res,[])
290 liste_res.append(id_res)
291 self.targets[id_task][type_res] = liste_res
292
293 - def init_task(self, id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date=None):
294 """initialise un objet tâche dans le gestionnaire
295 """
296
297 task_class = self.period_to_task.get(periodicity, SingleTask)
298 new_t = task_class(id_task, name, cmd, month, day, hour, min, week_day, periodicity, exec_date)
299 self.tasks[id_task] = new_t
300
301 - def add_task(self, name, cmd, month, day, hour, min, week_day, periodicity):
302 """programme une tâche sur un/plusieur(s) serveurs
303 periodicity : 0 = tâche unique
304 1 = tâche répétitive (délai : day, hour, min) -> lancée au début de la boucle
305 2 = tâche journalière
306 3 = tâche hebdomadaire
307 4 = tâche mensuelle
308 """
309
310 cu = cx_pool.create()
311 try:
312 query = """insert into tasks (name, cmd, month, day, hour, min, week_day, periodicity) values (E%s, E%s, %s, %s, %s, %s, %s, %s)"""
313 params = (name, cmd, int(month), int(day), int(hour), int(min), int(week_day), int(periodicity))
314 cu.execute(query, params)
315
316 query = """select id_task from tasks where name=%s and cmd=%s and """
317 params = [name, cmd]
318 for arg in [('month',month), ('day',day), ('hour',hour), ('min',min), ('week_day',week_day)]:
319 if arg[1] in [None, 'null']:
320 query += "%s is null and " % arg[0]
321 else:
322 query += arg[0] + "=%s and "
323 params.append(int(arg[1]))
324 query += """periodicity=%s order by id_task desc"""
325 params.append(int(periodicity))
326 cu.execute(query, params)
327 id_task = int(cu.fetchone()[0])
328 cx_pool.commit(cu)
329 except:
330 traceback.print_exc()
331 cx_pool.rollback(cu)
332 self.init_task(id_task, name, cmd, month, day, hour, min, week_day, periodicity)
333 return id_task
334
335 - def assign_task(self, cred_user, id_task, id_res, type_res):
336 """attribue une tâche à un(des) serveur(s)
337 @cred_user : utilisateur ayant demandé l'association: permet de vérifier les droits d'accès aux ressources
338 @id_task : numéro de tâche à associer à la ressource
339 @id_res : identifiant de la ressource à associer (serveur ou groupe)
340 @type_res : type de la ressource (serveur par défaut)
341 """
342 try:
343 id_task = int(id_task)
344 id_res = int(id_res)
345 except:
346 return 0, """Identifiant de ressource de type incorrect"""
347
348 if id_task not in self.tasks:
349 return 0, """identifiant de tâche inconnu"""
350 try:
351 if type_res == 'groupe':
352 gr = self.s_pool.get_groupes(cred_user, id_res)
353 elif type_res == 'serveur':
354 serv = self.s_pool.get(cred_user, id_res)
355 else:
356 return 0, """type de ressource non reconnu (utiliser 'groupe' ou 'serveur')"""
357 except KeyError, ResourceAuthError:
358 return 0, """Permissions insuffisantes ou %s inexistant""" % type_res
359 if id_task not in self.targets:
360 self.targets[id_task] = {}
361 task_res = self.targets[id_task].get(type_res,[])
362 if id_res in task_res:
363 return 0, """Tâche déjà assignée à cette ressource"""
364
365 cu = cx_pool.create()
366 query = """insert into task_targets (id_res, type_res, id_task) values (%s, %s, %s)"""
367 params = (int(id_res), type_res, int(id_task))
368 try:
369 cu.execute(query, params)
370 cx_pool.commit(cu)
371 except:
372 traceback.print_exc()
373 cx_pool.rollback(cu)
374 return 0, """Erreur de mise à jour de la base de données"""
375
376 task_res.append(id_res)
377 self.targets[id_task][type_res] = task_res
378 return 1, "OK"
379
381 """Supprime une association tâche/serveur(ou groupe)
382 """
383 try:
384 id_task = int(id_task)
385 id_res = int(id_res)
386 except:
387 return 0, """Identifiant de ressource de type incorrect"""
388
389 if id_task not in self.tasks:
390 return 0, """identifiant de tâche inconnu"""
391 try:
392 if type_res == 'groupe':
393 gr = self.s_pool.get_groupes(cred_user, id_res)
394 elif type_res == 'serveur':
395 serv = self.s_pool.get(cred_user, id_res)
396 else:
397 return 0, """type de ressource non reconnu (utiliser 'groupe' ou 'serveur')"""
398 except KeyError, ResourceAuthError:
399 return 0, """Permissions insuffisantes ou %s inexistant""" % type_res
400 assoc_ok = False
401 if id_task in self.targets:
402 task_res = self.targets[id_task].get(type_res,[])
403 if id_res in task_res:
404 assoc_ok = True
405 if not assoc_ok:
406 return 0, """Tâche non assignée à cette ressource"""
407
408 cu = cx_pool.create()
409 query = """delete from task_targets where id_res=%s and type_res=%s and id_task=%s"""
410 params = (int(id_res), type_res, int(id_task))
411 try:
412 cu.execute(query, params)
413 cx_pool.commit(cu)
414 except:
415 traceback.print_exc()
416 cx_pool.rollback(cu)
417 return 0, """Erreur de mise à jour de la base de données"""
418
419 self.targets[id_task][type_res].remove(id_res)
420 return 1, "OK"
421
423 """supprime une tâche programmée.
424 si serveurs est spécifié, la tâche sera supprimée seulement sur ce(s) serveur(s)
425 """
426 cu = cx_pool.create()
427 try:
428 query = """delete from tasks where id_task=%d"""
429 del(self.tasks[int(id_task)])
430 cu.execute(query, (int(id_task),))
431 cx_pool.commit(cu)
432 return True
433 except:
434 traceback.print_exc()
435 cx_pool.rollback(cu)
436 return False
437
439 """met en place les tâches a exécuter pour l'itération en cours
440 """
441 for id_task, task in self.tasks.items():
442
443 if id_task in self.targets:
444 if task.check_time():
445 serveurs = []
446
447 for type_res in self.targets.get(id_task,[]):
448 for res in self.targets[id_task][type_res]:
449
450 if type_res == 'groupe':
451 gr_serv = self.s_pool._groupes_serveurs[res][1]
452 for serv in gr_serv:
453 if serv not in serveurs: serveurs.append(serv)
454 else:
455 serveurs.append(res)
456
457 task.run(serveurs)
458
459 if task.single_use == True:
460
461 self.del_task(id_task)
462
463
464 if not self.running:
465 self.running = True
466