Chương 33: Logging và Detection
What to Log
import logging
import json
from datetime import datetime
class SecurityLogger:
def __init__(self):
self.logger = logging.getLogger('security')
def log_auth_event(self, event_type: str, user_id: str,
ip: str, success: bool, extra: dict = None):
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type, # 'login', 'logout', 'password_change'
'user_id': user_id,
'ip_address': ip,
'success': success,
'user_agent': extra.get('user_agent') if extra else None,
}
self.logger.info(json.dumps(event))
def log_access(self, user_id: str, resource: str,
action: str, allowed: bool):
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'access_control',
'user_id': user_id,
'resource': resource,
'action': action,
'allowed': allowed,
}
self.logger.info(json.dumps(event))
def log_suspicious(self, event_type: str, details: dict):
"""Log suspicious activity"""
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': f'suspicious_{event_type}',
'severity': 'high',
**details
}
self.logger.warning(json.dumps(event))
Security Events phải Log
Authentication:
- Login success/failure (với username, IP, user-agent)
- Logout
- Password change/reset
- MFA events
Authorization:
- Access denied events
- Privilege escalation attempts
- Admin actions
Application:
- Input validation failures (potential injection attempts)
- Rate limit exceeded
- Large data exports
- Unusual patterns (1 user request 1000 profiles)
Infrastructure:
- Deployment events
- Configuration changes
- Certificate expiry warnings
Không Log
# KHÔNG bao giờ log:
logger.info(f"User {username} logged in with password {password}") # ← LOG PASSWORD!
logger.debug(f"JWT token: {token}") # ← LOG TOKEN!
logger.info(f"Credit card: {card_number}") # ← LOG PII!
# Mask sensitive data
logger.info(f"User {username} logged in") # OK
logger.info(f"Payment processed for card ending in {card_number[-4:]}") # OK
logger.info(f"Token: {token[:8]}...") # OK nếu debug
ELK Stack cho Security Monitoring
# Elasticsearch + Logstash + Kibana
# Logstash pipeline
input {
beats {
port => 5044
}
}
filter {
# Parse JSON security events
json {
source => "message"
}
# Tag suspicious events
if [event_type] =~ /suspicious_/ {
mutate {
add_tag => ["alert"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "security-%{+YYYY.MM.dd}"
}
# Send alerts
if "alert" in [tags] {
http {
url => "https://hooks.slack.com/..."
method => "POST"
mapping => {
"text" => "Security Alert: %{event_type} - %{message}"
}
}
}
}
Detection Rules
# Alertmanager rules cho security events
groups:
- name: security
rules:
- alert: BruteForceDetected
expr: |
sum(rate(auth_failures_total[5m])) by (ip_address) > 10
for: 2m
annotations:
summary: "Brute force: {{ $labels.ip_address }}"
- alert: UnusualDataAccess
expr: |
sum(rate(api_requests_total{endpoint="/api/users"}[1m])) > 100
for: 1m
annotations:
summary: "Mass user data access detected"
- alert: SuspiciousAdminAction
expr: |
rate(admin_actions_total{action="delete"}[5m]) > 5
annotations:
summary: "High rate of admin delete actions"
Tóm tắt
- Log đủ để reconstruct attack: timestamp, user, IP, action, resource, success/fail.
- Không log: passwords, tokens, credit cards, PII.
- Centralized logging với ELK/Loki.
- Alerting cho brute force, mass data access, suspicious admin actions.
- SIEM correlate events từ nhiều nguồn.