DEV Community

Cover image for Integrating Crypto Market Data From Multiple Sources
DolphinDB
DolphinDB

Posted on

Integrating Crypto Market Data From Multiple Sources

Cryptocurrency trading professionals typically use Python, Java, Rust, or C++ for exchange API access. Single-source data access presents inherent limitations, such as data loss from server downtime, delayed updates, and limited data quality and refresh rates, which can potentially impact trading decisions.

This article introduces a DolphinDB solution to access market streams (with Binance Exchange as an example) through both WebSocket and REST APIs. Performance test shows that this integrated approach enhances continuous data access and comprehensive coverage and improves data quality by providing more timely and frequent market updates. By implementing higher frequency data refreshes within shorter intervals, traders can achieve superior market responsiveness and make more informed decisions.

1. Background

Binance offers two ways for accessing the market streams:

  • REST API: Suitable for retrieving static data but requires polling for real-time updates.

  • WebSocket API: Designed for accessing real-time market streams. By maintaining a persistent connection, it enables low-latency retrieval of the latest data.

Binance provides multiple base endpoints, each with distinct performance and stability characteristics. We recommend that users test the performance and stability of each base endpoints on the designated server in their specific business context and determine the optimal number of data sources and retrieval methods based on their needs. The proposed approach combines the strengths of both methods to enhance performance and stability by subscribing to data on different base URLs using the same account and setting different processing frequencies.

2. Implementation Overview

The integrated market data solution comprises the following components.

Image description

Accessing Market Streams from Binance Exchange

For each cryptocurrency pair, market data is accessed from Binance via both WebSocket API (with subscription frequency at 100 ms) and REST API (with polling interval of 50 ms).

Processing Data from WebSocket API

  • Subscribing to the data from WebSocket API, and writing it to a shared stream table.

  • Persisting the stream table to a DFS partitioned table.

Processing Data from REST API

  • Setting up HTTP requests to fetch data from REST API.

  • Writing the data to a shared stream table.

  • Persisting the stream table to a DFS partitioned table.

Cleaning and Integrating Received Data

  • Creating a shared dictionary to maintain the latest timestamp for each trading pair.

  • Defining rules for filtering data with latest timestamps, and integrating selected streams into a shared stream table.

  • Upon receiving new data, comparing timestamps of incoming data with those in table to ensure only the latest data is written.

  • Persisting the stream table to a DFS partitioned table.

3. Accessing Market Streams

This chapter outlines the steps of accessing market streams from Binance’s WebSocket and REST APIs using DolphinDB WebSocket and HttpClient Plugins.

3.1 Accessing Data From WebSocket API

We establish a WebSocket subscription task using the DolphinDB WebSocket plugin to access real-time market depth data from Binance WebSocket API.

Step 1: Install and load the DolphinDB WebSocket plugin. This step can be skipped if the plugin is already loaded.

installPlugin("WebSocket")
loadPlugin("WebSocket")
Enter fullscreen mode Exit fullscreen mode

Step 2: Create a shared stream table wssDepthST and persist it to disk.

colNames = `type`eventTime`transactionTime`code`firstId`lastId`finalId`bidPrice`bidQty`askPrice`askQty`currentTime
colTypes = [SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], TIMESTAMP]
enableTableShareAndPersistence(table=streamTable(10000:0, colNames, colTypes), tableName=`wssDepthST, cacheSize=12000)
Enter fullscreen mode Exit fullscreen mode

Step 3: Define functions for handling market data.

The market depth data accessed through WebSocket API is formatted as follows:

{"e": "depthUpdate",  "E": 1571889248277, "T": 1571889248276, "s": "BTCUSDT", "U": 390497796, "u": 390497878, "pu": 390497794,
 "b": [["7403.89", "0.002"], ["7403.90", "3.906"], ["7404.00", "1.428"] ,["7404.85", "5.239"], ["7405.43","2.562"]],
 "a": [["7405.96","3.340" ], ["7406.63", "4.525"], ["7407.08", "2.475"], ["7407.15", "4.800"], ["7407.20","0.175"]]}
Enter fullscreen mode Exit fullscreen mode

The following script defines functions for processing the incoming data in DolphinDB.

// Define parseDepth for parsing market depth data
def parseDepth(rawData){
    rawDataDict = parseExpr(rawData).eval().data
    if (rawDataDict["e"]=="depthUpdate"){
        // Process bid data
        bTransposed = rawDataDict.b.double().matrix(b).transpose()
        bidPrice = bTransposed[0].enlist()
        bidQty = bTransposed[1].enlist()
        // Process ask data
        aTransposed = rawDataDict.a.double().matrix(a).transpose()
        askPrice = aTransposed[0].enlist()
        askQty = aTransposed[1].enlist()
        // Extract other relevant fields
        type = rawDataDict["e"]
        eventTime = timestamp(rawDataDict["E"])
        transactionTime = timestamp(rawDataDict["T"])
        code = rawDataDict["s"]
        firstId = rawDataDict["U"]
        lastId = rawDataDict["u"]
        finalId = rawDataDict["pu"]
        currentTime = gmtime(now())
        // Construct a table
        return table(typ, eventTime, transactionTime, code, firstId, 
          lastId, finalId, bidPrice, bidQty, askPrice, askQty, currentTime)
    }
}
// WebSocket event handlers
def onOpen(ws){
    writeLog("WebSocket opened to receive data")
}

def onMessage(mutable streamTable, ws, dataTable){
    for (data in dataTable[`msg]){
        res = parseDepth(data)
        streamTable.append!(res)
    }
}

def onError(ws, error){
    writeLog("WebSocket failed to receive data: " + error.string())
}

def onClose(ws, statusCode, msg){
    writeLog("Connection is closed, status code: " + statusCode.string() + ", " + 
      msg.string())
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Create a WebSocket subscription task.

// Connect to Binance WebSocket API
url = "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms"
config = dict(STRING, ANY)
// Create a WebSocket subscription task
ws = WebSocket::createSubJob(url, onOpen, onMessage{streamTable=wssDepthST}, onError, onClose, "wssDepth", config)
Enter fullscreen mode Exit fullscreen mode

Step 5: Manage the subscription.

After establishing the subscription, use getSubJobStat to view the subscription status:

// Check subscription status
WebSocket::getSubJobStat()
Enter fullscreen mode Exit fullscreen mode

To cancel the subscription, use cancelSubJob:

// Cancel subscription
WebSocket::cancelSubJob("wssDepth")
Enter fullscreen mode Exit fullscreen mode

For the complete script for Binance WebSocket data access, refer to Appendix binanceDepthWS.

3.2 Accessing Data From REST API

We create a task to send periodic HTTP requests through the HttpClient Plugin for accessing the latest market depth data from Binance REST API.

Step 1: Install and load the DolphinDB HttpClient plugin. This step can be skipped if the plugin is already loaded.

installPlugin("httpClient")
loadPlugin("httpClient")
Enter fullscreen mode Exit fullscreen mode

Step 2: Create a shared stream table restDepthST and persist it to disk.

colNames = `eventTime`transactionTime`code`lastUpdateId`bidPrice`bidQty`askPrice`askQty`currentTime
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, LONG, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], TIMESTAMP]
enableTableShareAndPersistence(table=streamTable(100000:0, colNames, colTypes), tableName=`restDepthST, cacheSize=12000)
Enter fullscreen mode Exit fullscreen mode

Step 3: Define functions for handling market data.

The market depth data accessed through REST API is formatted as follows:

{"lastUpdateId": 1027024,"E": 1589436922972,"T": 1589436922959,
 "bids": [["4.00000000", "431.00000000"]],
 "asks": [["4.00000200", "12.00000000"]]}
Enter fullscreen mode Exit fullscreen mode

Define a getDepth function that sends HTTP requests at regular intervals (every 50 ms) to continuously fetch the latest market data. The retrieved data is parsed and ingested into a stream table.

// Define getDepth to fetch market data
def getDepth(mutable restDepthST, code, baseUrl){
    do{
        try{
        param = dict(string,string)
        param['symbol'] = code;
        res = httpClient::httpGet(baseUrl,param,10000)

        rawDataDict = parseExpr(res.text).eval()
        b = double(rawDataDict.bids)
        bTransposed = matrix(b).transpose()
        bidPrice = bTransposed[0]
        bidQty = bTransposed[1]
        a = double(rawDataDict.asks)
        aTransposed = matrix(a).transpose()
        askPrice = aTransposed[0]
        askQty = aTransposed[1]

        eventTime = timestamp(rawDataDict["E"])
        transactionTime = timestamp(rawDataDict["T"])
        lastUpdateId = long(rawDataDict["lastUpdateId"])
        currentTime = gmtime(now())

        resTable = table(eventTime as eventTime, transactionTime as transactionTime, 
            code as code, lastUpdateId as lastUpdateId, [bidPrice] as bidPrice, 
            [bidQty] as bidQty, [askPrice] as askPrice, [askQty] as askQty, currentTime as currentTime)
        restDepthST.append!(resTable)
        sleep(50) 
        }
        catch(ex){
            print(ex)
            continue
        }
    }while(true)
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Set the base endpoint and submit the job with the name getDepth_BTC.

baseUrl = "https://fapi.binance.com/fapi/v1/depth"
submitJob("getDepth_BTC","getDepth_BTC", getDepth, restDepthST, "btcusdt", baseUrl)
Enter fullscreen mode Exit fullscreen mode

Step 5: Manage submitted jobs.

// View recent jobs
getRecentJobs()
// View intermediate job information
getJobMessage("getDepth_BTC")
// Cancel the job
cancelJob('getDepth_BTC')
Enter fullscreen mode Exit fullscreen mode

For the complete script for Binance REST data access, refer to Appendix binanceDepthREST.

4. Integrating Multi-Source Data

The stream tables wssDepthST and restDepthST receive market data from Binance WebSocket and REST APIs. Due to the differences in subscription methods and delays, rules for aligning and cleaning such multi-source market data are required for later data integration. These rules are designed to compare the timestamp of the incoming data with that in table to determine if the data provides the most recent information.

Specifically, we implement data integration by creating a shared dictionary to store the latest timestamps for each cryptocurrency pair and using a stream table latestDepthST to store the integrated market data. The table subscribes to wssDepthST and restDepthST with handling rules to update records with latest timestamps.

Step 1: Create a shared dictionary for maintaining the latest timestamps for each trading pair to track the most recent update time.

latestTs=syncDict(SYMBOL,TIMESTAMP)
latestTs['btcusdt'] = 2024.05.16 06:39:17.513
Enter fullscreen mode Exit fullscreen mode

Step 2: Set rules for filtering and ingesting data from restDepthST and wssDepthST into table latestDepthST. By extracting data and event times, the rules check if the record has the latest timestamp. If true, the shared dictionary is updated and new records are appended to latestDepthST.

def toMerged(mutable latestTs, routeName, msg){ 
    symbolID = exec code from msg limit 1
    Timets = exec eventTime from msg limit 1
    lts = latestTs[symbolID]
    if(Timets > lts){
        latestTs[symbolID] = Timets
        appendTestTb = select code, eventTime, lastUpdateId, bidPrice, bidQty, askPrice, askQty, currentTime, routeName as dataroute from msg
        objByName("latestDepthST").append!(appendTestTb)
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Subscribe to wssDepthST and restDepthST and specify the handler with filtering rules.

subscribeTable(tableName="restDepthST", actionName="restTolive", offset=-1, handler=toMerged{latestTs, 'restapi'}, msgAsTable=true, batchSize=1, throttle=0.001)
subscribeTable(tableName="wssDepthST", actionName="wssTolive", offset=-1, handler=toMerged{latestTs, 'wssapi'}, msgAsTable=true, batchSize=1, throttle=0.001)
Enter fullscreen mode Exit fullscreen mode

This method allows subscriptions to multiple sources to be integrated into one table. Rules should be specified based on the format of the obtained market data.

5. Performance Test

We test the data coverage of using WebSocket and REST APIs separately and together by evaluating the data volume received within 24 hours. Larger data volume indicates higher refresh rates on average.

In our test environment, we use two methods to obtain BTCUSDT spot depth data: WebSocket with a 100-ms frequency (maximum available speed) and REST polling with a 50-ms interval. We then filter and insert the latest market data into a table. While the 50-ms update speed is ideal for balancing update frequency and market data acquisition across multiple currency pairs given bandwidth limitations, it’s important to note that the REST polling method actually yields higher data volumes due to its faster 50 ms interval.

After 24 hours of data acquisition:

  • REST polling: 1,383,480
  • WebSocket subscription: 644,808
  • Duplicate records: 10,020

The latest market table latestDepthST receives 1,753,067 new records, with the specific composition as follows:

Image description

From the results, we can see that the number of new records added to latestDepthST is greater than the number of records obtained separately through WebSocket and REST. Moreover, a higher proportion of REST data in the table indicates that this multi-source integration approach can increase the frequency and timeliness of data acquisition compared to a single access method. Additionally, since the data obtained from WebSocket and REST APIs are pushed from different servers, this method helps avoid problems caused by a single server failure. This further demonstrates that multi-source market data access can obtain market streams more quickly, reducing the risks associated with network fluctuations and exchange service failures.

6. Conclusion

This article introduces a DolphinDB-based solution in multi-source market stream access and integration of cryptocurrency market. By integrating multiple data sources and implementing efficient data cleansing and storage, DolphinDB not only improves the refresh frequency and timeliness of data acquisition but also ensures data continuity and reliability. The test results show that this approach significantly increases the amount of market data obtained and improves the data refresh interval, providing traders with more reliable and timely market data support.

In summary, DolphinDB’s multi-source market data integration solution significantly enhances the cryptocurrency market data acquisition process, providing traders with a superior framework for strategic trading decisions and activities.

7. Appendix

Top comments (0)