Source code for module.snmpbooster_poller

# -*- coding: utf-8 -*-

# Copyright (C) 2012-2014:
#    Thibault Cohen, thibault.cohen@savoirfairelinux.com
#
# This file is part of SNMP Booster Shinken Module.
#
# Shinken is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Shinken is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with SNMP Booster Shinken Module.
# If not, see <http://www.gnu.org/licenses/>.


"""
This module contains the SnmpBoosterPoller class which is the part
of SNMP Booster loaded in the Poller
"""


import signal
import time
import shlex


from Queue import Empty, Queue
import sys

from datetime import datetime, timedelta

from shinken.log import logger
from shinken.util import to_int
from pyasn1.type.univ import OctetString

from snmpbooster import SnmpBooster
from libs.utils import parse_args, compute_value
from libs.result import set_output_and_status
from libs.checks import check_snmp, check_cache
from libs.snmpworker import SNMPWorker


[docs]class SnmpBoosterPoller(SnmpBooster): """ SNMP Poller module class Improve SNMP checks """ def __init__(self, mod_conf): SnmpBooster.__init__(self, mod_conf) self.max_prepared_tasks = to_int(getattr(mod_conf, 'max_prepared_tasks', 50)) self.checks_done = 0 self.task_queue = Queue() self.result_queue = Queue() self.last_checks_counted = 0
[docs] def get_new_checks(self): """ Get new checks if less than nb_checks_max If no new checks got and no check in queue, sleep for 1 sec REF: doc/shinken-action-queues.png (3) """ try: while True: try: msg = self.master_slave_queue.get(block=False) except IOError: # IOError: [Errno 104] Connection reset by peer msg = None if msg is not None: self.checks.append(msg.get_data()) except Empty: if len(self.checks) == 0: time.sleep(1)
[docs] def launch_new_checks(self): """ Launch checks that are in status REF: doc/shinken-action-queues.png (4) """ for chk in self.checks: now = time.time() if chk.status == 'queue': # Ok we launch it chk.status = 'launched' chk.check_time = now # Want the args of the commands so we parse it like a shell # shlex want str only clean_command = shlex.split(chk.command.encode('utf8', 'ignore')) # If the command seems good if len(clean_command) > 1: # we do not want the first member, check_snmp thing try: args = parse_args(clean_command[1:]) except Exception as exp: # if we get a parsing error error_message = ("[SnmpBooster] [code 1001]" "Command line { %s } parsing error: " "%s" % (chk.command.encode('utf8', 'ignore'), str(exp))) logger.error(error_message) # Check is now marked as done chk.status = 'done' # Get exit code chk.exit_status = 3 chk.get_outputs("Command line parsing error: `%s' - " "Please verify your check " "command" % str(exp), 8012) # Get execution time chk.execution_time = 0 continue # Ok we are good, we go on if args.get('real_check', False): # Make a SNMP check check_snmp(chk, args, self.db_client, self.task_queue, self.result_queue) #logger.debug("CHECK SNMP %(host)s:%(service)s" % args) else: # Make fake check (get datas from DB) check_cache(chk, args, self.db_client)
#logger.debug("CHECK cache %(host)s:%(service)s" % args) # Check the status of checks # if done, return message finished :) # REF: doc/shinken-action-queues.png (5)
[docs] def manage_finished_checks(self): """ This function handles finished check It gets output and exit_code and Add check to the return queue """ to_del = [] now = time.time() prev_log = self.last_checks_counted if now > prev_log + 5: logger.info("%s checks ongoing.." % len(self.checks)) self.last_checks_counted = now # First look for checks in timeout for chk in self.checks: if now > chk.check_time + 3600: logger.warning("check timeout: %s" % chk.command) chk.get_outputs("check timedout", 8012) chk.status = "done" chk.exit_status = 3 chk.execution_time = now - chk.check_time if not hasattr(chk, "result"): continue if chk.status == 'launched' and chk.result.get('state') != 'received': pass # TODO compore check.result['execution_time'] > timeout # chk.con.look_for_timeout() # Now we look for finished checks for chk in self.checks: # First manage check in error, bad formed if chk.status == 'done': if hasattr(chk, "result"): del chk.result to_del.append(chk) try: self.returns_queue.put(chk) except IOError, exp: logger.error("[SnmpBooster] [code 1002]" "[%d] Exiting: %s" % (str(self), exp)) # NOTE Do we really want to exit ??? sys.exit(2) continue # Then we check for good checks if not hasattr(chk, "result"): continue if chk.status == 'launched' and chk.result['state'] == 'received': result = chk.result # Format result # Launch trigger set_output_and_status(result) # Set status chk.status = 'done' # Get exit code chk.exit_status = result.get('exit_code', 3) chk.get_outputs(str(result.get('output', 'Output is missing')), 8012) # Get execution time chk.execution_time = result.get('execution_time', 0.0) # unlink our object from the original check if hasattr(chk, 'result'): del chk.result # and set this check for deleting # and try to send it to_del.append(chk) try: self.returns_queue.put(chk) except IOError, exp: logger.error("[SnmpBooster] [code 1003]" "FIX-ME-ID Exiting: %s" % exp) # NOTE Do we really want to exit ??? sys.exit(2) # And delete finished checks for chk in to_del: self.checks.remove(chk) # Count checks done self.checks_done += 1
[docs] def save_results(self): """ Save results to database """ while not self.result_queue.empty(): results = self.result_queue.get() for result in results.values(): # Check error snmp_error = result.get('error') # Get key from task key = result.get('key') if snmp_error is None: # We don't got a SNMP error # Clean raw_value: if result.get('type') in ['DERIVE', 'GAUGE', 'COUNTER']: if isinstance(result.get('value'), OctetString): result['value'] = raw_value = float(str(result.get('value'))) else: result['value'] = raw_value = float(result.get('value')) elif result.get('type') in ['DERIVE64', 'COUNTER64']: result['value'] = raw_value = float(result.get('value')) elif result.get('type') in ['TEXT', 'STRING']: result['value'] = raw_value = str(result.get('value')) else: logger.error("[SnmpBooster] [code 1004] [%s, %s] " "Value type is not in 'TEXT', 'STRING', " "'DERIVE', 'GAUGE', 'COUNTER', 'DERIVE64'" ", 'COUNTER64'" % (key.get('host'), key.get('service'), )) continue # Compute value before saving if key.get('oid_type') == 'ds_oid': # add max value result['ds_max'] = None if results.get(result['ds_max_oid']) is not None: result['ds_max'] = results.get(result['ds_max_oid']).get('value') # add min value result['ds_min'] = None if results.get(result['ds_min_oid']) is not None: result['ds_min'] = results.get(result['ds_min_oid']).get('value') try: value = compute_value(result) except Exception as exp: logger.warning("[SnmpBooster] [code 1005]" " [%s, %s] " "%s" % (key.get('host'), key.get('service'), str(exp))) value = None else: # For oid_type == ds_max or ds_min # No calculation or transformation needed # So value is raw_value value = raw_value else: # We got a SNMP error raw_value = None value = None # Save to database new_data = {"ds": {} } for ds_name in key.get('ds_names'): new_data["ds"][ds_name] = {} new_data["ds"][ds_name][key.get('oid_type') + "_value_last"] = result.get('value_last') new_data["ds"][ds_name][key.get('oid_type') + "_value"] = raw_value new_data["ds"][ds_name][key.get('oid_type') + "_value_computed"] = value new_data["ds"][ds_name][key.get('oid_type') + "_value_computed_last"] = result.get('value_last_computed') new_data["ds"][ds_name]["error"] = snmp_error new_data["check_time"] = result.get('check_time') new_data["check_time_last"] = result.get('check_time_last') self.db_client.update_service(key.get('host'), key.get('service'), new_data) # Remove task from queue self.result_queue.task_done()
# id = id of the worker # master_slave_queue = Global Queue Master->Slave # m = Queue Slave->Master # return_queue = queue managed by manager # control_queue = Control Queue for the worker
[docs] def work(self, master_slave_queue, returns_queue, control_queue): """ Main loop of SNMP Booster """ logger.info("[SnmpBooster] [code 1006] Module SNMP Booster started!") # restore default signal handler for the workers: signal.signal(signal.SIGTERM, signal.SIG_DFL) timeout = 1.0 self.checks = [] self.returns_queue = returns_queue self.master_slave_queue = master_slave_queue self.t_each_loop = time.time() self.snmpworker = SNMPWorker(self.task_queue, self.max_prepared_tasks) self.snmpworker.start() dt_start = datetime.now() dt_mid = dt_start.replace(hour=12, minute=0, second=0, microsecond=0) if dt_mid < dt_start: dt_mid = dt_mid + timedelta(days=1) while True: now = datetime.now() if 0 and now > dt_mid: logger.info('worker leaving..') break cmsg = None # Check snmp worker status if not self.snmpworker.is_alive(): # The snmpworker seems down ... # We respawn one self.snmpworker.join() self.snmpworker = SNMPWorker(self.task_queue, self.max_prepared_tasks) # and start it self.snmpworker.start() # If we are diyin (big problem!) we do not # take new jobs, we just finished the current one if not self.i_am_dying: # Get new checks to do self.get_new_checks() # Launch checks self.launch_new_checks() # Save collected datas from checks in mongodb self.save_results() # Prepare checks output self.manage_finished_checks() # Now get order from master try: cmsg = control_queue.get(block=False) if cmsg.get_type() == 'Die': # TODO : What is self.id undefined variable # logger.info("[SnmpBooster] [%d] # Dad say we are dying..." % self.id) logger.info("[SnmpBooster] [code 1007] FIX-ME-ID Parent " "requests termination.") break except Empty: pass # TODO : better time management time.sleep(.1)