Detect Spoofing in Real-Time with Databento and RisingWave

Heng MaHeng Ma
10 min read

Introduction

Financial markets operate at lightning speed, with vast sums exchanged every second. In this dynamic environment, market manipulation—where individuals artificially influence prices or deceive others—poses a significant risk. To safeguard market integrity, real-time trade surveillance is essential for detecting manipulative behaviors swiftly and mitigating potential harm.

In our previous post, we demonstrated how to ingest live market data using Databento and calculate real-time metrics like VWAP with RisingWave. Building on that foundation, this post will guide you through creating a simplified real-time trade surveillance system. We'll focus on detecting spoofing, a common market manipulation tactic, using the combined power of Databento's high-quality data and RisingWave's real-time processing capabilities.

Why Real-Time Surveillance Matters

Traditional end-of-day surveillance methods are no longer sufficient in today’s fast-paced financial markets. Real-time surveillance provides several critical benefits:

  • Early Detection: Identifies manipulative patterns as they occur, enabling faster responses.

  • Proactive Compliance: Demonstrates a commitment to meeting regulatory requirements.

  • Reduced Risk: Minimizes financial losses and reputational damage.

  • Enhanced Market Integrity: Contributes to a more transparent and trustworthy trading environment.

The Tools: Databento and RisingWave

Databento: Your Market Data Source

Sourcing high-quality market data can be challenging, especially when dealing with multiple financial exchanges. Databento offers a powerful but lightweight solution with comprehensive coverage for live and historical market data, designed to be:

  • Easy-to-use : A small API surface and straightforward protocols enable users to get started in minutes.

  • Fast : 6.1-microsecond normalization latency, and close to zero data gaps with FPGA-based capture.

  • Versatile : Supports multiple asset classes and venues with a unified message format. Listen to every order book message or hundreds of thousands of symbols at once.

RisingWave: Your Real-Time Data Processing and Management Platform

RisingWave is a unified real-time data processing and management platform. It allows users to ingest, process, and query streaming data using familiar SQL. For this demonstration, we'll particularly leverage RisingWave's materialized views, which continuously and incrementally compute results as new data arrives, enabling real-time analysis without constant re-computation. Additionally, its Python SDK simplifies integration into Python-based applications.

Building a Simplified Surveillance System

In this section, we’ll build a system to detect spoofing. For demonstration purposes, we'll show how to ingest a batch of historical data using a Python script. However, the core detection logic within RisingWave (using a materialized view) is designed for continuous processing. In a real-world scenario, you would adapt the ingestion part to stream live data, and RisingWave would provide continuously updated spoofing alerts.

Architecture Overview

The following diagram illustrates the high-level data flow for our system:

This diagram shows the Python script fetching historical Market By Order (MBO) and Market By Price (MBP-1) data from Databento and inserting it into base tables in RisingWave. Inside RisingWave, the spoofing_alerts materialized view continuously processes data from these tables to detect patterns. An optional alert subscriber can then receive real-time updates from this materialized view and take actions accordingly.

Prerequisites

To follow along, ensure you have the following:

  1. A Databento account and API key.

  2. A running RisingWave instance (e.g., local Docker deployment accessible at localhost:4566).

  3. Python 3.9+ and the uv package manager (installation guide).

Set up your Python environment:

uv venv -p 3.9 # Or your preferred Python 3.9+ version
source .venv/bin/activate # On Windows: .venv\\Scripts\\activate
uv pip install databento risingwave-py pandas

Step 1: Creating Base Tables in RisingWave

We need tables to store the incoming market data. We'll use market by order (MBO) data for individual order events (including trades) and market by price (MBP-1) data specifically to get the current best bid and offer (BBO).

These two data types serve different purposes: MBO provides the granular order actions needed for pattern detection, while MBP-1 gives us the BBO reference point efficiently.

Connect to your RisingWave instance (using psql or another SQL client) and create the tables:

-- Create a table for MBO (Market by Order) data
CREATE TABLE IF NOT EXISTS market_data (
    ts_event TIMESTAMP,
    symbol VARCHAR,
    exchange VARCHAR,
    side VARCHAR,
    price DOUBLE PRECISION,
    size INTEGER,
    event_type VARCHAR,
    order_id VARCHAR
);

-- Create a table for BBO (Best Bid and Offer) data
CREATE TABLE IF NOT EXISTS bbo (
    ts_event TIMESTAMP,
    symbol VARCHAR,
    exchange VARCHAR,
    bid_px_00 DOUBLE PRECISION,
    ask_px_00 DOUBLE PRECISION
);

Step 2: Setting up Continuous Spoofing Detection Logic in RisingWave

Spoofing involves placing large, deceptive orders near the best bid or offer (BBO) with no intention of execution, quickly canceling them after they influence the market perception.

Now, we define a materialized view in RisingWave named spoofing_alerts. This view joins the MBO data (market_data) with the BBO data (bbo) and applies our spoofing detection logic. Crucially, RisingWave will keep this view updated automatically and incrementally as new data arrives in the base tables.

-- Spoofing detection SQL with very loose thresholds for testing
CREATE MATERIALIZED VIEW spoofing_alerts AS
WITH order_events AS (
    SELECT
        md.ts_event,
        md.symbol,
        md.exchange,
        md.side,
        md.price,
        md.size,
        md.event_type,
        t.bid_px_00,
        t.ask_px_00
    FROM market_data md
    LEFT JOIN bbo t
      ON CAST(md.symbol AS VARCHAR) = t.symbol
     AND md.exchange = t.exchange
     AND t.ts_event >= md.ts_event - INTERVAL '5 seconds' -- Adjust the interval as needed
     AND t.ts_event <= md.ts_event
    WHERE md.event_type IN ('add', 'cancel')
),
potential_spoofing AS (
    SELECT
        symbol,
        exchange,
        window_start,
        COUNT(*) AS num_cancellations
    FROM TUMBLE(order_events, ts_event, INTERVAL '5 second') -- Adjust the interval as needed
    WHERE
        event_type = 'cancel'
        AND size > 1  -- Adjust the size threshold as needed
        AND (
            (side = 'bid' AND ABS(price - bid_px_00/1000000000) <= (10 * 0.25)) OR  
            (side = 'ask' AND ABS(price - ask_px_00/1000000000) <= (10 * 0.25))  -- Adjust the price proximity rules as needed
        )
    GROUP BY symbol, exchange, window_start
    HAVING COUNT(*) > 1       -- Adjust cancellation threshold as needed
)
SELECT
    window_start,
    symbol,
    exchange,
    num_cancellations
FROM potential_spoofing;

Understanding the Parameters

The spoofing detection logic uses several parameters that must be tuned for real-world effectiveness:

  1. Size Threshold (size > 1):

    • Demo Value: We use 1 in this example purely to increase the likelihood of seeing some results during testing with limited data.

    • WARNING: This value is too low for detecting actual spoofing in liquid markets like ES.FUT. Real spoofing involves significantly larger orders intended to mislead. Increase this number for realistic detection.

  2. Proximity to BBO (Ticks) (10 * 0.25):

    • Logic: We check if the order's price is within a certain distance of the BBO. Here, 10 * 0.25 means within 10 ticks for ES.n.0 (tick size 0.25).

    • Tuning: The number of ticks (e.g., 5) needs adjustment based on the asset's typical spread and desired sensitivity.

Note: At this point, the materialized view exists but is empty as no data has been ingested. Once we load data in the next step, RisingWave will automatically compute the results for spoofing_alerts and keep it updated.

Step 3: Ingesting Historical Market Data from Databento

Now, we'll run a Python script to fetch a batch of historical MBO and MBP-1 data from Databento for a specific time window and insert it into the market_data and bbo tables in RisingWave. This one-time load populates our base tables, allowing the spoofing_alerts materialized view to compute its initial results.

Save the following code as ingest_data.py:

import databento as db
from risingwave import RisingWave, RisingWaveConnOptions
import logging
import os

# --- Setup logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Global client setup ---
api_key = os.environ.get("DATABENTO_API_KEY")
if api_key is None:
    logger.error("Please set the DATABENTO_API_KEY environment variable.")
    exit(1)
hist = db.Historical()

# --- RisingWave Connection Setup ---
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="", database="dev"
    )
)

# --- Action Mapping (for MBO events) ---
action_map = {
    "A": "add",
    "M": "modify",
    "C": "cancel",
    "R": "clear",
    "T": "trade",
    "F": "fill",
    "N": "none",
}

def process_market_data(dataset, symbol, start_time):
    """Process market data directly from Databento"""
    logger.info(f"Processing market data for {symbols} from {start_time}")

    # Fetch MBO data
    logger.info("Fetching MBO data...")
    mbo_data = hist.timeseries.get_range(
        dataset=dataset,
        schema="mbo",
        stype_in="continuous",
        symbols=[symbol],
        start=start_time,
        limit=10000,
    )

    # Fetch BBO data
    logger.info("Fetching BBO data...")
    tbbo_data = hist.timeseries.get_range(
        dataset=dataset,
        schema="mbp-1",
        stype_in="continuous",
        symbols=[symbol],
        start=start_time,
        limit=10000,
    )

    # Process records directly
    with rw.getconn() as conn:
        # Process MBO records one by one
        mbo_count = 0
        for record in mbo_data:
            try:
                params = {
                    "ts_event": record.pretty_ts_event,
                    "symbol": symbol,
                    "exchange": "GLBX",
                    "side": "bid" if record.side == "B" else "ask",
                    "price": record.pretty_price,
                    "size": record.size,
                    "event_type": action_map[record.action],
                    "order_id": str(record.order_id),
                }
                conn.execute(
                    """
                    INSERT INTO market_data
                    (ts_event, symbol, exchange, side, price, size, event_type, order_id)
                    VALUES 
                    (:ts_event, :symbol, :exchange, :side, :price, :size, :event_type, :order_id)
                    """,
                    params,
                )
                mbo_count += 1
                if mbo_count % 1000 == 0:
                    logger.info(f"Processed {mbo_count} MBO records")
            except Exception as e:
                logger.error(f"Error processing MBO record: {e}")
                continue

        # Process BBO records one by one
        bbo_count = 0
        for record in bbo_data:
            try:
                params = {
                    "ts_event": record.pretty_ts_event,
                    "symbol": symbol,
                    "exchange": "GLBX",
                    "bid_px_00": record.levels[0].bid_px,
                    "ask_px_00": record.levels[0].ask_px
                }
                conn.execute(
                    """
                    INSERT INTO bbo 
                    (ts_event, symbol, exchange, bid_px_00, ask_px_00)
                    VALUES 
                    (:ts_event, :symbol, :exchange, :bid_px_00, :ask_px_00)
                    """,
                    params,
                )
                bbo_count += 1
                if bbo_count % 1000 == 0:
                    logger.info(f"Processed {bbo_count} BBO records")
            except Exception as e:
                logger.error(f"Error processing BBO record: {e}")
                continue

    logger.info(f"Data processing complete. Processed {mbo_count} MBO records and {tbbo_count} BBO records")

def main():
    dataset = "GLBX.MDP3"
    symbol = "ES.n.0"
    start_time = "2024-03-19"

    logger.info(f"Starting data processing from {start_time}")
    process_market_data(dataset, symbol, start_time)
    logger.info("Done!")

if __name__ == "__main__":
    main()

Run this script using python ingest_data.py. It will connect to Databento, fetch the specified historical data, and insert it record-by-record into your RisingWave tables.

Optional: Subscribing to Alerts

After ingesting data, the spoofing_alerts materialized view in RisingWave will contain the results based on the historical data. For a real-time system, you would typically have a separate application continuously subscribing to changes in this view.

The following script (subscribe_alerts.py) demonstrates how to connect to RisingWave and print any new spoofing alerts pushed by the spoofing_alerts materialized view. To be able to get alerts as soon as spoofing activities are detected, you should run this code before ingesting market data.

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd
import signal
import sys
import threading
import logging
import os

# --- Setup logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- RisingWave Connection Setup ---
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="", database="dev"
    )
)

def signal_handler(sig, frame):
    """Handle graceful shutdown."""
    logger.info('Shutting down...')
    sys.exit(0)

def handle_spoofing_alerts(event_df: pd.DataFrame) -> None:
    """Handles spoofing alerts and logs them."""
    try:
        event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
        if event_df.empty:
            return
        event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
        logger.info("\\nSpoofing Alert:")
        logger.info(event_df)

    except Exception as e:
        logger.error(f"Error processing alert: {e}", exc_info=True)
        # Consider adding a retry mechanism here for transient errors

def main():
    """Main function to subscribe to alerts."""
    signal.signal(signal.SIGINT, signal_handler)  # Handle Ctrl+C gracefully

    # Check if the materialized view exists before subscribing
    try:
        with rw.getconn() as conn:
            # Different approach to check if the view exists
            result = conn.execute("""
                SELECT count(*) FROM pg_matviews 
                WHERE matviewname = 'spoofing_alerts';
            """)

            if result and result[0][0] == 0:
                logger.error("Materialized view 'spoofing_alerts' does not exist. Run spoofing_alerts.sql first.")
                sys.exit(1)
    except Exception as e:
        logger.error(f"Error checking for materialized view: {e}")
        sys.exit(1)

    # Subscribe to alerts in a separate thread.
    threading.Thread(
        target=lambda: rw.on_change(
            subscribe_from="spoofing_alerts",
            handler=handle_spoofing_alerts,
            output_format=OutputFormat.DATAFRAME,
        ),
        daemon=True  # Allow the thread to exit when the main thread exits
    ).start()

    logger.info("Subscribed to spoofing_alerts. Waiting for events...")
    signal.pause() # Wait for a signal (like Ctrl+C)

if __name__ == "__main__":
    main()

Run this script using python subscribe_alerts.py. It will connect and wait. If the materialized view produces results (either from the initial batch load or if you were streaming live data), this script will print them.

Sample Output

If the spoofing_alerts view detects events based on the ingested data and the defined thresholds, the subscribe_alerts.py script might output something similar to:

Spoofing Alert Output:

Spoofing Alert:
INFO:__main__:               

window_start                 symbol     exchange     num_cancellations
0 2024-03-19 00:00:40+00:00  ES.n.0     GLBX                625
1 2024-03-19 00:00:45+00:00  ES.n.0     GLBX                 40

Note: These are sample outputs. Actual results depend entirely on the specific market data ingested and the tuning of the detection parameters in the SQL query. With the very low thresholds used for testing, you might see more alerts than expected for actual spoofing.

Conclusion

We've demonstrated how to build a basic trade surveillance system for spoofing detection by integrating Databento's market data with RisingWave's real-time SQL processing. We ingested historical data and used a materialized view in RisingWave to continuously identify suspicious cancellation patterns based on size and proximity to the BBO.

While this example uses batch ingestion for simplicity, the true power lies in RisingWave's ability to perform these complex analyses incrementally on live data streams. By adapting the ingestion to use live data and carefully tuning the detection parameters, you can create robust, real-time surveillance systems that help maintain market integrity.

To extend this system, consider adding more sophisticated detection rules, integrating machine learning models via user-defined functions, or building interactive dashboards for visualizing alerts. Remember to always consult Databento documentation and RisingWave documentation for detailed information. Thorough testing with realistic data and careful parameter tuning are crucial for the accuracy and reliability of any trade surveillance system.

0
Subscribe to my newsletter

Read articles from Heng Ma directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Heng Ma
Heng Ma