Source code for centreonapi.webservice

# -*- coding: utf-8 -*-

import requests
import json
from requests.packages.urllib3.exceptions import InsecureRequestWarning



[docs]class Webservice(object): """ Class for call Centreon Web Rest webservices """ __instance = None def __new__(cls): """ Constructor with singleton for webservices """ if Webservice.__instance is None: Webservice.__instance = object.__new__(cls) Webservice.__instance.url = None Webservice.__instance.authuser = None Webservice.__instance.authpass = None Webservice.__instance.auth_token = None Webservice.__instance.check_ssl = True return Webservice.__instance
[docs] def load(self, url, username, password, check_ssl=True): """ Load configuration for webservices :param url: The Centreon Web URL :type url: String :param username: The username for connect to Centreon Web webservices :type username: String :param password: The password for connect to Centreon Web webservices :type password: String """ self.url = url self.authuser = username self.authpass = password self.check_ssl = check_ssl
[docs] def isLoaded(self): """ Test if webservices configuration is loaded :return: If the webservices configuration is loaded :rtype: Boolean """ if self.url is None: return False return True
[docs] def auth(self): """ Authenticate to the webservices """ request = requests.post( self.url + '/api/index.php?action=authenticate', data={ 'username': self.authuser, 'password': self.authpass }, verify=self.check_ssl ) request.raise_for_status() data = request.json() self.auth_token = data['authToken']
[docs] def call_clapi(self, action=None, obj=None, values=None): """ Call an endpoint of Centreon Web for clapi wrapper :param action: The clapi action :type action: String :param obj: The clapi object :type obj: String :param values: The values for the call :type values: mixed :return: The response of call :rtype: dict """ if self.auth_token is None: self.auth() data = {} if action is not None: data['action'] = action if obj is not None: data['object'] = obj if values is not None: data['values'] = values request = requests.post( self.url + '/api/index.php?action=action&object=centreon_clapi', headers={ 'Content-Type': 'application/json', 'centreon-auth-token': self.auth_token }, data=json.dumps(data), verify=self.check_ssl ) try: request.raise_for_status() return True, request.json() except requests.HTTPError as e: return False, json.dumps(['error', {'message': str(e)}, {'code': str(e.response.status_code), 'reason': str(e.response.reason), 'text': e.response.text}])
[docs] def centreon_realtime(self, action=None, obj=None, values=None): if self.auth_token is None: self.auth() data = {} obj_supported = ('hosts', 'services') if action is not None: data['action'] = action if obj is not None: if obj in obj_supported: data['object'] = obj else: raise ValueError("Only support <hosts> or <services>") if not self.check_ssl: # urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) requests.packages.urllib3\ .disable_warnings(InsecureRequestWarning) request = requests.get( self.url + '/api/index.php?object=centreon_realtime_' + obj + '&action=' + action, headers={ 'Content-Type': 'application/json', 'centreon-auth-token': self.auth_token }, params=values, verify=self.check_ssl ) request.raise_for_status() return request.json()
[docs] @staticmethod def getInstance(url=None, username=None, password=None, check_ssl=True): """ Get an unique instance of the webservices :param url: The Centreon Web URL :type url: String :param username: The username for connect to Centreon Web webservices :type username: String :param password: The password for connect to Centreon Web webservices :type password: String """ instance = Webservice() if instance.isLoaded(): return instance if url is None or username is None or password is None: raise KeyError('Missing parameters to load the Webservice') instance.load(url, username, password, check_ssl) return instance