import json import os import enum import datetime import threading import time import signal import sys from typing import Dict, Any, Optional, Union, List from pathlib import Path import requests from flask import Flask, request, jsonify import multiprocessing as mp from queue import Queue, Empty import logging class LogType(enum.Enum): """Enum for different types of logs.""" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" DEBUG = "DEBUG" CRITICAL = "CRITICAL" class LogServer: """ HTTP-based log server that accepts log entries via REST API and writes them to date-based files. """ DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 9020 DEFAULT_DIR = "logger" def __init__( self, log_dir: str = DEFAULT_DIR, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, ): """ Initialize the log server. Args: log_dir (str): Directory to store log files host (str): Host to bind the HTTP server port (int): Port to bind the HTTP server """ self.log_dir = Path(log_dir) self.host = host self.port = port # Create log directory if it doesn't exist os.makedirs(self.log_dir, exist_ok=True) # Set up Flask app self.app = Flask("LogServer") # Configure Flask logging to be minimal flask_logger = logging.getLogger("werkzeug") flask_logger.setLevel(logging.ERROR) # Set up routes self.app.add_url_rule("/log", "log", self.handle_log, methods=["POST"]) self.app.add_url_rule("/health", "health", self.health_check, methods=["GET"]) # Queue for async processing self.log_queue = Queue() self.processing_thread = None self.should_exit = threading.Event() # Current log file information self.current_date = None self.current_file = None def start(self): """Start the log server.""" # Start the processing thread self.processing_thread = threading.Thread(target=self.process_logs) self.processing_thread.daemon = True self.processing_thread.start() # Start the Flask app print(f"Log server starting on {self.host}:{self.port}") print("Press Ctrl+C to stop the server") # Run the Flask app self.app.run( host=self.host, port=self.port, threaded=True, debug=True, use_reloader=False, ) def health_check(self): """Simple health check endpoint.""" return jsonify({"status": "ok", "message": "Log server is running"}) def handle_log(self): """Handle a log request.""" try: # Get the log data from the request log_data = request.json print(f"New log req came: {log_data}") # Validate the log data if not log_data or not isinstance(log_data, dict): return ( jsonify({"status": "error", "message": "Invalid log data format"}), 400, ) # Add to the queue for processing self.log_queue.put(log_data) return jsonify({"status": "ok", "message": "Log entry received"}), 200 except Exception as e: print(f"Error handling log request: {e}") return jsonify({"status": "error", "message": str(e)}), 500 def process_logs(self): """Process logs from the queue in a separate thread.""" while not self.should_exit.is_set(): try: # Get a log entry from the queue try: log_data = self.log_queue.get(timeout=1.0) self.write_log(log_data) self.log_queue.task_done() except Empty: # No log entries in queue, just continue continue except Exception as e: print(f"Error processing logs: {e}") def write_log(self, log_data: Dict[str, Any]): """Write a log entry to the appropriate file.""" try: # Check if we need to open a new file for a new day log_time = datetime.datetime.fromisoformat(log_data["time"]) log_date = log_time.date() if log_date != self.current_date: self.current_date = log_date log_filename = f"app-{log_date.strftime('%Y-%m-%d')}.log" log_path = self.log_dir / log_filename if self.current_file: self.current_file.close() self.current_file = open(log_path, "a", encoding="utf-8") print(f"Opened new log file: {log_path}") # Write log entry json_line = json.dumps(log_data) + "\n" self.current_file.write(json_line) self.current_file.flush() except Exception as e: print(f"Error writing log: {e}") def shutdown(self): """Shut down the log server.""" self.should_exit.set() if self.processing_thread and self.processing_thread.is_alive(): self.processing_thread.join(timeout=2.0) if self.current_file: self.current_file.close() class AsyncLogger: """ Client for the log server. Sends log messages via HTTP requests. Includes a local queue for batching and retry logic. """ def __init__( self, app_name: str, default_user: str = "system", server_url: str = None, host: str = LogServer.DEFAULT_HOST, port: int = LogServer.DEFAULT_PORT, batch_size: int = 10, flush_interval: float = 1.0, max_retries: int = 3, retry_delay: float = 1.0, ): """ Initialize the logger client. Args: app_name (str): Name of the application using the logger default_user (str): Default user for logs when not specified server_url (str): Full URL to the log server (overrides host/port if provided) host (str): Host of the log server port (int): Port of the log server batch_size (int): Number of logs to batch before sending flush_interval (float): Maximum time to wait before sending logs max_retries (int): Maximum number of retry attempts retry_delay (float): Initial delay between retries (will be exponential) """ self.app_name = app_name self.default_user = default_user if server_url: self.server_url = server_url else: self.server_url = f"http://{host}:{port}/log" self.batch_size = batch_size self.flush_interval = flush_interval self.max_retries = max_retries self.retry_delay = retry_delay # Set up a session for connection pooling self.session = requests.Session() # Queue for batching logs self.log_queue = Queue() self.queue_lock = threading.Lock() # Flag to signal shutdown self.should_exit = threading.Event() # Background thread for sending logs self.flush_thread = threading.Thread(target=self._flush_worker) self.flush_thread.daemon = True self.flush_thread.start() def log( self, log_type: Union[LogType, str], title: str, info: Any = None, user: Optional[str] = None, ): """ Log a message. Args: log_type (LogType or str): Type of log title (str): Title of the log message info (Any, optional): Additional information to log user (str, optional): User associated with the log """ # Convert string log_type to enum if needed if isinstance(log_type, str): try: log_type = LogType[log_type.upper()] except KeyError: log_type = LogType.INFO # Format the log entry log_data = { "time": datetime.datetime.now().isoformat(), "logType": ( log_type.value if isinstance(log_type, LogType) else str(log_type) ), "app": self.app_name, "user": user or self.default_user, "title": title, "info": info, } # Add to queue for batching with self.queue_lock: self.log_queue.put(log_data) # If queue reaches batch size, trigger a flush if self.log_queue.qsize() >= self.batch_size: self._flush_logs() def _flush_worker(self): """Worker thread that periodically flushes the log queue.""" while not self.should_exit.is_set(): # Sleep for the flush interval time.sleep(self.flush_interval) # Flush any pending logs self._flush_logs() def _flush_logs(self): """Flush any pending logs to the server.""" logs_to_send = [] # Gather logs from the queue with self.queue_lock: while not self.log_queue.empty() and len(logs_to_send) < self.batch_size: try: logs_to_send.append(self.log_queue.get_nowait()) self.log_queue.task_done() except Empty: break # If we have logs to send if logs_to_send: # If only one log, send it directly if len(logs_to_send) == 1: self._send_log(logs_to_send[0]) else: # Send as a batch self._send_batch(logs_to_send) def _send_log(self, log_data: Dict[str, Any]): """Send a single log to the server with retry logic.""" for attempt in range(self.max_retries): try: response = self.session.post(self.server_url, json=log_data, timeout=5) if response.status_code == 200: return True else: print( f"Error sending log: HTTP {response.status_code} - {response.text}" ) except Exception as e: print(f"Error sending log: {e}") # If we get here, the request failed if attempt < self.max_retries - 1: # Wait with exponential backoff wait_time = self.retry_delay * (2**attempt) time.sleep(wait_time) # If we exhausted retries, re-queue the log if not shutting down if not self.should_exit.is_set(): with self.queue_lock: self.log_queue.put(log_data) return False def _send_batch(self, logs: List[Dict[str, Any]]): """Send a batch of logs to the server.""" # For simplicity, we'll just send them one by one # In a production system, you might implement a batch endpoint for log_data in logs: self._send_log(log_data) # Convenience methods for different log types def info(self, title: str, info: Any = None, user: Optional[str] = None): """Log an INFO level message.""" self.log(LogType.INFO, title, info, user) def warning(self, title: str, info: Any = None, user: Optional[str] = None): """Log a WARNING level message.""" self.log(LogType.WARNING, title, info, user) def error(self, title: str, info: Any = None, user: Optional[str] = None): """Log an ERROR level message.""" self.log(LogType.ERROR, title, info, user) def debug(self, title: str, info: Any = None, user: Optional[str] = None): """Log a DEBUG level message.""" self.log(LogType.DEBUG, title, info, user) def critical(self, title: str, info: Any = None, user: Optional[str] = None): """Log a CRITICAL level message.""" self.log(LogType.CRITICAL, title, info, user) def shutdown(self): """Clean up resources and flush any remaining logs.""" self.should_exit.set() # Flush any remaining logs self._flush_logs() # Wait for the flush thread to exit if self.flush_thread.is_alive(): self.flush_thread.join(timeout=2.0) # Close the session self.session.close() class Logger: _instance = None mlogger = None def __new__(cls, app="server"): if cls._instance is None: cls._instance = super().__new__(cls) if cls.mlogger == None: cls.mlogger = AsyncLogger(app) return cls._instance # def __init__(self, app="server"): # if self.mlogger == None: # self.mlogger = AsyncLogger(app) default_log_host = LogServer.DEFAULT_HOST default_log_port = LogServer.DEFAULT_PORT def start_log_server( log_dir=LogServer.DEFAULT_DIR, host=default_log_host, port=default_log_port ): """ Start the log server in a separate process. Args: log_dir (str): Directory to store log files host (str): Host to bind the server to port (int): Port to bind the server to Returns: mp.Process: The process running the log server """ server_process = mp.Process( target=_run_log_server, args=(log_dir, host, port), daemon=True ) server_process.start() # Wait a moment to ensure the server is up time.sleep(2) return server_process def _run_log_server(log_dir, host, port): """Target function for the server process.""" server = LogServer(log_dir=log_dir, host=host, port=port) server.start() # Example usage if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Run either the log server or example clients" ) parser.add_argument( "mode", choices=["server", "client"], help="Run as server or client" ) parser.add_argument( "--log-dir", default=LogServer.DEFAULT_DIR, help="Directory to store log files" ) parser.add_argument( "--host", default=default_log_host, help="Host for the log server" ) parser.add_argument( "--port", type=int, default=default_log_port, help="Port for the log server" ) args = parser.parse_args() if args.mode == "server": # Run as server server = LogServer(log_dir=args.log_dir, host=args.host, port=args.port) server.start() else: # Run as client example try: # Create a few loggers to simulate different components logger1 = AsyncLogger(app_name="testapp", host=args.host, port=args.port) # logger2 = AsyncLogger( # app_name="DatabaseService", host=args.host, port=args.port # ) # logger3 = AsyncLogger( # app_name="AuthService", host=args.host, port=args.port # ) # Log some messages logger1.info("Test Log", user="none") # logger2.warning("Database connection slow", {"query_time": "250ms"}) # logger3.error( # "Authentication failed", # {"user_id": "user123", "reason": "invalid_token"}, # ) # logger1.debug("Processing request", {"request_id": "abc123"}) # logger2.info("Query executed", {"table": "users", "rows": 15}) # logger3.critical("Security breach detected", {"source_ip": "192.168.1.100"}) # Wait to ensure logs are processed time.sleep(3) finally: # Force flush any queued logs if "logger1" in locals(): logger1.shutdown() # if "logger2" in locals(): # logger2.shutdown() # if "logger3" in locals(): # logger3.shutdown()