How to discover the health of Elektron Real Time Infrastructure using the EMA Java API

This article describes how downstream applications discover the health of Elektron Real Time Infrastructure, through the consumption of SPS (Service Provider Status) messages provided by the Elektron Real Time CHE (Collection Head End). It is intended for Java Developers using EMA to develop an application for monitoring health of Elektron Real Time Infrastructure. However, the general techniques outlined below could be applied to any of our real-time streaming APIs i.e. RFA C++/Java/.NET, ETA C++/Java and EMA C++. The users should have a basic understanding of how to consume data using EMA Java which can be learnt from EMA Consumer Tutorials in Thomson Reuters Developer Community web site.

Overview

Elektron Pulse is a real-time monitoring capability built into Elektron products to provide full transparency into the health of Thomson Reuters infrastructure; helping customers to improve accuracy of data and manage business risk. It is an innovative approach into service management, with full end-to-end service monitoring and operational status for Elektron to radically improve support levels and service manageability- fit for purpose in today’s high frequency, low latency trading environment.

Elektron Pulse provides real-time service information on infrastructural and distribution failures, the health is advertised through a feed, with the flexibility to be programmatically consumed into their monitoring system, or viewed on an interactive GUI provided by Thomson Reuters. Elektron Pulse displays Service Provider Status (SPS) messages provided on the Elektron Network.

In Elektron, the Collection Head End (CHE) is used to deliver market data from the venue. The CHE is composed of multiple sub-systems known as “line handler”. Each line handler is responsible for part of the venue data coverage. Each line handler advertises its data healthiness by publishing SPS message. In Elektron Pulse, the information of data healthiness is aggregated. A top-level SPS record acts as starting point for client to discover all CHE subProvider SPS. The high level overview is as follow:

The top level SPS RIC points to provider SPS records, which shows individual venues within the same region. Each provider SPS record represents one venue.

  • A line handler publishes venue data, and one venue may have multiple line handlers which contribute to the venue. A provider SPS RICs representing a venue, points to a subProvider SPS(s) which represents a CHE line handler.
  • Subprovider SPS RIC shows the health of one CHE line handler of a venue. A line handler publishes the venues data, and one venue may be contributed to by multiple line handlers.
  • By aggregating the subProvider SPS information, we can get the health of a venue

Solution

The application can consume Elektron Real Time Infrastructure Health Status by the steps below:

  1. Subscribe to snapshot of a top-level SPS RIC and get the list of Provider SPS in a refresh message.
  2. Subscribe to snapshot of a Provider SPS and get the list of SubProvider SPS in a refresh message.
  3. Make a streaming Subscribtion of a SubProvider SPS and get it's health status in real time.

Data content from subscribing top-level SPS and Provider SPS are linkage of records which the application does not need to monitor in real time, because they do not change. Therefore, the application should subscribe to them as snapshot requests; only refresh message is returned with no update message.

An EMA Java application to consume Elektron Real Time Infrastructure Health Status is designed as per the figure below:

We will implement the application based on example210__MarketByOrder__Streaming. The example application includes source code to decode Map and field list data that we need in the application. The application will subscribe to a top-level SPS RIC, then show the Provider SPS list and wait for a Provider SPS which the user inputs. Next, subscribe to the input Provider SPS and show the SubProvider SPS list. Wait for a SubProvider SPS which the user inputs. Finally, subscribe to the input SubProvider SPS to get its health in real-time.

Solution Code

The EMA Java package can be downloaded at Elektron SDK - Java Downloads. The source code of  example210__MarketByOrder__Streaming is in \Ema\Src\examples\java\com\thomsonreuters\ema\examples\training\consumer\series200\example210__MarketByOrder__Streaming. We will modify the file named Consumer.java. The steps to implement the above behaviour are the following:

1. Declare the following variables used in processing SPS data in AppClient class:

  • Vector SPSList - keep the list of Provider SPS/SubProvider SPS
  • int level - keep the current level that the application is processing so the application can act accordingly. For example: if level=1 or level=2, print the list of Provider SPS/SubProvider SPS and wait for a Provider SPS/SubProvider SPS RIC that the user inputs.
  • boolean snapshot – a flag to request snapshot(true) or streaming(false).
  • String spsRIC – holds the Provider SPS/SubProvider SPS RIC that the user inputs
  • BufferedReader br – read an input Provider SPS/SubProvider SPS RIC
  • OmmConsumer consumer – subscribe to a Provider SPS/SubProvider SPS RIC
       class AppClient implements OmmConsumerClient    
       {
           private Vector<String> SPSList;
           private int level;
           private boolean snapshot;
           String spsRIC;
           BufferedReader br;
           OmmConsumer consumer;
           ...
       }

2. Modify AppClient's Constructor to accept an OmmConsumer object and initialize the variables declared in the step 1. OmmConsumer provides the method for subscribing to Provider/SubProvider SPS.

       class AppClient implements OmmConsumerClient
       {
           ...

           public AppClient(OmmConsumer cons) {
               consumer=cons;
               SPSList = new Vector<String>();
               level=0;
               snapshot=true;
               spsRIC=null;
               br =  new BufferedReader(new InputStreamReader(System.in));
       }

3. Modify main(String[] args) to subscribe to a snapshot of a top level SPS with SPS domain(11) or EmaRdm.MMT_SERVICE_PROVIDER_STATUS

       try
       {
           consumer  = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig().host("adsmachine").username("user"));
           AppClient appClient = new AppClient(consumer);
           //the RIC can be  .[SPSEMEA, .[SPSASIA, .[SPSAMER
           consumer.registerClient(EmaFactory.createReqMsg().domainType(11).serviceName("ELEKTRON_SERVICE").name(".[SPSEMEA").interestAfterRefresh(false), appClient);
           Thread.sleep(600000); // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg()
       }

4. After subscribing to a top level SPS, the application receives a refresh message, of which the payload is a map container. Each map entry contains a Provider SPS info and the key is a Provider SPS RIC i.e. .[SPSCHE-EDX as shown below:

       <mapEntry flags="0x00" action="ADD" key=".[SPSCHE-EDX" >
           <fieldList flags="0x08 (HAS_STANDARD_DATA)">
               <fieldEntry fieldId="6456" data="0B"/>
               <fieldEntry fieldId="6457" data="01"/>
               <fieldEntry fieldId="6458" data="0A"/>
               <fieldEntry fieldId="6459" data="00"/>
           </fieldList>
       </mapEntry>

To get the list of Provider SPS RIC, we have to get the key from each map entry. Likewise, to get the list of SubProvider SPS RIC, we have to get the key in each map entry of a refresh message returned after subscribing to an input Provider SPS RIC e.g. .[SPSCHE-EDX as shown below:

       <mapEntry flags="0x00" action="ADD" key=".[SPSEDXL1" >
           <fieldList flags="0x08 (HAS_STANDARD_DATA)">
               <fieldEntry fieldId="6456" data="0B"/>
               <fieldEntry fieldId="6457" data="01"/>
               <fieldEntry fieldId="6458" data="0A"/>
               <fieldEntry fieldId="6459" data="00"/>
           </fieldList>
       </mapEntry>

The map entry above contains a SubProvider SPS info and the key is a SubProvider SPS RIC i.e. .[SPSEDXL1. Therefore, we have to add a method named getKeyString(..) in AppClient class. The method is used to get the key in each map entry of Provider SPS and SubProvider SPS as explained above. Then, we will be able to get the list of Provider SPS and SubProvider SPS RIC.

       String getKeyString(OmmBuffer ommBuf) {
           ByteBuffer buffer =  ommBuf.asHex();
           StringBuilder asString = new StringBuilder();
           int length =buffer.limit();
           for (int i = buffer.position(); i < length; i++)
           {
              byte b = buffer.get(i);
              asString.append((char)b);
           }
           return asString.toString();
       }

5. In the decode(..) method which decodes Map, call the method getKeyString(..) to get the keys from the map of a refresh message. Then, extract and add the keys which are the Provider/SubProvider SPS RICs/names to the SPS list.

       void decode(Map map)
       {
           ...
           SPSList.clear();
           for (MapEntry mapEntry : map)
           {
               if (DataTypes.BUFFER == mapEntry.key().dataType())
               {
                   System.out.println("Action: " + mapEntry.mapActionAsString() + " key value: " + EmaUtility.asHexString(mapEntry.key().buffer().buffer()));
                   String keyStr=getKeyString(mapEntry.key().buffer());
                   System.out.println("key="+keyStr);
                   SPSList.add(keyStr);
               }
               ...

6. Add a method named requestNextlevel() in AppClient class. The method is used to verify if it should print the list of Provider/SubProvider SPS and wait for the user input after it receives a refresh message or not, as detailed below:

  • level 1 : print List of Provider SPS RICs and flag to subscribe an input RIC as a snapshot. Then, wait for a user input Provider SPS RIC and returns true to inform the application to subcribe the user input RIC.
  • level 2 : print List of SubProvider SPS RICs and flag to subscribe to an input RIC streaming. Then, wait for a user input SubProvider SPS RIC and returns true to inform the application to subcribe to the user input RIC.
  • level 3 or higher: returns false to inform the application not to subcribe to any RIC.
       public boolean requestNextlevel() {
           String key="";
           if(level==1) {
               key = "Provider SPS";
               snapshot=true;
           } else if(level==2) {
               key = "SubProvider SPS";
               snapshot=false;
           }

           if(level<3) {
               System.out.println("List of " + key + ": " );
               for(String SPS : SPSList) {
                   System.out.print(SPS + ",");
               }   
               System.out.println("\nEnter a " + key+ ": " );
               try {
                   spsRIC = br.readLine();
               }catch(IOException ie) {
                   ie.printStackTrace();
                   System.exit(1);
               }
               return true;
           } 
           else 
               return false;
       }

7. In onRefreshMsg(..) method, call the method requestNextlevel() after decoding a refresh message. The requestNextlevel() method prints the list of Provider/SubProvider SPS and wait for input. Finally, print the FieldList data which is the health status of the venue line handler(SubProvider SPS).

       public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
       {
           ...
           if (DataType.DataTypes.MAP == refreshMsg.payload().dataType()) {
               decode(refreshMsg.payload().map());
           }
           System.out.println();
           +level;
           if(requestNextlevel()){
               consumer.registerClient(EmaFactory.createReqMsg().domainType(11).serviceName("API_ELEKTRON_EPD_RSSL").name(spsRIC).interestAfterRefresh(!snapshot), this);
           } else {
               System.out.println("=======Health Status of " + spsRIC + "=============="); 
           }
           if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType())
               decode(refreshMsg.payload().fieldList());
           ...
       }

The application source code is available at Github. You can run the application using the following command line:

   java -cp D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\ema.jar;D:\Elektron-SDK1.1.0.java.rrg\Eta\Libs\upa.jar;D:\Elektron-SDK1.1.0.java.rrg\Eta\Libs\upaValueAdd.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\org.apache.commons.collections.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\commons-configuration-1.10.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\commons-lang-2.6.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\commons-logging-1.2.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\SLF4J\slf4j-1.7.12\slf4j-api-1.7.12.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\SLF4J\slf4j-jdk14-1.7.12.jar;. com.thomsonreuters.ema.examples.Consumer

The example output when you run the application:

The application will show a refresh containing map of Provider SPS RICs in EMEA Top level SPS. Then, enter a Provider SPS RIC from the list e.g. .[SPSCHE-EDX

The application will show a list of SubProvider SPS RICs under .[SPSCHE-EDX. Next, enter a SubProvider SPS RIC from the list e.g. .[SPSEDXL1

The application will show the health status of a SubProvider. In this example, SubProvider .[SPSEDXL1 has SPS_PV_STS field(field id 6479) is 1 means Line Handler/Provider is UP and SPS_FD_STS field(field id 6474) is 5 means the input feed connection status is UP. Please refer to the next section for further details.

The data content in a SubProvider

Unlike the Top and Provider level SPS items which carry linkage of records, the data content in SubProvider level SPS item returns FieldLists that reflect the health status of the venue line handlers. In Elektron, data of a venue is published by one or multiple line handlers, and data in each line handler should be discrete. Below is the extract of FIDs in the subProvider level item which client should be aware of.

CATEGORY ACRONYM FIELD ID DATA TYPE COMMENTS
Provider Information SPS_DESCR 6471 RMTES_STRING Description of the venue line handler
Timestamps SPS_TME_MS 6472 UINT64 GMT timestamp in milliseconds within a day, from the provider at the point of SPS transmission
SPS Connectivity Detection Criteria SPS_FREQ 6473 UINT64 Frequency of this SPS publication in milliseconds
SPS Fail Threshold SPS_FAIL_T 6478 UINT64 Used to inform SPS consumers how many consecutive missed heartbeats constitute a failure
Venue Information L_H_STAT 8920 Enum Indication of Market activity for a Line Handler, used for SPS
Value Display Meaning
0 “ “ Closed. No Market activity expected
1 “OPEN” Open. Market activity taking place, including pre/post market activity
Line Handler Status SPS_PV_STS 6479 Enum Provider status. Enum values are –
Value Display Meaning
0 “ ” Undefined
1 “UP” Up
2 “DOWN” Down
3 “UNAV” Unavailable
Input Feed Connectivity Status SPS_FD_STS 6474 Enum Represent input feed connection status –
3 “UNDF” Undefined. The status is Down.
4 “TCPO” TCP Session unexpectedly offline. The status is Down.
5 “UDPU” All UDP Channel pairs up. The status is Up.
6 “UDPN” All UDP Channel up, some not redundant. The status is Up.
7 “UDPS” Some UDP Channel pairs down. The status is Down.
8 “UDPD” All UDP Channel pairs down. The status is Down.
9 “UNMO” Unmonitored. The status is Up.

*All other ENUM values undefined and should be ignored by applications

Input Gap ARB_GAP_PD 6475 UINT64 Sampling period in seconds of the gap count (x)
Counters ARB_GAPOUT 5198 UINT64 Current number of outstanding arbitrator Gaps
TTL_GAPOUT 5524 UINT64 Total Number of daily post arbitration sequence gaps, not individual messages.
Overall Pulse Status VENUE_STAT 6599 Enum Represent health group status of this publisher path. Enum values are –
Value Display Meaning
0 “ ” Undefined
1 “HEALTH” Confirmed Healthy
2 “STALE” Confirmed Stale
3 “SUSPCT” Suspected. Have problem but may not affect service, E.g. loss resiliency

1 – Service status is healthy

2 – Service status is stale.

3 – Service status is suspected. Non-Service Affecting

Conclusion

If you are planning to develop an application to monitor/discover the health of Elektron Real Time Infrastructure, you can use this article as a guide. EMA Java is one of the APIs(apart from ETA, RFA) which can be used to retrieve SPS messages. Implementing EMA Java to monitor the health of Elektron Real time is relatively quick and easy as shown in this article.

References

For further details, please check out the following resources: