Decrypting Crypto Dynamics: Real-Time Data Processing and Visualization

DolphinDBDolphinDB
6 min read

DolphinDB offers a comprehensive solution for real-time data processing and visualization in the cryptocurrency market. By integrating time-series databases with a robust stream processing framework, the system supports both historical data analysis and real-time stream processing.

We introduced techniques for multi-source market data access, integration and storage in the previous chapter. This chapter explores DolphinDB’s stream processing capabilities, demonstrates real-time metric calculations, and introduces visualization tools for effective data representation.

Download our whitepaper for a new crypto management experience: Cryptocurrency Solutions — Dolphindb

3.1 Stream Processing in DolphinDB

Stream processing is the continuous collection, cleansing, analysis, and storage of dynamic data generated by business systems. DolphinDB has developed specialized streaming engines optimized for these scenarios, utilizing incremental computation to enhance real-time performance.

3.1.1 Stream Processing Framework

DolphinDB’s modular and extensible stream processing framework leverages core components such as stream tables and streaming engines as reusable modules. These components can be flexibly combined in a publish-subscribe manner to form powerful processing pipelines. The system provides extensive built-in functions as operators for streaming engines, simplifying application development. Intricate financial algorithms can be implemented with user-defined functions.

Figure 3–1 Stream Processing Framework

3.1.2 Streaming Engines

Streaming engines serve as encapsulated, independent computation units and are core components of DolphinDB’s real-time data processing solutions. Computation is triggered when data is ingested into the engine, with results output to a target table. DolphinDB offers over a dozen built-in streaming engines, providing flexible computation methods and rich capabilities to address diverse real-time data processing needs in the cryptocurrency market.

Figure 3–2 Streaming Engines

3.2 Real-Time Metrics Calculation

3.2.1 Aggregating Data into OHLC Bars

The time series engine aggregates data within moving windows, automatically determining windows based on specified parameters. It applies aggregation rules on grouped data within each window and outputs a single result.

The following example demonstrates the use of a time series engine to generate 1-minute OHLC data and volume-weighted average price (VWAP) from aggregate trade data (aggTrades, which is as shown below):

The following script creates a time series engine with 1-minute moving windows. The ingested data is grouped by SecurityID. The metrics parameter specifies aggregation rules within each window: calculating open, high, low and close prices, as well as volume-weighted average price. Built-in functions like first natively support incremental computations within the engine, updating intermediate results upon each record arrival instead of batch calculations after windows close.

// Create stream table OHLCStream 
colNames = `tradeTime`instrument`open`high`low`close`volume`amount
colTypes = [TIMESTAMP, SYMBOL, DECIMAL128(18), DECIMAL128(18), 
DECIMAL128(18), DECIMAL128(18), DECIMAL128(8), DECIMAL128(18)]
enableTableShareAndPersistence(table=streamTable(1000:0, colNames, colTypes), 
tableName=`OHLCStream, cacheSize=120000)
go
//  Create time series engine
aggrMetrics = <[
    first(price),
    max(price),
    min(price),
    last(price),
    sum(quantity),
    sum(price*quantity)
]>
tsEngine = createTimeSeriesEngine(
    name="OHLC",
    windowSize=60000,
    step=60000,
    metrics=aggrMetrics,
    dummyTable=objByName("aggTrades"),
    outputTable=objByName("OHLCStream"),
    timeColumn="tradeTime",
    keyColumn="code"
    )
// Subscribe to aggTrades
subscribeTable(tableName="aggTrades", actionName="OHLCStream", 
handler=getStreamEngine("OHLC"), msgAsTable=true);

3.2.2 Calculating Technical Analysis Indicators

The reactive state engine responds instantly to incoming data, triggering computation upon the arrival of a new record and outputting results to the target table. DolphinDB optimizes performance for operators like moving, cumulative, order-sensitive, and TopN functions using incremental algorithms.

The following script creates a reactive state engine that subscribes to the real-time OHLCStream table created in the previous section. The parameter metrics specifies DolphinDB built-in technical analysis indicators, such as MACD, CCI, EMA and GEMA, as defined in the ta module. A user-defined state function prevMacd is presented to demonstrate user-defined metrics.

// Create stream table factorStream
colNames = `instrument`tradeTime`open`high`low`close`volume
`amount`DIF`DEA`MACD`prevDIF`prevDEA`prevMACD`CCI`prevCCI`msumAmount
colTypes = [SYMBOL, TIMESTAMP, DECIMAL128(18), DECIMAL128(18), DECIMAL128(18), 
DECIMAL128(18), DECIMAL128(8), DECIMAL128(18), DOUBLE, DOUBLE, DOUBLE, 
DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
enableTableShareAndPersistence(table=streamTable(1000:0, colNames, colTypes), 
tableName=`factorStream, cacheSize=120000)
go
// Use ta module
use ta
@state
def prevMacd(close, fastPeriod=12, slowPeriod=26, signalPeriod=9){
 if (fastPeriod == 0 && slowPeriod == 0) {
  inSlowPeriod = 26
  close_ = talibNull(close, talib(mcount, close, 15))[0]
  fastResult = gema(close_, 12, 0.15)
  slowResult = gema(close, 26, 0.075)
  diff = fastResult - slowResult
 }
 else {
  inSlowPeriod = max(fastPeriod, slowPeriod)
  inFastPeriod = min(fastPeriod, slowPeriod)
  diffPeriod = inSlowPeriod - inFastPeriod
  diff = ema(talibNull(close, talib(mcount, close, diffPeriod+1))[0], 
  inFastPeriod) - ema(close, inSlowPeriod)
 }
 dea = ema(diff, signalPeriod)
 return (prev(diff), prev(dea), prev(diff - dea))
}
//  Create reactive state engine and define metrics
facotrMetrics=<[
    tradeTime,
    open,
    high,
    low,
    close,
    volume,
    amount,
    macd(double(close)) as `DIF`DEA`MACD,
    prevMacd(double(close)) as `prevDIF`prevDEA`prevMACD,
    cci(high, low, close) as `CCI,
    prev(cci(high, low, close)) as `prevCCI,
    msum(amount, 5) as `msumAmount
]>
createReactiveStateEngine(
    name="calFactor",
    metrics=facotrMetrics,
    dummyTable=objByName("OHLCStream"),
    outputTable=objByName("factorStream"),
    keyColumn="instrument",
    keepOrder = true
    )
// Subscribe to OHLCStream
subscribeTable(tableName="OHLCStream", actionName="calFactor", 
handler=getStreamEngine("calFactor"), msgAsTable=true);

3.2.3 Generating Buy/Sell Signal

DolphinDB provides robust capabilities for monitoring metric value changes and generating buy/sell signals. The following example demonstrates how to generate buy and sell signals for crypto trading by monitoring the value changes in MACD and CCI indicators.

Buy Signal:

  • When the DIF line crosses above the DEA line (a “golden cross”), the CCI exceeds 100, and the cumulative trading volume in the past 5 minutes exceeds 20 million USDT.

  • Or when a golden cross occurs and the CCI crosses above -100.

Sell Signal: When the DIF line crosses below the DEA line (a “death cross”).

An example script is as follows:

// Create stream table signalStream
colNames = `instrument`tradeTime`Signal
colTypes = [SYMBOL, TIMESTAMP, INT]
enableTableShareAndPersistence(table=streamTable(1000:0, colNames, colTypes), 
tableName=`signalStream, cacheSize=120000)
go
// Define signal generation strategy
def macdAndCci(mutable signalStream, msg){
    result = 
    select instrument,
        tradeTime,
        iif(DIF>DEA and prevDIF<prevDEA 
            and CCI>100 and prevCCI<=100 and msumAmount>=200000, 
            1, 
            iif(DIF>DEA and prevDIF<prevDEA and CCI>-100 and prevCCI<=-100, 1,
                iif(DIF<DEA and prevDIF>prevDEA and msumAmount>=20000000, -1, 0))) 
                as Signal
    from msg
    context by instrument
    tableInsert(signalStream, result)
}
// Subscribe to factorStream
subscribeTable(
 tableName="factorStream",
 actionName="macdAndCci",
 offset=0,
 handler=macdAndCci{signalStream,},
 msgAsTable=true,
 batchSize=500,
 throttle=0.01,
 reconnect=true)

3.3 Data Visualization Tools

DolphinDB integrates with two popular visualization platforms TimeStored Pulse and Grafana, allowing users to create interactive, real-time visualizations of cryptocurrency market data and calculation results.

3.3.1 TimeStored Pulse

DolphinDB has partnered with TimeStored for interactive data visualization. Users in Pulse can connect directly to DolphinDB databases by specifying the data source type and write queries to display selected data. The image below displays technical indicators, buy/sell signals and 1-minute OHLC data for BTC/USTD.

Fig 3–3 Visualization with Pulse

3.3.2 Grafana

DolphinDB provides Grafana datasource plugins, enabling users to connect to DolphinDB and write queries to visualize selected data in customizable panels. The following image demonstrates a Grafana panel displaying technical indicators, buy/sell signals and 1-minute OHLC data for BTC/USTD.

Fig 3–4 Visualization with Grafana

In conclusion, DolphinDB’s stream processing framework, coupled with its real-time metric calculation capabilities and integration with visualization tools, provides a robust solution for cryptocurrency market analysis and decision-making. By leveraging these features, users can gain valuable insights and respond quickly to market changes in the fast-paced world of digital assets.

Download Whitepaper Here: Cryptocurrency Solutions — Dolphindb

Email us for more information or to schedule a demo: sales@dolphindb.com

Thanks for your reading! To keep up with our latest news, please follow our Twitter @DolphinDB_Inc and Linkedin. You can also join our Slack to chat with the author!

0
Subscribe to my newsletter

Read articles from DolphinDB directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

DolphinDB
DolphinDB