🕵🏼 How to setup watermark, ObjectStore in Mule 4 application

Implementing Watermark, Object store in Mule 4




Watermark is used to poll for a new set of records instead of polling for the same resources repeatedly. Watermarking is a common use of the object-store.

Object stores are used to store an object. Mule 4 uses object stores to retrieve the objects later and to keep the data persistent. Object store has Contains, Dual store, Remove, Retrieve all keys, Retrieve and store, Store operations. By default, the object store connector not present in the Mule palette. So you can install it from Anypoint exchange.


1. Create a new flow  watermark_Flow.xml .

  • In the “ Mule palette”  Add Database module.

  • Now Drag and drop “On Table row ” component. The flow should appear as shown below:

  • Now Configure On Table row as shown below




2. Place logger after “On table Row” and configure it to log payload.

3. Add the File module to the Mule palette on the right side.

  • Drag the “Write” component after the logger.

  • Configure File Connector with some Working Directory.

  •  Give path as output/products.csv

  • Configure Write mode as “APPEND”

  • It Should appear as shown below




We want to convert the records from the database to XML or JSON.

4. Drag a “Transform Message” between the logger and the “Write” component.

Configure it as shown below:
%dw 2.0
output application/json
---
payload


  • Run the application and observe that all the rows in the employee table are logged and written to file as specified

  • Observe the same again by restarting the application

  • Now that you understood “On Table row ” component is able to keep in memory about the last processed records using watermarking. 

STEP2

Configuring Object store in Mule 4

you will be working on a 02-watermarking-using-objectstore-start project under the 05-watermarking-object stores  working set

We want a scheduler to kick off the flow after every 5 secs. After the scheduler triggers a Mule event, we want to check if there is a key latestempID in object store. If there is no key we need to create a variable latestempID with 0. If already present, latestempID variable should be initialized with the value from object-store.

1. So, First use a Scheduler and configure it as shown below

2. ObjectStore is not available in the studio by default. So, in the mule palette, click on search in exchange and add ObjectStore connector.

If Object Store is available already in your latest Anypoint studio, you can ignore adding it from the exchange.

  • Now, drag the ObjectStore connector to the left side and select it.

  • Now, drag and drop the “retrieve” component next to Scheduler.

  • Configure the “Retrieve” component as shown below:

  • Configure key as latestempID and default value as '0'.

  • We want the output of the “Retrieve” component to be stored in a variable with the name latestempID.




Click on the Advanced tab and give the variable name as shown below

3. Now use the “Select” component of the DB module and configure it with the query as below



4. If there is data from the database, we want to transform it to JSON and log it. After that, we want to store the max value of latestempID in the object store with key latestempID.

  • If there is no data from the database, use a logger to print something like “New employee records were not found”
5. Use a  choice router after selecting the component.

  • In the default block of the choice router, drag a logger and configure it to log “New employee records were not found”

  • Configure the first route of the choice router with condition #[not isEmpty(payload)]

6. Drag Transform Message component inside the first route to transform the payload to JSON.
  • Drag a logger after transform message to log the payload

7. Now, drag store component  from  Object store connector module

Configure “Store” component properties as shown below : (Don’t worry if it showing an error in the expression. It is a known bug in the studio. Actually the expression is correct and it will work)



8. Run the application and observe that all the records are logged in the console tab for the first time when Scheduler runs.

9. Now run the application and observe that “New employee records were not found” will be logged in Console.

Find the latest MuleSoft(Mule 4) Articles on DataWeave, Anypoint Studio, Anypoint Platform, and Runtime Manager. Our articles also include frequently asked Mule4 Interview questions and answers. Please comment below for any queries or free article submissions.

Post a Comment

0 Comments