Complex Event Processing (CEP) solution with Business Rule Engine

I have implemented CEP solution with BizTalk and focused on real time passenger flow through the airport scenario. Imagine that information of passengers are flowing through the pipeline and you want to define some criteria to filter passenger data over pipeline. In CEP concept, collection of criteria or rules is assumed as a event. You may want to collect many different events data from multiple sources simultaneously. As a result, the aim of the complex event processing is to identify meaningful opportunities, threads etc.

solution
Let’s take a look at CEP implementation. The solution has two orchestration which Detects event and process event orchestration. The detect event orchestration takes passenger info and checks whether any event occurs or not. Every detected event is stored to event table in database. Also the event which reaches the max occurrence criterion is published to BizTalk MessageBox. The second orchestration catches the published event(thread or opportunity) and tries to process.

But where is the Business rule engine in this solution? Business rule engine has responsibility to execute event rules in detect orchestration. Can we define new event(s) without deploying the solution again? Yes, the events can be defined using with Business Rule Composer.

First, we need to define our schema to represent passenger and event information.

Schema_PassengerInfo Schema_Event

Then I have created two events that are called Turkish and German Passenger Events with Business Rule Composer.
Event_German

Event_Turkish

Business Rule Engine takes passenger info and executes defined rules of event over passenger info. Then If passenger info complaint with rules, BRE sets the values that are event name, max occurrence of event and description. This is also tricky point BRE determines events dynamically depends on the passenger info.

After defining rules, we can construct our orchestration. Passenger info comes from Port_PI receive port. And the event info is initialized in expression shape.

Orch1

We send passenger and event info to Business Rule Engine. Instead of using call rules shape in orchestration, there is a rule executer helper in the solution.
Orch2

namespace BT.POC.RuleExecuter
{
    [Serializable]
    public class PolicyUtility
    {
        public string Execute(XLANGMessage msgPassenger, XLANGMessage msgEvent)
        {
            string output = String.Empty;
            XmlDocument xmlDocPassenger = (XmlDocument)msgPassenger[0].RetrieveAs(typeof(XmlDocument));
            XmlDocument xmlDocEvent = (XmlDocument)msgEvent[0].RetrieveAs(typeof(XmlDocument));

            TypedXmlDocument txdPassenger = new TypedXmlDocument("ibrahimoguz.BiztalkPOC.CEP.Schemas.PassengerInfo", xmlDocPassenger);
            TypedXmlDocument txdEvent = new TypedXmlDocument("ibrahimoguz.BiztalkPOC.CEP.Schemas.EventSchema", xmlDocEvent);

            Microsoft.BizTalk.RuleEngineExtensions.RuleSetDeploymentDriver breDriver = new Microsoft.BizTalk.RuleEngineExtensions.RuleSetDeploymentDriver();
            Microsoft.RuleEngine.RuleStore breStore = breDriver.GetRuleStore();

            Microsoft.RuleEngine.RuleSetInfoCollection colPolInfo = null;
            colPolInfo = breStore.GetRuleSets(RuleStore.Filter.All);

            string nameXPath = "/*[local-name()='EventInfo' and namespace-uri()='http://ibrahimoguz.BiztalkPOC.CEP.EventSchema']/*[local-name()='Name' and namespace-uri()='']";
            foreach (RuleSetInfo pInfo in colPolInfo)
            {
                Policy p = new Policy(pInfo.Name);
                object[] facts = new object[2];
                facts[0] = txdPassenger;
                facts[1] = txdEvent;
                p.Execute(facts);

                XmlNode nameNode = txdEvent.Document.SelectSingleNode(nameXPath);
                if (nameNode != null && !String.IsNullOrWhiteSpace(nameNode.InnerText))
                {
                    output = txdEvent.Document.OuterXml;
                    break;
                }
            }

            return output;
        }
    }
}

The PolicyUtility class has execute method that takes two orchestration passenger and event messages. This utility fetches all policies from business rule engine rules store and executes sequentially. If any event occurrence returns event information to orchestration.

eventIf

If the event is occurred then the number of occurrence of this event checked from database.
eventcheck

public int GetCountEventOccurrence(string eventName)
{
   SqlConnection con = new SqlConnection() { ConnectionString = conString };
   SqlCommand command = new SqlCommand()
   {
      Connection = con,
      CommandType = System.Data.CommandType.Text,
      CommandText = String.Format("SELECT COUNT(*) FROM Event WHERE EventName='{0}' AND IsProcessed = 0", eventName)
    };

    con.Open();
    int result = (int)command.ExecuteScalar();
    con.Dispose();
    return result;
}

The passenger info mapped to event into before persisting event info to database.
Map

persist

maxOccurrence

OrchDetectLast

When max occurrence of event is reached then isProcessed field of event is set to true.
setEventProcessed

Then message is published into messageBox to trigger EventProcessing work flow.
EventProcessing

This is just testing purpose therefore event processing orchestration not have business logic. Let’s make some tests. I have used following passenger message. I have sent same message to reach max event occurrence number.

<ns0:PassengerInfo>
  <Passenger>
   <Name>Name_0</Name>
   <Surname>Surname_0</Surname>
   <FlightNo>FlightNo_0</FlightNo>
   <Departure>Frankfurt Airport</Departure>
   <Arrival>Vienna Airport</Arrival>
   <Date>2015-03-14</Date>
   <Airline>Lufthansa Airlines</Airline>
   <ScheduledTime>1999-05-31T13:20:00.000-05:00</ScheduledTime>
   <EstimatedTime>1999-05-31T13:20:00.000-05:00</EstimatedTime>
   <Age>10</Age>
   <Genre>F</Genre>
   <Nationality>DEUTCH</Nationality>
  </Passenger>
</ns0:PassengerInfo>

Eventlogs are as follows;
t1

t2

t3

t4

t45PNG

t6

t7

t8

Advertisements