About this article

This is the next part of Implementing Elektron API application to work with ATS - Part 1 article. In this part I explain how to update data, delete fields and RIC with Elektron APIs application source code. I also present troubleshooting which helps you to solve common problems which can occur when working with ATS. I strongly recommend you read Implementing Elektron API application to work with ATS - Part 1 first. It gives you the basic knowledge which you need to know before working with ATS e.g. Posting and ATS concepts, Prerequisites, the overview steps . It also explains how to add RIC and fields before your application can update data, delete fields and RIC which are explained in this part 2.

Sample Posting with Updating data, Deleting Fields and RIC

Updating field values

You can update field values on ATS by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will update the value of field id 22 and 25 of a RIC named NEW.RIC with value 430 and 460 respectively:

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="3" postUserId="18" postUserAddr="10.42.61.200" dataSize="24">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEW.RIC"/>
     <dataBody>
          <UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="13">
               <dataBody>
	            <fieldList flags="0x08 (HAS_STANDARD_DATA)">
		         <fieldEntry fieldId="22" data="0F2B"/>
			 <fieldEntry fieldId="25" data="0F2E"/>
		    </fieldList>
	       </dataBody>
	  </UPDATE>
     </dataBody>
</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be the RIC name whose data is going to be updated.
  • The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.  
  • The field list are the updated fields i.e. field id 12(HIGH_1) and field id 13(LOW_1) with their updated values on this RIC.
  • Data values in the fields list are encoded OMM.

An example of success Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="3" text="[1]: Contribution Accepted" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEW.RIC"/>
     <dataBody>
     </dataBody>
</ACK>

Notice that:

  • The Ack message's domain type is Market Price. The ackId is 3 which corresponds with the postId(3) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can update the fields' data of the RIC successfully.

The example of each Elektron APIs snipped source code to create the post message above for updating data are below:

  • EMA Java:
 //Consumer.java in example341__MarketPrice__OffStreamPost folder
   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
   {
		...
		if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 
				refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
				refreshMsg.state().dataState() == OmmState.DataState.OK )
		{
				PostMsg postMsg = EmaFactory.createPostMsg();
				UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();
				FieldList nestedFieldList = EmaFactory.createFieldList();
				
				//FieldList is a collection in java
				nestedFieldList.add(EmaFactory.createFieldEntry().real(22, 43, OmmReal.MagnitudeType.EXPONENT_POS_1));
				nestedFieldList.add(EmaFactory.createFieldEntry().real(25, 46, OmmReal.MagnitudeType.EXPONENT_POS_1));
			
				
				nestedUpdateMsg.payload(nestedFieldList );
				
				
				((OmmConsumer)event.closure()).submit( postMsg.postId( 3 ).serviceId(267)
															.name( "NEW.RIC" ).solicitAck( true ).complete(true).publisherId(18,170540488)
															.payload(nestedUpdateMsg), event.handle() );
		}
		...
	}
  • EMA C++:
 /* Consumer.cpp in 341__MarketPrice__OffStreamPost */
   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )
   {
		...
		if ( refreshMsg.getDomainType() == MMT_LOGIN && 
			refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
			refreshMsg.getState().getDataState() == OmmState::OkEnum )
		{
			_pOmmConsumer->submit( PostMsg().postId(3).serviceId( 267 ).name( "NEW.RIC").solicitAck(true).publisherId(18,170540488).complete()
			.payload(UpdateMsg().payload( FieldList().addReal( 22, 43, OmmReal::ExponentPos1Enum ).addReal(25, 46, OmmReal::ExponentPos1Enum).complete() ) ), ommEvent.getHandle() );
		}
		...
   }
  • ETA Java:
 //PostHandler.java in Consumer folder
   //To declare variables used to create a post message for updating data on ATS
    private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();
    protected FieldList fieldList = CodecFactory.createFieldList();
    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();
    protected UInt tempUInt = CodecFactory.createUInt();
	...
	//To create a postMessage for updating data on ATS
    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.
    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)
    {
        // First encode message for payload
        postMsg.clear();

        // set-up message
        postMsg.msgClass(MsgClasses.POST);
        postMsg.streamId(1);
        postMsg.domainType(DomainTypes.MARKET_PRICE);
        postMsg.containerType(DataTypes.MSG);

        // Note: post message key not required for on-stream post
        postMsg.applyPostComplete();
        postMsg.applyAck();
       
        postMsg.applyHasPostId();
        postMsg.postId(3);

        postMsg.applyHasMsgKey();
        postMsg.msgKey().applyHasServiceId();
        postMsg.msgKey().serviceId(267);
        postMsg.msgKey().applyHasName();
        postMsg.msgKey().name().data("NEW.RIC");
        
        
        // populate post user info        
        postMsg.postUserInfo().userAddr("10.42.61.200");	
        postMsg.postUserInfo().userId(18);        	
       

        // encode post message
        encIter.clear();
        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        ret = postMsg.encodeInit(encIter, 0);
        if (ret != CodecReturnCodes.ENCODE_CONTAINER)
        {
            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        // get a buffer for nested market price refresh
        postNestedMsgPayLoad = CodecFactory.createBuffer();
        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

        // Although we are encoding RWF message, this code
        // encodes nested message into a separate buffer.
        // this is because MarketPrice.encode message is shared by all
        // applications, and it expects to encode the message into a stand alone
        // buffer.
        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        postMsgEncIter.clear();
        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

    	ret = encodeUpdateData(postMsgEncIter);
		if (ret != CodecReturnCodes.SUCCESS) {
			System.out.println("POST DATA TO ATS failed:  <" + CodecReturnCodes.toString(ret));
			return CodecReturnCodes.FAILURE;
		}
        shouldOffstreamPost = false; //set flag to send post message once
		
        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        // complete encode message
        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeMsgComplete() failed with return code: " + ret);
            return ret;
        }

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId() );

        return CodecReturnCodes.SUCCESS;
    }
	// To create a payload of the post message 
    private int encodeUpdateData(EncodeIterator encodeIter) {
    	fieldList.clear();
        fieldEntry.clear();

        tempUInt.clear();
        tempReal.clear();

        // set-up message
        Msg msg = encodeUpdateMsg();

        // encode message
        int ret = msg.encodeInit(encodeIter, 0);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        // encode field list
        fieldList.applyHasStandardData();
        ret = fieldList.encodeInit(encodeIter, null, 0);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        // encode fields common to refresh and updates
        DictionaryEntry dictionaryEntry = null;
        // BID
        fieldEntry.clear();
        dictionaryEntry = dictionary.entry(22);
        if (dictionaryEntry != null)
        {
        	fieldEntry.fieldId(22);
            fieldEntry.dataType(dictionaryEntry.rwfType());
            tempReal.clear();
            tempReal.value(43, RealHints.EXPONENT1);
            ret = fieldEntry.encode(encodeIter, tempReal);
            if (ret < CodecReturnCodes.SUCCESS)
            {
                    return ret;
            }
         }
         // ASK
        fieldEntry.clear();
        dictionaryEntry = dictionary.entry(25);
        if (dictionaryEntry != null)
        {
        	fieldEntry.fieldId(25);
            fieldEntry.dataType(dictionaryEntry.rwfType());
            tempReal.clear();
            tempReal.value(46, RealHints.EXPONENT1);
            ret = fieldEntry.encode(encodeIter, tempReal);
            if (ret < CodecReturnCodes.SUCCESS)
            {
                    return ret;
            }
         } 
        // complete encode field list
        ret = fieldList.encodeComplete(encodeIter, true);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        // complete encode message
        return msg.encodeComplete(encodeIter, true);
    }
	// To create an update message which is in the payload of the post message
    public Msg encodeUpdateMsg()
    {
        updateMsg.clear();
        updateMsg.msgClass(MsgClasses.UPDATE);
        updateMsg.streamId(0);
        updateMsg.domainType(DomainTypes.MARKET_PRICE);
        updateMsg.containerType(DataTypes.FIELD_LIST);
        return updateMsg;
    }
    
  • ETA C:
 /* rsslPostHandler.c in Consumer folder */
	static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)
	{
		RsslRet ret = 0;
		RsslPostMsg postMsg = RSSL_INIT_POST_MSG;
		RsslDataDictionary* dictionary = getDictionary();
		RsslBool isSolicited = RSSL_FALSE; // ??? for post
		RsslUInt16 serviceId = (RsslUInt16)getServiceId();
		RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;
		RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;
		RsslBuffer hostName = RSSL_INIT_BUFFER;
		char hostNameBuf[] = "10.42.61.200";

		/* set-up message */
		postMsg.msgBase.msgClass = RSSL_MC_POST;
		postMsg.msgBase.streamId = 1;
		postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;
		postMsg.msgBase.containerType = RSSL_DT_MSG;

		// Note: post message key not required for on-stream post
		postMsg.flags = RSSL_PSMF_POST_COMPLETE
			| RSSL_PSMF_ACK // request ACK
			| RSSL_PSMF_HAS_POST_ID
			| RSSL_PSMF_HAS_MSG_KEY;

		postMsg.postId = 3;
	
		/* populate post user info */
		hostName.data = hostNameBuf;
		hostName.length = (RsslUInt32)strlen(hostName.data);
		if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)
		{
			printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",
				rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));
			return ret;
		}
		postMsg.postUserInfo.postUserId = 18;

		postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

		postMsg.msgBase.msgKey.name.data = (char *)"NEW.RIC";
		postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("NEW.RIC");

		postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

		// encode message 
		if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);
			return ret;
		}
		rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

		if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
			return ret;
		}

		ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
			return RSSL_RET_FAILURE;
		}

		ret = encodeUpdateData(chnl, &payloadMsgBuf, dictionary);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: encodeUpdateData() failed\n");
			return RSSL_RET_FAILURE;
		}

		shouldOffstreamPost = FALSE; //set flag to send a post message once

		ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
			return RSSL_RET_FAILURE;
		}


		/* complete encode message */
		if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
			return ret;
		}

		msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);



		printf("\n\nSENDING POST WITH MESSAGE:\n");
		printf("	streamId = %d\n", postMsg.msgBase.streamId);
		printf("	postId   = %d\n", postMsg.postId);
	
		return RSSL_RET_SUCCESS;

	}
	RsslRet encodeUpdateData(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)
	{
		RsslRet ret = 0;
		RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;
		RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;
		RsslMsgBase* msgBase;
		RsslMsg* msg;
		RsslFieldList fList = RSSL_INIT_FIELD_LIST;
		RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;
		char errTxt[256];
		RsslBuffer errorText = { 255, (char*)errTxt };
		RsslBuffer tempBuffer;
		RsslReal tempReal = RSSL_INIT_REAL;
		RsslDictionaryEntry* dictionaryEntry = NULL;
		RsslEncodeIterator encodeIter;

		double price;

		/* clear encode iterator */
		rsslClearEncodeIterator(&encodeIter);

		/* set-up message */
		/* set message depending on whether refresh or update */

		msgBase = &updateMsg.msgBase;
		msgBase->msgClass = RSSL_MC_UPDATE;

		msg = (RsslMsg *)&updateMsg;

		msgBase->domainType = RSSL_DMT_MARKET_PRICE;
		msgBase->containerType = RSSL_DT_FIELD_LIST;

		/* encode message */
		if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
		{
			printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);
			return ret;
		}
		rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);
		if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
			return ret;
		}

		/* encode field list */
		fList.flags = RSSL_FLF_HAS_STANDARD_DATA;
		if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);
			return ret;
		}

		/* encode fields */


		rsslClearFieldEntry(&fEntry);
		dictionaryEntry = dictionary->entriesArray[22];
		if (dictionaryEntry)
		{
			fEntry.fieldId = 22;
			fEntry.dataType = dictionaryEntry->rwfType;
			rsslClearReal(&tempReal);
			price = 43;
			rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
			if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
			{
				printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
				return ret;
			}
		}

		rsslClearFieldEntry(&fEntry);
		dictionaryEntry = dictionary->entriesArray[25];
		if (dictionaryEntry)
		{
			fEntry.fieldId = 25;
			fEntry.dataType = dictionaryEntry->rwfType;
			rsslClearReal(&tempReal);
			price = 46;
			rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
			if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
			{
				printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
				return ret;
			}
		}

		/* complete encode field list */
		if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);
			return ret;
		}

		/* complete encode message */
		if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
			return ret;
		}
		msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

		return RSSL_RET_SUCCESS;
	}
  • WebSocket API in Ruby:
# market_price_posting.rb in ruby folder
	if message_type == 'Refresh' then
		message_domain = message_json['Domain']
		if message_domain != nil then
			if message_domain == 'Login' then
				mp_post_json_hash = {
					'ID' => 1,
					'Type' => 'Post',
					'Domain' => 'MarketPrice',
					'Ack' => true,
					'PostID' => 3,
					'PostUserInfo' =>  {
						'Address' => '10.42.61.200', # Use the IP address as the Post User Address.
						'UserID' => 18
					},
					'Key' => {
						'Name' => 'NEW.RIC', # ATS server contribution RIC name
						'Service' => 267 # ADS Service ID that connects to ATS server
					},
					'Message' => {
						'ID' => 0,
						'Type' => 'Update',
						'Domain' => 'MarketPrice',
						'Fields' => {'BID' => 430,'ASK' => 460 }
					}
				}
				ws.send mp_post_json_hash.to_json.to_s
			end
		end
	end

Deleting Fields

You can use ATS command, ATS_DELETE, to delete fields by sending a post message from a consumer application as shown in step 3 in the figure above. Here’s the sample of a post message that will delete field id 13 and 25 from a RIC named NEW.RIC:

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="4" postUserId="18" postUserAddr="10.42.61.200" dataSize="34">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>
     <dataBody>
          <UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="23">
	       <dataBody>
	            <fieldList flags="0x08 (HAS_STANDARD_DATA)">
		         <fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>
			 <fieldEntry fieldId="13" data="0F01"/>
			 <fieldEntry fieldId="25" data="0F02"/>
		    </fieldList>
	       </dataBody>
	  </UPDATE>
     </dataBody>
</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be ATS_DELETE to inform ATS to add the fields.
  • The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.  
  • The field list consists of:
    • Field Id -1 for the RIC/record name whose the fields are deleted.
    • The fields i.e. field id 13(LOW_1) and field id 25(ASK) to be deleted from this RIC.
    • Data values shown in the fields list are encoded OMM.

An example of success Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="4" text="[3]: Delete Accepted" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>
     <dataBody>
     </dataBody>
</ACK>

Notice that:

  • The Ack message's domain type is Market Price. The ackId is 4 which corresponds with the postId(4) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can delete the fields from the RIC successfully.

The example of each Elektron APIs snipped source code to create the post message above for deleting the fields are below:

  • EMA Java:
//Consumer.java in example341__MarketPrice__OffStreamPost folder
   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
   {
		...
		if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 
				refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
				refreshMsg.state().dataState() == OmmState.DataState.OK )
		{
				PostMsg postMsg = EmaFactory.createPostMsg();
				UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();
				FieldList nestedFieldList = EmaFactory.createFieldList();
				
				//FieldList is a collection in java
				nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));
				
				nestedFieldList.add(EmaFactory.createFieldEntry().real(13, 1, OmmReal.MagnitudeType.EXPONENT_POS_1));
				nestedFieldList.add(EmaFactory.createFieldEntry().real(25, 2, OmmReal.MagnitudeType.EXPONENT_POS_1));
			
				
				nestedUpdateMsg.payload(nestedFieldList );
				
				
				((OmmConsumer)event.closure()).submit( postMsg.postId( 4 ).serviceId(267)
							.name( "ATS_DELETE" ).solicitAck( true ).complete(true).publisherId(18,170540488)
							.payload(nestedUpdateMsg), event.handle() );
				
		}
		...
	}
  • EMA C++:
/* Consumer.cpp in 341__MarketPrice__OffStreamPost */
   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )
   {
		...
		if ( refreshMsg.getDomainType() == MMT_LOGIN && 
			refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
			refreshMsg.getState().getDataState() == OmmState::OkEnum )
		{
			_pOmmConsumer->submit( PostMsg().postId(4).serviceId( 267 ).name( "ATS_DELETE" ).solicitAck( true ).publisherId(18,170540488).complete()
			.payload(UpdateMsg().payload( FieldList().addAscii(-1, "NEW.RIC").addReal( 13, 1, OmmReal::ExponentPos1Enum ).addReal(25, 2, OmmReal::ExponentPos1Enum).complete() )), ommEvent.getHandle() );
		}
		...
   }
  • ETA Java:
//PostHandler.java in Consumer folder
    //To declare variables used to create a post message for deleting some fields of a RIC on ATS
    private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();
    protected FieldList fieldList = CodecFactory.createFieldList();
    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();
    protected UInt tempUInt = CodecFactory.createUInt();
	
	   //To create a postMessage for deleting fields on ATS
    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.
    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)
    {
    	
        // First encode message for payload
        postMsg.clear();

        // set-up message
        postMsg.msgClass(MsgClasses.POST);
        postMsg.streamId(1);
        postMsg.domainType(DomainTypes.MARKET_PRICE);
        postMsg.containerType(DataTypes.MSG);

        // Note: post message key not required for on-stream post
        postMsg.applyPostComplete();
        postMsg.applyAck();
       
        postMsg.applyHasPostId();
        postMsg.postId(4);

        postMsg.applyHasMsgKey();
        postMsg.msgKey().applyHasServiceId();
        postMsg.msgKey().serviceId(267);
        postMsg.msgKey().applyHasName();
        postMsg.msgKey().name().data("ATS_DELETE");
        
        
        // populate post user info        
        postMsg.postUserInfo().userAddr("10.42.61.200");	
        postMsg.postUserInfo().userId(18);        	
       

        // encode post message
        encIter.clear();
        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        ret = postMsg.encodeInit(encIter, 0);
        if (ret != CodecReturnCodes.ENCODE_CONTAINER)
        {
            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        // get a buffer for nested market price refresh
        postNestedMsgPayLoad = CodecFactory.createBuffer();
        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

        // Although we are encoding RWF message, this code
        // encodes nested message into a separate buffer.
        // this is because MarketPrice.encode message is shared by all
        // applications, and it expects to encode the message into a stand alone
        // buffer.
        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        postMsgEncIter.clear();
        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

    	ret = encodeDeleteFields(postMsgEncIter);
 		if (ret != CodecReturnCodes.SUCCESS) {
 			System.out.println("ATS_ADDFIELD_S failed:  <" + CodecReturnCodes.toString(ret));
 			return CodecReturnCodes.FAILURE;
 		}
 		 shouldOffstreamPost = false; //set flag to send post message once
		
        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        // complete encode message
        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeMsgComplete() failed with return code: " + ret);
            return ret;
        }

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId() );

        return CodecReturnCodes.SUCCESS;
    }
	// To create a payload of the post message 
    private int encodeDeleteFields(EncodeIterator encodeIter) {
    	fieldList.clear();
        fieldEntry.clear();

        tempUInt.clear();
        tempReal.clear();

        // set-up message
        Msg msg = encodeUpdateMsg();

        // encode message
        int ret = msg.encodeInit(encodeIter, 0);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        // encode field list
        fieldList.applyHasStandardData();
        ret = fieldList.encodeInit(encodeIter, null, 0);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }
        // X_RIC_NAME
        fieldEntry.clear();
        fieldEntry.fieldId(-1);
        fieldEntry.dataType(DataTypes.ASCII_STRING);
        Buffer aRIC = CodecFactory.createBuffer();
        aRIC.data("NEW.RIC");
        ret = fieldEntry.encode(encodeIter, aRIC);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }
        
       
        fieldEntry.clear();
        DictionaryEntry dictionaryEntry = dictionary.entry(13);
        if (dictionaryEntry != null)
        {
            fieldEntry.fieldId(13);
            fieldEntry.dataType(dictionaryEntry.rwfType());
            tempReal.clear();
            tempReal.value(1, RealHints.EXPONENT1);
            ret = fieldEntry.encode(encodeIter, tempReal);
            if (ret < CodecReturnCodes.SUCCESS)
            {
                return ret;
            }
        }

        fieldEntry.clear();
        dictionaryEntry = dictionary.entry(25);
        if (dictionaryEntry != null)
        {
            fieldEntry.fieldId(25);
            fieldEntry.dataType(dictionaryEntry.rwfType());
            tempReal.clear();
            tempReal.value(2, RealHints.EXPONENT1);
            ret = fieldEntry.encode(encodeIter, tempReal);
            if (ret < CodecReturnCodes.SUCCESS)
            {
                return ret;
            }
        }
        // complete encode field list
        ret = fieldList.encodeComplete(encodeIter, true);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        // complete encode message
        return msg.encodeComplete(encodeIter, true);
    }
    
	// To create an update message which is in the payload of the post message
    public Msg encodeUpdateMsg()
    {
        updateMsg.clear();
        updateMsg.msgClass(MsgClasses.UPDATE);
        updateMsg.streamId(0);
        updateMsg.domainType(DomainTypes.MARKET_PRICE);
        updateMsg.containerType(DataTypes.FIELD_LIST);
        return updateMsg;
    }
    
  • ETA C:
/* rsslPostHandler.c in Consumer folder */
	static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)
	{
		RsslRet ret = 0;
		RsslPostMsg postMsg = RSSL_INIT_POST_MSG;
		RsslDataDictionary* dictionary = getDictionary();
		RsslBool isSolicited = RSSL_FALSE; // ??? for post
		RsslUInt16 serviceId = (RsslUInt16)getServiceId();
		RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;
		RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;
		RsslBuffer hostName = RSSL_INIT_BUFFER;
		char hostNameBuf[] = "10.42.61.200";

		/* set-up message */
		postMsg.msgBase.msgClass = RSSL_MC_POST;
		postMsg.msgBase.streamId = 1;
		postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;
		postMsg.msgBase.containerType = RSSL_DT_MSG;

		// Note: post message key not required for on-stream post
		postMsg.flags = RSSL_PSMF_POST_COMPLETE
			| RSSL_PSMF_ACK // request ACK
			| RSSL_PSMF_HAS_POST_ID
			| RSSL_PSMF_HAS_MSG_KEY;

		postMsg.postId = 4;
		

		/* populate post user info */
		hostName.data = hostNameBuf;
		hostName.length = (RsslUInt32)strlen(hostName.data);
		if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)
		{
			printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",
				rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));
			return ret;
		}
		postMsg.postUserInfo.postUserId = 18;

		postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

		postMsg.msgBase.msgKey.name.data = (char *)"ATS_DELETE";
		postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("ATS_DELETE");

		postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

		// encode message 
		if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);
			return ret;
		}
		rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

		if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
			return ret;
		}

		ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
			return RSSL_RET_FAILURE;
		}

		ret = encodeDeleteFields(chnl, &payloadMsgBuf, dictionary);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: encodeDeleteFields() failed\n");
			return RSSL_RET_FAILURE;
		}

		shouldOffstreamPost = FALSE; //set flag to send a post message once

		ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
			return RSSL_RET_FAILURE;
		}


		/* complete encode message */
		if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
			return ret;
		}

		msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);



		printf("\n\nSENDING POST WITH MESSAGE:\n");
		printf("	streamId = %d\n", postMsg.msgBase.streamId);
		printf("	postId   = %d\n", postMsg.postId);

		return RSSL_RET_SUCCESS;

	}
	RsslRet encodeDeleteFields(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)
	{
		RsslRet ret = 0;
		RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;
		RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;
		RsslMsgBase* msgBase;
		RsslMsg* msg;
		RsslFieldList fList = RSSL_INIT_FIELD_LIST;
		RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;
		char errTxt[256];
		RsslBuffer errorText = { 255, (char*)errTxt };
		RsslBuffer tempBuffer;
		RsslReal tempReal = RSSL_INIT_REAL;
		RsslDictionaryEntry* dictionaryEntry = NULL;
		RsslEncodeIterator encodeIter;

		double price;

		/* clear encode iterator */
		rsslClearEncodeIterator(&encodeIter);

		/* set-up message */
		/* set message depending on whether refresh or update */

		msgBase = &updateMsg.msgBase;
		msgBase->msgClass = RSSL_MC_UPDATE;

		msg = (RsslMsg *)&updateMsg;

		msgBase->domainType = RSSL_DMT_MARKET_PRICE;
		msgBase->containerType = RSSL_DT_FIELD_LIST;

		/* encode message */
		if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
		{
			printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);
			return ret;
		}
		rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);
		if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
			return ret;
		}

		/* encode field list */
		fList.flags = RSSL_FLF_HAS_STANDARD_DATA;
		if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);
			return ret;
		}

		/* encode fields */
		/* X_RIC_NAME */
		rsslClearFieldEntry(&fEntry);

		fEntry.fieldId = -1;
		fEntry.dataType = RSSL_DT_ASCII_STRING;

		tempBuffer.data = (char *)"NEW.RIC";
		tempBuffer.length = (RsslUInt32)strlen("NEW.RIC");
		if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
			return ret;
		}

		rsslClearFieldEntry(&fEntry);
		dictionaryEntry = dictionary->entriesArray[13];
		if (dictionaryEntry)
		{
			fEntry.fieldId = 13;
			fEntry.dataType = dictionaryEntry->rwfType;
			rsslClearReal(&tempReal);
			price = 1;
			rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
			if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
			{
				printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
				return ret;
			}
		}

		rsslClearFieldEntry(&fEntry);
		dictionaryEntry = dictionary->entriesArray[25];
		if (dictionaryEntry)
		{
			fEntry.fieldId = 25;
			fEntry.dataType = dictionaryEntry->rwfType;
			rsslClearReal(&tempReal);
			price = 2;
			rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
			if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
			{
				printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
				return ret;
			}
		}

		/* complete encode field list */
		if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);
			return ret;
		}

		/* complete encode message */
		if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
			return ret;
		}
		msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

		return RSSL_RET_SUCCESS;
	}
  • WebSocket API in Ruby:
	# market_price_posting.rb in ruby folder
	if message_type == 'Refresh' then
		message_domain = message_json['Domain']
		if message_domain != nil then
			if message_domain == 'Login' then
				mp_post_json_hash = {
					'ID' => 1,
					'Type' => 'Post',
					'Domain' => 'MarketPrice',
					'Ack' => true,
					'PostID' => 4,
					'PostUserInfo' =>  {
						'Address' => '10.42.61.200',
						'UserID' => 18
					},
					'Key' => {
						'Name' => 'ATS_DELETE', 
						'Service' => 267
					},
					'Message' => {
						'ID' => 0,
						'Type' => 'Update',
						'Domain' => 'MarketPrice',
						'Fields' => {'X_RIC_NAME' => 'NEW.RIC' ,'LOW_1' => 10 ,'ASK' => 20 }
					}
				}
				ws.send mp_post_json_hash.to_json.to_s
			end
		end
	end

 

Deleting a RIC

You can use ATS command, ATS_DELETE_ALL, to delete a RIC/record by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will delete a RIC named NEW.RIC:

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="5" postUserId="18" postUserAddr="10.42.61.200" dataSize="24">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE_ALL"/>
     <dataBody>
          <UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="13">
	       <dataBody>
	            <fieldList flags="0x08 (HAS_STANDARD_DATA)">
		         <fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>
		    </fieldList>
	       </dataBody>
	  </UPDATE>
     </dataBody>
</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be ATS_DELETE_ALL to inform ATS to delete a RIC/Record.
  • The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.
  • The field list consists of the field Id -1 for the RIC/record name to delete from ATS.
  • Data values shown in the fields list are encoded OMM.

An example of success Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="5" text="[3]: Delete Accepted" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE_ALL"/>
     <dataBody>
     </dataBody>
</ACK>

Notice that:

  • The Ack message's domain type is Market Price. The ackId is 5 which corresponds with the postId(5) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can delete the RIC/record from ATS successfully.

The example of each Elektron APIs snipped source code to create the post message above for deleting a RIC are below:

  • EMA Java:
//Consumer.java in example341__MarketPrice__OffStreamPost folder
   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
   {
		...
		if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 
					refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
					refreshMsg.state().dataState() == OmmState.DataState.OK )
		{
					PostMsg postMsg = EmaFactory.createPostMsg();
					UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();
					FieldList nestedFieldList = EmaFactory.createFieldList();
					
					//FieldList is a collection in java
					nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));
					
					
					nestedUpdateMsg.payload(nestedFieldList );
					
					
					((OmmConsumer)event.closure()).submit( postMsg.postId( 5 ).serviceId(267)
										.name( "ATS_DELETE_ALL" ).solicitAck( true ).complete(true).publisherId(18,170540488)
										.payload(nestedUpdateMsg), event.handle() );
		}
		...
	}
  • EMA C++:
 /* Consumer.cpp in 341__MarketPrice__OffStreamPost */
   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )
   {
		...
		if ( refreshMsg.getDomainType() == MMT_LOGIN && 
			refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
			refreshMsg.getState().getDataState() == OmmState::OkEnum )
		{
					
			_pOmmConsumer->submit(PostMsg().postId(5).serviceId( 267 ).name("ATS_DELETE_ALL").solicitAck(true).publisherId(18,170540488).complete()
			.payload(UpdateMsg().payload(FieldList().addAscii(-1, "NEW.RIC").complete())), ommEvent.getHandle());
		}
		...
   }
  • ETA Java:
//PostHandler.java in Consumer folder
    //To declare variables used to create a post message for deleting a RIC on ATS
    private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();
    protected FieldList fieldList = CodecFactory.createFieldList();
    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();
    protected UInt tempUInt = CodecFactory.createUInt();
    
	 //To create a postMessage for deleting a RIC on ATS
    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.
    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)
    {
    	
        // First encode message for payload
        postMsg.clear();

        // set-up message
        postMsg.msgClass(MsgClasses.POST);
        postMsg.streamId(1);
        postMsg.domainType(DomainTypes.MARKET_PRICE);
        postMsg.containerType(DataTypes.MSG);

        // Note: post message key not required for on-stream post
        postMsg.applyPostComplete();
        postMsg.applyAck();
       
        postMsg.applyHasPostId();
        postMsg.postId(5);

        postMsg.applyHasMsgKey();
        postMsg.msgKey().applyHasServiceId();
        postMsg.msgKey().serviceId(267);
        postMsg.msgKey().applyHasName();
        postMsg.msgKey().name().data("ATS_DELETE_ALL");
        
        
        // populate post user info        
        postMsg.postUserInfo().userAddr("10.42.61.200");	
        postMsg.postUserInfo().userId(18);        	
       

        // encode post message
        encIter.clear();
        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        ret = postMsg.encodeInit(encIter, 0);
        if (ret != CodecReturnCodes.ENCODE_CONTAINER)
        {
            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        // get a buffer for nested market price refresh
        postNestedMsgPayLoad = CodecFactory.createBuffer();
        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

        // Although we are encoding RWF message, this code
        // encodes nested message into a separate buffer.
        // this is because MarketPrice.encode message is shared by all
        // applications, and it expects to encode the message into a stand alone
        // buffer.
        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        postMsgEncIter.clear();
        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

    	ret = encodeDeleteRIC(postMsgEncIter);
 		if (ret != CodecReturnCodes.SUCCESS) {
 			System.out.println("ATS_ADDFIELD_S failed:  <" + CodecReturnCodes.toString(ret));
 			return CodecReturnCodes.FAILURE;
 		}
 		 shouldOffstreamPost = false; //set flag to send post message once
		
        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        // complete encode message
        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeMsgComplete() failed with return code: " + ret);
            return ret;
        }

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId());

        return CodecReturnCodes.SUCCESS;
    }
	// To create a payload of the post message 
    private int encodeDeleteRIC(EncodeIterator encodeIter) 
    {
    	fieldList.clear();
        fieldEntry.clear();

        tempUInt.clear();
        tempReal.clear();

        // set-up message
        Msg msg = encodeUpdateMsg();

        // encode message
        int ret = msg.encodeInit(encodeIter, 0);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        // encode field list
        fieldList.applyHasStandardData();
        ret = fieldList.encodeInit(encodeIter, null, 0);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }
        // X_RIC_NAME
        fieldEntry.clear();
        fieldEntry.fieldId(-1);
        fieldEntry.dataType(DataTypes.ASCII_STRING);
        Buffer aRIC = CodecFactory.createBuffer();
        aRIC.data("NEW.RIC");
        ret = fieldEntry.encode(encodeIter, aRIC);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }
        
        // complete encode field list
        ret = fieldList.encodeComplete(encodeIter, true);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        // complete encode message
        return msg.encodeComplete(encodeIter, true);
    }
	// To create an update message which is in the payload of the post message
    public Msg encodeUpdateMsg()
    {
        updateMsg.clear();
        updateMsg.msgClass(MsgClasses.UPDATE);
        updateMsg.streamId(0);
        updateMsg.domainType(DomainTypes.MARKET_PRICE);
        updateMsg.containerType(DataTypes.FIELD_LIST);
        return updateMsg;
    }
    
  • ETA C:
/* rsslPostHandler.c in Consumer folder */
	static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)
	{
		RsslRet ret = 0;
		RsslPostMsg postMsg = RSSL_INIT_POST_MSG;
		RsslDataDictionary* dictionary = getDictionary();
		RsslBool isSolicited = RSSL_FALSE; // ??? for post
		RsslUInt16 serviceId = (RsslUInt16)getServiceId();
		RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;
		RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;
		RsslBuffer hostName = RSSL_INIT_BUFFER;
		char hostNameBuf[] = "10.42.61.200";

		/* set-up message */
		postMsg.msgBase.msgClass = RSSL_MC_POST;
		postMsg.msgBase.streamId = 1;
		postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;
		postMsg.msgBase.containerType = RSSL_DT_MSG;

		// Note: post message key not required for on-stream post
		postMsg.flags = RSSL_PSMF_POST_COMPLETE
			| RSSL_PSMF_ACK // request ACK
			| RSSL_PSMF_HAS_POST_ID
			| RSSL_PSMF_HAS_MSG_KEY;

		postMsg.postId = 5;
	
		/* populate post user info */
		hostName.data = hostNameBuf;
		hostName.length = (RsslUInt32)strlen(hostName.data);
		if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)
		{
			printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",
				rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));
			return ret;
		}
		postMsg.postUserInfo.postUserId = 18;

		postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

		postMsg.msgBase.msgKey.name.data = (char *)"ATS_DELETE_ALL";
		postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("ATS_DELETE_ALL");
	
		postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

		// encode message 
		if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);
			return ret;
		}
		rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

		if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
			return ret;
		}

		ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
			return RSSL_RET_FAILURE;
		}

		ret = encodeDeleteRIC(chnl, &payloadMsgBuf, dictionary);

		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: encodeDeleteRIC() failed\n");
			return RSSL_RET_FAILURE;
		}

		shouldOffstreamPost = FALSE; //set flag to send a post message once

		ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);
		if (ret != RSSL_RET_SUCCESS)
		{
			printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
			return RSSL_RET_FAILURE;
		}


		/* complete encode message */
		if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
			return ret;
		}

		msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);



		printf("\n\nSENDING POST WITH MESSAGE:\n");
		printf("	streamId = %d\n", postMsg.msgBase.streamId);
		printf("	postId   = %d\n", postMsg.postId);
		

		return RSSL_RET_SUCCESS;

	}
	RsslRet encodeDeleteRIC(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)
	{
		RsslRet ret = 0;
		RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;
		RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;
		RsslMsgBase* msgBase;
		RsslMsg* msg;
		RsslFieldList fList = RSSL_INIT_FIELD_LIST;
		RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;
		char errTxt[256];
		RsslBuffer errorText = { 255, (char*)errTxt };
		RsslBuffer tempBuffer;
		RsslReal tempReal = RSSL_INIT_REAL;
		RsslDictionaryEntry* dictionaryEntry = NULL;
		RsslEncodeIterator encodeIter;

		/* clear encode iterator */
		rsslClearEncodeIterator(&encodeIter);

		/* set-up message */
		/* set message depending on whether refresh or update */

		msgBase = &updateMsg.msgBase;
		msgBase->msgClass = RSSL_MC_UPDATE;

		msg = (RsslMsg *)&updateMsg;

		msgBase->domainType = RSSL_DMT_MARKET_PRICE;
		msgBase->containerType = RSSL_DT_FIELD_LIST;

		/* encode message */
		if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
		{
			printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);
			return ret;
		}
		rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);
		if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
			return ret;
		}

		/* encode field list */
		fList.flags = RSSL_FLF_HAS_STANDARD_DATA;
		if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);
			return ret;
		}

		/* encode fields */
		/* X_RIC_NAME */
		rsslClearFieldEntry(&fEntry);

		fEntry.fieldId = -1;
		fEntry.dataType = RSSL_DT_ASCII_STRING;

		tempBuffer.data = (char *)"NEW.RIC";
		tempBuffer.length = (RsslUInt32)strlen("NEW.RIC");
		if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
			return ret;
		}

		/* complete encode field list */
		if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);
			return ret;
		}

		/* complete encode message */
		if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
		{
			printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
			return ret;
		}
		msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

		return RSSL_RET_SUCCESS;
	}
  • WebSocket API in Ruby:
   # market_price_posting.rb in ruby folder
	if message_type == 'Refresh' then
		message_domain = message_json['Domain']
		if message_domain != nil then
			if message_domain == 'Login' then
				mp_post_json_hash = {
					'ID' => 1,
					'Type' => 'Post',
					'Domain' => 'MarketPrice',
					'Ack' => true,
					'PostID' => 5,
					'PostUserInfo' =>  {
						'Address' => '10.42.61.200', 
						'UserID' => 18
					},
					'Key' => {
						'Name' => 'ATS_DELETE_ALL', 
						'Service' => 267
					},
					'Message' => {
						'ID' => 0,
						'Type' => 'Update',
						'Domain' => 'MarketPrice',
						'Fields' => {'X_RIC_NAME' => 'NEW.RIC'}
					}
				}
				ws.send mp_post_json_hash.to_json.to_s
			end
		end
	end

 

Troubleshooting

When an application fails to post a message to ATS; an Ack message received from the server contains a NAK code. Hence, you need to review the NAK Code and the text message which indicates what the error is and help you to find out the root cause of the problem.

This section lists the common errors which you can face when working with ATS including their solutions as shown below:

1) text="Unable to find service for post message.", NackCode: 2 or "DeniedBySrc" in WebSocketAPI.

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="DENIED_BY_SRC" text="Unable to find service for post message." dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="2670" name="ATS_INSERT_S"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: The specified service in the post message is incorrect; the server does not provide this service.
  • Solution: Please double check the service name to make sure the correct service name is used. You may want to contact your local MarketData support or administrator for ATS service name. If the infrastructure is hosted by Thomson Reuters, please contact the account team for the service name. Then, modify your source code to use the correct service in the post message.

2) text="A9: Service is unavailable.", NackCode: 3 or "SourceDown" in WebSocketAPI.

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="SOURCE_DOWN" text="A9: Service is unavailable." dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: The specified service in the post message is down.
  • Solution: Please contact your local MarketData support or administrator who can help you to recover the service. If the infrastructure is hosted by Thomson Reuters, please contact the account team.

3) text="[610]: Item Already Exists", NackCode: 2 or "DeniedBySrc" in WebSocketAPI.

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="DENIED_BY_SRC" text="[610]: Item Already Exists" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: An application tries to insert a RIC/record which exists on ATS already.
  • Solution: Make sure that RIC/record in the post message is the one you would like to add to ATS.
    • If yes, you should do nothing because the RIC exists on ATS already.
    • If no, modify the source code to insert the correct RIC.

4) text="[500]: Unknown Item", NackCode: 2 or "DeniedBySrc" in WebSocketAPI

 

- Other ATS commands apart from ATS_INSERT_S or ATS_ADDFIELD_S or ATS_DELETE or ATS_DELETE_ALL.

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_s"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: An application sets an incorrect ATS command in the post message. In the example message above, the ATS command specified in name is ATS_INSERT_s which is incorrect; it should be ATS_INSERT_S. The ATS commands are the case sensitive.
  • Solution: Correct the ATS command in the post message to be the correct one e.g. ATS_INSERT_S or ATS_ADDFIELD_S or ATS_DELETE and ATS_DELETE_ALL according to your requirement.

- Updating data.

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="3" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEWs.RIC"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: An application tries to update data of a RIC which does not exist on ATS. In the example message above, the RIC name is NEWs.RIC.
  • Solution: Make sure that RIC/record in the post message is the one you would like to update data.
    • If yes, add the RIC to ATS first. Please refer to Inserting a RIC/record section.
    • If no, modify the source code to update data of the correct RIC.

- Deleting field(s).

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="4" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: An application tries to delete field(s) of a RIC which never contains these field(s) or have already been deleted.
  • Solution: Make sure that the field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, and you would like the RIC exists on ATS. Please refer to Inserting a RIC/record section to add the RIC with your desired fields.
    • If no, modify the source code to delete the field(s) of the correct RIC according to your requirements.

- Deleting a RIC.

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="5" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE_ALL"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: An application tries to delete a RIC which never exists on ATS or have already been deleted.
  • Solution: Make sure that the RIC in the post message are correct according to your requirements.
    • If yes, you should do nothing because the RIC does not exists on ATS any more.
    • If no, modify the source code to delete the correct RIC according to your requirements.

5) text="[502]: Unknown Fid", NackCode: 2 or "DeniedBySrc" in WebSocketAPI

- Adding field(s).

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="2" nakCode="DENIED_BY_SRC" text="[502]: Unknown Fid" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_ADDFIELD_S"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Causes:
    • An application tries to add field(s) to a RIC which contains these field(s) already.
    • An application tries to add field(s) which ATS does not support.
  • Solution: Make sure that field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, You may want to contact your local MarketData support or administrator for the list of fields which you can add to a RIC on ATS. If ATS is hosted by Thomson Reuters, please contact the account team.
    • If no, modify the source code to add the correct fields to the correct RIC according to your requirements.

- Updating data.

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="3" nakCode="DENIED_BY_SRC" text="[502]: Unknown Fid" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEW.RIC"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Causes: an application tries to update data of a RIC specified in name(NEW.RIC) which does not contains these field(s).
  • Solution: Make sure that field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, add the field(s) to the RIC first. Please refer to Adding Fields section.
    • If no, modify the source code to update data of the correct field(s) to the correct RIC according to your requirements.

- Deleting field(s).

An example of Ack message:

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="4" nakCode="DENIED_BY_SRC" text="[502]: Unknown Fid" dataSize="0">
     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>
     <dataBody>
     </dataBody>
</ACK>
  • Root Cause: An application tries to delete field(s) of a RIC which never contains these field(s) or have been deleted from the RIC already.
  • Solution: Make sure that field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, you should do nothing because the RIC does not have these field(s) any more.
    • If no, modify the source code to delete the correct field(s) of the correct RIC according to your requirements.

Summary



After finish both articles in this series, you will understand more about ATS and Posting. Also, by using the Elektron APIs you can fulfill general day-to-day operations with ATS more easily and quickly, including solving common problems. The technical knowledge explained in the article can help you to perform additional advanced use case scenario to better utilize ATS’s usage further as well. If you would like to acquire ATS, please contact Thomson Reuters Account team for the process and details. You can contact the support team directly by submitting your query to My Account, if you require any ATS assistance.

References

For further details, please check out the following resources: