1
2
3
4
5
6
7
8
9 """
10 Services Twisted de collection et de publication de données.
11 """
12
13 import locale, gettext, os, pwd, shutil, random
14 from glob import glob
15
16
17 from zephir.monitor.agentmanager import ZEPHIRAGENTS_DATADIR
18 APP = 'zephir-agents'
19 DIR = os.path.join(ZEPHIRAGENTS_DATADIR, 'i18n')
20 gettext.install(APP, DIR, unicode=False)
21
22
23 from twisted.application import internet, service
24 from twisted.internet import utils, reactor
25 from twisted.web import resource, server, static, util, xmlrpc
26
27 from zephir.monitor.agentmanager import config as cfg
28 from zephir.monitor.agentmanager.util import ensure_dirs, md5file, md5files, log
29 from zephir.monitor.agentmanager.web_resources import ZephirServerResource
30 from zephir.monitor.agentmanager.clientmanager import ClientManager
31
32 try:
33 from creole.fonctionseole import get_module_name
34 mod_name = get_module_name()
35 except:
36 mod_name = ""
37
38 try:
39 import zephir.zephir_conf.zephir_conf as conf_zeph
40 from zephir.lib_zephir import zephir_proxy, convert, zephir_dir, update_sudoers
41 from zephir.lib_zephir import log as zeph_log
42 registered = 1
43 except:
44
45 registered = 0
46
48 """Main Twisted service for Zephir apps"""
49
50 - def __init__(self, config, root_resource=None, serve_static=False):
51 """config will be completed by default values"""
52 service.MultiService.__init__(self)
53 self.config = cfg.DEFAULT_CONFIG.copy()
54 self.config.update(config)
55 self.updater = self.publisher = None
56
57 if registered:
58 update_sudoers()
59
60 if root_resource is None:
61 self.root_resource = resource.Resource()
62 webserver = internet.TCPServer(self.config['webserver_port'],
63 server.Site(self.root_resource))
64 webserver.setServiceParent(service.IServiceCollection(self))
65 else:
66 self.root_resource = root_resource
67
68 if serve_static:
69 self.root_resource.putChild('static',
70 static.File(self.config['static_web_dir']))
71
72
73
74
76 assert self.updater is None
77 self.updater = UpdaterService(self.config, self, self.root_resource)
78 return self
79
81 assert self.publisher is None
82 self.publisher = PublisherService(self.config, self, self.root_resource)
83 return self
84
86 assert self.updater is None
87 assert self.publisher is None
88 self.updater = UpdaterService(self.config, self, self.root_resource)
89 self.publisher = PublisherService(self.config, self, self.root_resource,
90 show_clients_page = False,
91 live_agents={self.config['host_ref']: self.updater.agents})
92 return self
93
94
95
96
98 """Schedules measures, data serialisation and upload."""
99
100 - def __init__(self, config, parent, root_resource):
101 """config should be complete"""
102 service.MultiService.__init__(self)
103 xmlrpc.XMLRPC.__init__(self)
104 self.config = config
105 self.module = ""
106
107 if os.path.exists('/etc/eole/version'):
108 f = file('/etc/eole/version')
109 self.module = f.read().split('\n')[0]
110 f.close()
111
112 self.update_static_data()
113
114 loc, enc = locale.getdefaultlocale()
115 log.msg(_('default locale: %s encoding: %s') % (loc, enc))
116 if enc == 'utf':
117 log.msg(_('Warning: locale encoding %s broken in RRD graphs, set e.g: LC_ALL=fr_FR') % enc)
118 self.agents = self.load_agents()
119
120 self.setServiceParent(service.IServiceCollection(parent))
121 root_resource.putChild('xmlrpc', self)
122
124 """initialize zephir services"""
125 service.MultiService.startService(self)
126 reactor.callLater(2,self.schedule_all)
127 if registered != 0:
128
129
130 self.setup_uucp()
131
132 if os.path.isfile(os.path.join(zephir_dir,'reboot.lck')):
133 try:
134 zeph_log('REBOOT',0,'redémarrage du serveur terminé')
135 os.unlink(os.path.join(zephir_dir,'reboot.lck'))
136 except:
137 pass
138
140 """Charge tous les agents du répertoire de configurations."""
141 log.msg(_("Loading agents from %s...") % self.config['config_dir'])
142 loaded_agents = {}
143 list_agents = glob(os.path.join(self.config['config_dir'], "*.agent"))
144 list_agents.extend(glob(os.path.join(self.config['config_dir'],self.module,"*.agent")))
145 for f in list_agents:
146 log.msg(_(" from %s:") % os.path.basename(f))
147 h = { 'AGENTS': None }
148 execfile(f, globals(), h)
149 assert h.has_key('AGENTS')
150 for a in h['AGENTS']:
151 assert not loaded_agents.has_key(a.name)
152
153 a.init_data(os.path.join(self.config['state_dir'],
154 self.config['host_ref'],
155 a.name))
156 a.manager = self
157 a.archive()
158 loaded_agents[a.name] = a
159 log.msg(_(" %s, period %d") % (a.name, a.period))
160 log.msg(_("Loaded."))
161 return loaded_agents
162
163
164
165
167 """Planifie les mesures périodiques d'un agent."""
168 assert self.agents.has_key(agent_name)
169 if self.agents[agent_name].period > 0:
170 timer = internet.TimerService(self.agents[agent_name].period,
171 self.wakeup_for_measure, agent_name)
172 timer.setName(agent_name)
173 timer.setServiceParent(service.IServiceCollection(self))
174
175
177 """Callback pour les mesures planifiées."""
178 assert self.agents.has_key(agent_name)
179
180 self.agents[agent_name].scheduled_measure()
181
182
184 """Planifie tous les agents chargés.
185 Démarre le cycle de mesures périodiques de chaque agent
186 chargé. La première mesure est prise immédiatement.
187 """
188 for agent_name in self.agents.keys():
189
190
191 for action_dir in (os.path.join(self.config['action_dir'],'eole'), self.config['action_dir']):
192 f_actions = os.path.join(action_dir, "%s.actions" % agent_name)
193 if os.path.isfile(f_actions):
194 actions = {}
195 execfile(f_actions, globals(), actions)
196 for item in actions.keys():
197 if item.startswith('action_'):
198 setattr(self.agents[agent_name], item, actions[item])
199
200 self.schedule(agent_name)
201
202
204 assert self.agents.has_key(agent_name)
205 return self.getServiceNamed(agent_name)
206
207
208
209
211 ensure_dirs(self.config['uucp_dir'])
212 self.update_static_data()
213
214 try:
215 reload(conf_zeph)
216
217
218
219
220 if not mod_name.startswith('zephir'):
221 for st_dir in os.listdir(self.config['state_dir']):
222 if st_dir != str(conf_zeph.id_serveur):
223 shutil.rmtree(os.path.join(self.config['state_dir'],st_dir))
224
225 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1])
226 except:
227 period = 0
228
229 if period < 30:
230 period = self.config['upload_period']
231 log.msg(_('Using default period : %s seconds') % period)
232
233
234 delay = random.randrange(30,period)
235 reactor.callLater(delay,self.wakeup_for_upload)
236
238 original = os.path.join(self.config['config_dir'], 'site.cfg')
239 if os.path.isfile(original):
240 destination = cfg.client_data_dir(self.config, self.config['host_ref'])
241 ensure_dirs(destination)
242 need_copy = False
243 try:
244 org_mtime = os.path.getmtime(original)
245 dest_mtime = os.path.getmtime(os.path.join(destination, 'site.cfg'))
246 except OSError:
247 need_copy = True
248 if need_copy or (org_mtime > dest_mtime):
249 shutil.copy(original, destination)
250
252
253 try:
254 reload(conf_zeph)
255 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1])
256 except:
257 period = 0
258
259 if period < 30:
260 period = self.config['upload_period']
261 log.msg(_('Using default period : %s seconds') % period)
262
263
264 if recall:
265 reactor.callLater(period,self.wakeup_for_upload)
266
267
268 for agent in self.agents.values():
269 agent.archive()
270
271 self.update_static_data()
272
273 try:
274 assert conf_zeph.id_serveur != 0
275 client_dir = os.path.join(self.config['tmp_data_dir'],str(conf_zeph.id_serveur))
276 except:
277 client_dir = os.path.join(self.config['tmp_data_dir'],self.config['host_ref'])
278 try:
279
280 if os.path.isdir(client_dir):
281 shutil.rmtree(client_dir)
282 os.makedirs(client_dir)
283 except:
284 pass
285 args = ['-Rf',os.path.abspath(os.path.join(cfg.client_data_dir(self.config, self.config['host_ref']),'site.cfg'))]
286 ignore_file = os.path.abspath(os.path.join(self.config['state_dir'],'ignore_list'))
287 if os.path.exists(ignore_file):
288 args.append(ignore_file)
289
290
291 for agent_name in self.agents.keys():
292 args.append(os.path.abspath(cfg.agent_data_dir(self.config, self.config['host_ref'],agent_name)))
293 args.append(os.path.abspath(client_dir))
294 res = utils.getProcessOutput('/bin/cp', args = args)
295 res.addCallbacks(self._make_archive,
296 lambda x: log.msg(_("/!\ copy failed (%s)\n"
297 "data: %s")
298 % (x, self.config['state_dir'])))
299
301
302 rep_src = "/etc/eole"
303 data = []
304 for src, dst, pattern in md5files[cfg.distrib_version]:
305 if src == 'variables.eol':
306
307 orig_eol = os.path.join(rep_src,'config.eol')
308 var_eol = os.path.join(rep_src,'variables.eol')
309 f_var = file(var_eol,'w')
310 if os.path.isfile(orig_eol):
311 for line in file(orig_eol).readlines():
312 if not "valprec =" in line:
313 f_var.write(line)
314 f_var.close()
315 if os.path.isdir(os.path.join(rep_src,src)):
316 fics = os.listdir(os.path.join(rep_src,src))
317 fics = [(os.path.join(src,fic),os.path.join(dst,fic)) for fic in fics]
318 else:
319 fics = [(src,dst)]
320 for fic, fic_dst in fics:
321 if os.path.isfile(os.path.join(rep_src,fic)):
322 if (pattern is None) or fic.endswith(pattern):
323 md5res = md5file(os.path.join(rep_src,fic))
324 data.append("%s %s\n" % (md5res, fic_dst))
325 try:
326 assert conf_zeph.id_serveur != 0
327 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % str(conf_zeph.id_serveur)), "w")
328 except:
329 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % self.config['host_ref']), "w")
330 outf.writelines(data)
331 outf.close()
332
334 """génère une liste des paquets installés
335 """
336 try:
337 assert conf_zeph.id_serveur != 0
338 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % str(conf_zeph.id_serveur)))
339 except:
340 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % self.config['host_ref']))
341 os.system(cmd_pkg)
342
344 self._check_md5()
345 self._get_packages()
346
347 try:
348 assert conf_zeph.id_serveur != 0
349 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % str(conf_zeph.id_serveur))
350 except:
351 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % self.config['host_ref'])
352 tar_cwd = os.path.dirname(os.path.abspath(self.config['tmp_data_dir']))
353 tar_dir = os.path.basename(os.path.abspath(self.config['tmp_data_dir']))
354 res = utils.getProcessOutput('/bin/tar',
355 args = ('czf', tarball,
356 '--exclude', 'private',
357 '-C', tar_cwd,
358 tar_dir))
359 res.addCallbacks(self._try_chown,
360 lambda x: log.msg(_("/!\ archiving failed (%s)\n"
361 "data: %s\narchive: %s")
362 % (x, self.config['state_dir'], tarball)),
363 callbackArgs = [tarball])
364
366 try:
367 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
368 uid = os.getuid()
369 os.chown(tarball, uucp_uid, uucp_gid)
370 except OSError, e:
371 log.msg("/!\ chown error, check authorizations (%s)" % e)
372
373
374 try:
375 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
376 os.chown('/usr/share/zephir/deffered_logs', uucp_uid, uucp_gid)
377 except:
378 log.msg("/!\ chown error on deffered_logs")
379 os.system('/usr/share/zephir/scripts/zephir_client call &> /dev/null')
380
381
382
383
385 """@return: Liste des agents chargés"""
386 return self.agents.keys()
387 xmlrpc_list_agents.signature = [['array']]
388
390 """@return: Liste des agents chargés et structure d'affichage"""
391 try:
392 menu = {}
393 for name, agent in self.agents.items():
394 if agent.section != None:
395 if not menu.has_key(agent.section):
396 menu[agent.section] = []
397 menu[agent.section].append((name, agent.description))
398 return menu
399 except Exception, e:
400 log.msg(e)
401 xmlrpc_agents_menu.signature = [['struct']]
402
404 """
405 @return: Les statuts des agents listés dans un dictionnaire
406 C{{nom:status}}. Le status est lui-même un dictionnaire avec
407 pour clés C{'level'} et C{'message'}. Seuls les noms d'agents
408 effectivement chargés apparaîtront parmi les clés du
409 dictionnaire.
410 """
411 result = {}
412 if len(agent_name_list) == 0:
413 agent_name_list = self.agents.keys()
414 for agent_name in agent_name_list:
415 if self.agents.has_key(agent_name):
416 result[agent_name] = self.agents[agent_name].check_status().to_dict()
417 return result
418 xmlrpc_status_for_agents.signature = [['string', 'struct']]
419
421 if len(agent_name_list) == 0:
422 agent_name_list = self.agents.keys()
423 for agent_name in agent_name_list:
424 if self.agents.has_key(agent_name):
425 self.agents[agent_name].reset_max_status()
426 return "ok"
427
431
432
434 """Serves the web interface for current agent data"""
435
436 - def __init__(self, config, parent, root_resource,
437 live_agents=None,
438 show_clients_page=True):
439 """config should be complete"""
440 service.MultiService.__init__(self)
441 self.config = config
442 self.show_clients_page = show_clients_page
443 self.manager = ClientManager(self.config, live_agents)
444
445 self.setServiceParent(service.IServiceCollection(parent))
446
447 rsrc = ZephirServerResource(self.config, self.manager)
448 root_resource.putChild('agents', rsrc)
449 default_page = './agents/'
450 if not self.show_clients_page:
451 default_page += self.config['host_ref'] + '/'
452 root_resource.putChild('', util.Redirect(default_page))
453
454
455
456
457
458
459
460