Search This Blog

Monday, October 7, 2019

MQTT Cluster Retained topic issue and resolution

Problem Statement

The messages sent using retained flag , are identified through publisher node only to the broker . So the broker just stores the retained messages ,tagging publisher node information and only recognizes the publisher node when a subscriber connects to the same topic , connecting the same node.


Lets assume a scenario where you have a sensor publishing its status only when changed e.g. Door sensor. What happens if a new subscriber subscribes to this status?
Without retained messages the subscriber would have to wait for the status to change before it received a message. However with the retained message the subscriber would see the current state of the sensor. What is important to understand is that only one message is retained per topic. The next message published on that topic replaces the last retained message for that topic.

Processing Details

We will then examine how retained messages work in various cases-
The basic process is this:
  1. Publish message on a topic with retained message flag not set, and set
  2. Subscribe to message on topic
  3. Monitor messages received and analyse results

First case

From below screen we just want to show case that , If a publisher is publishing a message on a topic on which two subscribers (Subscriber 1 and Subscriber 2) are connected . Then they will receive the same message at the same time either with retained =true or retained =false(default case). Retained is required when we need last good value of a device.

Second Case

Suppose a publisher is publishing a message and one of the subscriber is in disconnect mode. Now what we want is to get the last good value of that client , since Subscriber 2 was in disconnect mode when publisher published the message so when its get connected again it will receive the last good value if and only if the node is same on which publisher published the message. 
-Mirroring is only for queues not for retained messages.
In case subscriber 2 get connected with Node 2 then in that case It wont receive the updated value which was published on Node1.

When to Use Retained Messages

Generally you will publish a message with the retained flag set when the message contains persistent data.
For example a sensor could publish information about itself like firmware version number,IP address, Current state.
This information is unlikely to change and so you only need to publish it once using the retain flag and any new clients can retrieve that information.

Solutions tried so for

There are few customized solutions as given below -
Either to store the retained messages in some consistent database like Cassandra or Riak as per Rabbitmq documentation.(Not tried yet as we need erlang knowledge as well to change or configure the conf script of rabbitmq to use cassandra or Riak).
These implementations are suitable for development but sometimes won't be for production needs. MQTT 3.1 specification does not define consistency or replication requirements for retained message stores, therefore RabbitMQ allows for custom ones to meet the consistency and availability needs of a particular environment. For example, stores based on Riak and Cassandra would be suitable for most production environments as those data stores provide tunable consistency.
We have tried to connect all the nodes available in clusters and published  the same message to all (Using HTTP APIs) but again as per broker rule its not publishing the message to all cluster nodes and only recognize the node which was connected and published the retained message.

We have also discussed in rabbitmq forum on this issue and they told , they don't support this feature as of now but in future versions they would support it.

Note: EMQ and Hive are already supporting this feature with cluster mode.


The retained message feature is useful feature for keeping the last state of an object, and is especially useful when the state doesn’t change frequently.
Quality of service settings don’t impact retained messages.

Thursday, July 11, 2019

Spring Boot-Actuators


Spring Boot ships with a module called actuator which enables things like metrics and statistics about your application. For example, we can collect logs, view metrics, perform thread dumps,
show environment variables, understand garbage collection, and show what beans are configured in the BeanFactory. You can expose this information via HTTP, JMX, or you can even log in directly to the process via SSH.


  • Endpoints Actuator endpoints allow you to monitor and interact with your application. Spring Boot includes a number of built-in endpoints and you can also add your own. For example the health endpoint provides basic application health information. Run up a basic application and look at /actuator/health.
  • Metrics Spring Boot Actuator provides dimensional metrics by integrating with Micrometer
  • Audit Spring Boot Actuator has a flexible audit framework that will publish events to an AuditEventRepository. Once Spring Security is in play it automatically publishes authentication events by default. This can be very useful for reporting, and also to implement a lock-out policy based on authentication failures.
To add the actuator to a Maven based project, add the following ‘Starter’ dependency:

For Gradle, use the following declaration:
dependencies {
That’s it? Yes! You have now added the actuator to your existing application. When you restart your application, endpoints are enabled for the user access.

Actuator Endpoints

Spring Boot includes a number of built-in endpoints and lets you add your own. For example, the health endpoint provides basic application health information.
Each individual endpoint can be enabled or disabled. This controls whether or not the endpoint is created and its bean exists in the application context. To be remotely accessible an endpoint also has to be exposed via JMX or HTTP. Most applications choose HTTP, where the ID of the endpoint along with a prefix of /actuator is mapped to a URL. For example, by default, the health endpoint is mapped to /actuator/health.
Here are some of the most common endpoints Boot provides out of the box:
/health – Shows application health information (a simple ‘status’ when accessed over an
unauthenticated connection or full message details when authenticated). It is not sensitive by
/info – Displays arbitrary application info. Not sensitive by default.
/metrics – Shows ‘metrics’ information for the current application. It is also sensitive by default.
/trace – Displays trace information (by default the last few HTTP requests). You can find the
full list of existing endpoints over on the official docs.

Enabling Endpoint

By default, all endpoints except for shutdown are enabled. To configure the enablement of an endpoint, use its management.endpoint.<id>.enabled property. The following example enables the shutdown endpoint:
If you prefer endpoint enablement to be opt-in rather than opt-out, set the management.endpoints.enabled-by-default property to false and use individual endpoint enabled properties to opt back in.
Since Endpoints may contain sensitive information so we should be very careful while enabling the endpoints. To change which endpoints are exposed, use the following technology-specific include and exclude properties:
                Property                                    Default
management.endpoints.jmx.exposure.include                       *
management.endpoints.web.exposure.include                 info, health

Customizing Endpoint

Each endpoint can be customized with properties in file , using following format:
endpoints. [endpoint name].[property to customize]

Three properties are available:
id – by which this endpoint will be accessed over HTTP
enabled – if true then it can be accessed otherwise not
sensitive – if true then need the authorization to show crucial information over HTTP.

For example, add the following properties will customize the /beans endpoint:

Customizing Health Endpoint : 

The /health endpoint is used to check the health/status of the running application. It’s usually used by basic monitoring software to alert you if the production goes down.
By default only health information is shown to unauthorized access over HTTP:
"status" : "UP"
This health information is collected from all the beans implementing HealthIndicator interface configured in your application context. Some information returned by HealthIndicator is sensitive in nature – but you can configure to expose the other information like disk space, data source etc.
You can also roll your own custom health indicator. One can extend the HealthIndicator interface and provide their own implementation. CustomHealthCheck is implementing the method health() which is declared in the interface HealthIndicator. When you create a custom class of this type and override the method health(), the default functionality will be overwritten by your custom logic.
public class HealthCheckEndpoint implements HealthIndicator {
    /* (non-Javadoc)
     * @see
    public Health health() {
        Health.Builder builder = new Health.Builder();
        Health health = null;
            health = builder.up().withDetail("MyApp-Services""online").build();
        }else {
            health = builder.down().withDetail("MyApp-Services""offline").build();
        return health;
     * Checks if is remote service up.
     * @return true, if is remote service up
    private boolean isRemoteServiceUp(){ 
        ApplicationHealthIndicator applicationHealthIndicator = new ApplicationHealthIndicator();
        // perform call out to remote service to check if its up
        DataSourceHealthIndicator dataSourceHealthIndicator = new DataSourceHealthIndicator();
        return Status.UP.equals( &&

Customizing Application Info Endpoint:

We can also configure information about Spring boot application which is being developed. It needs few configurations -

And we call this endpoint using below url which will produce the output like mentioned in code block.
    "app": {
        "description""MyApp SERVICE",
        "java": {

Creating Custom Endpoint:

Sometimes we need to get few specific metrics which are not provided by Spring boot actuators as default metrics. So for this purpose we can develop it using annotation @Endpoint . 
Its really simple to create metrics for your application as your business needs. The detail syntax to create your own metrics is 
@Endpoint(id = "service.count")
public class ApplicationMetricsEndpoint {
    /** The counter. */
    private Counter counter;
     * Instantiates a new application metrics endpoint.
     * @param registry the registry
    public ApplicationMetricsEndpoint(MeterRegistry registry) {
        this.counter = registry.counter("service.count");
     * Count service.
     * @return the string
     * @throws JSONException
    public String countService() throws JSONException {
        JSONObject metricData = new JSONObject();
        metricData.put(Constants.TOTAL_COUNT, counter.count());
        return metricData.toString();

Integration with Prometheus

With Spring boot 2.0, adding Prometheus support to Spring boot became a lot easier thanks to the integration of Micrometer. Micrometer can be compared to what slf4j does for logging, but for monitoring in stead. It provides a clean API that can be accessed, and has a bridge to many monitoring platforms, including Prometheus.
To be able to monitor our application within Spring boot using third party tool, we need to add the following dependencies:

Now you can configure Actuator to expose the Prometheus endpoint by configuring the following property within


If you run your application now, and you visit - http://<host>:<port no>/actuator
    "_links": {
        "self": {
        "service.count": {
        "service.count.clients": {
        "service.count.queries": {
        "service.count.grants": {
        "service.count.requests": {
        "service.count.requests.timebased": {
        "health": {
        "shutdown": {
        "flyway": {
        "info": {
        "prometheus": {
        "metrics-requiredMetricName": {
        "metrics": {
        "httptrace": {

Prometheus has to be configured separately, by creating a prometheus.yml file. In this case, I’m going to set up Prometheus so that it will scan the following locations:
● Our Spring boot application
● Prometheus itself
● The Grafana web application
Just we need to add prometheus.yml file into our boot project in resources with following configurations
# my global config
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
  # - "first_rules.yml"
  # - "second_rules.yml"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    - targets: ['']
  - job_name: 'MyApp-actuator'
    metrics_path: '/actuator/prometheus'
    scrape_interval: 5s
    - targets: ['']
Once we restart our boot application after these changes , the metrics which has been configured in the application , will be published to prometheus . Prometheus usually pulls the data on some scrape interval basis. 

Grafana dashboard

We can also display these metrics on Grafana, just we need to configure datasource and add the job name which we configured for prometheus.