Implementing the Wire Tap pattern using Camel K

Sadhana Nandakumar
4 min readSep 9, 2021

Camel K is a lightweight integration framework built from Apache Camel to run natively on Kubernetes. It is specifically designed for serverless and microservices based architectures. In this article, we will create a microservice and push metrics using the Wire Tap EIP. Wire tap allows you to route messages to a separate location while they are being forwarded to the ultimate destination.

Let us assume that we have a microservice and everytime we invoke this service, we would like to push metrics data to an elastic cluster. We will use the Wire tap EIP to do this. As a transaction request comes in, we will push the incoming message to a Kafka topic called Investigation, and simultaneously push the message to a reporting topic. We will then push the message to the elastic cluster.

Assuming we have an openshift cluster, let us first create a project.

oc create project transaction-processing

Step 1: Install an AMQ Streams cluster

The Operator Hub is a collection of Operators from the Kubernetes community and Red Hat partners, curated by Red Hat. Let us first install the AMQ Streams operator.

We will install it in a namespace we have created (transaction-processing in the example).

Now that the operator is installed, we can now create a simple 3 node Kafka cluster. For this click on the Kafka tab and click on the Create Kafka button. We will accept the defaults and create.

Step 2: Setup an Elastic Cluster

We will create an ElasticSearchCluster and an instance of Kibana. Let us accept the defaults to complete the setup.

Step 3: Create the Integration microservice

We will now define the integration micro service using Camel-K. We have defined two routes. The first route defines the microservice, and pushes the transaction to an investigation topic. Notice that we also send the same message to the reporting topic as well. The camel wireTap component allows you to send this message without interrupting the main flow.

rest("/transaction").post().route().to("kafka:"+"investigation"+ "?brokers=" + kafkaBootstrap).wireTap("kafka:"+"reporting"+ "?brokers=" + kafkaBootstrap);

We then push the message from the reporting topic to the elastic cluster.

from("kafka:" + "reporting" + "?brokers=" + kafkaBootstrap + "&maxPollRecords="+ consumerMaxPollRecords + "&seekTo=" + "beginning"+ "&groupId=" + "task").setHeader(Exchange.HTTP_METHOD, constant("POST")).setHeader("Authorization",constant("Basic XXXXXXX")).setHeader("Content-Type",constant("application/json")).to("https://elasticsearch-sample-es-http:9200/audit/audit").log("${body}");

The elastic cluster exposes a REST API over https. We will need to create a secret with the valid certificate, so that we can authenticate against the cluster.

oc extract secret/elasticsearch-sample-es-http-certs-internal --keys=ca.crt --to=- > ca.crtkeytool -trustcacerts -keystore "trust.jks" -storepass changeit -importcert -alias elastic -file "ca.crt"oc create secret generic trust --from-file trust.jks

We will add the SSL properties using a Modeline comment on the top of this integration.

// camel-k: language=java property-file=tls.properties secret=trust

We will now install the Camel K operator and deploy the integration micro service.

kamel installkamel run TransactionService.java

Step 4: Invoke the REST service and check for metrics in the Elastic cluster

Now we can lookup the route that is auto generated by the REST DSL component and send in a transaction payload.

The data should be available on the Elastic cluster. Lookup the route for Elastic and invoke a REST call to verify the audit data contents.

One of the main strengths of this concept is the ability to use multiple Wire Taps across one or more services to be analyzed by a central reporting engine. As you can see, we will now be able to send audit messages from across a micro service ecosystem to have a complete understanding of the end to end process.

--

--

Sadhana Nandakumar

Sadhana is a Sr Technical Product Marketing Manager specializing in Salesforce Platform