AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.133.159.125
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/library/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /home/internationaljou/public_html/admin/js/BROKY_ADMIN/alfasymlink/root/lib/panopta-agent/library/log_matcher.py
import re
from datetime import datetime, timedelta
import logging
import os


class LogMatcher(object):
    """
    Handles the matching of filter in the logs.
    """

    def __init__(self, inode):
        """
        @param file_creation: last known creation timestamp of the log file.
        """
        self._inode = inode

    def match(self, lines, expression, results=None):
        """
        Match the ampount of times expression is present in lines,
        return the results with the new entry appended to it.

        @param expression: String,
        @param lines: Iterable
        @param results: List of past results
        """
        if not results:
            results = []
        try:
            self._valid_inode()
        except ValueError:
            import sys
            _, error, _ = sys.exc_info()
            logging.info(error)
            return []
        else:
            def find_match(line):
                match = re.search(expression, line)
                return match and line or False

            matched_lines = list(filter(find_match, lines))
            results.append((datetime.now(), len(matched_lines)))
            return results

    def match_in_column(self, lines, expression, column, results=None):
        """
        Return the number of lines that the where the column is
        equal to the expression by splitting the lines.

        @param lines: Array of lines to split and search.
        @param expression: Regular expression to match agains the specified column
        @param column: Column number to separate from the regular line.
        @param results: List of past results
        """
        if not results:
            results = []
        try:
            self._valid_inode()
        except ValueError:
            import sys
            _, error, _ = sys.exc_info()
            logging.info(error)
            return []
        splitted_lines = [line.split() for line in lines]

        def find_match(line):
            data_point = line[column]
            match = re.search(expression, data_point)
            return match and line or False
        matched_lines = list(filter(find_match, splitted_lines))
        results.append((datetime.now(), len(matched_lines)))
        return results

    def _valid_inode(self):
        """
        Validate that we have an inode. If we dont
        that means we are running the check for the first time,
        and don't have enought information to calculate the matchs.
        """
        if self._inode is None:
            raise ValueError('Inode is None. Returning None')

    def calculate_metric(self, results, timescale):
        """
        Check the results and the timescale to determine if a metric
        should be given. The delta between now and the last results time
        must be greater than the timescale to properly calculate the metric.

        @param results: List of past results.
        @param timescale: Integer of the buffer size to take into
        consideration.
        @param last_results_time: Datetime
        """
        total_sum = 0
        valid_results = []
        for timestamp, result in results:
            delta = datetime.now() - timestamp
            if delta < timedelta(minutes=timescale):
                total_sum += result
                valid_results.append((timestamp, result))
        if not self._inode:
            return None, []
        return total_sum, valid_results

    @staticmethod
    def get_file_lines(last_known_line_number, source, current_inode,
                       stored_inode):
        """
        Grab the lines from the last known line number to the
        end of the file.
        """
        expected_lines = []
        index = 0
        total_lines = 0
        opened_file = open(source, 'r')
        if stored_inode is None:
            for index, line in enumerate(opened_file):
                pass
            total_lines = index
        else:
            for index, line in enumerate(opened_file):
                if index > last_known_line_number:
                    expected_lines.append(line)
            total_lines = index
        opened_file.close()

        return total_lines, expected_lines

    @staticmethod
    def get_file_inode(source):
        """
        Grab the file created timstamp.
        """
        return os.stat(source).st_ino

Anon7 - 2022
AnonSec Team