Package zephir :: Package monitor :: Package agentmanager :: Module zephirservice
[hide private]
[frames] | no frames]

Source Code for Module zephir.monitor.agentmanager.zephirservice

  1  # -*- coding: UTF-8 -*- 
  2  ########################################################################### 
  3  # Eole NG - 2007 
  4  # Copyright Pole de Competence Eole  (Ministere Education - Academie Dijon) 
  5  # Licence CeCill  cf /root/LicenceEole.txt 
  6  # eole@ac-dijon.fr 
  7  ########################################################################### 
  8   
  9  """ 
 10  Services Twisted de collection et de publication de données. 
 11  """ 
 12   
 13  import locale, gettext, os, pwd, shutil, random 
 14  from glob import glob 
 15   
 16  # install locales early 
 17  from zephir.monitor.agentmanager import ZEPHIRAGENTS_DATADIR 
 18  APP = 'zephir-agents' 
 19  DIR = os.path.join(ZEPHIRAGENTS_DATADIR, 'i18n') 
 20  gettext.install(APP, DIR, unicode=False) 
 21   
 22   
 23  from twisted.python import log 
 24  from twisted.application import internet, service 
 25  from twisted.internet import utils, reactor 
 26  from twisted.web import resource, server, static, util, xmlrpc 
 27   
 28  from zephir.monitor.agentmanager import config as cfg 
 29  from zephir.monitor.agentmanager.util import ensure_dirs, md5file, md5files 
 30  from zephir.monitor.agentmanager.web_resources import ZephirServerResource 
 31  from zephir.monitor.agentmanager.clientmanager import ClientManager 
 32   
 33  try: 
 34      import zephir.zephir_conf.zephir_conf as conf_zeph 
 35      from zephir.lib_zephir import zephir, convert, zephir_dir 
 36      from zephir.lib_zephir import log as zeph_log 
 37      registered = 1 
 38  except: 
 39      # serveur non enregistré sur zephir 
 40      registered = 0 
 41   
42 -class ZephirService(service.MultiService):
43 """Main Twisted service for Zephir apps""" 44
45 - def __init__(self, config, root_resource=None, serve_static=False):
46 """config will be completed by default values""" 47 service.MultiService.__init__(self) 48 self.config = cfg.DEFAULT_CONFIG.copy() 49 self.config.update(config) 50 self.updater = self.publisher = None 51 # parent web server 52 if root_resource is None: 53 self.root_resource = resource.Resource() 54 webserver = internet.TCPServer(self.config['webserver_port'], 55 server.Site(self.root_resource)) 56 webserver.setServiceParent(service.IServiceCollection(self)) 57 else: 58 self.root_resource = root_resource 59 # serve global static files 60 if serve_static: 61 self.root_resource.putChild('static', 62 static.File(self.config['static_web_dir']))
63 64 65 # subservices factory methods 66
67 - def with_updater(self):
68 assert self.updater is None 69 self.updater = UpdaterService(self.config, self, self.root_resource) 70 return self
71
72 - def with_publisher(self):
73 assert self.publisher is None 74 self.publisher = PublisherService(self.config, self, self.root_resource) 75 return self
76
78 assert self.updater is None 79 assert self.publisher is None 80 self.updater = UpdaterService(self.config, self, self.root_resource) 81 self.publisher = PublisherService(self.config, self, self.root_resource, 82 show_clients_page = False, 83 live_agents={self.config['host_ref']: self.updater.agents}) 84 return self
85 86 87 88
89 -class UpdaterService(service.MultiService, xmlrpc.XMLRPC):
90 """Schedules measures, data serialisation and upload.""" 91
92 - def __init__(self, config, parent, root_resource):
93 """config should be complete""" 94 service.MultiService.__init__(self) 95 xmlrpc.XMLRPC.__init__(self) 96 self.config = config 97 self.module = "" 98 # detect Eole module 99 if os.path.exists('/etc/eole/version'): 100 f = file('/etc/eole/version') 101 self.module = f.read().split('\n')[0] 102 f.close() 103 # updates site.cfg file 104 self.update_static_data() 105 # start subservices 106 loc, enc = locale.getdefaultlocale() 107 log.msg(_('default locale: %s encoding: %s') % (loc, enc)) 108 if enc == 'utf': 109 log.msg(_('Warning: locale encoding %s broken in RRD graphs, set e.g: LC_ALL=fr_FR') % enc) 110 self.agents = self.load_agents() 111 # attach to parent service 112 self.setServiceParent(service.IServiceCollection(parent)) 113 root_resource.putChild('xmlrpc', self)
114
115 - def startService(self):
116 """initialize zephir services""" 117 service.MultiService.startService(self) 118 reactor.callLater(2,self.schedule_all) 119 if registered != 0: 120 # on est enregistré sur zephir => initiation de 121 # la création et l'envoi d'archives 122 self.setup_uucp() 123 # dans le cas ou un reboot a été demandé, on indique que le redémarrage est bon 124 if os.path.isfile(os.path.join(zephir_dir,'reboot.lck')): 125 try: 126 zeph_log('REBOOT',0,'redémarrage du serveur terminé') 127 os.unlink(os.path.join(zephir_dir,'reboot.lck')) 128 except: 129 pass
130
131 - def load_agents(self):
132 """Charge tous les agents du répertoire de configurations.""" 133 log.msg(_("Loading agents from %s...") % self.config['config_dir']) 134 loaded_agents = {} 135 list_agents = glob(os.path.join(self.config['config_dir'], "*.agent")) 136 list_agents.extend(glob(os.path.join(self.config['config_dir'],self.module,"*.agent"))) 137 for f in list_agents: 138 log.msg(_(" from %s:") % os.path.basename(f)) 139 h = { 'AGENTS': None } 140 execfile(f, globals(), h) 141 assert h.has_key('AGENTS') 142 for a in h['AGENTS']: 143 assert not loaded_agents.has_key(a.name) 144 # init agent data and do a first archive 145 a.init_data(os.path.join(self.config['state_dir'], 146 self.config['host_ref'], 147 a.name)) 148 a.manager = self 149 a.archive() 150 loaded_agents[a.name] = a # /!\ écrasement des clés 151 log.msg(_(" %s, period %d") % (a.name, a.period)) 152 log.msg(_("Loaded.")) 153 return loaded_agents
154 155 156 # scheduling measures 157
158 - def schedule(self, agent_name):
159 """Planifie les mesures périodiques d'un agent.""" 160 assert self.agents.has_key(agent_name) 161 if self.agents[agent_name].period > 0: 162 timer = internet.TimerService(self.agents[agent_name].period, 163 self.wakeup_for_measure, agent_name) 164 timer.setName(agent_name) 165 timer.setServiceParent(service.IServiceCollection(self))
166 167
168 - def wakeup_for_measure(self, agent_name):
169 """Callback pour les mesures planifiées.""" 170 assert self.agents.has_key(agent_name) 171 # log.debug("Doing scheduled measure on " + agent_name) 172 self.agents[agent_name].scheduled_measure()
173 174
175 - def schedule_all(self):
176 """Planifie tous les agents chargés. 177 Démarre le cycle de mesures périodiques de chaque agent 178 chargé. La première mesure est prise immédiatement. 179 """ 180 for agent_name in self.agents.keys(): 181 # self.wakeup_for_measure(agent_name) # first measure at launch 182 self.schedule(agent_name)
183 184
185 - def timer_for_agent_named(self, agent_name):
186 assert self.agents.has_key(agent_name) 187 return self.getServiceNamed(agent_name)
188 189 190 # data upload to zephir server 191
192 - def setup_uucp(self):
193 ensure_dirs(self.config['uucp_dir']) 194 self.update_static_data() 195 # récupération du délai de connexion à zephir 196 try: 197 reload(conf_zeph) 198 # supression des éventuels répertoires de stats invalides 199 # par ex, en cas de désinscription zephir 'manuelle'. 200 for dir in os.listdir(self.config['state_dir']): 201 if dir != str(conf_zeph.id_serveur): 202 shutil.rmtree(os.path.join(self.config['state_dir'],dir)) 203 # vérification sur zephir du délai de connexion 204 period = convert(zephir.serveurs.get_timeout(conf_zeph.id_serveur)[1]) 205 except: 206 period = 0 207 208 if period == 0: 209 period = self.config['upload_period'] 210 # on ajoute un décalage aléatoire (entre 30 secondes et period) au premier démarrage 211 # (pour éviter trop de connexions simultanées si le service est relancé par crontab) 212 delay = random.randrange(30,period) 213 reactor.callLater(delay,self.wakeup_for_upload)
214
215 - def update_static_data(self):
216 original = os.path.join(self.config['config_dir'], 'site.cfg') 217 if os.path.isfile(original): 218 destination = cfg.client_data_dir(self.config, self.config['host_ref']) 219 ensure_dirs(destination) 220 need_copy = False 221 try: 222 org_mtime = os.path.getmtime(original) 223 dest_mtime = os.path.getmtime(os.path.join(destination, 'site.cfg')) 224 except OSError: 225 need_copy = True 226 if need_copy or (org_mtime > dest_mtime): 227 shutil.copy(original, destination)
228
229 - def wakeup_for_upload(self, recall=True):
230 # relecture du délai de connexion sur zephir 231 try: 232 reload(conf_zeph) 233 period = convert(zephir.serveurs.get_timeout(conf_zeph.id_serveur)[1]) 234 except: 235 period = 0 236 # on relance la fonction dans le délai demandé 237 if period == 0: 238 period = self.config['upload_period'] 239 # on ajoute un décalage au premier démarrage 240 # (pour éviter trop de connexions simultanées si le service est relancé par crontab) 241 if recall: 242 reactor.callLater(period,self.wakeup_for_upload) 243 244 # virer l'ancienne archive du rép. uucp 245 for agent in self.agents.values(): 246 agent.archive() 247 # agent.reset_max_status() 248 self.update_static_data() 249 # archiver dans rép. uucp, donner les droits en lecture sur l'archive 250 try: 251 # purge du répertoire temporaire 252 client_dir = os.path.join(self.config['tmp_data_dir'],self.config['host_ref']) 253 if os.path.isdir(client_dir): 254 shutil.rmtree(client_dir) 255 os.makedirs(client_dir) 256 except: # non existant 257 pass 258 args = ['-Rf',os.path.abspath(os.path.join(cfg.client_data_dir(self.config, self.config['host_ref']),'site.cfg'))] 259 ignore_file = os.path.abspath(os.path.join(self.config['state_dir'],'ignore_list')) 260 if os.path.exists(ignore_file): 261 args.append(ignore_file) 262 # on ne copie que les données des agents instanciés 263 # cela évite de remonter par exemple les stats rvp si le service a été désactivé 264 for agent_name in self.agents.keys(): 265 args.append(os.path.abspath(cfg.agent_data_dir(self.config, self.config['host_ref'],agent_name))) 266 args.append(os.path.abspath(client_dir)) 267 res = utils.getProcessOutput('/bin/cp', args = args) 268 res.addCallbacks(self._make_archive, 269 lambda x: log.msg(_("/!\ copy failed (%s)\n" 270 "data: %s") 271 % (x, self.config['state_dir'])))
272
273 - def _check_md5(self):
274 # calcul de sommes md5 pour config.eol et les patchs 275 rep_src = "/etc/eole" 276 data = [] 277 for src, dst, pattern in md5files: 278 if os.path.isdir(os.path.join(rep_src,src)): 279 fics = os.listdir(os.path.join(rep_src,src)) 280 fics = [(os.path.join(src,fic),os.path.join(dst,fic)) for fic in fics] 281 else: 282 fics = [(src,dst)] 283 for fic, fic_dst in fics: 284 if os.path.isfile(os.path.join(rep_src,fic)): 285 if (pattern is None) or fic.endswith(pattern): 286 md5res = md5file(os.path.join(rep_src,fic)) 287 data.append("%s %s\n" % (md5res, fic_dst)) 288 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % self.config['host_ref']), "w") 289 outf.writelines(data) 290 outf.close()
291
292 - def _make_archive(self,*args):
293 self._check_md5() 294 # compression des données à envoyer 295 tarball = os.path.join(self.config['uucp_dir'], 296 'site%s.tar' % self.config['host_ref']) 297 res = utils.getProcessOutput('/bin/tar', 298 args = ('czf', tarball, 299 '--exclude', 'private', 300 self.config['tmp_data_dir'])) 301 res.addCallbacks(self._try_chown, 302 lambda x: log.msg(_("/!\ archiving failed (%s)\n" 303 "data: %s\narchive: %s") 304 % (x, self.config['state_dir'], tarball)), 305 callbackArgs = [tarball])
306
307 - def _try_chown(self, tar_output, tarball):
308 try: 309 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4] 310 uid = os.getuid() 311 os.chown(tarball, uucp_uid, uucp_gid) # only change group id so that uucp can read while we can still write 312 except OSError, e: 313 log.msg("/!\ chown error, check authorizations (%s)" % e) 314 # upload uucp 315 # on fait également un chown sur le fichier deffered_logs au cas ou il serait en root 316 try: 317 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4] 318 os.chown('/usr/share/zephir/deffered_logs', uucp_uid, uucp_gid) 319 except: 320 log.msg("/!\ chown error on deffered_logs") 321 os.system('/usr/share/zephir/scripts/zephir_client call &> /dev/null')
322 323 324 # xmlrpc methods 325
326 - def xmlrpc_list_agents(self):
327 """@return: Liste des agents chargés""" 328 return self.agents.keys()
329 xmlrpc_list_agents.signature = [['array']] 330
331 - def xmlrpc_agents_menu(self):
332 """@return: Liste des agents chargés et structure d'affichage""" 333 try: 334 menu = {} 335 for name, agent in self.agents.items(): 336 if agent.section != None: 337 if not menu.has_key(agent.section): 338 menu[agent.section] = [] 339 menu[agent.section].append((name, agent.description)) 340 return menu 341 except Exception, e: 342 log.msg(e)
343 xmlrpc_agents_menu.signature = [['struct']] 344
345 - def xmlrpc_status_for_agents(self, agent_name_list = []):
346 """ 347 @return: Les statuts des agents listés dans un dictionnaire 348 C{{nom:status}}. Le status est lui-même un dictionnaire avec 349 pour clés C{'level'} et C{'message'}. Seuls les noms d'agents 350 effectivement chargés apparaîtront parmi les clés du 351 dictionnaire. 352 """ 353 result = {} 354 if len(agent_name_list) == 0: 355 agent_name_list = self.agents.keys() 356 for agent_name in agent_name_list: 357 if self.agents.has_key(agent_name): 358 result[agent_name] = self.agents[agent_name].check_status().to_dict() 359 return result
360 xmlrpc_status_for_agents.signature = [['string', 'struct']] 361
362 - def xmlrpc_reset_max_status_for_agents(self, agent_name_list=[]):
363 if len(agent_name_list) == 0: 364 agent_name_list = self.agents.keys() 365 for agent_name in agent_name_list: 366 if self.agents.has_key(agent_name): 367 self.agents[agent_name].reset_max_status() 368 return "ok"
369 370
372 self.wakeup_for_upload(False) 373 return "ok"
374 375
376 -class PublisherService(service.MultiService):
377 """Serves the web interface for current agent data""" 378
379 - def __init__(self, config, parent, root_resource, 380 live_agents=None, 381 show_clients_page=True):
382 """config should be complete""" 383 service.MultiService.__init__(self) 384 self.config = config 385 self.show_clients_page = show_clients_page 386 self.manager = ClientManager(self.config, live_agents) 387 # attach to parent service 388 self.setServiceParent(service.IServiceCollection(parent)) 389 # run webserver 390 rsrc = ZephirServerResource(self.config, self.manager) 391 root_resource.putChild('agents', rsrc) 392 default_page = './agents/' 393 if not self.show_clients_page: 394 default_page += self.config['host_ref'] + '/' 395 root_resource.putChild('', util.Redirect(default_page))
396 397 #TODO 398 # update resources: loading host structures, manager -> agent dict 399 # connect publisher and updater to zephir service (web server, config...) 400 401 # client manager: liste des host_ref, {host_ref => agent_manager} 402 # agent manager: structure, {nom => agent_data} 403