Simplify Your Crypto Data Workflow: Market Data Access, Integration and Storage Solutions
In the cryptocurrency market, quantitative trading often requires real-time access to market data. Crypto APIs have emerged as essential tools for connecting to exchanges and third-party services to access live market data streams.
This chapter follows up the framework of the previous chapter and explores two primary methods of accessing crypto market data:
Interacting with exchange APIs using DolphinDB plugins.
Using the integrated CCXT Library in Python and writing data to DolphinDB databases via the DolphinDB Python API.
We will also discuss techniques for integrating multi-source market data and optimizing storage in DolphinDB databases, with a focus on partitioning designs tailored for different types of market data.
Download the whole whitepaper here: Cryptocurrency Solutions — Dolphindb
2.1 Accessing Data with DolphinDB Plugins
Binance, a leading crypto exchange, offers a comprehensive API ecosystem that empowers developers to interact directly with their platform. This section demonstrates how to access market streams from Binance using two primary types of APIs:
WebSocket API: Provides real-time streams, allowing clients to subscribe to specific data feeds. This method is ideal for continuous market monitoring and immediate updates.
REST API: Suitable for periodic data retrieval, where clients can actively request data at set intervals. This approach requires polling for real-time updates.
The following example demonstrates how to access market streams from Binance exchange, by connecting to the Binance WebSocket API to obtain aggregate trade streams, and using the REST API to request order book data. The obtained data is subsequently saved into a DolphinDB DFS database for efficient management and analysis.
2.1.1 Prerequisites
Installing DolphinDB plugins
To initiate the data access process, log in as an administrator in DolphinDB and execute the following commands to install and load the WebSocket and httpClient plugins.
// Install and load the WebSocket plugin
installPlugin("WebSocket")
loadPlugin("WebSocket")
// Install and load the httpClient plugin
installPlugin("httpClient")
loadPlugin("httpClient")
Enabling stream persistence
To prevent memory issues due to oversized stream tables, this solution enables streaming persistence by configuring the persistenceDir on the data node that publishes data streams. This can be done by editing the relevant configuration file (dolphindb.cfg for a single node, or cluster.cfg for a cluster) or through the Web interface:
persistenceDir=/home/DolphinDB/Data/Persistence
2.1.2 WebSocket API: Aggregate Trade Streams for USDT-Margined Futures
This section demonstrates how to subscribe to Binance’s WebSocket API for aggregate trade streams of USDT-margined futures.
(1) Creating Databases and Tables
Below is a sample of the data structure of aggregate trade streams provided by Binance. For details, see Binance > Aggregate Trade Streams.
{
"e": "aggTrade", // Event type
"E": 123456789, // Event time
"s": "BTCUSDT", // Symbol
"a": 5933014, // Aggregate trade ID
"p": "0.001", // Price
"q": "100", // Quantity
"f": 100, // First trade ID
"l": 105, // Last trade ID
"T": 123456785, // Trade time
"m": true, // Is the buyer the market maker?
}
Based on the given data structure, we create the database “dfs://binance” with a composite partitioning scheme using value partition on date and hash partition on symbol.
dbName = "dfs://binance"
tbName = "aggTrades"
if (existsDatabase(dbName)){
dropDatabase(dbName)
}
colNames =`eventTime`code`id`price`quantity`firstId`lastId`tradeTime`marketMaker`currentTime
colTypes = [TIMESTAMP,SYMBOL,LONG,DOUBLE,DOUBLE,LONG,LONG,TIMESTAMP,BOOL,TIMESTAMP]
dbDate = database(directory="", partitionType=VALUE, partitionScheme=2012.01.01..2012.01.05)
dbSym = database(directory="", partitionType=HASH, partitionScheme=[SYMBOL, 50])
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate, dbSym], engine='TSDB')
t = table(1000000:0, colNames, colTypes)
pt = createPartitionedTable(dbHandle=db, table=t, tableName=tbName,
partitionColumns=`tradeTime`code, sortColumns=`code`tradeTime)
(2) Creating Persisted Stream Tables
Though WebSockets maintain low-latency and persistent real-time communication, in living trading, network fluctuations and server failures may cause interruptions, leading to potential data loss. To ensure data stability and high availability, we use keyedStreamTable
to create a keyed stream table, and establish multiple subscriptions to the same data source to write data to the stream table simultaneously. The keyed stream table retains only the first record for each primary key to avoid duplicate data. This approach ensures that if one subscription is interrupted, downstream tasks remain unaffected as other subscriptions continue to provide data.
colNames =`eventTime`code`id`price`quantity`firstId`lastId`tradeTime`marketMaker`currentTime
colTypes = [TIMESTAMP,SYMBOL,INT,DOUBLE,DOUBLE,LONG,LONG,TIMESTAMP,BOOL,TIMESTAMP]
keyedAggTradesTable = keyedStreamTable(`eventTime`code`id, 10000:0,
colNames, colTypes)
enableTableShareAndPersistence(table=keyedAggTradesTable,
tableName=`keyedAggTradesTableST, cacheSize=1200000)
go
Subscribe to the keyed stream table to save the streaming data to a DFS table.
dbName = "dfs://binance"
tbName = "aggTrades"
aggTradesPT = loadTable(dbName, tbName)
subscribeTable(tableName=`keyedAggTradesTableST,
actionName="keyedAggTradesTableST", msgAsTable=true,
handler=tableInsert{aggTradesPT},batchSize=10000, throttle=1)
(3) Establishing WebSocket Connection
Use DolphinDB’s WebSocket plugin to establish a connection to the Binance WebSocket API. First, define a parseAggTrades
function to parse incoming data into a table:
def parseAggTrades(rawData){
rawDataDict = parseExpr(rawData).eval()
if (rawDataDict["e"]!=null){
eventTime=timestamp([rawDataDict["E"]])
code=symbol([rawDataDict["s"]])
id=long([rawDataDict["a"]])
price=double([rawDataDict["p"]])
quantity=double([rawDataDict["q"]])
firstId=long([rawDataDict["f"]])
lastId=long([rawDataDict["l"]])
tradeTime=timestamp([rawDataDict["T"]])
marketMaker=bool([rawDataDict["m"]])
currentTime=[gmtime(now())]
res=table(eventTime, code, id, price, quantity,
firstId, lastId, tradeTime, marketMaker, currentTime)
return res
}
return null
}
Then define callback functions required by the WebSocket connection. The onMessage
function calls parseAggTrades
to parse and insert data into the previously defined stream table. Use method WebSocket::createSubJob
to create the subscription connection to Binance, establishing two connections for high availability:
def onOpen(ws){
writeLog("WebSocket open to receive data: ")
}
def onMessage(mutable dataStreamTable, ws, msgTable){
for (msg in msgTable[`msg]){
res = parseAggTrades(msg)
dataStreamTable.append!(res)
}
}
def onError(ws, error){
writeLog("WebSocket failed to receive data: " + error.string())
}
def onClose(ws, statusCode, msg){
writeLog("connection is close,
statusCode: " + statusCode.string() + ", " + msg.string())
}
url = "wss://fstream.binance.com/stream?streams=btcusdt@aggTrade
/ethusdt@aggTrade/enausdt@aggTrade/usdcusdt@aggTrade/solusdt@aggTrade
/dogeusdt@aggTrade/bnbusdt@aggTrade/xrpusdt@aggTrade"
config = dict(STRING, ANY)
ws = WebSocket::createSubJob(url, onOpen, onMessage{keyedAggTradesTableST},
onError, onClose, "aggTradeWebSocket1", config)
ws = WebSocket::createSubJob(url, onOpen, onMessage{keyedAggTradesTableST},
onError, onClose, "aggTradeWebSocket2", config)
2.1.3 REST API: Order Book for USDT-Margined Futures
This section demonstrates how to access order book data for USDT-margined futures using Binance’s REST API.
(1) Creating Databases and Tables
Below is a sample of the data structure of order book provided by Binance. For details, see Binance > Order Book.
{
"lastUpdateId": 1027024,
"E": 1589436922972, // Message output time
"T": 1589436922959, // Transaction time
"bids": [
[
"4.00000000", // PRICE
"431.00000000" // QTY
]
],
"asks": [
[
"4.00000200",
"12.00000000"
]
]
}
Based on the given data structure, we store the aggregate trades and order book data in the same database “dfs://binance” to use the same partitioning scheme. Load the database handle with db = database(dbName)
and create a new table "restDepth" in it. Data of the same partitions of these two tables are stored on the same node, ensuring efficient table joins.
dbName = "dfs://binance"
tbName = "restDepth"
colNames = `eventTime`transactionTime`code`lastUpdateId`bidPrice`bidQty`askPrice`askQty`currentTime
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, LONG, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], TIMESTAMP]
t = table(10000:0, colNames, colTypes)
db = database(dbName)
pt = createPartitionedTable(dbHandle=db, table=t, tableName=tbName,
partitionColumns=`eventTime`code, sortColumns=`code`eventTime)
(2) Creating Persisted Stream Tables
When accessing the REST API, data is retrieved by polling the exchange periodically. We create a persisted stream table and each data request will ingest data to the table.
enableTableShareAndPersistence(table=streamTable(100000:0, colNames, colTypes),
tableName=`restDepthST, cacheSize=12000)
go
Subscribe to the stream table to save the streaming data to a DFS table.
restDepthPT = loadTable(dbName, tbName)
subscribeTable(tableName=`restDepthST, actionName="insertDB",
msgAsTable=true, handler=tableInsert{restDepthPT}, batchSize=10000, throttle=1)
(3) Requesting Data Using REST API Polling
Use DolphinDB’s HttpClient plugin to send requests server every second for order book data. First, define a getDepth
function to send requests, parse the data and insert it into the stream table.
def getDepth(mutable restDepthST, code, baseUrl){
do{
try{
param = dict(string,string)
param['symbol'] = code;
// res = httpClient::httpGet(baseUrl,param,10000)
config = dict(STRING, ANY)
config[`proxy] = "http://192.198.1.31:8899/"
res = httpClient::httpGet(baseUrl,param,10000,,config)
rawDataDict = parseExpr(res.text).eval()
b = double(rawDataDict.bids)
bTransposed = matrix(b).transpose()
bidPrice = bTransposed[0]
bidQty = bTransposed[1]
a = double(rawDataDict.asks)
aTransposed = matrix(a).transpose()
askPrice = aTransposed[0]
askQty = aTransposed[1]
eventTime = timestamp(rawDataDict["E"])
transactionTime = timestamp(rawDataDict["T"])
lastUpdateId = long(rawDataDict["lastUpdateId"])
currentTime = gmtime(now())
resTable = table(eventTime as eventTime, transactionTime as transactionTime,
code as code, lastUpdateId as lastUpdateId, [bidPrice] as bidPrice,
[bidQty] as bidQty, [askPrice] as askPrice,
[askQty] as askQty, currentTime as currentTime)
restDepthST.append!(resTable)
sleep(1000)
}
catch(ex){
print(ex)
sleep(5000)
continue
}
}while(true)
}
Then, submit the job to run in the background using submitJob
, allowing it to poll for data without interrupting interactive tasks.
// Set the base URL and submit the task
baseUrl = "https://fapi.binance.com/fapi/v1/depth"
submitJob("getDepth_BTCUSDT","getDepth_BTCUSDT", getDepth, restDepthST, "btcusdt", baseUrl) baseUrl)
2.2 Accessing Data With Python CCXT Library
The crypto exchange ecosystem is characterized by diverse API interfaces and data formats, presenting significant challenges for developers to implement cross-exchange trading and data retrieval strategies. CCXT is a Python library that addresses these challenges by offering a unified API interface, enabling developers to seamlessly integrate multiple exchanges without navigating the intricacies of individual API discrepancies.
DolphinDB Python API bridges Python programs and DolphinDBserver, enabling bi-directional data transfer and script execution. Users can retrieve market data from various exchanges using the CCXT library, and write to DolphinDB databases through DolphinDB Python API. This approach leverages DolphinDB’s excellent computational performance and powerful storage capabilities to accelerate data handling and analysis.
This section provides an example demonstrating how to use CCXT to fetch real-time BTC mid-prices from Binance, OKX, Huobi, and Gate exchanges in Python, and write the received data to DolphinDB.
(1) Environment Setup
Install and load the required packages in Python:
import ccxt
import dolphindb as ddb
import os
import time
import websocket
import json
import requests
import numpy as np
# Set up the proxy if required
os.environ['http_proxy'] = 'http://127.0.0.1:7890'
os.environ['https_proxy'] = 'http://127.0.0.1:7890'
# Connect to DolphinDB server
s = ddb.session()
s.connect(host="DolphinDB server IP", port="DolphinDB server port",
userid="username", password="password")
(2) Creating Persisted Stream Tables
Create a persisted stream data table in DolphinDB to store the data obtained from exchanges.
Python script:
script = """
name = `timestamp`BinanceMid`OkxMid`HuobiMid`GateMid
type = [int, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
enableTableShareAndPersistence(table=streamTable(1:0, name, type),
tableName="BTCUSDTCROSSEXCHANGE", cacheSize=1000, preCache=1000)
"""
s.run(script)
DolphinDB script:
name = `timestamp`BinanceMid`OkxMid`HuobiMid`GateMid
type = [int, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
enableTableShareAndPersistence(table=streamTable(1:0, name, type),
tableName="BTCUSDTCROSSEXCHANGE", cacheSize=1000, preCache=1000)
(3) CCXT Exchange Instances and Data Fetching Function
Create CCXT exchange instances in Python and define the function to fetch the order book data.
# Proxy server URL
proxy_url = 'http://127.0.0.1:7890'
# Initialize exchange objects with proxy and timeout settings
exchangeBinance = ccxt.binance({
'proxies': {
'http': proxy_url,
'https': proxy_url,
},
'timeout': 3000,
})
exchangeOkx = ccxt.okx({
'proxies': {
'http': proxy_url,
'https': proxy_url,
},
'timeout': 3000,
})
exchangeHuobi = ccxt.huobi({
'proxies': {
'http': proxy_url,
'https': proxy_url,
},
'timeout': 3000,
})
exchangeGate = ccxt.gate({
'proxies': {
'http': proxy_url,
'https': proxy_url,
},
'timeout': 3000,
})
def fetch_order_book_with_retry(symbol, ddbTableName, retries=3):
for attempt in range(retries):
try:
# Fetch order book data and calculate mid-price
orderBookBinance = exchangeBinance.fetch_order_book(symbol)
timestamp = time.time()
BinanceMid = (orderBookBinance['bids'][0][0] +
orderBookBinance['asks'][0][0])/2
orderBookOkx = exchangeOkx.fetch_order_book(symbol)
OkxMid = (orderBookOkx['bids'][0][0] +
orderBookOkx['asks'][0][0])/2
orderBookHuobi = exchangeHuobi.fetch_order_book(symbol)
HuobiMid = (orderBookHuobi['bids'][0][0] +
orderBookHuobi['asks'][0][0])/2
orderBookGate = exchangeGate.fetch_order_book(symbol)
GateMid = (orderBookGate['bids'][0][0] +
orderBookGate['asks'][0][0])/2
insertData = [int(1000*timestamp),round(BinanceMid,2),
round(OkxMid,2),round(HuobiMid,2),round(GateMid,2)]
s.run("tableInsert{"+ddbTableName+"}",insertData)
except ccxt.RequestTimeout:
print(f"Request timed out, retrying... ({attempt + 1}/{retries})")
time.sleep(2)
except ccxt.NetworkError as e:
print(f"Network error: {str(e)}")
break
except ccxt.ExchangeError as e:
print(f"Exchange error: {str(e)}")
break
return None
(4) Continuous Data Fetching
Use a loop in Python to fetch real-time market data every other second:
while True:
try:
fetch_order_book_with_retry('BTC/USDT', "BTCUSDTCROSSEXCHANGE")
time.sleep(1)
except Exception as e:
print(e)
time.sleep(5)
Data written to the stream table (the latest 10 records) can be checked with the following select
query:
select * from BTCUSDTCROSSEXCHANGE limit 10
(5) Creating a DFS database in DolphinDB to store the data:
dbName = "dfs://mid_cross_market"
tbName = "btc"
colNames = `timestamp`BinanceMid`OkxMid`HuobiMid`GateMid
colTypes = [int, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
db = database(dbName, VALUE, 1..10, engine='TSDB')
// dropDatabase("dfs://mid_cross_market")
pt = createPartitionedTable(db, t, tbName,
partitionColumns=`timestamp, sortColumns=`timestamp)
btcCross = loadTable(dbName, tbName)
subscribeTable(tableName="BTCUSDTCROSSEXCHANGE",
actionName="insertDBBTC", offset=-1, handler=btcCross,
msgAsTable=true, batchSize=5000, throttle=0.5)
The following script can be used to cancel subscription and drop the stream table:
unsubscribeTable(tableName="BTCUSDTCROSSEXCHANGE", actionName="insertDBBTC")
undef(`BTCUSDTCROSSEXCHANGE, SHARED)
To query the data stored in a DFS database, the table must be first loaded:
dbName = "dfs://mid_cross_market"
tbName = "btc"
btcCross = loadTable(dbName, tbName)
select * from btcCross limit 10
2.3 Integrating Multi-Source Market Data
This section presents an advanced multi-source market data solution that integrates the data retrieved using two methods described in Section 2.1. The integration process addresses the challenges posed by varying subscriptions and latencies, implementing sophisticated rules for data alignment and cleaning to ensure cohesive integration into a unified table.
2.3.1 Workflow
Accessing Market Streams from Binance Exchange
For each trading pair, market data is accessed via both WebSocket API (with subscription frequency at 100 ms) and REST API (with polling interval of 50 ms).
Processing Data from WebSocket API
Subscribe to WebSocket API data streams.
Ingest received data to a shared stream table.
Subscribe to the stream table and write data to a DFS partitioned table.
Processing Data from REST API
Set up HTTP requests to fetch data from REST API.
Ingest the data to a shared stream table.
Subscribe to the stream table and write data to a DFS partitioned table.
Data Cleaning and Integration
Create a shared dictionary to keep the latest timestamp for each trading pair.
Define rules for filtering data with latest timestamps, and integrate selected streams into a shared stream table.
Upon receiving new data, compare timestamps of incoming data with those in table to ensure only the latest data is written.
Subscribe to the stream table and write data to a DFS partitioned table.
Fig 2–1 Integrated Multi-Source Data Access
The core of this integration strategy lies in maintaining only the latest records for each trading pair. Therefore, we employ a shared dictionary to store the latest timestamps for each trading pair. The subscription to multi-source data specifies filtering rules based on the dictionary, and only the latest records can be saved to the integrated table.
2.3.2 Implementation
The stream tables wssDepthST and restDepthST are used to receive market data from Binance WebSocket and REST APIs. Due to the differences in subscription methods, frequencies and delays, rules for aligning and cleaning such multi-source market data are required for later data integration. These rules are designed to compare the timestamp of the incoming data with that in table to determine if the data provides the most recent information.
Note: Binance provides multiple base endpoints, each with distinct performance and stability characteristics. We recommend that users test the performance and stability of each base endpoints on the designated server in their specific business context and determine the optimal number of data sources and retrieval methods based on their needs. The proposed approach combines the strengths of both methods to enhance performance and stability by subscribing to data on different base URLs using the same account and setting different processing frequencies.
(1) Create a shared dictionary for maintaining the latest timestamps for each trading pair to track the most recent update time.
latestTs=syncDict(SYMBOL,TIMESTAMP)
latestTs['btcusdt'] = 2024.05.16 06:39:17.513
(2) Set rules for filtering and ingesting data from restDepthST and wssDepthST into table latestDepthST. By extracting data and event times, the rules check if the record presents the latest timestamp. If true, the shared dictionary is updated and new records are appended to latestDepthST.
def toMerged(mutable latestTs, routeName, msg){
symbolID = exec code from msg limit 1
Timets = exec eventTime from msg limit 1
lts = latestTs[symbolID]
if(Timets > lts){
latestTs[symbolID] = Timets
appendTestTb = select code, eventTime, lastUpdateId, bidPrice, bidQty, askPrice, askQty, currentTime, routeName as dataroute from msg
objByName("latestDepthST").append!(appendTestTb)
}
}
(3) Subscribe to wssDepthST and restDepthST and specify the handler with filtering rules.
subscribeTable(tableName="restDepthST", actionName="restTolive", offset=-1, handler=toMerged{latestTs, 'restapi'}, msgAsTable=true, batchSize=1, throttle=0.001)
subscribeTable(tableName="wssDepthST", actionName="wssTolive", offset=-1, handler=toMerged{latestTs, 'wssapi'}, msgAsTable=true, batchSize=1, throttle=0.001)
This method allows subscriptions to multiple sources to be integrated into one table. Rules should be specified based on the format of the obtained market data.
2.3.3 Performance Test
To assess the efficacy of our multi-source integration approach, we conducted a 24-hour data coverage test, comparing the performance of WebSocket and REST methods both separately and in combination. The volume of data received serves as a proxy for average refresh rates, with larger volumes indicating higher refresh frequencies.
In our test environment, we obtained BTC/USDT spot order book data using two methods:
WebSocket with a 100-ms frequency (maximum available speed)
REST polling with a 50-ms interval
We then filtered and inserted the latest market data into a table. While the 50-ms update speed is ideal for balancing update frequency and market data acquisition across multiple currency pairs given bandwidth limitations, it’s important to note that the REST polling method actually yielded higher data volumes due to its faster 50 ms interval.
Record obtained after 24 hours:
WebSocket subscription: 644,808
REST polling: 1,383,480
Duplicate records: 10,020
The latest market table latestDepthST receives 1,753,067 new records in total, with the specific composition as follows:
Table 2–1 Table latestDepthST Composition
From the results, we can see that the number of new records added to latestDepthST exceeds the number of records obtained separately through WebSocket and REST. Moreover, a higher proportion of REST data in the table indicates that this multi-source integration approach can increase the frequency and timeliness of data acquisition compared to a single access method. Additionally, since the data obtained from WebSocket and REST APIs are pushed from different servers, this method helps mitigate risks associated with single server failures. This further demonstrates that multi-source market data access can obtain market streams more quickly, reducing the risks associated with network fluctuations and exchange service failures.
2.4 Storing Market Data in DolphinDB
Efficient data partitioning is crucial for minimizing system response latency and maximizing data throughput. This section presents optimized partitioning schemes and sample scripts for crypto market data storage, using Binance exchange data as a reference. The database designs employ both OLAP and TSDB storage engines, taking into account the unique characteristics of crypto market data.
2.4.1 Key Considerations in Database Design
Composite Partitioning: Crypto trading typically involves multiple trading pairs across various exchanges, with significant fluctuations in trading volume between different pairs. To address these characteristics, this solution adopts a three-dimension composite partitioning strategy based on date, exchange, and trading pair.
Date-based partitioning: Facilitates efficient management of historical data and accelerates date-range queries in the 24/7 crypto market environment.
Exchange-based partitioning: Expedites data queries and computations within specified exchanges.
Trading pair-based partitioning: Hashed trading pairs ensures balanced data distribution across partitions, optimizing query performance.
Co-location Mechanism: With co-location mechanism, tables (market data types) within the same database employ consistent partitioning schemes, and partitions of different tables are stored on the same node for optimized table joins.
High-Precision Data Types: DECIMAL data types are used instead of floating-point types to maintain the extremely-high precision required in crypto market data, preventing losses or calculation errors due to rounding.
Note: Each exchange’s data encompasses approximately 300 trading pairs and 10 accounts.
2.4.2 Example Script
The following script creates databases and tables for the order book data of U-margined futures based on the proposed partitioning scheme.
dbName = "dfs://marketData"
tbName = "orderBookFutures"
dbDate = database("", VALUE, 2012.01.01..2012.01.30)
dbEx = database("", VALUE, `Binance`OKX)
dbSym = database("", HASH, [SYMBOL, 50])
db = database(dbName, COMPO, [dbDate, dbEx, dbSym], engine='TSDB')
colNames = `exchange`transactionTime`symbol`bidPrice`bidQty`askPrice`askQty
colTypes = [SYMBOL, TIMESTAMP, SYMBOL, DECIMAL128(18)[],
DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]
t = table(1000000:0, colNames, colTypes)
pt = createPartitionedTable(db, t, tbName,
partitionColumns=`transactionTime`exchange`symbol,
sortColumns=`symbol`transactionTime)
Note that the bidPrice, bidQty, askPrice and askQty columns are specified as array vectors of DECIMAL128 type for conveniently storing multi-level market data in one field. DECIMAL128(18) is used for prices and DECIMAL128(8) for trading volumes to ensure precision across various exchanges. While DECIMAL128 provides higher storage precision, it also consumes more storage and computational resources. Users should adjust data storage formats based on their specific trading requirements and exchange documentation.
2.5 Resampling OHLC Data
This section demonstrates how to perform frequency resampling to convert OHLC data (also known as Kline or candlestick data) to a specified frequency. The example below illustrates the process of converting second-level OHLC data to minute-level:
dbName0 = "dfs://binance_spot_k_1min_level"
tbName0 = "kline"
colNames = `symbol`open_time`open`high`low`close`volume
colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
t = table(1000000:0, colNames, colTypes)
db = database(dbName0, VALUE, 2024.01.01..2024.12.30, engine='TSDB')
pt = createPartitionedTable(db, t, tbName0,
partitionColumns=`open_time, sortColumns=`symbol`open_time)
dbName = "dfs://binance_spot_k_1s_level"
tableName = "kline"
tb = loadTable(dbName,tableName)
t = select symbol,open_time,open,high,low,close,volume from tb;
time = t['open_time']
barMinutes = 1
OHLC = select first(open_time) as open_time, first(open) as open,
max(high) as high, min(low) as low, last(close) as close,
sum(volume) as volume from t group by symbol,
bar(time, barMinutes*60*1000) as barStartq
minKline = select symbol,open_time,open,high,low,close,volume from OHLC
tb0 = loadTable(dbName0,tbName0)
tableInsert(tb0,minKline)
This chapter presents a thorough solution for real-time market data access, integration and storage. The methods offer flexibility and scalability, allowing users to tailor the system to various exchanges and market conditions. This helps build a sophisticated market data infrastructure that can support advanced trading algorithms and in-depth market research in crypto quant trading.
In the next chapter, we will explore DolphinDB’s stream processing capabilities, demonstrates real-time metric calculations, and introduces visualization tools for effective data representation.
Download Whitepaper Here: Cryptocurrency Solutions — Dolphindb
Email us for more information or to schedule a demo: sales@dolphindb.com
Thanks for your reading! To keep up with our latest news, please follow our Twitter @DolphinDB_Inc and Linkedin. You can also join our Slack to chat with the author!
Subscribe to my newsletter
Read articles from DolphinDB directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by