Skip to content

SIEM API

siem_logger

SIEMLogger

Stochastic SIEM event generator.

Source code in netforge_rl\siem\siem_logger.py
class SIEMLogger:
    """Stochastic SIEM event generator."""

    def __init__(self, seed: int | None = None):
        self._rng = random.Random(seed)

    def log_action(
        self,
        action_name: str,
        effect: 'ActionEffect',
        global_state: 'GlobalNetworkState',
        agent_id: str,
        target_ip: str | None = None,
    ) -> str | None:
        """Potentially generate a SIEM log line for this action's outcome."""
        p_threshold = P_LOG_ON_SUCCESS if effect.success else P_LOG_ON_FAILURE
        if self._rng.random() > p_threshold:
            return None  # This action was not detected / logged

        # Pick a source IP — prefer the agent's known foothold in DMZ
        src_ip = self._infer_src_ip(agent_id, global_state)
        tgt_ip = target_ip or src_ip

        log_line = self._generate_event(action_name, src_ip, tgt_ip)
        if log_line:
            # Determine subnet for filtering
            host = global_state.all_hosts.get(tgt_ip)
            subnet = host.subnet_cidr if host else 'unknown'
            self._push_to_buffer(log_line, subnet, global_state)
        return log_line

    def log_background_noise(self, global_state: 'GlobalNetworkState') -> None:
        """Inject benign background network activity every tick."""
        if self._rng.random() > P_BACKGROUND_NOISE:
            return

        # Pick two random live hosts and generate a benign connection event
        live_hosts = [
            h
            for h in global_state.all_hosts.values()
            if h.status == 'online' and '169.254' not in h.ip
        ]
        if len(live_hosts) < 2:
            return

        src, dst = self._rng.sample(live_hosts, 2)
        # Sample a benign template from the default bucket
        templates = ACTION_EVENT_MAP.get('_default', [])
        if not templates:
            return
        weights, callables = zip(*templates)
        total = sum(weights)
        norm_weights = [w / total for w in weights]
        chosen = self._rng.choices(callables, weights=norm_weights, k=1)[0]
        log_line = chosen(src.ip, dst.ip)
        self._push_to_buffer(f'[BACKGROUND] {log_line}', src.subnet_cidr, global_state)

    def get_recent_logs(
        self,
        global_state: 'GlobalNetworkState',
        n: int = 8,
    ) -> list[str]:
        """Return the N most recent SIEM log lines from the buffer."""
        return [entry[0] for entry in global_state.siem_log_buffer[-n:]]

    def get_filtered_logs(
        self,
        global_state: 'GlobalNetworkState',
        subnet_tag: str | None = None,
        n: int = 8,
    ) -> list[str]:
        """Return the N most recent logs filtered by subnet mapping."""
        mapping = {
            'dmz': 'DMZ',
            'internal': 'Corporate',
            'restricted': 'Secure',
        }
        target_name = mapping.get(subnet_tag) if subnet_tag else None

        if not target_name:
            return self.get_recent_logs(global_state, n)

        filtered = [
            entry[0]
            for entry in global_state.siem_log_buffer
            if global_state.get_subnet_name(entry[1]) == target_name
        ]
        return filtered[-n:]

    def _generate_event(self, action_name: str, src_ip: str, tgt_ip: str) -> str | None:
        templates = ACTION_EVENT_MAP.get(action_name, ACTION_EVENT_MAP['_default'])
        if not templates:
            return None
        weights, callables = zip(*templates)
        total = sum(weights)
        norm_weights = [w / total for w in weights]
        chosen = self._rng.choices(callables, weights=norm_weights, k=1)[0]
        try:
            return chosen(src_ip, tgt_ip)
        except Exception:
            return None

    def _infer_src_ip(self, agent_id: str, global_state: 'GlobalNetworkState') -> str:
        """Best-guess the agent's active source IP from known compromised hosts."""
        known = global_state.agent_knowledge.get(agent_id, set())
        for ip in known:
            host = global_state.all_hosts.get(ip)
            if host and host.privilege in ('User', 'Root'):
                return ip
        # Fallback — first known IP
        if known:
            return next(iter(known))
        return '10.0.0.1'

    def _push_to_buffer(
        self,
        log_line: str,
        subnet_cidr: str,
        global_state: 'GlobalNetworkState',
    ) -> None:
        global_state.siem_log_buffer.append((log_line, subnet_cidr))
        # Rolling window — evict oldest entries beyond max
        if len(global_state.siem_log_buffer) > SIEM_BUFFER_MAX:
            global_state.siem_log_buffer.pop(0)

log_action

log_action(
    action_name: str,
    effect: 'ActionEffect',
    global_state: 'GlobalNetworkState',
    agent_id: str,
    target_ip: str | None = None,
) -> str | None

Potentially generate a SIEM log line for this action's outcome.

Source code in netforge_rl\siem\siem_logger.py
def log_action(
    self,
    action_name: str,
    effect: 'ActionEffect',
    global_state: 'GlobalNetworkState',
    agent_id: str,
    target_ip: str | None = None,
) -> str | None:
    """Potentially generate a SIEM log line for this action's outcome."""
    p_threshold = P_LOG_ON_SUCCESS if effect.success else P_LOG_ON_FAILURE
    if self._rng.random() > p_threshold:
        return None  # This action was not detected / logged

    # Pick a source IP — prefer the agent's known foothold in DMZ
    src_ip = self._infer_src_ip(agent_id, global_state)
    tgt_ip = target_ip or src_ip

    log_line = self._generate_event(action_name, src_ip, tgt_ip)
    if log_line:
        # Determine subnet for filtering
        host = global_state.all_hosts.get(tgt_ip)
        subnet = host.subnet_cidr if host else 'unknown'
        self._push_to_buffer(log_line, subnet, global_state)
    return log_line

log_background_noise

log_background_noise(
    global_state: 'GlobalNetworkState',
) -> None

Inject benign background network activity every tick.

Source code in netforge_rl\siem\siem_logger.py
def log_background_noise(self, global_state: 'GlobalNetworkState') -> None:
    """Inject benign background network activity every tick."""
    if self._rng.random() > P_BACKGROUND_NOISE:
        return

    # Pick two random live hosts and generate a benign connection event
    live_hosts = [
        h
        for h in global_state.all_hosts.values()
        if h.status == 'online' and '169.254' not in h.ip
    ]
    if len(live_hosts) < 2:
        return

    src, dst = self._rng.sample(live_hosts, 2)
    # Sample a benign template from the default bucket
    templates = ACTION_EVENT_MAP.get('_default', [])
    if not templates:
        return
    weights, callables = zip(*templates)
    total = sum(weights)
    norm_weights = [w / total for w in weights]
    chosen = self._rng.choices(callables, weights=norm_weights, k=1)[0]
    log_line = chosen(src.ip, dst.ip)
    self._push_to_buffer(f'[BACKGROUND] {log_line}', src.subnet_cidr, global_state)

get_recent_logs

get_recent_logs(
    global_state: 'GlobalNetworkState', n: int = 8
) -> list[str]

Return the N most recent SIEM log lines from the buffer.

Source code in netforge_rl\siem\siem_logger.py
def get_recent_logs(
    self,
    global_state: 'GlobalNetworkState',
    n: int = 8,
) -> list[str]:
    """Return the N most recent SIEM log lines from the buffer."""
    return [entry[0] for entry in global_state.siem_log_buffer[-n:]]

get_filtered_logs

get_filtered_logs(
    global_state: 'GlobalNetworkState',
    subnet_tag: str | None = None,
    n: int = 8,
) -> list[str]

Return the N most recent logs filtered by subnet mapping.

Source code in netforge_rl\siem\siem_logger.py
def get_filtered_logs(
    self,
    global_state: 'GlobalNetworkState',
    subnet_tag: str | None = None,
    n: int = 8,
) -> list[str]:
    """Return the N most recent logs filtered by subnet mapping."""
    mapping = {
        'dmz': 'DMZ',
        'internal': 'Corporate',
        'restricted': 'Secure',
    }
    target_name = mapping.get(subnet_tag) if subnet_tag else None

    if not target_name:
        return self.get_recent_logs(global_state, n)

    filtered = [
        entry[0]
        for entry in global_state.siem_log_buffer
        if global_state.get_subnet_name(entry[1]) == target_name
    ]
    return filtered[-n:]