Source code for shinken.http_daemon

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# Copyright (C) 2009-2012:
#     Gabes Jean,
#     Gerhard Lausser,
#     Gregory Starck,
#     Hartmut Goebel,
# This file is part of Shinken.
# Shinken is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Shinken is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with Shinken.  If not, see <>.

import select
import errno
import time
import socket
import select
import copy
import cPickle
import inspect
import json
import zlib
import threading
    import ssl
except ImportError:
    ssl = None

    from cherrypy import wsgiserver as cheery_wsgiserver
except ImportError:
    cheery_wsgiserver = None

from wsgiref import simple_server

from log import logger

# Let's load bottlecore! :)
from shinken.webui import bottlecore as bottle

[docs]class InvalidWorkDir(Exception): pass
[docs]class PortNotFree(Exception): pass # CherryPy is allowing us to have a HTTP 1.1 server, and so have a KeepAlive
[docs]class CherryPyServer(bottle.ServerAdapter):
[docs] def run(self, handler): # pragma: no cover daemon_thread_pool_size = self.options['daemon_thread_pool_size'] server = cheery_wsgiserver.CherryPyWSGIServer((, self.port), handler, numthreads=daemon_thread_pool_size, shutdown_timeout=1)'Initializing a CherryPy backend with %d threads' % daemon_thread_pool_size) use_ssl = self.options['use_ssl'] ca_cert = self.options['ca_cert'] ssl_cert = self.options['ssl_cert'] ssl_key = self.options['ssl_key'] if use_ssl: server.ssl_certificate = ssl_cert server.ssl_private_key = ssl_key return server
[docs]class CherryPyBackend(object): def __init__(self, host, port, use_ssl, ca_cert, ssl_key, ssl_cert, hard_ssl_name_check, daemon_thread_pool_size): self.port = port self.use_ssl = use_ssl try: self.srv =, port=port, server=CherryPyServer, quiet=False, use_ssl=use_ssl, ca_cert=ca_cert, ssl_key=ssl_key, ssl_cert=ssl_cert, daemon_thread_pool_size=daemon_thread_pool_size) except socket.error, exp: msg = "Error: Sorry, the port %d is not free: %s" % (self.port, str(exp)) raise PortNotFree(msg) except Exception, e: # must be a problem with pyro workdir: raise InvalidWorkDir(e) # When call, it do not have a socket
[docs] def get_sockets(self): return [] # We stop our processing, but also try to hard close our socket as cherrypy is not doing it...
[docs] def stop(self): #TODO: find why, but in ssl mode the stop() is locking, so bailout before if self.use_ssl: return try: self.srv.stop() except Exception, exp: logger.warning('Cannot stop the CherryPy backend : %s' % exp) # Will run and LOCK
[docs] def run(self): try: self.srv.start() except socket.error, exp: msg = "Error: Sorry, the port %d is not free: %s" % (self.port, str(exp)) raise PortNotFree(msg) finally: self.srv.stop() # WSGIRef is the default HTTP server, it CAN manage HTTPS, but at a Huge cost for the client, because it's only HTTP1.0 # so no Keep-Alive, and in HTTPS it's just a nightmare
[docs]class WSGIREFAdapter (bottle.ServerAdapter):
[docs] def run (self, handler): daemon_thread_pool_size = self.options['daemon_thread_pool_size'] from wsgiref.simple_server import WSGIRequestHandler LoggerHandler = WSGIRequestHandler if self.quiet: class QuietHandler(WSGIRequestHandler): def log_request(*args, **kw): pass LoggerHandler = QuietHandler srv = simple_server.make_server(, self.port, handler, handler_class=LoggerHandler)'Initializing a wsgiref backend with %d threads' % daemon_thread_pool_size) use_ssl = self.options['use_ssl'] ca_cert = self.options['ca_cert'] ssl_cert = self.options['ssl_cert'] ssl_key = self.options['ssl_key'] if use_ssl: if not ssl: logger.error("Missing python-openssl librairy, please install it to open a https backend") raise Exception("Missing python-openssl librairy, please install it to open a https backend") srv.socket = ssl.wrap_socket(srv.socket, keyfile=ssl_key, certfile=ssl_cert, server_side=True) return srv
[docs]class WSGIREFBackend(object): def __init__(self, host, port, use_ssl, ca_cert, ssl_key, ssl_cert, hard_ssl_name_check, daemon_thread_pool_size): self.daemon_thread_pool_size = daemon_thread_pool_size try: self.srv =, port=port, server=WSGIREFAdapter, quiet=True, use_ssl=use_ssl, ca_cert=ca_cert, ssl_key=ssl_key, ssl_cert=ssl_cert, daemon_thread_pool_size=daemon_thread_pool_size) except socket.error, exp: msg = "Error: Sorry, the port %d is not free: %s" % (port, str(exp)) raise PortNotFree(msg) except Exception, e: # must be a problem with pyro workdir: raise e
[docs] def get_sockets(self): if self.srv.socket: return [self.srv.socket] else: return []
[docs] def get_socks_activity(self, socks, timeout): try: ins, _, _ =, [], [], timeout) except select.error, e: errnum, _ = e if errnum == errno.EINTR: return [] raise return ins # We are asking us to stop, so we close our sockets
[docs] def stop(self): for s in self.get_sockets(): try: s.close() except: pass self.srv.socket = None # Manually manage the number of threads
[docs] def run(self): # Ok create the thread nb_threads = self.daemon_thread_pool_size # Keep a list of our running threads threads = []'Using a %d http pool size' % nb_threads) while True: # We must not run too much threads, so we will loop until # we got at least one free slot available free_slots = 0 while free_slots <= 0: to_del = [t for t in threads if not t.is_alive()] _ = [t.join() for t in to_del] for t in to_del: threads.remove(t) free_slots = nb_threads - len(threads) if free_slots <= 0: time.sleep(0.01) socks = self.get_sockets() # Blocking for 0.1 s max here ins = self.get_socks_activity(socks, 0.1) if len(ins) == 0: # trivial case: no fd activity: continue # If we got activity, Go for a new thread! for sock in socks: if sock in ins: # GO! t = threading.Thread(None, target=self.handle_one_request_thread, name='http-request', args=(sock,)) # We don't want to hang the master thread just because this one is still alive t.daemon = True t.start() threads.append(t)
[docs] def handle_one_request_thread(self, sock): self.srv.handle_request()
[docs]class HTTPDaemon(object): def __init__(self, host, port, http_backend, use_ssl, ca_cert, ssl_key, ssl_cert, hard_ssl_name_check, daemon_thread_pool_size): self.port = port = host # Port = 0 means "I don't want HTTP server" if self.port == 0: return self.use_ssl = use_ssl self.registered_fun = {} self.registered_fun_names = [] self.registered_fun_defaults = {} protocol = 'http' if use_ssl: protocol = 'https' self.uri = '%s://%s:%s' % (protocol,, self.port)"Opening HTTP socket at %s" % self.uri) # Hack the BaseHTTPServer so only IP will be looked by wsgiref, and not names __import__('BaseHTTPServer').BaseHTTPRequestHandler.address_string = lambda x:x.client_address[0] if http_backend == 'cherrypy' or http_backend == 'auto' and cheery_wsgiserver: self.srv = CherryPyBackend(host, port, use_ssl, ca_cert, ssl_key, ssl_cert, hard_ssl_name_check, daemon_thread_pool_size) else: self.srv = WSGIREFBackend(host, port, use_ssl, ca_cert, ssl_key, ssl_cert, hard_ssl_name_check, daemon_thread_pool_size) self.lock = threading.RLock() #@bottle.error(code=500) #def error500(err): # print err.__dict__ # return 'FUCKING ERROR 500', str(err) # Get the server socket but not if disabled or closed
[docs] def get_sockets(self): if self.port == 0 or self.srv is None: return [] return self.srv.get_sockets()
[docs] def run(self):
[docs] def register(self, obj): methods = inspect.getmembers(obj, predicate=inspect.ismethod) merge = [fname for (fname, f) in methods if fname in self.registered_fun_names ] if merge != []: methods_in = [m.__name__ for m in obj.__class__.__dict__.values() if inspect.isfunction(m)] methods = [m for m in methods if m[0] in methods_in] print "picking only bound methods of class and not parents" print "List to register :%s" % methods for (fname, f) in methods: if fname.startswith('_'): continue # Get the args of the function to catch them in the queries argspec = inspect.getargspec(f) args = argspec.args varargs = argspec.varargs keywords = argspec.keywords defaults = argspec.defaults # If we got some defauts, save arg=value so we can lookup # for them after if defaults: default_args = zip(argspec.args[-len(argspec.defaults):],argspec.defaults) _d = {} for (argname, defavalue) in default_args: _d[argname] = defavalue self.registered_fun_defaults[fname] = _d # remove useless self in args, because we alredy got a bonded method f if 'self' in args: args.remove('self') print "Registering", fname, args, obj self.registered_fun_names.append(fname) self.registered_fun[fname] = (f) # WARNING : we MUST do a 2 levels function here, or the f_wrapper # will be uniq and so will link to the last function again # and again def register_callback(fname, args, f, obj, lock): def f_wrapper(): t0 = time.time() args_time = aqu_lock_time = calling_time = json_time = 0 need_lock = getattr(f, 'need_lock', True) # Warning : put the bottle.response set inside the wrapper # because outside it will break bottle d = {} method = getattr(f, 'method', 'get').lower() for aname in args: v = None if method == 'post': v = bottle.request.forms.get(aname, None) # Post args are zlibed and cPickled if v is not None: v = zlib.decompress(v) v = cPickle.loads(v) elif method == 'get': v = bottle.request.GET.get(aname, None) if v is None: # Maybe we got a default value? default_args = self.registered_fun_defaults.get(fname, {}) if not aname in default_args: raise Exception('Missing argument %s' % aname) v = default_args[aname] d[aname] = v args_time = time.time() - t0 if need_lock: logger.debug("HTTP: calling lock for %s" % fname) lock.acquire() aqu_lock_time = time.time() - t0 try: ret = f(**d) # Always call the lock release if need finally: # Ok now we can release the lock if need_lock: lock.release() calling_time = time.time() - t0 encode = getattr(f, 'encode', 'json').lower() j = json.dumps(ret) json_time = time.time() - t0 logger.debug("Debug perf: %s [args:%s] [aqu_lock:%s] [calling:%s] [json:%s]" % ( fname, args_time, aqu_lock_time, calling_time, json_time) ) return j # Ok now really put the route in place bottle.route('/'+fname, callback=f_wrapper, method=getattr(f, 'method', 'get').upper()) # and the name with - instead of _ if need fname_dash = fname.replace('_', '-') if fname_dash != fname: bottle.route('/'+fname_dash, callback=f_wrapper, method=getattr(f, 'method', 'get').upper()) register_callback(fname, args, f, obj, self.lock) # Add a simple / page def slash(): return "OK" bottle.route('/', callback=slash)
[docs] def unregister(self, obj): return
[docs] def handleRequests(self, s): self.srv.handle_request()
[docs] def create_uri(address, port, obj_name, use_ssl=False): return "PYRO:%s@%s:%d" % (obj_name, address, port)
[docs] def set_timeout(con, timeout): con._pyroTimeout = timeout # Close all sockets and delete the server object to be sure # no one is still alive
[docs] def shutdown(self): self.srv.stop() self.srv = None
[docs] def get_socks_activity(self, timeout): try: ins, _, _ =, [], [], timeout) except select.error, e: errnum, _ = e if errnum == errno.EINTR: return [] raise return ins
daemon_inst = None
