Data pipelines are a critical part of modern software systems, ensuring seamless data flow between various components. In this tutorial, we’ll build an Apache Camel-based data pipeline that listens to an AWS SQS queue, retrieves weather information from the WeatherStack API, and persists the results into a PostgreSQL database.
This example is structured to highlight how you can split your Camel routes across multiple files, a best practice for organizing real-world applications. However, in production, you might further refine this structure based on project needs.
Architecture Overview
The pipeline consists of the following steps:
- AWS SQS Listener: Reads messages containing city names from an SQS queue.
- AWS Secrets Manager: Retrieves the WeatherStack API key securely.
- WeatherStack API Call: Fetches weather details based on the received city.
- PostgreSQL Persistence: Stores the weather data for future use.
Prerequisites
Before we dive into the code, ensure you have:
- AWS SQS and Secrets Manager configured.
- A PostgreSQL database set up.
- Apache Camel with Spring Boot.
- A valid WeatherStack API key (which you can have one for free for up to 100req/month).
Project Structure
The code is split across multiple packages for modularity:
weather-processor/
├── src/main/java/io/igventurelli/weather_processor/
│ ├── WeatherProcessorApplication.java # Main entry point
│ ├── aws/
│ │ ├── AWSSQSRoute.java # Listens to SQS
│ │ ├── AWSSecretsManagerRoute.java # Fetches API key
│ ├── integration/
│ │ ├── WeatherStackRoute.java # Calls WeatherStack API
│ ├── db/
│ │ ├── PostgresRoute.java # Persists data to PostgreSQL
│ ├── model/
│ │ ├── WeatherData.java # Entity for weather data
│ │ ├── WeatherRequest.java # Entity for weather data
│ │ ├── CurrentWeather.java # Entity for weather data
│ │ ├── Location.java # Entity for weather data
Each component has a single responsibility, making the system easier to maintain and extend.
Step 1: Listening to AWS SQS
Apache Camel provides an easy way to consume messages from SQS. The AWSSQSRoute.java
listens for incoming JSON messages in the format:
{ "city": "Sao Paulo" }
Implementation:
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class AWSSQSRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("aws2-sqs://weather-processor?useDefaultCredentialsProvider=true").routeId("sqs")
.unmarshal().json().setVariable("city", simple("${body.get('city')}"))
.to("direct:orchestrator");
}
}
//useDefaultCredentialsProvider=true is for load AWS auth from the system
This route:
- Listens to the SQS queue
weather-queue
; - Parses the payload to JSON and then get the
city
attribute from it; - Routes the message to the orchestrator route (
direct:orchestrator
);
Step 2: Retrieving the API Key from AWS Secrets Manager
Instead of hardcoding the API key, we fetch it securely from AWS Secrets Manager.
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws.secretsmanager.SecretsManagerConstants;
import org.springframework.stereotype.Component;
@Component
public class AWSSecretsManagerRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:aws-secrets-manager").routeId("secrets-manager")
.setHeader(SecretsManagerConstants.SECRET_ID, constant("prod/weatherstack/apikey"))
.to("aws-secrets-manager://weatherstack-secret?useDefaultCredentialsProvider=true&operation=getSecret")
.unmarshal().json().setVariable("apiKey", simple("${body.get('key')}"));
}
}
This route:
- Retrieves the secret from AWS Secrets Manager;
- Extracts the API key from the JSON response and store on the
apiKey
variable;
Step 3: Calling the WeatherStack API
Next, we use Apache Camel’s HTTP component to fetch weather data.
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.http.HttpMethods;
import org.springframework.stereotype.Component;
@Component
public class WeatherStackRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:weather-stack").routeId("weather-stack")
.setHeader(Exchange.HTTP_METHOD, constant(HttpMethods.GET))
.setHeader(Exchange.HTTP_QUERY, simple("access_key=${variable.apiKey}&query=${variable.city}"))
.to("<https://api.weatherstack.com/current>");
}
}
Here:
- The route constructs the WeatherStack API URL dynamically;
- Calls the API;
Step 4: Persisting Data in PostgreSQL
Once we have the weather data, we store it in the database.
import io.igventurelli.weather_processor.model.WeatherData;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class PostgresRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:postgres")
.unmarshal().json(WeatherData.class)
.to("jpa:io.igventurelli.weather_processor.model.WeatherData");
}
}
This route:
- Parses the raw data from the WeatherStack API into our domain
WeatherData.java
; - Inserts the processed data into PostgreSQL;
Under the hood we’re using Spring Data JPA starter with Camel JPA starter as well. The first one manages the connection and the second one provides us the API to persist the data.
Step 5: The Orchestrator
We have an Orchestrator to manages the pipeline execution. For pipelines I rather useOrchestrator instead Coreography pattern.
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class WeatherProcessor extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:orchestrator").routeId("orchestrator")
// load WeatherStack API Key
.to("direct:aws-secrets-manager")
// call WeatherStack API
.to("direct:weather-stack")
// send to Postgres
.to("direct:postgres")
.log("finished");
}
}
By having an Orchestrator it became way easier to understand the entire flow and all the steps.
Running the Application
To start the application, simply run:
mvn spring-boot:run
You can checkout the entire code on GitHub.
You should see logs indicating messages being processed from SQS, API calls being made, and data being persisted to PostgreSQL, like these:
2025-02-25T18:29:20.498-03:00 INFO 3206 --- [weather-processor] [ main] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2025-02-25T18:29:22.102-03:00 INFO 3206 --- [weather-processor] [ main] o.a.camel.component.jpa.JpaComponent : Using EntityManagerFactory found in registry with id [entityManagerFactory] org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@18ff753c
2025-02-25T18:29:22.102-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.c.jpa.DefaultTransactionStrategy : Using TransactionManager found in registry with id [transactionManager] org.springframework.orm.jpa.JpaTransactionManager@48a21ea6
2025-02-25T18:29:22.132-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.10.0 (camel-1) is starting
2025-02-25T18:29:22.168-03:00 INFO 3206 --- [weather-processor] [ main] c.s.b.CamelSpringBootApplicationListener : Starting CamelMainRunController to ensure the main thread keeps running
2025-02-25T18:29:22.168-03:00 INFO 3206 --- [weather-processor] [inRunController] org.apache.camel.main.MainSupport : Apache Camel (Main) 4.10.0 is starting
2025-02-25T18:29:22.172-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Routes startup (total:5)
2025-02-25T18:29:22.172-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Started sqs (aws2-sqs://weather-processor)
2025-02-25T18:29:22.172-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Started secrets-manager (direct://aws-secrets-manager)
2025-02-25T18:29:22.172-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Started route1 (direct://postgres)
2025-02-25T18:29:22.172-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Started weather-stack (direct://weather-stack)
2025-02-25T18:29:22.172-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Started orchestrator (direct://orchestrator)
2025-02-25T18:29:22.172-03:00 INFO 3206 --- [weather-processor] [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.10.0 (camel-1) started in 39ms (build:0ms init:0ms start:39ms)
2025-02-25T18:29:22.174-03:00 INFO 3206 --- [weather-processor] [ main] i.i.w.WeatherProcessorApplication : Started WeatherProcessorApplication in 3.423 seconds (process running for 3.685)
Hibernate: insert into current_weather (cloudcover,feelslike,humidity,is_day,observation_time,precip,pressure,temperature,uv_index,visibility,weather_code,wind_degree,wind_dir,wind_speed) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
Hibernate: insert into location (country,lat,localtime_epoch,lon,name,region,timezone_id,utc_offset) values (?,?,?,?,?,?,?,?)
Hibernate: insert into weather_request (language,query,type,unit) values (?,?,?,?)
Hibernate: insert into weather_data (current_id,location_id,request_id) values (?,?,?)
Hibernate: insert into current_weather_weather_descriptions (current_weather_id,weather_descriptions) values (?,?)
Hibernate: insert into current_weather_weather_icons (current_weather_id,weather_icons) values (?,?)
2025-02-25T18:29:25.405-03:00 INFO 3206 --- [weather-processor] [ather-processor] orchestrator : finished
Conclusion
With Apache Camel, we efficiently built a modular and extensible data pipeline:
- Decoupled SQS message consumption from processing.
- Retrieved API keys securely using AWS Secrets Manager.
- Called an external API dynamically.
- Persisted the results to a database.
This architecture is scalable, and each component can be enhanced independently. In production, you might consider:
- Adding retry and error handling mechanisms.
- Using a caching layer to reduce API calls.
- Implementing structured logging and monitoring.
By leveraging Apache Camel’s powerful routing capabilities and easy integration way, you can build robust integration solutions efficiently.
Would you like to see enhancements such as retries, error handling, or monitoring? Let me know in the comments!