ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

OSSIM-agent源代码分析(四)

2021-10-30 22:35:05  阅读:147  来源: 互联网

标签:string self value replace agent groups var 源代码 OSSIM


2021SC@SDUSC
OSSIM-agent源代码分析(四)

OSSIM-agent-config.py下

简述

OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式(standardized way)有序的发送给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行标准化处理,这样Server就可以依一种统一的方式来处理这些信息,并且也简化了Server的处理过程

agent的配置文件是config.py,作为大量数据的标准化处理位置,配置文件的对各种数据、文件的处理方式的

相关方法

在plugin.cfg文件中显示,各种常量的设置

    _NEEDED_CONFIG_ENTRIES = {
        'config': ['type', 'source', 'enable']
    }
    _EXIT_IF_MALFORMED_CONFIG = False
    TRANSLATION_SECTION = 'translation'
    TRANSLATION_FUNCTION = 'translate'
    TRANSLATION_DEFAULT = '_DEFAULT_'
    SECTIONS_NOT_RULES = ["config", "info", TRANSLATION_SECTION]

    CONCAT_FUNCTION = 'CONCAT'
    _MAP_REPLACE_TRANSLATIONS = 4
    _MAP_REPLACE_USER_FUNCTIONS = 8
    _MAP_REPLACE_CUSTOM_USER_FUNCTIONS = 16
    _MAP_REPLACE_CONCAT = 32      #00100000
    _MAP_REPLACE_TRANSLATE2 = 64  #1000000

检查所选部分是否为可以的翻译部分

    __regexIsTranslationSection = re.compile("translation-\S+", re.UNICODE)
    __regex_check_for_translate2_function = re.compile("\{translate2\((?P<variable>\$[^\)]+),(?P<translate_section>\$[^\)]+)\)\}", re.UNICODE)

设置配置、翻译等具体的规则

    def rules(self):
        rules = {}
        for section in sorted(self.sections()):
            regexp = self.get(section,"regexp")
            if self.get("config","source") == "log" and (regexp is None or regexp == ""):
                continue
            if Plugin.__regexIsTranslationSection.match(section.lower()) is not None:
                if section.lower() not in Plugin.SECTIONS_NOT_RULES:
                    logger.info("Loading new translation section... %s" % section.lower())
                    Plugin.SECTIONS_NOT_RULES.append(section.lower())
            if section.lower() not in Plugin.SECTIONS_NOT_RULES:
                rules[section] = self.hitems(section,True)
        return rules

通过正则表达式和位置变换进行变量替换

    def _replace_array_variables(self, value, groups):
        for i in range(2):
            search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
            rvalue = value
            if search != []:
                for string in search:
                    var = string[2:-1]
                    var_position = 0
                    try:
                        var_position= int(var)
                        if var_position < len(groups):
                            rvalue = rvalue.replace(string, str(groups[var_position]))
                    except ValueError,e:
                        rvalue = value
        return rvalue

替换变量、用户功能和自定义用户功能

 def get_replace_array_value(self, value, groups):  
        rvalue = ""
        rvalue = self._replace_array_variables(value, groups)
        if rvalue==value:
            rvalue = self._replace_user_array_functions(value, groups)
        if rvalue == value:
            rvalue = self._replace_custom_user_function_array(value,groups)
        return rvalue

使用位置参数替换自定义用户函数,并且检查所有变量是否有替换,以及调用函数在ParserUtil.py

    def _replace_custom_user_function_array(self,value,groups):
        search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)
                for var in vars:
                    var_pos =0
                    try:
                        var_pos = int(var)
                    except TypeError:
                        logger.warning("Can not replace '%s'" % var)
                        return value
                    if var_pos >= len(groups):
                        logger.warning("Can not replace '%s' " % var)
                        return value
                func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id"))
                if func_name != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(Plugin, func_name):
                    args = ",".join(["groups[%s]"%s  for s in vars])
                    try:
                        cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args)
                        exec cmd
                    except TypeError, e:
                        logger.error(e)
                else:
                    logger.warning( "Function '%s' is not implemented" % (func))
                    value = value.replace(string_matched, str(groups[var]))
        return value

替换位置函数,检查所有变量是否有替换,然后调用ParserUtil.py函数、getattr等函数执行替换功能

def _replace_user_array_functions(self, value, groups):

        if value is None:
            return None
        search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)

                for var in vars:

                    if len(groups) - 1 < int(var):
                        return value
                if func != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(ParserUtil, func):
                   
                    f = getattr(ParserUtil, func)

                    args = ""

                    for v in vars:
   
                        args += "str(groups[%s])," % (str(v))

                    try:
                        exec "value = value.replace(string_matched," + \
                            "str(f(" + args + ")))"
                    except TypeError, e:
                        logger.error(e)

                else:
                    
                    logger.debug("Tranlation fucntion...")
                    for v in vars:
                     .
                        if int(v) > (len(groups) - 1):
                            logger.debug("Error var:%d, is greatter than groups size. (%d)" % (int(v), len(groups)))
                        else:
                            var_value = groups[int(v)] 
                            if self.has_section(Plugin.TRANSLATION_SECTION):
                                if self.has_option(Plugin.TRANSLATION_SECTION, var_value):
                                    value = self.get(Plugin.TRANSLATION_SECTION, var_value)

        return value


通过正则表达式,查找配置参数中的_CFG(section,option)值,并将其替换为全局配置文件中的值

   def replace_config(self, conf):

        for section in sorted(self.sections()):
            for option in self.options(section):
                regexp = self.get(section, option)
                search = re.findall("(\\\\_CFG\(([\w-]+),([\w-]+)\))", regexp, re.UNICODE)
                if search != []:
                    for string in search:
                        (all, arg1, arg2) = string
                        if conf.has_option(arg1, arg2):
                            value = conf.get(arg1, arg2)
                            regexp = regexp.replace(all, value)
                            self.set(section, option, regexp)

总规则下,regexp条目中替换处理每一个反斜线

 def replace_aliases(self, aliases)
        for rule in self.rules().iterkeys():

            regexp = self.get(rule, 'regexp')
            search = re.findall("\\\\\w\w+", regexp, re.UNICODE)

            if search != []:
                for string in search:

                    repl = string[1:]
                    if aliases.has_option("regexp", repl):
                        value = aliases.get("regexp", repl)
                        regexp = regexp.replace(string, value)
                        self.set(rule, "regexp", regexp)

从get_replace_value()调用,通过正则替换变量

    def _replace_variables(self, value, groups, rounds=2):

        for i in range(rounds):
            search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
            if search != []:
                for string in search:
                    var = string[2:-1]
                    if groups.has_key(var):
                        value = value.replace(string, str(groups[var]))

        return value

确定替换变量是否成功,如果失败,则跳过get_replace_value()

    def _replace_variables_assess(self, value):
        ret = 0
        for i in range(2):
            search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
            if search != []:
                ret = i
        return ret

通过搜索字符串方式,替换指定插件及其对应插件

    def _replace_translations(self, value, groups):
        regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})"
        search = re.findall(regexp, value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, var) = string
                if groups.has_key(var):
                    if self.has_section(Plugin.TRANSLATION_SECTION):
                        if self.has_option(Plugin.TRANSLATION_SECTION,
                                           groups[var]):
                            value = self.get(Plugin.TRANSLATION_SECTION,
                                             groups[var])
                        else:
                            logger.warning("Can not translate '%s' value" % \
                                (groups[var]))
                            if self.has_option(Plugin.TRANSLATION_SECTION,
                                               Plugin.TRANSLATION_DEFAULT):
                                value = self.get(Plugin.TRANSLATION_SECTION,
                                                 Plugin.TRANSLATION_DEFAULT)
                            else:
                                value = groups[var]
                    else:
                        logger.warning("There is no translation section")
                        value = groups[var]

        return value

同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()

    def _replace_translations_assess(self, value):
        regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})"
        search = re.findall(regexp, value, re.UNICODE)
        if search != []:
            return self._MAP_REPLACE_TRANSLATIONS

        return 0

函数被指定为{f($v)},并且从get_replace_value()调用,执行替换功能

    def _replace_user_functions(self, value, groups):

        search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)
                for var in vars:

                    if not groups.has_key(var):
                        logger.warning("Can not replace '%s'" % (value))
                        return value

                if func != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(ParserUtil, func):

                  
                    f = getattr(ParserUtil, func)
                   
                    args = ""
                    for i in (range(len(vars))):
                        args += "groups[vars[%s]]," % (str(i))
 
                    try:
                        cmd = "value = value.replace(string_matched," + \
                              "str(f(" + args + ")))"
                        exec cmd
                    except TypeError, e:
                        logger.error(e)

                else:
                    logger.warning(
                        "Function '%s' is not implemented" % (func))

                    value = value.replace(string_matched, \
                                          str(groups[var]))

        return value

同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()

    def _replace_user_functions_assess(self, value):
        search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
        if search != []:
            return self._MAP_REPLACE_USER_FUNCTIONS
        
        return 0

检查所有变量是否有替换,如果没有一直替换到全部替换为止

  def _replace_custom_user_functions(self,value,groups):
        search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE)
        if search != []:
            for string in search:
                (string_matched, func, variables) = string
                vars = split_variables(variables)

                for var in vars:

                    if not groups.has_key(var):
                        logger.warning("Can not replace '%s'" % (var))
                        return value
                func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id"))
                if func_name != Plugin.TRANSLATION_FUNCTION and \
                   hasattr(Plugin, func_name):
                   
                    args = ""
                    for i in (range(len(vars))):
                        args += "groups[vars[%s]]," % (str(i))

                    try:
                        cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args)
                        exec cmd
                    except TypeError, e:
                        logger.error(e)

                else:
                    logger.warning(
                        "Function '%s' is not implemented" % (func))

                    value = value.replace(string_matched, \
                                          str(groups[var]))

        return value

替换数组变量连接的值

def __replaceConcatFunction(self,value,groups):

        concat =""
        m =  re.match("\$CONCAT\((?P<params>.*)\)",value)
        if m:
            mdict = m.groupdict()
            if mdict.has_key('params'):
                paramlist = mdict['params'].split(',')
                for param in paramlist:
                    if param.startswith('$'):#assume variable
                        if groups.has_key(param[1:]):
                            concat+=groups[param[1:]]
                        else:
                            concat+=param
                    else:
                        concat+=param
        return concat

检查是否有必要进行连接函数替换

 def __checkReplaceConcatFunction(self,value):
        ret = 0
        if re.match("\$CONCAT\((.*)\)",value):
            ret = self._MAP_REPLACE_CONCAT
        return ret

替换传递过来的对象中给定的值

 def __replace_translate2_function(self,value,groups):
        replaced_value =""
        m =  self.__regex_check_for_translate2_function.match(value)
        if m:
            mdict = m.groupdict()
            if 'variable' in mdict and 'translate_section' in mdict:
                variable = mdict['variable'].replace('$','')
                translate_section = mdict['translate_section'].replace('$','')
                if variable in groups:
                    if self.has_section(translate_section):
                        if self.has_option(translate_section,groups[variable]):
                            replaced_value = self.get(translate_section, groups[variable])
                            return replaced_value
                        else:
                            logger.warning("Translate2 variable(%s) not found on the translate section %s" % (groups[variable],translate_section))
                    else:
                        logger.warning("Translate2 translation_section(%s) not found" % translate_section)
        replaced_value = value

检查是否需要换translate2

   def __check_for_translate2_function(self,value):
        """Check whether we need to make a translate2 replacement or not. 
        @param value: the string in the rule
        """
        ret = 0
        if Plugin.__regex_check_for_translate2_function.match(value) is not None:
            ret = self._MAP_REPLACE_TRANSLATE2
        return ret

替换concat函数、自定义用户函数、用户函数、翻译、变量等

   def get_replace_value(self, value, groups, replace=15):
        if replace > 0:
            if replace & self._MAP_REPLACE_TRANSLATE2:
                value = self.__replace_translate2_function(value, groups)

            if replace & 3:
                value = self._replace_variables(value, groups, (replace & 3))

            if replace & 4:
                value = self._replace_translations(value, groups)

            if replace & 8:
                value = self._replace_user_functions(value, groups)

            if replace & self._MAP_REPLACE_CUSTOM_USER_FUNCTIONS:
                value = self._replace_custom_user_functions(value,groups)

            if replace & self._MAP_REPLACE_CONCAT:
                value = self.__replaceConcatFunction(value,groups)

        return value

类CommandLineOptions,用于配置各种数据

class CommandLineOptions:

    def __init__(self):

        self.__options = None

        parser = OptionParser(
            usage="%prog [-v] [-q] [-d] [-f] [-g] [-c config_file]",
            version="OSSIM (Open Source Security Information Management) " + \
                      "- Agent ")

        parser.add_option("-v", "--verbose", dest="verbose",
                          action="count",
                          help="verbose mode, makes lot of noise")
        parser.add_option("-d", "--daemon", dest="daemon", action="store_true",
                          help="Run agent in daemon mode")
        parser.add_option("-f", "--force", dest="force", action="store_true",
                          help="Force startup overriding pidfile")
        parser.add_option("-s", "--stats", dest="stats", type='choice', choices=['all', 'clients', 'plugins'], default=None,
                          help="Get stats about the agent")
        parser.add_option("-c", "--config", dest="config_file", action="store",
                          help="read config from FILE", metavar="FILE")
        (self.__options, args) = parser.parse_args()

        if len(args) > 1:
            parser.error("incorrect number of arguments")

        if self.__options.verbose and self.__options.daemon:
            parser.error("incompatible options -v -d")


    def get_options(self):
        return self.__options

进行数组等变量创建

def split_variables(string):
    return re.findall("(?:\$([^,\s]+))+", string)


def split_sids(string, separator=','):

    list = list_tmp = []

    list = string.split(separator)

    for sid in list:
        a = sid.split('-')
        if len(a) == 2:
            list.remove(sid)
            for i in range(int(a[0]), int(a[1]) + 1):
                list_tmp.append(str(i))

    list.extend(list_tmp)
    return list

标签:string,self,value,replace,agent,groups,var,源代码,OSSIM
来源: https://blog.csdn.net/aunt_y/article/details/121057735

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有