Building a Data Pipeline with Apache Camel: Processing Weather Data from AWS SQS

Learn how to build a scalable data pipeline with Apache Camel, AWS SQS, and PostgreSQL – Step by Step

Data pipelines are a critical part of modern software systems, ensuring seamless data flow between various components. In this tutorial, we’ll build an Apache Camel-based data pipeline that listens to an AWS SQS queue, retrieves weather information from the WeatherStack API, and persists the results into a PostgreSQL database.

This example is structured to highlight how you can split your Camel routes across multiple files, a best practice for organizing real-world applications. However, in production, you might further refine this structure based on project needs.

Architecture Overview

The pipeline consists of the following steps:

  1. AWS SQS Listener: Reads messages containing city names from an SQS queue.
  2. AWS Secrets Manager: Retrieves the WeatherStack API key securely.
  3. WeatherStack API Call: Fetches weather details based on the received city.
  4. PostgreSQL Persistence: Stores the weather data for future use.

Prerequisites

Before we dive into the code, ensure you have:

  • AWS SQS and Secrets Manager configured.
  • A PostgreSQL database set up.
  • Apache Camel with Spring Boot.
  • A valid WeatherStack API key (which you can have one for free for up to 100req/month).

Project Structure

The code is split across multiple packages for modularity:

weather-processor/
├── src/main/java/io/igventurelli/weather_processor/
│   ├── WeatherProcessorApplication.java  # Main entry point
│   ├── aws/
│   │   ├── AWSSQSRoute.java              # Listens to SQS
│   │   ├── AWSSecretsManagerRoute.java   # Fetches API key
│   ├── integration/
│   │   ├── WeatherStackRoute.java        # Calls WeatherStack API
│   ├── db/
│   │   ├── PostgresRoute.java            # Persists data to PostgreSQL
│   ├── model/
│   │   ├── WeatherData.java              # Entity for weather data
│   │   ├── WeatherRequest.java           # Entity for weather data
│   │   ├── CurrentWeather.java           # Entity for weather data
│   │   ├── Location.java                 # Entity for weather data

Each component has a single responsibility, making the system easier to maintain and extend.

Step 1: Listening to AWS SQS

Apache Camel provides an easy way to consume messages from SQS. The AWSSQSRoute.java listens for incoming JSON messages in the format:

{ "city": "Sao Paulo" }

Implementation:

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class AWSSQSRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("aws2-sqs://weather-processor?useDefaultCredentialsProvider=true").routeId("sqs")
            .unmarshal().json().setVariable("city", simple("${body.get('city')}"))
            .to("direct:orchestrator");
    }
}

//useDefaultCredentialsProvider=true is for load AWS auth from the system

This route:

  1. Listens to the SQS queue weather-queue ;
  2. Parses the payload to JSON and then get the city attribute from it;
  3. Routes the message to the orchestrator route (direct:orchestrator);

Step 2: Retrieving the API Key from AWS Secrets Manager

Instead of hardcoding the API key, we fetch it securely from AWS Secrets Manager.

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws.secretsmanager.SecretsManagerConstants;
import org.springframework.stereotype.Component;

@Component
public class AWSSecretsManagerRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:aws-secrets-manager").routeId("secrets-manager")
            .setHeader(SecretsManagerConstants.SECRET_ID, constant("prod/weatherstack/apikey"))
            .to("aws-secrets-manager://weatherstack-secret?useDefaultCredentialsProvider=true&operation=getSecret")
            .unmarshal().json().setVariable("apiKey", simple("${body.get('key')}"));
    }
}

This route:

  1. Retrieves the secret from AWS Secrets Manager;
  2. Extracts the API key from the JSON response and store on the apiKey variable;

Step 3: Calling the WeatherStack API

Next, we use Apache Camel’s HTTP component to fetch weather data.

import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.http.HttpMethods;
import org.springframework.stereotype.Component;

@Component
public class WeatherStackRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:weather-stack").routeId("weather-stack")
            .setHeader(Exchange.HTTP_METHOD, constant(HttpMethods.GET))
            .setHeader(Exchange.HTTP_QUERY, simple("access_key=${variable.apiKey}&query=${variable.city}"))
            .to("<https://api.weatherstack.com/current>");
    }
}

Here:

  1. The route constructs the WeatherStack API URL dynamically;
  2. Calls the API;

Step 4: Persisting Data in PostgreSQL

Once we have the weather data, we store it in the database.

import io.igventurelli.weather_processor.model.WeatherData;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class PostgresRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:postgres")
            .unmarshal().json(WeatherData.class)
            .to("jpa:io.igventurelli.weather_processor.model.WeatherData");
    }
}

This route:

  1. Parses the raw data from the WeatherStack API into our domain WeatherData.java ;
  2. Inserts the processed data into PostgreSQL;

Under the hood we’re using Spring Data JPA starter with Camel JPA starter as well. The first one manages the connection and the second one provides us the API to persist the data.

Step 5: The Orchestrator

We have an Orchestrator to manages the pipeline execution. For pipelines I rather useOrchestrator instead Coreography pattern.

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class WeatherProcessor extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:orchestrator").routeId("orchestrator")

            // load WeatherStack API Key
            .to("direct:aws-secrets-manager")

            // call WeatherStack API
            .to("direct:weather-stack")

            // send to Postgres
            .to("direct:postgres")

            .log("finished");
    }
}

By having an Orchestrator it became way easier to understand the entire flow and all the steps.

Running the Application

To start the application, simply run:

mvn spring-boot:run

You can checkout the entire code on GitHub.

You should see logs indicating messages being processed from SQS, API calls being made, and data being persisted to PostgreSQL, like these:

2025-02-25T18:29:20.498-03:00  INFO 3206 --- [weather-processor] [           main] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2025-02-25T18:29:22.102-03:00  INFO 3206 --- [weather-processor] [           main] o.a.camel.component.jpa.JpaComponent     : Using EntityManagerFactory found in registry with id [entityManagerFactory] org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@18ff753c
2025-02-25T18:29:22.102-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.c.jpa.DefaultTransactionStrategy   : Using TransactionManager found in registry with id [transactionManager] org.springframework.orm.jpa.JpaTransactionManager@48a21ea6
2025-02-25T18:29:22.132-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 4.10.0 (camel-1) is starting
2025-02-25T18:29:22.168-03:00  INFO 3206 --- [weather-processor] [           main] c.s.b.CamelSpringBootApplicationListener : Starting CamelMainRunController to ensure the main thread keeps running
2025-02-25T18:29:22.168-03:00  INFO 3206 --- [weather-processor] [inRunController] org.apache.camel.main.MainSupport        : Apache Camel (Main) 4.10.0 is starting
2025-02-25T18:29:22.172-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   : Routes startup (total:5)
2025-02-25T18:29:22.172-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started sqs (aws2-sqs://weather-processor)
2025-02-25T18:29:22.172-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started secrets-manager (direct://aws-secrets-manager)
2025-02-25T18:29:22.172-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started route1 (direct://postgres)
2025-02-25T18:29:22.172-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started weather-stack (direct://weather-stack)
2025-02-25T18:29:22.172-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started orchestrator (direct://orchestrator)
2025-02-25T18:29:22.172-03:00  INFO 3206 --- [weather-processor] [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 4.10.0 (camel-1) started in 39ms (build:0ms init:0ms start:39ms)
2025-02-25T18:29:22.174-03:00  INFO 3206 --- [weather-processor] [           main] i.i.w.WeatherProcessorApplication        : Started WeatherProcessorApplication in 3.423 seconds (process running for 3.685)
Hibernate: insert into current_weather (cloudcover,feelslike,humidity,is_day,observation_time,precip,pressure,temperature,uv_index,visibility,weather_code,wind_degree,wind_dir,wind_speed) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
Hibernate: insert into location (country,lat,localtime_epoch,lon,name,region,timezone_id,utc_offset) values (?,?,?,?,?,?,?,?)
Hibernate: insert into weather_request (language,query,type,unit) values (?,?,?,?)
Hibernate: insert into weather_data (current_id,location_id,request_id) values (?,?,?)
Hibernate: insert into current_weather_weather_descriptions (current_weather_id,weather_descriptions) values (?,?)
Hibernate: insert into current_weather_weather_icons (current_weather_id,weather_icons) values (?,?)
2025-02-25T18:29:25.405-03:00  INFO 3206 --- [weather-processor] [ather-processor] orchestrator                             : finished

Conclusion

With Apache Camel, we efficiently built a modular and extensible data pipeline:

  • Decoupled SQS message consumption from processing.
  • Retrieved API keys securely using AWS Secrets Manager.
  • Called an external API dynamically.
  • Persisted the results to a database.

This architecture is scalable, and each component can be enhanced independently. In production, you might consider:

  • Adding retry and error handling mechanisms.
  • Using a caching layer to reduce API calls.
  • Implementing structured logging and monitoring.

By leveraging Apache Camel’s powerful routing capabilities and easy integration way, you can build robust integration solutions efficiently.

Would you like to see enhancements such as retries, error handling, or monitoring? Let me know in the comments!

Leave a Reply

Your email address will not be published. Required fields are marked *