This is the third post in a series about the new Solace/kdb+ API. I suggest you read the previous posts about streaming data between kdb+ and PubSub+ and bringing the power of pub/sub messaging to kdb+.

A typical kdb+ architecture consists of several q processes tightly coupled together and maybe a Java feed handler. The feed handler is responsible for capturing market data and sending it to a ticker plant which then routes it to different real-time subscribers. The most popular real-time subscriber is RDB (realtime database) which keeps the raw real-time data in memory and then persists it to disk at the end of the day.

The second most popular real-time subscriber is usually a bar generation process that also subscribes to real-time updates, just like the RDB, from the ticker plant. However, instead of saving the raw updates, this process computes real-time analytics. These stats are usually computed every minute but can differ depending on the individual use case. This bar generation process either persists the data to disk and/or sends it off to another process interested in this data.

In this post, I would like to show you how your bar generation process can consume streaming data from Solace’s PubSub+ Event Broker, generate minutely statistics, and then publish those stats back to PubSub+ on dynamic topics.

You can find the code of the q process here.

Market Data Simulator

I will be using a market-data-simulator (github | blog post) which I have previously coded to generate simulated market data for us. It’s a simple application that can be configured to generate L1 market data for certain stocks.

Market Data Simulatorby himoacsOpen on GitHub

The simulator publishes data to PubSub+ topics of this syntax:

For example, AAPL’s data will be published on EQ/marketData/v1/US/NASDAQ/AAPL and IBM’s data will be published on EQ/marketData/v1/US/NYSE/IBM.

By default, the simulator is configured to publish data for multiple stocks from 4 exchanges: NYSE, NASDAQ, LSE, and SGX.

Solace PubSub+ Event Broker

For the market data simulator to work, we need to have a running instance of Solace’s PubSub+ Event Broker. There are two easy ways to get started:

  1. Install PubSub+ locally via docker
  2. Get started with an event broker for free at Solace PubSub+ Event Broker: Cloud

I will be using a local PubSub+ instance.

Consuming Raw Market Data Updates

To consume this data, we can either subscribe directly to the topics or create a queue and map the topic(s) to it. In production, working with high volume market data, you would typically subscribe to topics but for our use case, we will be subscribing to a queue.

We can use Solace’s rich hierarchical topics and wildcard filtering to select which topics we are interested in. For example, we can receive data only for US exchanges by adding this topic subscription to our queue: EQ/marketData/v1/US/>. We can also choose to subscribe to updates from a certain exchange only: EQ/marketData/v1/*/LSE/>.

The script creates a queue called market_data and maps a topic to it: EQ/marketData/v1/US/>.

We will be using Kx’s Solace interface to kdb+ to communicate with our PubSub+ instance.

// Market Data queue that we would like to subscribe to

-1"### Creating endpoint";

-1"### Mapping topic: ", (string topicToMap), " to queue";

My q bar generation process, gen_stats.q, loads sol_init.q file which consists of connection settings and some other information required to establish a connection with my local PubSub+ instance.

I defined a global table, prices, for keeping track of the latest updates:

prices:flip (`date`time`sym`exchange`currency`askPrice`askSize`bidPrice`bidSize`tradePrice`tradeSize)!(`date$();`time$();`symbol$();`symbol$();`symbol$();`float$();`float$();`float$();`float$();`float$();`float$());

Then, I defined our callback function, subUpdate, which will be triggered every time there is a new message. In this function, I parse the incoming message and insert it into our prices table.

Then, I defined a function, updateStats, that will be triggered every minute via .z.ts. This function selects rows from prices that arrived within the last minute and computes stats such as highPricelowPricehighAskPricelowAskPricevwap, etc.

// Generate minutely stats on data from last min`prices set rawTable:select from rawTable where time>.z.T-00:01;stats:select lowAskSize: min askSize,highAskSize: max askSize,lowBidPrice: min bidPrice,highBidPrice: max bidPrice,lowBidSize: min bidSize,highBidSize: max bidSize,lowTradePrice: min tradePrice,highTradePrice: max tradePrice,lowTradeSize: min tradeSize,highTradeSize: max tradeSize,lowAskPrice: min askPrice,highAskPrice: max askPrice,vwap:tradePrice wavg tradeSize by date, sym, time:1 xbar time.minute from rawTable;stats:select from stats where time=max time;

Then, for each symbol, it generates a dynamic topic and publishes the stats for each symbol to their specific PubSub+ topic. For example, stats for AAPL are published to EQ/stats/v1/AAPL and stats for IBM are published to EQ/stats/v1/IBM.

On PubSub+, we have created another queue called stats which we will attract all stats messages by subscribing to EQ/stats/>.

And that’s it!


Now that we understand the flow better, let’s run our market data simulator and bar generation process.
I have modified the simulator code a bit to log the messages it is publishing:

=======================================================Publishing to topic: EQ/marketData/v1/US/NASDAQ/AAPLData: {"date":"2020-06-08","symbol":"AAPL","askPrice":260.145,"bidSize":790,"tradeSize":340,"exchange":"NASDAQ","currency":"USD","time":"15:42:13.805925-04:00","tradePrice":256.93332,"askSize":760,"bidPrice":253.72165}==============================================================================================================Publishing to topic: EQ/marketData/v1/US/NASDAQ/FBData: {"date":"2020-06-08","symbol":"FB","askPrice":170.17534,"bidSize":560,"tradeSize":350,"exchange":"NASDAQ","currency":"USD","time":"15:42:13.825708-04:00","tradePrice":168.49043,"askSize":380,"bidPrice":166.80553}==============================================================================================================Publishing to topic: EQ/marketData/v1/US/NASDAQ/INTCData: {"date":"2020-06-08","symbol":"INTC","askPrice":59.54003,"bidSize":520,"tradeSize":280,"exchange":"NASDAQ","currency":"USD","time":"15:42:13.840505-04:00","tradePrice":59.023575,"askSize":560,"bidPrice":58.50712}=======================================================

As you can see, we are publishing messages to PubSub+ topics.

Now, it’s time to run our bar generation q process. You can see that our q process is receiving the raw updates:

q)pricesdate             time             sym  exchange currency askPrice askSize bidPrice bidSize tradePrice tradeSize-------------------------------------------------------------------------------------------2020.06.08 15:47:50.732 AAPL NASDAQ      USD           249.9859 70          246.2641 460        248.125       1802020.06.08 15:47:50.732 AAPL NASDAQ       USD            249.9859 70        246.2641 460         248.125        180200.06.08 15:47:50.945    FB     NASDAQ       USD            170.8511 780      169.999   50            170.425        3802020.06.08 15:47:50.945  FB     NASDAQ       USD            170.8511 780      169.999   50            170.425        3802020.06.08 15:47:50.952  INTC NASDAQ       USD            61.206     150       59.994    100          60.6               3302020.06.08 15:47:50.952  INTC NASDAQ       USD            61.206     150       59.994    100          60.6               3302020.06.08 15:47:50.955  IBM  NYSE              USD            99.99937 290      99.50063 260         99.75             4902020.06.08 15:47:50.955 IBM   NYSE              USD            99.99937 290      99.50063 260         99.75             4902020.06.08 15:47:50.960 AC     NYSE              USD            22.16531 300      21.99969 400         22.0825         430

Our market_data queue was generated by the stats script. Additionally, I have administratively created a stats queue from the web UI with the appropriate subscription to capture stats generated by our script:

Additionally, you can see that our stats queue is receiving minutely stats:

We can also use Solace’s Try Me! subscriber app to subscribe to these stats and inspect the data:

As we can see from the screenshot above, by subscribing to EQ/stats/>, we are receiving data sent to any topic that falls in that hierarchy such as EQ/stats/v1/XOM. The payload for our stats messages looks like this:



That’s a wrap! In this post, I showed you how you can consume raw streaming data from PubSub+ from within your q process, generate stats on the raw data, and then publish the stats back to PubSub+ using dynamic topics.

By using PubSub+ as your distribution layer, you have removed your stats process’s dependency on the ticker plant. Additionally, your kdb+ stack is now loosely coupled. You are also able to take advantage of PubSub+’s rich hierarchical topics and wildcard filtering.

Himanshu Gupta

Himanshu Gupta is a Solutions Architect at Solace. He has experience working at both buy and sell side as a tick data developer. In these roles, he worked with popular timeseries databases kdb+ and OneTick to store and analyze real-time and historical financial market data for cross asset classes.

Join Our Developer Community

Join the Solace Developer Community to discuss and share PubSub+ API hints, new features, useful integrations, demos, and sample code!