Thursday, 17 January 2019

C# - RTD - Redis - Real Time Data on the worksheet

In this article I share C# code for a Real Time Data worksheet function callable using RTD(). It will call into a local instance of Redis and will tick in real time. This article assumes you have installed Redis.

Click here for separate Youtube window

So I am very much pleased with Redis as outlined in the two articles (here and here) I wrote earlier this month. I hope you have read those previous two articles , and/or watched the videos. Redis is exactly what I have been looking for, I want a machine wide data cache, not too heavyweight but accessible to multiple instances of Excel because it would be inefficient for multiple Excel sessions to draw down duplicate data from network servers. In my humble opinion, much better to draw the data down to one executable, Redis, and then have the Excel sessions call into Redis locally.

RTD Server

There are already examples online for how to write an RTD server in C# (including the superb Learning Tree blog post here) and they share a similar pattern in that they have a timer and the single data item that they return is typically a timestamp so they can show the timestamp moving. I started from those examples and then expanded the functionality. I want multiple data items. I also want to make use of the multiple arguments, for example supplying a JSON path to drill into a JSON document.

Introducing ExcelCoin

I do have a stockbroker but I'm not going to advertise their data. Instead, I invent a fake financial security called ExcelCoin. I write the ExcelCoin price to Redis as a scalar number. I also write a JSON meta document to Redis, which gives the timestamp; for financial securities it could also give trading volume etc. The VBA code for this is given below but it does rely on the C# component given even further below. So just hang on.

I've invented ExcelCoin just to fake a real-time data feed. If you already have a real-time data feed then you can use that to write prices, headlines or whatever to Redis by re-writing the code below.

[If running the code in Word then you need the Python class from this blog post to generate a random Normal observation. If running in Excel then the code uses Excel's WorksheetFunction].

Option Explicit

Private vExcelCoinPrice As Variant

Private moPythonNormsInv As Object
Private moRedisSocket As Object

Public Property Get RedisSocket() As Object
    If moRedisSocket Is Nothing Then
        
        '* see next blog post for codeo this server
        Set moRedisSocket = VBA.CreateObject("RedisRTDServerLib.RedisSocket")
        moRedisSocket.Initialize
    End If
    Set RedisSocket = moRedisSocket
End Property

Public Sub DisposeRedisSocket()
    If Not moRedisSocket Is Nothing Then
        moRedisSocket.Dispose
        Set moRedisSocket = Nothing
    
    End If
End Sub


Public Property Get PythonNormsInv2() As Object
    If moPythonNormsInv Is Nothing Then
        Set moPythonNormsInv = VBA.CreateObject("SciPyInVBA.PythonNormsInv")
    End If
    Set PythonNormsInv2 = moPythonNormsInv
End Property

Sub Test_VBAPyNormStdInv()
    Debug.Print PythonNormsInv2.PyNormStdInv(0.95)
End Sub

Sub ResetCoin()
    vExcelCoinPrice = 1.2
End Sub

Sub TestOnTime()
    
    '*
    '* initialize price of ExcelCoin
    '*
    If IsEmpty(vExcelCoinPrice) Then
        vExcelCoinPrice = 1.2
    End If

    Const dDrift As Double = 1.0001
    
    Dim dRectangular As Double
    dRectangular = Rnd(1)
    
    Dim dNormal2 As Double
    If WinWordVBA() Then
        dNormal2 = PythonNormsInv2.PyNormInv(dRectangular, 0, 0.001)
    ElseIf ExcelVBA() Then
        '=NORM.INV(0.95,0,0.001)
        Dim objApp As Object
        Set objApp = Application
        dNormal2 = objApp.WorksheetFunction.NormInv(dRectangular, 0, 0.001)
    
    End If
    
    Dim dLogNormal As Double
    dLogNormal = Exp(dNormal2)
    
    vExcelCoinPrice = vExcelCoinPrice * dLogNormal * dDrift
    Debug.Print vExcelCoinPrice

    '*
    '* call Redis to update price of Excel Coin, this is just a number (double)
    '*
    Dim oRedisSocket As Object
    Set oRedisSocket = RedisSocket
    Dim sResponse As String
    sResponse = oRedisSocket.SendAndReadReponse("SET USD/ExcelCoin " & CStr(vExcelCoinPrice) & vbCrLf)
    Dim vParsed As Variant
    vParsed = oRedisSocket.Parse(sResponse)
    
    '*
    '* also write the fuller meta document that carries timestamps, source URI etc.
    '*
    Dim sJsonDoc As String
    sJsonDoc = "{ ""timestamp2"": " & CDbl(Now()) & ", ""redisId"": ""USD/ExcelCoin/meta"" ,  ""point"": " & vExcelCoinPrice & " }"
    sJsonDoc = VBA.Replace(sJsonDoc, """", "\""")
    sResponse = oRedisSocket.SendAndReadReponse("SET USD/ExcelCoin/meta """ & sJsonDoc & """" & vbCrLf)
    vParsed = oRedisSocket.Parse(sResponse)
    
    '*
    '* set up next call
    '*
    DoEvents
    Call Application.OnTime(Format(Now() + CDate("00:00:02"), "dd/mmm/yyyy hh:mm:ss"), "TestOnTime")
End Sub

Function ExcelVBA() As Boolean
    On Error GoTo QuickExit
    Dim objApp As Object
    Set objApp = Application
    
    Dim objWbs As Object
    Set objWbs = objApp.Workbooks
    
    ExcelVBA = True
QuickExit:
End Function

Function WinWordVBA() As Boolean
    On Error GoTo QuickExit
    Dim objApp As Object
    Set objApp = Application
    
    Dim objActiveDocument As Object
    Set objActiveDocument = objApp.ActiveDocument
    
    WinWordVBA = True
QuickExit:
End Function

About the C# RTD Server Code

The listing is quite large and worth talking through. The project type is a C# assembly with the 'Register for COM interop' checkbox checked (found on the Build tab of the project properties). Registering COM components requires administrator rights so run Visual Studio with admin rights. In AssemblyInfo.cs I have set [assembly: ComVisible(true)]

The RTD interfaces are defined in the Excel type library. So one must add a reference to Microsoft.Office.Interop.Excel primary interop assembly (PIA).

Also, I do some JSON processing so I have added the Newtonsoft.Json package by using the Package Manager.

Code Reuse

I do try to write object orientated code. In the listing below, there are three major classes plus some minor helper classes. The code split is intended to promote code re-use. Firstly, there is the RTDServer class which houses the RTD interface implementation. Secondly, there is the RedisSocket class which houses code for interacting with Redis via Sockets. Hopefully those two will be quite reusable.

Thirdly, the RedisRTDServer class bridges the RTDServer and RedisSocket classes. This is less likely to be reusable as it is very much idiosyncratic to this application.

DebugView and Error Handling

One potential impediment to re-use is error handling. The RTD function will smother any errors, if I want to communicate an error I need to write it to a log or call the Win32 Api OutputDebugString function and view the message using the DebugView application from SysInternals. Also, I can pass back the error message so it is written to calling cell. In the code I do both.

To facilitate this I subclass the C# Exception class to give a DebugMonitorableException class.

Error handling can be quite a personal choice so feel free to take a different view on error handling to me.

COM Interfaces

Two of the three major classes, RedisSocket and RedisRTDServer also have COM interfaces which allows them to be callable from VBA. This promotes re-use. I like to develop classes in C# whilst writing unit tests in Excel VBA. This is frequently how my classes mature and evolve.

Singleton substitute for global variable

We all know that global variables are naughty and in fact impossible in C#. However, I needed a singleton instance of the RedisSocket class to be initialised and be available thereafter. I found that creating and disposing the class every time I wanted to read a price caused the RTD server code to freeze. Moving to a singleton instance solved this.

RTDServer implements IRTDServer

There is a standard interface IRTDServer that any RTD server class must implement. One can read the Microsoft official documentation here. Actually better to read this superb Learning Tree blog post here.

In fact that Learning Tree blog post is probably a better place to begin and once you have that running correctly you can return to this blog post to see how to implement multiple topics as served by Redis.

RedisSocket encapsulates Redis Sockets interface

The RedisSocket class encapsulates a minimalistic interface to Redis. There are more comprehensive C# Redis interface libraries if you want them but for this blog this code will serve. The code uses the .NET TcpClient class to send and receive bytes via Sockets to/from Redis. The code parses the response using a simple split function. It is based on some equally simple VBA code I wrote in previous blog posts. Nevertheless it serves. I won't comment further on it here. See the previous two Redis blog posts for more details.

RedisRTDServer houses application specific code

As mentioned above, in order to make the RedisSockets and RTDServer classes as re-usable as possible I felt it necessary to offload as much as possible into a third class, RedisRTDServer, which houses all the code idiosyncratic to this application. It is called by the RTDServer class and is handed an array of strings as parameters.

The RedisRTDServer class interprets this array of strings. This is the role of the Router() method. The first string is the key; for the ExcelCoin the key is 'USD/ExcelCoin'. The second string specifies a handler which allows different processing logic, so if I want a single price then I supply "scalar" as the handler or indeed just an empty string ("").

I have added one other handler option, "json" and in this use case the key identifies a JSON document the third string is interpreted as a JSON path which allows a drill down into the JSON document.

Conceivably, in the future I could add another option "xml" and allow an XPath expression to be supplied to drill down into an Xml document.

I think you'll agree, there is plenty of scope to define a detailed interface.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Timers;
using System.Text;
using System.Reflection;

// Added Reference to Microsoft Excel 15
using Microsoft.Office.Interop.Excel;
using Newtonsoft.Json.Linq;  // I used Package Manager to add Newtonsoft.Json

namespace RedisRTDServerLib
{
    [ComVisible(false)]
    static class Win32Api
    {   // this is so I can use DebugView from SysInternals
        [DllImport("kernel32.dll")]
        public static extern void OutputDebugString(string lpOutputString);
    }

    [ComVisible(false)]
    static class Singletons
    {   // this Singleton class is in lieu of a global variable
        static RedisSocket _redisSocket = null;

        static public RedisSocket RedisSocketSingleton
        {
            get
            {
                if (_redisSocket == null)
                {
                    _redisSocket = new RedisSocket();
                    _redisSocket.Initialize();
                }
                return _redisSocket;
            }
            set
            {
                // only interested in this for tidy up purposes
                if (value == null)
                {
                    _redisSocket = value;
                }
            }
        }
    }

    [ComVisible(false)]
    public class DebugMonitorableException : Exception
    {   // this class does the same as an Exception but simply prints the error message to the debug monitor
        public DebugMonitorableException()
        {
        }

        public DebugMonitorableException(string message) : base(message)
        {
            Win32Api.OutputDebugString(message);
        }

        public DebugMonitorableException(string message, Exception inner) : base(message, inner)
        {
            Win32Api.OutputDebugString(message);
        }
    }

    [ComVisible(true)]
    public interface IRedisSocket
    {
        void Initialize();
        void Dispose();
        string SendAndReadReponse(string sCommand);
        object Parse(string sRaw);
    }
    [ClassInterface(ClassInterfaceType.None), ComVisible(true)]
    [ComDefaultInterface(typeof(IRedisSocket))]
    public class RedisSocket : IRedisSocket, IDisposable
    {
        TcpClient _client;

        public void Initialize()
        {
            try
            {
                _client = new TcpClient();
                _client.Connect("127.0.0.1", 6379);
            }
            catch (Exception ex)
            {
                throw new DebugMonitorableException("Trapped error: " + ex.Message);
            }
        }

        public void Dispose()
        {
            _client.Close();

            _client.Dispose();
            _client = null;
        }

        public string SendAndReadReponse(string sCommand)
        {
            try
            {
                //TcpClient client = new TcpClient();
                NetworkStream stream;

                stream = _client.GetStream();

                Byte[] sendBytes = Encoding.UTF8.GetBytes(sCommand);
                stream.Write(sendBytes, 0, sendBytes.Length);
                stream.Flush();

                Byte[] recvBytes = new byte[_client.ReceiveBufferSize];
                stream.Read(recvBytes, 0, recvBytes.Length);

                string result = Encoding.UTF8.GetString(recvBytes);


                string result2 = result.Substring(0, result.LastIndexOf("\r\n"));
                return result2;
            }
            catch (Exception ex)
            {
                throw new DebugMonitorableException("SendAndReadReponse error: " + ex.Message);
            }
        }

        public object Parse(string sResponse)
        {
            try
            {
                Int32 lTotalLength = sResponse.Length;
                Debug.Assert(lTotalLength > 0);

                string[] vSplitResponse;
                {
                    string[] splitStrings = { "\r\n" };
                    vSplitResponse = sResponse.Split(splitStrings, StringSplitOptions.None);
                }

                Int32 lReponseLineCount = vSplitResponse.Length;

                switch (sResponse.Substring(0, 1))
                {
                    case "$":
                        if (sResponse == "$-1")
                        {
                            throw new Exception("Not found!");
                        }
                        else
                        {
                            return vSplitResponse[1];
                        }
                    case "+":
                        return vSplitResponse[0].Substring(1);
                    case ":":

                        //'* response is a numeric
                        return Double.Parse(vSplitResponse[0].Substring(1));
                    case "-":
                        //'* response is an error
                        throw new Exception(vSplitResponse[0].Substring(1));
                    case "*":
                        //'* multiple responses, build an array to return
                        Int32 lResponseCount = Int32.Parse(vSplitResponse[0].Substring(1));
                        if (lResponseCount > 0)
                        {
                            Debug.Assert(lResponseCount == (lReponseLineCount - 1) / 2);
                            List<Object> returnList = new List<Object>();
                            for (Int32 lLoop = 0; lLoop < lResponseCount; lLoop++)
                            {
                                returnList.Add(vSplitResponse[(lLoop + 1) * 2]);
                            }
                            return returnList.ToArray();
                        }
                        else
                        {
                            return false;
                        }
                    default:
                        // '* this should not happen
                        throw new Exception("Unrecognised return type '" + sResponse.Substring(0, 1) + "'");
                }
            }
            catch (Exception ex)
            {
                throw new DebugMonitorableException("Parse error: " + ex.Message);
            }
        }
    }

    [ComVisible(true)]
    public interface IRedisRTDServer
    {
        object Router(Array Strings);
        string GetJsonDocOrError(Array Strings);
        Boolean GetJSON(string sKey, string path, out string json, out string errorMsg);
        string GetKey(Array Strings);
        string GetPath(Array Strings);
        string GetHandler(Array Strings);
        object GetScalarOrError(Array Strings);
        Boolean GetScalar(string sKey, out double scalar, out string errorMsg);
    }

    [ClassInterface(ClassInterfaceType.None), ComVisible(true)]
    [ComDefaultInterface(typeof(IRedisRTDServer))]
    public class RedisRTDServer : IRedisRTDServer
    {
        public object Router(Array Strings)
        {
            try
            {
                string handler = GetHandler(Strings);
                string path = GetPath(Strings);

                switch (handler)
                {
                    case "":
                    case "scalar":
                        return GetScalarOrError(Strings);
                        
                    case "json":
                        return GetJsonDocOrError(Strings);
                        
                    default:
                        return "Failed to request with handler '" + handler + "'";
                }
            }
            catch (Exception ex)
            {
                string errMsg = this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + " error:" + ex.Message;
                Win32Api.OutputDebugString(errMsg);
                return errMsg;
            }
        }

        public string GetJsonDocOrError(Array Strings)
        {
            try
            {
                string sKey = GetKey(Strings);
                string path = GetPath(Strings);
                if (GetJSON(sKey, path, out string json, out string errMsg))
                {
                    return json;
                }
                else
                {
                    return errMsg;
                }
            }
            catch (Exception ex)
            {
                string errMsg = this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + " error:" + ex.Message;
                Win32Api.OutputDebugString(errMsg);
                return errMsg;
            }
        }

        public Boolean GetJSON(string sKey, string path, out string json, out string errorMsg)
        {

            json = "";
            errorMsg = "";
            try
            {

                object parsed = null;

                try
                {
                    RedisSocket redisSocket = null;
                    redisSocket = Singletons.RedisSocketSingleton;
                    string rawResponse = redisSocket.SendAndReadReponse("GET " + sKey + "\r\n");
                    parsed = redisSocket.Parse(rawResponse);
                }
                catch (Exception ex)
                {
                    // for the time being I'm not rethrowing this
                    errorMsg = ex.Message;
                    return false;
                }
                if (parsed is String)
                {
                    json = (string)parsed;

                    if (path.Length == 0)
                    {
                        return true;
                    }
                    else
                    {   // NewtonSoft.JSON code to drill into a JSON document with a JSON path
                        JObject o = JObject.Parse(json);
                        JToken acme = o.SelectToken(path);
                        json = acme.ToString();
                        return true;
                    }
                }
                else
                {
                    errorMsg = "Not a JSON document!";
                    return false;
                }

            }
            catch (Exception ex)
            {
                throw new DebugMonitorableException(this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + ": error\r\n" + ex.Message);
            }
        }

        public string GetKey(Array Strings)
        {
            try
            {
                string sKey;
                sKey = (string)Strings.GetValue(0);
                if (sKey.Contains(" "))
                {
                    sKey = "'" + sKey + "'";
                }
                return sKey;
            }
            catch (Exception ex)
            {
                string errMsg = this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + " error:" + ex.Message;
                Win32Api.OutputDebugString(errMsg);
                throw new Exception(errMsg);
            }
        }

        public string GetPath(Array Strings)
        {
            try
            {
                string path = "";
                {
                    if (Strings.Length >= 3)
                    {
                        path = (string)Strings.GetValue(2);
                    }
                }
                return path;
            }
            catch (Exception ex)
            {
                string errMsg = this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + " error:" + ex.Message;
                Win32Api.OutputDebugString(errMsg);
                throw new Exception(errMsg);
            }
        }

        public string GetHandler(Array Strings)
        {
            try
            {
                string handler = "";
                {
                    if (Strings.Length >= 2)
                    {
                        handler = (string)Strings.GetValue(1);
                    }
                }
                return handler;
            }
            catch (Exception ex)
            {
                string errMsg = this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + " error:" + ex.Message;
                Win32Api.OutputDebugString(errMsg);
                throw new Exception(errMsg);
            }
        }

        public object GetScalarOrError(Array Strings)
        {
            try
            {
                string sKey = GetKey(Strings);

                if (GetScalar(sKey, out double price, out string errMsg))
                {
                    return price;
                }
                else
                {
                    return errMsg;
                }
            }
            catch (Exception ex)
            {
                string errMsg = this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + " error:" + ex.Message;
                Win32Api.OutputDebugString(errMsg);
                return errMsg;
            }
        }

        public Boolean GetScalar(string sKey, out double scalar, out string errorMsg)
        {

            scalar = 0;
            errorMsg = "";
            try
            {

                object parsed = null;

                try
                {
                    RedisSocket redisSocket = null;
                    
                    redisSocket = Singletons.RedisSocketSingleton;
                    string rawResponse = redisSocket.SendAndReadReponse("GET " + sKey + "\r\n");
                    parsed = redisSocket.Parse(rawResponse);
                }
                catch (Exception ex)
                {
                    // for the time being I'm not rethrowing this
                    errorMsg = ex.Message;
                    return false;
                }
                if (parsed is String)
                {
                    string parsedString = (string)parsed;
                    if (!Double.TryParse(parsedString, out scalar))
                    {   
                        throw new DebugMonitorableException(this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + ": could not parse '" + parsedString + "' to double");
                    }
                }

                return true;
            }
            catch (Exception ex)
            {
                throw new DebugMonitorableException(this.GetType().Name + '.' + MethodBase.GetCurrentMethod() + ": error\r\n" + ex.Message);
            }
        }
    }


    public class RTDServer : IRtdServer
    {
        private Dictionary<int, Array> _topicsAndStrings = new Dictionary<int, Array>();
        private IRTDUpdateEvent m_callback;
        private Timer m_timer;
        private int m_topicId;

        int IRtdServer.ServerStart(IRTDUpdateEvent CallbackObject)
        {
            try
            {
                m_callback = CallbackObject;
                m_timer = new Timer();
                m_timer.Elapsed += new ElapsedEventHandler(TimerEventHandler);
                m_timer.Interval = 3000; // in milliseconds
                return 1;
            }
            catch (Exception ex)
            {
                Win32Api.OutputDebugString("IRtdServer.ServerStart:" + ex.Message);
                return 0;
            }
        }

        dynamic IRtdServer.ConnectData(int TopicID, ref Array Strings, ref bool GetNewValues)
        {
            try
            {
                _topicsAndStrings.Add(TopicID, Strings);

                m_topicId = TopicID;
                m_timer.Start();
                return GetTime();
            }
            catch (Exception ex)
            {
                string errMsg = "IRtdServer.ConnectData error:" + ex.Message;
                Win32Api.OutputDebugString(errMsg);
                return errMsg;
            }
        }

        private string GetTime()
        {
            return "RTD Server GetTime method: " + DateTime.Now.ToString("hh: mm:ss");
        }

        private void TimerEventHandler(object sender, EventArgs args)
        {
            try
            {
                // UpdateNotify is called to inform Excel that new data are available
                // the timer is turned off so that if Excel is busy, the TimerEventHandler is not called repeatedly

                m_timer.Stop();
                m_callback.UpdateNotify();
            }
            catch (Exception ex)
            {
                Win32Api.OutputDebugString("TimerEventHandler:" + ex.Message);
            }
        }

        Array IRtdServer.RefreshData(ref int TopicCount)
        {
            object[,] data = null;
            try
            {
                TopicCount = _topicsAndStrings.Count;
                data = new object[2, TopicCount];

                RedisRTDServer redisRtdServer = new RedisRTDServer();
                int idx = 0;
                foreach (var kvp in _topicsAndStrings)
                {
                    Array Strings = kvp.Value;
                    data[0, idx] = kvp.Key;
                    data[1, idx] = redisRtdServer.Router(kvp.Value);
                    idx++;
                }

                m_timer.Start();
                return data;
            }
            catch (Exception ex)
            {
                Win32Api.OutputDebugString("IRtdServer.RefreshData:" + ex.Message);
                return data;
            }
        }

        void IRtdServer.DisconnectData(int TopicID)
        {
            try
            {
                if (_topicsAndStrings.ContainsKey(TopicID))
                {
                    _topicsAndStrings.Remove(TopicID);
                }

                if (_topicsAndStrings.Count == 0)
                {
                    m_timer.Stop();
                    Singletons.RedisSocketSingleton = null;
                }
            }
            catch (Exception ex)
            {
                Win32Api.OutputDebugString("IRtdServer.DisconnectData:" + ex.Message);
            }
        }

        int IRtdServer.Heartbeat()
        {
            return 1;
        }

        void IRtdServer.ServerTerminate()
        {
            try
            {
                if (null != m_timer)
                {
                    m_timer.Dispose();
                    m_timer = null;
                }
                if (null != Singletons.RedisSocketSingleton)
                {
                    Singletons.RedisSocketSingleton = null;
                }
            }
            catch (Exception ex)
            {
                Win32Api.OutputDebugString("IRtdServer.ServerTerminate:" + ex.Message);
            }
        }
    }
}

Examples of calling from the worksheet

So finally, to call from the worksheet you need some cell formulas

=RTD("redisrtdserverlib.rtdserver",,"USD/ExcelCoin","","")
=RTD("redisrtdserverlib.rtdserver",,"USD/ExcelCoin/meta","json","$.timestamp2")+0

No comments:

Post a Comment