Skip to content

Consuming AWS SQS Events with Knative Eventing

Published on: 2024-11-12

Consuming AWS SQS Events with Knative Eventing

Author: Matthias Weßendorf, Senior Principal Software Engineer @ Red Hat

In a previous post we discussed the consumption of notifications from an AWS S3 bucket using Apache Camel K. While this is a good approach for getting data from cloud providers, like AWS, into Knative, the Knative Eventing team is aiming to integrate this at the core of its offering with a new CRD, the IntegrationSource. This post will describe howto receive SQS notifications and forward them to a regular Knative Broker for further processing.

Installation

The IntegrationSource will be part of Knative Eventing in a future release. Currently it is under development but already included in the main branch. For installing Knative Eventing from the sources you can follow the develpoment guide.

Note

Installing Knative Eventing from the source repository is not recommended for production cases. The purpose of this blog post is to give an early introduction to the new IntegrationSource CRD..

Creating a Knative Broker instance

Once the main branch of Knative Eventing is installed we are using a Knative Broker as the heart of our system, acting as an Event Mesh for both event producers and event consumers:

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  namespace: default
  name: my-broker

Now event producers can send events to it and event consumers can receive events.

Using IntegrationSource for AWS SQS

In order to send data from AWS SQS to a Knative component, like my-broker we created above, we are using the new IntegrationSource CRD. It basically allows to declaratively move data from a system, like AWS SQS, towards a Knative resource, like our above Broker:

apiVersion: sources.knative.dev/v1alpha1
kind: IntegrationSource
metadata:
  name: aws-sqs-source
spec:
  aws:
    sqs:
      queueNameOrArn: "my-queue"
      region: "my-queue"
      visibilityTimeout: 20
    auth:
      secret:
        ref:
          name: "my-secret"
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: my-broker

The IntegrationSource has an aws field, for defining different Amazon Web Services, such as s3, ddb-streams or like in this case sqs. Underneath the aws property is also a reference to a Kubernetes Secret, which contains the credentials for connecting to AWS. All SQS notifications are processed by the source and being forwarded as CloudEvents to the provided sink

Note

If you compare the IntegrationSource to the Pipe from Apache Camel on the previous article you will notice the new resource is less verbose and is directly following established Knative development principles, like any other Knative Event Source.

Creating the Kubernetes Secret for the IntegrationSource

For connecting to any AWS service the IntegrationSource uses regular Kubernetes Secrets, present in the namespace of the resource. The Secret can be created like:

$ kubectl -n <namespace> create secret generic <secret-name> --from-literal=aws.accessKey=<...> --from-literal=aws.secretKey=<...> 

Setting up the Consumer application

Now that we have the Broker and the IntegrationSource connected to it, it is time to define an application that is receiving and processing the SQS notifications:

apiVersion: v1
kind: Pod
metadata:
  name: log-receiver
  labels:
    app: log-receiver
spec:
  containers:
  - name: log-receiver
    image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    imagePullPolicy: Always
    ports:
    - containerPort: 8080
      protocol: TCP
      name: log-receiver
---
apiVersion: v1
kind: Service
metadata:
  name: log-receiver
spec:
  selector:
    app: log-receiver
  ports:
    - port: 80
      protocol: TCP
      targetPort: log-receiver
      name: http

Here we define a simple Pod and its Service, which points to an HTTP-Server, that receives the CloudEvents. As you can see, this is not a AWS SQS specific consumer. Basically any HTTP Webserver, in any given language, can be used for processing the CloudEvents coming from a Knative Broker.

Subscribe the Consumer application to AWS SQS events

In order to be able to receive the SQS event notifications, we need to create a Trigger for our Consumer application:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: aws-sqs-trigger
spec:
  broker: my-broker
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: log-receiver

For debugging purpose we create a Trigger without any filters so it will forward all CloudEvents to the log-receiver application. Once deployed, in the log of the log-receiver pod we should be seeing the following for any produced SQS notification on our queue:

☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.connector.event.aws-sqs
  source: dev.knative.eventing.aws-sqs-source
  subject: aws-sqs-source
  id: 9CC70D09569020C-0000000000000001
  time: 2024-11-08T07:34:16.413Z
  datacontenttype: application/json
Extensions,
  knativearrivaltime: 2024-11-08T07:34:16.487697262Z
Data,
  <test data notification>

Conclusion and Outlook

With the new IntegrationSource we will have a good way to integrate services from public cloud providers like AWS, by leveraging Apache Camel Kamelets behind the scenes. The initial set of services is on AWS, like s3, sqs or ddb-streams. However we are planning to add support for different services and providers.

Since Apache Camel Kamelets can also act as Sinks, the team is working on providing a IntegrationSink, following same principles.

We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.

× OK