403Webshell
Server IP : 192.64.118.117  /  Your IP : 18.220.97.0
Web Server : LiteSpeed
System : Linux premium56.web-hosting.com 4.18.0-513.24.1.lve.1.el8.x86_64 #1 SMP Thu May 9 15:10:09 UTC 2024 x86_64
User : thecgapy ( 1160)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /lib64/nagios/plugins/nccustom/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib64/nagios/plugins/nccustom/check-unexpected-systemd-services.py
#!/usr/libexec/platform-python
# -*- coding: utf-8 -*-
"""
Nagios plugin to check for unexpected systemd unit files.
"""
import sys
import os
from pathlib import Path
from datetime import datetime, timedelta
import argparse
from typing import Set, List, Dict, Tuple, Optional

# Determine script directory
SCRIPT_DIR = Path(__file__).resolve().parent

# Nagios exit codes
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

# Constants
WHITELIST_FILES: List[Path] = [
    SCRIPT_DIR / "systemd_services_whitelist",
    SCRIPT_DIR / "systemd_targets_whitelist",
    SCRIPT_DIR / "systemd_scopes_whitelist",
]
LOG_PATH = Path("/var/log/unexpected-systemd-services.log")
TRANSIENT_DIR = Path("/run/systemd/transient")


def log_to_file(message: str, severity: str):
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    with LOG_PATH.open('a') as f:
        f.write(f"{timestamp} - {severity.upper()} - {message}\n")


def parse_args():
    p = argparse.ArgumentParser(description="Nagios check for unexpected systemd units")
    p.add_argument('-p', '--pattern', required=True,
                   help="Comma-separated extensions (e.g. 'service,scope')")
    p.add_argument('-d', '--dirs-file', required=True,
                   help="File listing directories to scan")
    p.add_argument('-m', '--minutes', type=int, default=5,
                   help="Transient age threshold in minutes")
    p.add_argument('-v', '--verbose', action='store_true',
                   help="Enable verbose output")
    return p.parse_args()


def load_whitelist() -> Set[str]:
    entries: Set[str] = set()
    for path in WHITELIST_FILES:
        try:
            with path.open() as f:
                for line in f:
                    line = line.strip()
                    if line:
                        entries.add(line)
        except Exception as e:
            print(f"CRITICAL: cannot read whitelist {path}: {e}")
            sys.exit(CRITICAL)
    if not entries:
        print("CRITICAL: no whitelist entries loaded")
        sys.exit(CRITICAL)
    return entries


def scan_dir(directory: Path, exts: List[str]) -> Set[str]:
    found = set()
    try:
        for entry in os.scandir(directory):
            if entry.is_file() and any(entry.name.endswith(ext) for ext in exts):
                found.add(entry.name)
    except OSError as e:
        print(f"CRITICAL: cannot scan {directory}: {e}")
        sys.exit(CRITICAL)
    return found


def handle_transient(pattern_exts: List[str], allowed: Set[str], age_th: timedelta) -> Tuple[Set[str], List[Tuple[str, str]]]:
    """
    Return (skipped_set, critical_list) from the transient directory.
    """
    skipped: Set[str] = set()
    critical: List[Tuple[str, str]] = []  # list of (name, age_str)
    now = datetime.now()
    threshold_min = int(age_th.total_seconds() // 60)
    try:
        for entry in os.scandir(TRANSIENT_DIR):
            if not entry.is_file():
                continue
            name = entry.name
            if name in allowed or not any(name.endswith(ext) for ext in pattern_exts):
                continue
            try:
                created = datetime.fromtimestamp(entry.stat().st_ctime)
            except Exception:
                created = now
            age = now - created
            age_str = str(age).split('.')[0]
            if age < age_th:
                skipped.add(name)
                log_to_file(f"File {TRANSIENT_DIR}/{name} age={age_str} < {threshold_min}m", 'WARNING')
            else:
                critical.append((name, age_str))
#                log_to_file(f"File {TRANSIENT_DIR}/{name} age={age_str} > {threshold_min}m", 'CRITICAL')
    except OSError as e:
        print(f"CRITICAL: cannot scan transient dir {TRANSIENT_DIR}: {e}")
        sys.exit(CRITICAL)
    return skipped, critical


def main():
    args = parse_args()
    # prepare log
    try:
        LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
        LOG_PATH.touch(exist_ok=True)
        os.chmod(str(LOG_PATH), 0o600)
    except Exception as e:
        print(f"CRITICAL: log setup failed: {e}")
        sys.exit(CRITICAL)

    allowed = load_whitelist()
    exts = ['.' + p.strip() for p in args.pattern.split(',')]
    age_th = timedelta(minutes=args.minutes)

    # read dirs, strip trailing slashes
    dirs_file = Path(args.dirs_file)
    if not dirs_file.is_absolute():
        dirs_file = SCRIPT_DIR / dirs_file
    try:
        with dirs_file.open() as f:
            dirs = [line.strip().rstrip('/') for line in f if line.strip()]
    except Exception as e:
        print(f"CRITICAL: cannot read dirs file: {e}")
        sys.exit(CRITICAL)

    unexpected: Dict[Path, List[Tuple[str, Optional[str]]]] = {}
    for d in dirs:
        dpath = Path(d)
        if not dpath.is_absolute():
            dpath = SCRIPT_DIR / dpath
        dpath = dpath.resolve()
        services = scan_dir(dpath, exts)
        bad = services - allowed
        details: List[Tuple[str, Optional[str]]] = []
        if dpath == TRANSIENT_DIR:
            skipped, critical_list = handle_transient(exts, allowed, age_th)
            bad.difference_update(skipped)
            for name, age_str in critical_list:
                if name in bad:
                    details.append((name, age_str))
        else:
            for name in sorted(bad):
                details.append((name, None))
                log_to_file(f"DIR={dpath} OBJECT={name}", 'CRITICAL')
        if details:
            unexpected[dpath] = details

    if unexpected:
        print("CRITICAL: Unexpected systemd units found")
        for d, items in unexpected.items():
            formatted = [f"'{name}'" for name, _ in items]
            print(f"Directory: {d}")
            print(f"Units: [{', '.join(formatted)}]")
        sys.exit(CRITICAL)
    else:
        print("OK: No unexpected systemd units detected")
        sys.exit(OK)

if __name__ == '__main__':
    main()

Youez - 2016 - github.com/yon3zu
LinuXploit