Package zephir :: Package monitor :: Package agentmanager :: Module agent
[frames] | no frames]

Source Code for Module zephir.monitor.agentmanager.agent

  1  # -*- coding: UTF-8 -*- 
  2  ########################################################################### 
  3  # Eole NG - 2007 
  4  # Copyright Pole de Competence Eole  (Ministere Education - Academie Dijon) 
  5  # Licence CeCill  cf /root/LicenceEole.txt 
  6  # eole@ac-dijon.fr 
  7  ########################################################################### 
  8   
  9  try: _ # localized string fetch function 
 10  except NameError: _ = str 
 11   
 12  import os, traceback 
 13  from datetime import timedelta 
 14  from ConfigParser import ConfigParser 
 15  from twisted.internet import defer 
 16  from twisted.web.microdom import * 
 17  # from twisted.persisted import marmalade 
 18  from twisted.persisted import aot 
 19   
 20  from zephir.monitor.agentmanager import config as cfg, rrd, status, util 
 21  from zephir.monitor.agentmanager.data import * 
 22  from zephir.monitor.agentmanager.util import log 
 23  from zephir.lib_zephir import log as zephir_log, is_locked 
 24   
 25  action_map = {0:'action_unknown',1:'action_ok',2:'action_warn',3:'action_error',4:'action_dependant'} 
 26   
27 -class AgentData:
28 """Persistance et accès aux données d'un agent. 29 30 Cette classe contient la « charge utile » d'un agent ; avant envoi 31 au serveur Zephir tous les agents sont convertis en une instance 32 de C{L{AgentData}}. 33 34 Attributs : 35 36 - C{name} : nom de l'agent 37 38 - C{period} : intervalle de mesures en secondes ; C{period=0} 39 désactive les mesures automatiques. 40 41 - C{description} : description de cette instance d'agent, 42 apparaissant sur la page de l'agent. 43 44 - C{max_status} : L{status<zephir.monitor.agentmanager.status>} le plus grave ; réinitialisé à chaque 45 envoi au serveur Zephir ou sur demande via XML-RPC. 46 47 - C{last_measure_date} : date de la dernière mesure. 48 49 - C{data} : liste d'objets représentant les données associées à 50 l'agent (cf. C{L{zephir.monitor.agentmanager.data}}). 51 """ 52
53 - def __init__(self, name, period, description, section, 54 max_status, max_status_date, last_status, last_measure_date, data, measure_data={}):
55 self.name = name 56 self.period = period 57 self.description = description 58 self.section = section 59 self.max_status = max_status 60 self.last_status = last_status 61 self.max_status_date = max_status_date 62 self.last_measure_date = last_measure_date 63 self.data = data 64 self.measure_data = measure_data
65 66
67 - def from_agent(self, agent):
68 """I{Factory Method} 69 70 @param agent: un agent concret 71 72 @return: une copie de C{agent} ne contenant plus que les 73 données utiles au serveur Zephir (instance de C{AgentData} 74 """ 75 result = self(agent.name, agent.period, agent.description, agent.section, 76 agent.max_status, agent.max_status_date, agent.last_status, agent.last_measure_date, agent.data, agent.measure_data) 77 return result
78 from_agent = classmethod(from_agent) 79 80
81 - def from_archive(self, archive_dir):
82 """I{Factory Method} 83 84 @param archive_dir: le chemin d'un répertoire contenant les 85 données d'un agent 86 87 @return: une instance de C{AgentData} chargée depuis le 88 système de fichiers 89 """ 90 xml_filename = os.path.join(archive_dir, 'agent.xml') 91 xml_file = file(xml_filename, 'r') 92 try: 93 # passage de persisted.marmalade à persisted.aot 94 try: 95 result = aot.unjellyFromSource(xml_file) 96 except: 97 xml_file.seek(0) 98 from twisted.persisted import marmalade 99 result = marmalade.unjellyFromXML(xml_file) 100 except Exception, e: 101 error_agent = LoadErrorAgent( 102 os.path.basename(archive_dir), 103 title = _("Loading archive %s failed") % archive_dir, 104 message = _("Loading agent from archive %s failed:\n%s") % (archive_dir, e)) 105 result = AgentData.from_agent(error_agent) 106 xml_file.close() 107 return result
108 from_archive = classmethod(from_archive) 109 110
111 - def archive(self, archive_dir):
112 """Sérialise l'agent sur disque, cf. L{from_archive}""" 113 xml_filename = os.path.join(archive_dir, 'agent.xml') 114 xml_file = file(xml_filename, 'w') 115 # écriture avec aot (marmalade deprecated) 116 aot.jellyToSource(self, xml_file) 117 # marmalade.jellyToXML(self, xml_file) 118 xml_file.close()
119
120 - def ensure_data_uptodate(self):
121 """Met à jour les données de l'agent sur disque""" 122 pass
123 124 125 126 127 STATUS_GRAPH_OPTIONS = [ 128 "-send-7days", "-l0", "-u1", "-g", 129 "-w112", "-h10", "-xHOUR:6:DAY:1:DAY:1:0:%d", 130 "CDEF:unknown=status,0,EQ", 131 "CDEF:ok=status,1,EQ", 132 "CDEF:warn=status,2,EQ", 133 "CDEF:error=status,3,EQ", 134 "AREA:unknown#666666", 135 "AREA:ok#33BB33", 136 "AREA:warn#CC6600", 137 "AREA:error#BB3333", 138 ] 139 140 STATUS_GRAPH_OPTIONS_MONTHLY = [ 141 "-send-1month", "-l0", "-u1", "-g", 142 "-w300", "-h10", "-xDAY:1:WEEK:1:DAY:7:0:%d", 143 "CDEF:unknown=status,0,EQ", 144 "CDEF:ok=status,1,EQ", 145 "CDEF:warn=status,2,EQ", 146 "CDEF:error=status,3,EQ", 147 "AREA:unknown#666666", 148 "AREA:ok#33BB33", 149 "AREA:warn#CC6600", 150 "AREA:error#BB3333", 151 ] 152
153 -def no_action(agent, old_status, new_status):
154 log.msg('%s : status changed to %s' % (agent.name, new_status))
155
156 -class Agent(AgentData):
157 """Classe abstraite des agents. 158 159 Un agent concret est une sous-classe d'C{L{Agent}} implémentant 160 (en particulier) la méthode C{L{measure()}}. 161 """ 162
163 - def __init__(self, name, 164 period = 60, 165 fields = ['value'], 166 description = None, 167 section = None, 168 modules = None, 169 requires = [], 170 **params):
171 if description is None: 172 description = self.__class__.__doc__ 173 AgentData.__init__(self, name, period, description, section, 174 status.Unknown(), "", status.Unknown(), "", [], {}) 175 self.fields = fields 176 self.data_needs_update = False 177 # those will be set by loader, eg UpdaterService 178 self.archive_dir = None 179 self.status_rrd = None 180 self.manager = None 181 # dépendances fonctionnelles 182 self.requires = requires 183 self.modules = modules 184 self.last_measure = None
185
186 - def init_data(self, archive_dir):
187 """Mémorise et initialise le répertoire d'archivage 188 189 Cette méthode sera appelée par le framework après chargement 190 de l'agent, afin de terminer les initialisations pour 191 lesquelles l'agent a besoin de connaître l'emplacement de ses 192 données sur disque. 193 """ 194 self.archive_dir = archive_dir 195 self.ensure_datadirs() 196 if self.period != 0: # period = 0 => no agent measures scheduled 197 status_period, xff = self.period, 0.75 198 else: 199 status_period, xff = 60, 1 200 201 # on récupère le dernier problème et sa date dans l'archive si elle est présente 202 xml_file = os.path.join(self.archive_dir, 'agent.xml') 203 if os.path.exists(xml_file): 204 try: 205 a = AgentData("temp", 60, "agent temporaire",status.Unknown(), "", status.Unknown(), "", [], {}) 206 a = a.from_archive(self.archive_dir) 207 self.max_status = a.max_status 208 self.max_status_date = a.max_status_date 209 del a 210 except: 211 pass 212 213 statusname = os.path.join(self.archive_dir, 'status') 214 self.status_rrd = rrd.Database(statusname + '.rrd', 215 step = status_period) 216 self.status_rrd.new_datasource( # status history 217 name = "status", 218 min_bound = 0, max_bound = len(status.STATUS_ORDER)) 219 if status_period > 3600: 220 # 1 record per week on 1 year 221 self.status_rrd.new_archive( 222 rows = 52, steps = (3600*24*7) / status_period, 223 consolidation = 'MAX', xfiles_factor = 0.75) 224 else: 225 # 1 record per 12 hours on 1 year 226 self.status_rrd.new_archive( 227 rows = 730, steps = (3600 * 12) / status_period, 228 consolidation = 'MAX', xfiles_factor = 0.75) 229 # 1 record per 2hour/pixel on 30 days 230 self.status_rrd.new_archive( 231 rows = 12*30, steps = 7200 / status_period, 232 consolidation = 'MAX', xfiles_factor = 0.75) 233 # 1 record per hour/pixel on 1 week 234 self.status_rrd.new_archive( 235 rows = 24*7, steps = 3600 / status_period, 236 consolidation = 'MAX', xfiles_factor = 0.75) 237 # 1 record per period on 1 day 238 self.status_rrd.new_archive( 239 rows = 24*3600/status_period, steps = 1, 240 consolidation = 'MAX', xfiles_factor = 0.75) 241 self.status_rrd.new_graph(pngname = statusname + '.png', 242 vnamedefs = { "status": ("status", 'MAX') }, 243 options = STATUS_GRAPH_OPTIONS) 244 self.status_rrd.new_graph(pngname = statusname + '_long.png', 245 vnamedefs = { "status": ("status", 'MAX') }, 246 options = STATUS_GRAPH_OPTIONS_MONTHLY) 247 # vérification ajoutée pour recréer les anciennes archives (1 semaine de données seulement) 248 try: 249 rrd_info = self.status_rrd.info() 250 except: 251 # fichier rrd illisible, on le supprime 252 os.unlink(statusname + '.rrd') 253 if rrd_info: 254 if len(rrd_info['rra']) == 2: 255 # suppression du fichier 256 os.unlink(statusname + '.rrd') 257 self.status_rrd.create()
258 259 # Méthodes à spécialiser dans les agents concrets 260
261 - def measure(self):
262 """Prend concrètement une mesure. 263 264 Pour implémenter un agent, il faut implémenter au moins cette 265 méthode. 266 267 @return: Résultat de la mesure, un dictionnaire C{{champ: 268 valeur}} ou un objet C{L{twisted.internet.defer.Deferred}} 269 renvoyant ce dictionnaire. 270 """ 271 raise NotImplementedError
272
273 - def check_status(self):
274 """Renvoie le diagnostic de fonctionnement de l'agent. 275 276 L'implémentation par défaut dans C{L{Agent}} renvoie un statut 277 neutre. Les agents concrets doivent donc redéfinir cette 278 méthode pour annoncer un diagnostic utile. 279 """ 280 log.msg(_("Warning: agent class %s does not redefine method check_status()") 281 % self.__class__.__name__) 282 return status.Unknown()
283
284 - def update_status(self):
285 new_status = None 286 # si possible, on vérifie le dernier état des agents dont on dépend 287 if self.manager is not None: 288 for agent in self.requires: 289 try: 290 if self.manager.agents[agent].last_status == status.Error(): 291 new_status = status.Dependant() 292 except: 293 # agent non chargé ou état inconnu 294 pass 295 if new_status is None: 296 new_status = self.check_status() 297 # mise à jour de l'état 298 self.set_status(new_status)
299
300 - def set_status(self, s, reset = False):
301 """Mémorise le statut et met à jour C{statut_max} 302 303 @param s: statut actuel 304 @param reset: réinitialise C{max_status} à C{s} si C{reset==True} 305 """ 306 # Si on détecte un nouveau problème, on met à jour max_status et max_status_date 307 if s > self.last_status and s not in [status.OK(),status.Dependant()]: 308 self.max_status = s 309 self.max_status_date = self.last_measure_date 310 if self.last_status != s: 311 # l'état a changé, on execute éventuellement des actions 312 self.take_action(self.last_status, s) 313 self.last_status = s
314
315 - def reset_max_status(self):
316 """Réinitialise C{max_status} à la valeur courante du status 317 """ 318 self.set_status(self.check_status(), reset = True)
319 320 321 # Gestion des mesures
322 - def scheduled_measure(self):
323 """Déclenche une mesure programmée. 324 325 Prend une mesure et mémorise le résultat et l'heure. 326 """ 327 now = util.utcnow() 328 try: 329 m = self.measure() 330 except Exception, e: 331 print "erreur mesure (%s)" % self.name 332 traceback.print_exc() 333 self.handle_measure_exception(e) 334 else: 335 if isinstance(m, defer.Deferred): 336 m.addCallbacks( 337 lambda m: self.save_measure(Measure(now, m)), 338 lambda f: self.handle_measure_exception(f.trap(Exception))) 339 else: 340 self.save_measure(Measure(now, m))
341 342
343 - def save_measure(self, measure):
344 """Mémorise une mesure donnée. 345 346 Méthode à redéfinir dans les sous-classes concrètes de C{L{Agent}}. 347 (callback de succès pour C{L{scheduled_measure()}}) 348 """ 349 self.last_measure_date = measure.get_strdate() 350 self.data_needs_update = True 351 self.last_measure = measure 352 self.update_status() 353 self.status_rrd.update({'status': self.last_status.num_level()}, 354 util.utcnow())
355
356 - def check_action(self, action_name):
357 """retourne la liste des actions autorisées/interdites pour cet agent 358 """ 359 authorized = False 360 if self.manager: 361 for cfg_level in ["_eole", "_acad", ""]: 362 cfg_file = os.path.join(self.manager.config['action_dir'], 'actions%s.cfg' % cfg_level) 363 if os.path.isfile(cfg_file): 364 action_mngr = ConfigParser() 365 action_mngr.read(cfg_file) 366 try: 367 authorized = eval(action_mngr.get(self.name, action_name)) 368 except: 369 pass 370 return authorized
371
372 - def take_action(self, old_status, new_status):
373 """exécute des actions en cas de changement de status 374 """ 375 # si un fichier /var/run/actions existe, on n'exécute aurcune action 376 if not is_locked('actions'): 377 # recherche des actions prévues pour l'agent dans cet état 378 action_name = action_map.get(new_status.num_level(), None) 379 if action_name: 380 if self.check_action(action_name): 381 action_func = getattr(self, action_name, no_action) 382 # exécution des actions si besoin 383 msg = action_func(self, old_status, new_status) 384 # si un message est renvoyé, on le loggue sur zephir 385 if msg: 386 # action,etat,msg 387 log.msg(msg) 388 zephir_log('SURVEILLANCE',0,msg) 389 else: 390 log.msg('status not defined in action_map')
391
392 - def handle_measure_exception(self, exc):
393 """Callback d'erreur pour C{L{scheduled_measure()}} 394 """ 395 log.msg(_("/!\ Agent %s, exception during measure: %s") 396 % (self.name, str(exc))) 397 self.set_status(status.Error(str(exc)))
398 399
400 - def archive(self):
401 """Crée l'archive de l'agent sur disque 402 """ 403 self.ensure_data_uptodate() 404 AgentData.from_agent(self).archive(self.archive_dir)
405 406
407 - def ensure_data_uptodate(self):
408 self.ensure_datadirs() 409 if self.data_needs_update: 410 # would need locking here if multithreaded (archive must have 411 # up-to-date datas) 412 # for d in self.data: 413 # d.ensure_uptodate() 414 self.write_data() 415 self.update_status() 416 self.data_needs_update = False;
417
418 - def write_data(self):
419 """Écrit les données générées par l'agent sur disque 420 421 Méthode à redéfinir si nécessaire dans les sous-classes. 422 """ 423 additional_args = [] 424 this_morning = util.utcnow().replace(hour = 0, minute = 0, second = 0, microsecond = 0) 425 for i in range(7): 426 t = this_morning - i*timedelta(days = 1) 427 additional_args += "VRULE:%s#000000" % rrd.rrd_date_from_datetime(t) 428 self.status_rrd.graph_all()
429 430 # convenience method
431 - def ensure_datadirs(self):
432 """Méthode de convenance, cf C{L{zephir.monitor.agentmanager.util.ensure_dir}} 433 """ 434 assert self.archive_dir is not None 435 util.ensure_dirs(self.archive_dir)
436 437
438 -class TableAgent(Agent):
439 """Agent concret mémorisant ses mesures dans une table. 440 441 Les valeurs mesurées peuvent être non-numériques. 442 """ 443
444 - def __init__(self, name, 445 max_measures=100, 446 **params):
447 """ 448 @param max_measures: nombre maximal de mesures mémorisées 449 """ 450 Agent.__init__(self, name, **params) 451 assert max_measures >= 1 452 self.max_measures = max_measures 453 self.measures = [] 454 self.data = [MeasuresData(self.measures, self.fields)]
455 456
457 - def save_measure(self, measure):
458 """Maintient la table de mesures triée et en dessous de la taille 459 maximale (cf. C{Agent.save_measure}). 460 """ 461 Agent.save_measure(self, measure) 462 # drop old measures to keep list size under max_measures 463 # can't slice because the list must be modified in place 464 for x in range(max(0, len(self.measures) - self.max_measures +1)): 465 self.measures.pop(0) 466 self.measures.append(measure) 467 self.measures.sort()
468 469 470 471
472 -class MultiRRDAgent(Agent):
473 """Classe abstraite pour les agents utilisant RRDtool. 474 475 Les valeurs mesurées étant visualisées sous forme de graphes, 476 elles doivent être numériques. 477 478 Les agents de cette classe maintiennent plusieurs bases de données RRD et 479 génèrent des graphes au format PNG de leurs données. 480 """ 481
482 - def __init__(self, name, 483 datasources, archives, graphs, 484 **params):
485 """ 486 Les paramètres C{datasources}, C{archives} et C{graphs} sont 487 des listes de paramètres pour la configuration d'une L{base 488 RRD<agentmanager.rrd.Database>}. 489 """ 490 Agent.__init__(self, name, **params) 491 self.datasources = datasources 492 self.archives = archives 493 self.graphs = graphs 494 self.data = []
495 496
497 - def init_data(self, archive_dir):
498 """Crée et initialise la base RRD dans C{archive_dir}. 499 """ 500 Agent.init_data(self, archive_dir) 501 self.rrd = {} 502 arch_names = self.archives.keys() 503 arch_names.sort() 504 for name in arch_names: 505 rrdname = name + '.rrd' 506 self.rrd[name] = rrd.Database(os.path.join(self.archive_dir, rrdname), 507 step = self.period) 508 self.data.append(RRDFileData(rrdname)) 509 for ds in self.datasources[name]: self.rrd[name].new_datasource(**ds) 510 for rra in self.archives[name]: self.rrd[name].new_archive(**rra) 511 for g in self.graphs[name]: 512 self.data.append(ImgFileData(g['pngname'])) 513 self.data.append(HTMLData('<br>')) 514 g['pngname'] = os.path.join(self.archive_dir, g['pngname']) 515 self.rrd[name].new_graph(**g) 516 self.rrd[name].create()
517
518 - def save_measure(self, measure):
519 Agent.save_measure(self, measure) 520 # on ne prend que les mesures déclarées comme datasources 521 for name in self.archives.keys(): 522 rrd_values = {} 523 for datasource in self.datasources[name]: 524 if measure.value.has_key(datasource['name']): 525 rrd_values[datasource['name']] = measure.value[datasource['name']] 526 # update des bases rrd 527 self.rrd[name].update(rrd_values, measure.get_date())
528
529 - def write_data(self):
530 Agent.write_data(self) 531 for name in self.archives.keys(): 532 self.rrd[name].graph_all()
533 534
535 -class RRDAgent(Agent):
536 """Classe abstraite pour les agents utilisant RRDtool. 537 538 Les valeurs mesurées étant visualisées sous forme de graphes, 539 elles doivent être numériques. 540 541 Les agents de cette classe maintiennent une base de données RRD et 542 génèrent des graphes au format PNG de leurs données. 543 """ 544
545 - def __init__(self, name, 546 datasources, archives, graphs, 547 **params):
548 """ 549 Les paramètres C{datasources}, C{archives} et C{graphs} sont 550 des listes de paramètres pour la configuration d'une L{base 551 RRD<agentmanager.rrd.Database>}. 552 """ 553 Agent.__init__(self, name, **params) 554 self.datasources = datasources 555 self.archives = archives 556 self.graphs = graphs 557 self.data = []
558 559
560 - def init_data(self, archive_dir):
561 """Crée et initialise la base RRD dans C{archive_dir}. 562 """ 563 Agent.init_data(self, archive_dir) 564 rrdname = self.name + '.rrd' 565 self.rrd = rrd.Database(os.path.join(self.archive_dir, rrdname), 566 step = self.period) 567 self.data.append(RRDFileData(rrdname)) 568 for ds in self.datasources: self.rrd.new_datasource(**ds) 569 for rra in self.archives: self.rrd.new_archive(**rra) 570 for g in self.graphs: 571 self.data.append(ImgFileData(g['pngname'])) 572 self.data.append(HTMLData('<br>')) 573 g['pngname'] = os.path.join(self.archive_dir, g['pngname']) 574 self.rrd.new_graph(**g) 575 self.rrd.create()
576 577
578 - def save_measure(self, measure):
579 Agent.save_measure(self, measure) 580 # on ne prend que les mesures déclarées comme datasources 581 rrd_values = {} 582 for datasource in self.datasources: 583 rrd_values[datasource['name']] = measure.value[datasource['name']] 584 # update des bases rrd 585 self.rrd.update(rrd_values, measure.get_date())
586 587
588 - def write_data(self):
589 Agent.write_data(self) 590 self.rrd.graph_all()
591 592 593 594
595 -class LoadErrorAgent(Agent):
596 """Pseudo-agent représentant une erreur de chargement d'un fichier 597 de configuration. 598 """ 599 600 HTML = """<p class="error">%s</p>""" 601
602 - def __init__(self, name, 603 title=_("Error while loading agent"), 604 message="", 605 **params):
606 Agent.__init__(self, name, **params) 607 self.max_status = status.Error(title) 608 m = '<br />'.join(message.splitlines()) 609 self.html = HTMLData(LoadErrorAgent.HTML % m) 610 self.data = [self.html]
611
612 - def check_status(self):
613 return self.max_status()
614 615 616 617 # def test_main(): 618 # test_support.run_unittest(UserStringTest) 619 620 # if __name__ == "__main__": 621 # test_main() 622