class SIEMLogger:
"""Stochastic SIEM event generator."""
def __init__(self, seed: int | None = None):
self._rng = random.Random(seed)
def log_action(
self,
action_name: str,
effect: 'ActionEffect',
global_state: 'GlobalNetworkState',
agent_id: str,
target_ip: str | None = None,
) -> str | None:
"""Potentially generate a SIEM log line for this action's outcome."""
p_threshold = P_LOG_ON_SUCCESS if effect.success else P_LOG_ON_FAILURE
if self._rng.random() > p_threshold:
return None # This action was not detected / logged
# Pick a source IP — prefer the agent's known foothold in DMZ
src_ip = self._infer_src_ip(agent_id, global_state)
tgt_ip = target_ip or src_ip
log_line = self._generate_event(action_name, src_ip, tgt_ip)
if log_line:
# Determine subnet for filtering
host = global_state.all_hosts.get(tgt_ip)
subnet = host.subnet_cidr if host else 'unknown'
self._push_to_buffer(log_line, subnet, global_state)
return log_line
def log_background_noise(self, global_state: 'GlobalNetworkState') -> None:
"""Inject benign background network activity every tick."""
if self._rng.random() > P_BACKGROUND_NOISE:
return
# Pick two random live hosts and generate a benign connection event
live_hosts = [
h
for h in global_state.all_hosts.values()
if h.status == 'online' and '169.254' not in h.ip
]
if len(live_hosts) < 2:
return
src, dst = self._rng.sample(live_hosts, 2)
# Sample a benign template from the default bucket
templates = ACTION_EVENT_MAP.get('_default', [])
if not templates:
return
weights, callables = zip(*templates)
total = sum(weights)
norm_weights = [w / total for w in weights]
chosen = self._rng.choices(callables, weights=norm_weights, k=1)[0]
log_line = chosen(src.ip, dst.ip)
self._push_to_buffer(f'[BACKGROUND] {log_line}', src.subnet_cidr, global_state)
def get_recent_logs(
self,
global_state: 'GlobalNetworkState',
n: int = 8,
) -> list[str]:
"""Return the N most recent SIEM log lines from the buffer."""
return [entry[0] for entry in global_state.siem_log_buffer[-n:]]
def get_filtered_logs(
self,
global_state: 'GlobalNetworkState',
subnet_tag: str | None = None,
n: int = 8,
) -> list[str]:
"""Return the N most recent logs filtered by subnet mapping."""
mapping = {
'dmz': 'DMZ',
'internal': 'Corporate',
'restricted': 'Secure',
}
target_name = mapping.get(subnet_tag) if subnet_tag else None
if not target_name:
return self.get_recent_logs(global_state, n)
filtered = [
entry[0]
for entry in global_state.siem_log_buffer
if global_state.get_subnet_name(entry[1]) == target_name
]
return filtered[-n:]
def _generate_event(self, action_name: str, src_ip: str, tgt_ip: str) -> str | None:
templates = ACTION_EVENT_MAP.get(action_name, ACTION_EVENT_MAP['_default'])
if not templates:
return None
weights, callables = zip(*templates)
total = sum(weights)
norm_weights = [w / total for w in weights]
chosen = self._rng.choices(callables, weights=norm_weights, k=1)[0]
try:
return chosen(src_ip, tgt_ip)
except Exception:
return None
def _infer_src_ip(self, agent_id: str, global_state: 'GlobalNetworkState') -> str:
"""Best-guess the agent's active source IP from known compromised hosts."""
known = global_state.agent_knowledge.get(agent_id, set())
for ip in known:
host = global_state.all_hosts.get(ip)
if host and host.privilege in ('User', 'Root'):
return ip
# Fallback — first known IP
if known:
return next(iter(known))
return '10.0.0.1'
def _push_to_buffer(
self,
log_line: str,
subnet_cidr: str,
global_state: 'GlobalNetworkState',
) -> None:
global_state.siem_log_buffer.append((log_line, subnet_cidr))
# Rolling window — evict oldest entries beyond max
if len(global_state.siem_log_buffer) > SIEM_BUFFER_MAX:
global_state.siem_log_buffer.pop(0)