Complex Event Processing (CEP) solution with Business Rule Engine

I have implemented CEP solution with BizTalk and focused on real time passenger flow through the airport scenario. Imagine that information of passengers are flowing through the pipeline and you want to define some criteria to filter passenger data over pipeline. In CEP concept, collection of criteria or rules is assumed as a event. You may want to collect many different events data from multiple sources simultaneously. As a result, the aim of the complex event processing is to identify meaningful opportunities, threads etc.

solution
Let’s take a look at CEP implementation. The solution has two orchestration which Detects event and process event orchestration. The detect event orchestration takes passenger info and checks whether any event occurs or not. Every detected event is stored to event table in database. Also the event which reaches the max occurrence criterion is published to BizTalk MessageBox. The second orchestration catches the published event(thread or opportunity) and tries to process.

But where is the Business rule engine in this solution? Business rule engine has responsibility to execute event rules in detect orchestration. Can we define new event(s) without deploying the solution again? Yes, the events can be defined using with Business Rule Composer.

First, we need to define our schema to represent passenger and event information.

Schema_PassengerInfo Schema_Event

Then I have created two events that are called Turkish and German Passenger Events with Business Rule Composer.
Event_German

Event_Turkish

Business Rule Engine takes passenger info and executes defined rules of event over passenger info. Then If passenger info complaint with rules, BRE sets the values that are event name, max occurrence of event and description. This is also tricky point BRE determines events dynamically depends on the passenger info.

After defining rules, we can construct our orchestration. Passenger info comes from Port_PI receive port. And the event info is initialized in expression shape.

Orch1

We send passenger and event info to Business Rule Engine. Instead of using call rules shape in orchestration, there is a rule executer helper in the solution.
Orch2

namespace BT.POC.RuleExecuter
{
    [Serializable]
    public class PolicyUtility
    {
        public string Execute(XLANGMessage msgPassenger, XLANGMessage msgEvent)
        {
            string output = String.Empty;
            XmlDocument xmlDocPassenger = (XmlDocument)msgPassenger[0].RetrieveAs(typeof(XmlDocument));
            XmlDocument xmlDocEvent = (XmlDocument)msgEvent[0].RetrieveAs(typeof(XmlDocument));

            TypedXmlDocument txdPassenger = new TypedXmlDocument("ibrahimoguz.BiztalkPOC.CEP.Schemas.PassengerInfo", xmlDocPassenger);
            TypedXmlDocument txdEvent = new TypedXmlDocument("ibrahimoguz.BiztalkPOC.CEP.Schemas.EventSchema", xmlDocEvent);

            Microsoft.BizTalk.RuleEngineExtensions.RuleSetDeploymentDriver breDriver = new Microsoft.BizTalk.RuleEngineExtensions.RuleSetDeploymentDriver();
            Microsoft.RuleEngine.RuleStore breStore = breDriver.GetRuleStore();

            Microsoft.RuleEngine.RuleSetInfoCollection colPolInfo = null;
            colPolInfo = breStore.GetRuleSets(RuleStore.Filter.All);

            string nameXPath = "/*[local-name()='EventInfo' and namespace-uri()='http://ibrahimoguz.BiztalkPOC.CEP.EventSchema']/*[local-name()='Name' and namespace-uri()='']";
            foreach (RuleSetInfo pInfo in colPolInfo)
            {
                Policy p = new Policy(pInfo.Name);
                object[] facts = new object[2];
                facts[0] = txdPassenger;
                facts[1] = txdEvent;
                p.Execute(facts);

                XmlNode nameNode = txdEvent.Document.SelectSingleNode(nameXPath);
                if (nameNode != null && !String.IsNullOrWhiteSpace(nameNode.InnerText))
                {
                    output = txdEvent.Document.OuterXml;
                    break;
                }
            }

            return output;
        }
    }
}

The PolicyUtility class has execute method that takes two orchestration passenger and event messages. This utility fetches all policies from business rule engine rules store and executes sequentially. If any event occurrence returns event information to orchestration.

eventIf

If the event is occurred then the number of occurrence of this event checked from database.
eventcheck

public int GetCountEventOccurrence(string eventName)
{
   SqlConnection con = new SqlConnection() { ConnectionString = conString };
   SqlCommand command = new SqlCommand()
   {
      Connection = con,
      CommandType = System.Data.CommandType.Text,
      CommandText = String.Format("SELECT COUNT(*) FROM Event WHERE EventName='{0}' AND IsProcessed = 0", eventName)
    };

    con.Open();
    int result = (int)command.ExecuteScalar();
    con.Dispose();
    return result;
}

The passenger info mapped to event into before persisting event info to database.
Map

persist

maxOccurrence

OrchDetectLast

When max occurrence of event is reached then isProcessed field of event is set to true.
setEventProcessed

Then message is published into messageBox to trigger EventProcessing work flow.
EventProcessing

This is just testing purpose therefore event processing orchestration not have business logic. Let’s make some tests. I have used following passenger message. I have sent same message to reach max event occurrence number.

<ns0:PassengerInfo>
  <Passenger>
   <Name>Name_0</Name>
   <Surname>Surname_0</Surname>
   <FlightNo>FlightNo_0</FlightNo>
   <Departure>Frankfurt Airport</Departure>
   <Arrival>Vienna Airport</Arrival>
   <Date>2015-03-14</Date>
   <Airline>Lufthansa Airlines</Airline>
   <ScheduledTime>1999-05-31T13:20:00.000-05:00</ScheduledTime>
   <EstimatedTime>1999-05-31T13:20:00.000-05:00</EstimatedTime>
   <Age>10</Age>
   <Genre>F</Genre>
   <Nationality>DEUTCH</Nationality>
  </Passenger>
</ns0:PassengerInfo>

Eventlogs are as follows;
t1

t2

t3

t4

t45PNG

t6

t7

t8

Advertisements

Mapping BizTalk Xsd Types to .NET Types

The following table shows how Wcf svcUtils.exe transforms the schema data types to .NET types. The detailed explanation can be found in the SOA Patterns with BizTalk Server 2009 book of Ricard Seroter.

BizTalk XSD Type Description .NET Type
anyURI Can be any absolute or relative Uniform Resource Identifier Reference System.String
base64Binary Holds Base64-encoded arbitrary binary data System.Byte[]
boolean Logical value(0/1 or true/false) System.Boolean
byte Holds an 8-bit value System.SByte
date Object with year, month, and day properties System.DateTime
dateTime Object with year, month, day, hour, minute, second, and timezone properties System.DateTime
decimal Contains a subset of real numbers with support for at a minimum of 18 decimal digits System.Decimal
double A double precision 64-bit floating point type System.Double
duration Represents a duration of time consisting of year, month, day, hour, minute and second System.String
ENTITIES Separated list of ENTITY references System.String
ENTITY Unparsed entity that may include non-XML content System.String
float A single precision 32-bit floating point type System.Single
gDay Equal to a day recurring each month System.String
gMonth Equal to a month recurring each year System.String
gMonthDay A calendar date (month + day) recurring each year System.String
gYear A period of a single year System.String
gYearMonth A particular calendar month in a specific year System.String
hexBinary Represents arbitrary hex-encoded binary data System.Byte[]
ID Definition of document-global unique identifiers System.String
IDREF A reference to a unique identifier (ID) System.String
IDREFS Separated list of IDREF references System.String
int A 32-bit signed integer System.Int32
integer A signed integer of arbitrary length System.String
language Set of language codes called out in RFC 3066 (e.g. en-US) System.String
long 64-bit signed integer System.Int64
Name XML string with no whitespace System.String
NCName Name that conforms to namespace standard (e.g. no colons) System.String
negativeInteger Encompasses all strictly negative integers System.String
NMTOKEN Set of XML “name tokens” excluding spaces or commas System.String
NMTOKENS Separated list of NMTOKENS System.String nonNegativeInteger Encompasses all positive integers (including zero) System.String
nonPositiveInteger Encompasses all negative integers (including 0) System.String
normalizedString Contains whitespace-replaced strings (meaning all carriage returns, tabs, etc have been replaced) System.String
positiveInteger Encompasses all strictly positive integers System.String
QName Qualified name as a combination of namespace name and part name XmlQualifiedName
short Set of 16-bit integers System.Int16

Publish Orchestration as WCF Service

Depending on our SOA solution architectuıre,   sometimes we have decided to communicate with clients using with http or https protocol. In order to client is able to send requests to BizTalk orchestration, we need to expose BizTalk Orchestrations as service (wcf or .asmx) in some SOA scenarios. BizTalk WCF Service Publishing Wizard can help us on this issue.

There is one important point which the assembly of Biztalk project should be placed in GAC. You can find the steps of wizard as follows:

1

2

3

 

“Add a Service Bus endpoint” should be selected if you want to deploy Biztalk project to Cloud.

 

 

4

5

 

BizTalk assembly should also be placed in GAC in order to process BizTalk orchestration correctly.

6

7

8

9

10

12

13

14

15

16

Use of unconstructed message in BizTalk orchestration

There is common issue that is “use of unconstructed message” when developing BizTalk project. Visual studio gives that error while compiling BizTalk project because we are trying to use message which has not yet initialized.

UnconstructedMsg

In fact BizTalk message is a XML document. The first way to construct message follows;

1.Creating a message with the help of a System.XML.XMLDocument variable. We have to first declare an orchestration variable of type XmlDocument.Then in a message assignment shape, need to do something like this:


xmlDoc = new XmlDocument();

xmlDoc.LoadXML(“”)

outputMsg= xmlDoc

If we use outputMsg directly in the construct shape, it gives compilation error.
UM_6
We create instance of XmlDocument, the issue is resolved.

UM_7

2.Create new message from an existing message using BizTalk map

3.Assign one message to other(i.e creating a copy of an existing message)

How BizTalk Host Threads configured

To modify the number of threads available in the.NET thread pool associated with each instance of a BizTalk host, follow these steps:

1. Stop the BizTalk host.

2. Click Start, click Run, type regedit.exe, and then click OK to start Registry Editor.

3. Navigate to HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\BTSSvc$hostname] where hostname is the name of the host associated with the host instance.

DWORD entry

Default value

Recommended value

MaxIOThreads

20

100

MaxWorkerThreads     

25

100

MinIOThreads          

1

25

MinWorkerThreads      

1

25

4. Restart the BizTalk host.

BizTalk – Genel Bakıs

Çocukluğunda, muzur amcalar tarafından “1 kilo demir mi ağır, 1 kilo pamuk mu?”, “5 elma, 3 armut daha kaç eder?” gibi sorular ile uyanıklık seviyesi ölçülmemiş çocuk yoktur sanırım. Asıl sarsıcı an ise soruların cevaplarının öğrenildiği andır. Biz işin matematiğine dalıp toplamaya çalışırken sayıları, cevap tokat gibi gelir; “Elma ile armut toplanmaz”. Mantıksal olarak toplanamasa da yazılımsal olarak toplanması yani elma ile armudun konuşması, haberleşebilmesi mümkün.

BizTalk Server(BTS)‘ın yeteneklerinden ilk akla gelen bu elma ile armudu birbiri ile konuşturması, haberleştirmesidir. Burdaki meyvelerden kasıt tabi ki birbirinden farklı sistemlerdir. Aşağıdaki resimde gösterildiği üzere; ERP sistemi ile Finans departmanının haberleşmesini, aynı zamanda tedarikçiler ile Logistics ekibinin entegrasyonunu gerçekler.

Image

BTS bir entegrasyon sunucusudur. Bu blogunda baş kahramanıdır 🙂 BTS şu 3 alanda sorunları çözmeyi hedefler:
– Kurumsal uygulama entegrasyonu (EAI)
– Business to business (B2B) or Business to Customer(B2C)
– Süreç otomasyonu (BPA)

BTS sahip olduğu adaptörler yardımı ile birbirinden farklı ortamlara bağlanabilmektedir. Örneğin FILE adaptörü ile dosya sistemini dinleyebilir yada SOAP adaptörü ile web servislere request gönderebilir. Genel olarak BTS organizasyon bünyesinde veya organizasyonun partnerleriyle sahip olduğu ortak süreçleri otomatize etmekte kullanılır. Sahip olduğu adaptörler ile line of business (LOB) uygulamalarına (Siebel, SAP, IFS Applications, JD Edwards, Oracle,Microsoft Dynamics CRM) entegre çalışabilir.

Bu noktada akla şu soru gelebilir: “Peki tamam adaptörlerle veriyi aldı şimdi napacak yada yapabilir?” yada “Veri nerede duruyor şuan memory,disk?”. BTS’in göbekten bağlı olduğu ürün MS SQL Server’dır. BTS tarafından işlenmek üzere alınan her mesaj veritabanında saklanır.Verinin saklandığı yer MessageBox veritabanıdır. Her mesaj ilk iş olarak DB’ye atıldıktan sonra, iş sürecine dahil edilir.

Image

Yukardaki iş akış diagramındaki iş sürecini, BTS üzerinde implemente etmek mümkün.Tanımlamış olduğumuz süreç çalıştırıldıktan sonra süreç bitimine kadar alınan mesaj saklanır. Süreç tamamlandıktan sonra mesaj BTS veritabanından silinir. (Tracking özelliğini aktif hale getirdiysek, mesaj silinip MessageDTA veritabanına alınır.)