Skip to content
GitHub LinkedIn

Building a Snowpipe equivalent for Redshift

My last tutorial type article (Moving to functionless Step Functions) seemed to pick up some traction, so here’s another!

At work (Inawisdom) we’ve been seeing an increased use of Snowflake as an alternative to Redshift for data warehouse solutions. One really nice feature that I came across within Snowflake is Snowpipe. It allows data to be automatically loaded into the Snowflake data warehouse when an object is put into S3.

I’d love to have a native AWS way to do this where Redshift is the target data warehouse, for those use cases where extract-load-transform is the desired method (e.g. get the data into Redshift and then something like dbt to build data models).

Let’s get started on setting this up!

Setting up a source bucket

Before Redshift related infrastructure is looked at, an S3 bucket needs to be created for source files to be uploaded to. The reason that we create a bucket first is so we can reference the ARN later in IAM policies attached to the role on the Redshift cluster.

The bucket is configured to publish events to EventBridge. We configure this via the NotificationConfiguration property on the CloudFormation resource. This allows creation of EventBridge rules later to respond to an object being uploaded.

# cloudformation/s3-bucket.yaml

AWSTemplateFormatVersion: "2010-09-09"
Description: "Creates an S3 bucket for source files to be uploaded to"

Resources:
  Bucket:
    Type: "AWS::S3::Bucket"
    Properties: 
      BucketName: !Sub "data-platform-source-bucket-${AWS::Region}"
      NotificationConfiguration:
        EventBridgeConfiguration:
          EventBridgeEnabled: true
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

Outputs:
  DataPlatformSourceBucketName:
    Export: 
      Name: "DataPlatformSourceBucketName"
    Description: "The name of the source bucket"
    Value: !Ref "Bucket"
  DataPlatformSourceBucketArn:
    Export: 
      Name: "DataPlatformSourceBucketArn"
    Description: "The ARN of the source bucket"
    Value: !GetAtt "Bucket.Arn"

The name and ARN of the bucket are exported from the created CloudFormation stack for use in later stacks.

Provisioning a Redshift cluster

In order to get a Redshift cluster up and running, we first need to create a VPC for it to be deployed into. There’s nothing special about the VPC and subnet that is created here - however it is worth pointing out that in a production set up, you would want to create multiple subnets across multiple availability zones to enable high availability. The ID of the subnet where the Redshift cluster is to be deployed is exported from the CloudFormation stack to make it easily accessible in later stacks.

If you’ve got a VPC and subnets set up already, you can skip this step.

# cloudformation/vpc.yaml

AWSTemplateFormatVersion: "2010-09-09"
Description: "Creates a simple VPC for a Redshift cluster to be deployed into"

Resources:
  Vpc:
    Type: "AWS::EC2::VPC"
    Properties: 
      CidrBlock: "10.0.0.0/16"
      Tags:
        - Key: "Name"
          Value: "data-platform-vpc"
  
  Subnet:
    Type: "AWS::EC2::Subnet"
    Properties:
      CidrBlock: "10.0.0.0/16"
      VpcId: !Ref "Vpc"
      Tags:
        - Key: "Name"
          Value: "data-platform-subnet"

Outputs:
  DataPlatformSubnetId:
    Export: 
      Name: "DataPlatformSubnetId"
    Description: "The ID of the subnet to create the Redshift cluster in"
    Value: !Ref "Subnet"

With a VPC in place, the Redshift cluster can be provisioned. There’s quite a lot going on in this next code snippet, so I’ll work through it piece by piece.

If you already have a VPC set up, you’ll want to tweak the RedshiftClusterSubnetGroup resource to reference the location of your subnet ID. I’d recommend using a CloudFormation parameter, SSM parameter or CloudFormation exported value for this.

# cloudformation/redshift-cluster.yaml

AWSTemplateFormatVersion: "2010-09-09"
Description: "Creates a single node Redshift cluster, compatible with the Redshift Data API"

Parameters:
  ClusterDatabaseName:
    Type: "String"
    Description: "The name of the database to create on the Redshift cluster"
    Default: "demo_db"

Resources:
  RedshiftIamRole:
    Type: "AWS::IAM::Role"
    Properties:
      RoleName: "data-platform-redshift-role"
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "redshift.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Policies:
        - PolicyName: "root"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "s3:GetObject"
                  - "s3:ListBucket"
                Resource:
                  - !ImportValue "DataPlatformSourceBucketArn"
                  - !Join ["", [!ImportValue "DataPlatformSourceBucketArn", "/*"]]

  RedshiftAdminUserSecret:
    Type: "AWS::SecretsManager::Secret"
    Properties:
      Description: "Administrator credentials for the Redshift cluster"
      GenerateSecretString:
        SecretStringTemplate: '{"username": "db_admin"}'
        GenerateStringKey: "password"
        PasswordLength: 32
        ExcludeCharacters: '"@/\'
  
  RedshiftClusterSubnetGroup:
    Type: "AWS::Redshift::ClusterSubnetGroup"
    Properties:
      Description: "The cluster subnet group for the main Redshift cluster"
      SubnetIds:
        - !ImportValue "DataPlatformSubnetId"
      Tags:
        - Key: "Name"
          Value: !Sub "dataplatform-redshift-cluster-subnet-group-${AWS::Region}"
          
  RedshiftCluster:
    Type: "AWS::Redshift::Cluster"
    Properties:
      ClusterIdentifier: !Sub "dataplatform-redshift-cluster-${AWS::Region}"
      DBName: !Ref "ClusterDatabaseName"
      MasterUsername: !Join ['', ['{{resolve:secretsmanager:', !Ref RedshiftAdminUserSecret, ':SecretString:username}}' ]]
      MasterUserPassword: !Join ['', ['{{resolve:secretsmanager:', !Ref RedshiftAdminUserSecret, ':SecretString:password}}' ]]
      NodeType: "dc2.large"
      ClusterType: "single-node"
      ClusterSubnetGroupName: !Ref "RedshiftClusterSubnetGroup"
      PubliclyAccessible: false
      IamRoles:
        - !GetAtt "RedshiftIamRole.Arn"

The result of creating CloudFormation stacks from these two templates is a Redshift cluster ready to have data loaded into it.

Bootstrapping the cluster

Now that there’s a Redshift cluster to load data into, a table needs to be created within it. This can be done by using the Redshift query editor (v1 or v2) in the AWS management console. Make sure it’s using the demo_db database, or the name you chose if you passed a different value into the CloudFormation parameter.

-- sql/table.sql

CREATE TABLE IF NOT EXISTS public.data_loading_demo (
  email_address VARCHAR(255),
  first_name VARCHAR(255),
  last_name VARCHAR(255)
) DISTSTYLE AUTO SORTKEY AUTO;

This SQL script will create a table that matches the format of the data files we’ll be uploading to our S3 bucket later.

Automating loading of data into Redshift

Now that all of the groundwork is laid, we can get on with building the mechanism that’ll deal with the loading of data in response to objects being uploaded. First of all, let’s talk about how the solution will work. At a high level:

It’s worth pointing out at this stage that the Redshift Data API is asynchronous and therefore we’ll have to implement some polling logic within the state machine to check that the query completed successfully.

The first step of this is to create the IAM role that the state machine will execute as. This role needs access to use the Redshift Data API. There are three distinct actions that you’ll see granted here:

# cloudformation/state-machine.yaml

Parameters:
  ClusterDatabaseName:
    Type: "String"
    Description: "The name of the database to create on the Redshift cluster"
    Default: "demo_db"

Resources:
  StateMachineIamRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "states.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Policies:
        - PolicyName: "root"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "redshift:GetClusterCredentials"
                Resource:
                  - !Sub "arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:dataplatform-redshift-cluster-${AWS::Region}/${ClusterDatabaseName}"
                  - !Sub "arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:dataplatform-redshift-cluster-${AWS::Region}/db_admin"
              - Effect: "Allow"
                Action:
                  - "redshift-data:ExecuteStatement"
                Resource:
                  - !Sub "arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:dataplatform-redshift-cluster-${AWS::Region}"
              - Effect: "Allow"
                Action:
                  - "redshift-data:DescribeStatement"
                Resource:
                  - "*"

Now that we’ve looked at the IAM role required for the state machine, we can build the state machine itself. The state machine makes heavy use of intrinsic functions and the new-ish AWS SDK integration - no Lambda functions in sight here!

The first step executes a COPY query against Redshift. Although we haven’t set up automatic execution of the state machine yet, we’ll assume that the input to an execution consists of bucket and key. For more information about the COPY command, the AWS documentation is a good place to start.

We then move into how to handle the asynchronous nature of the Redshift Data API. To do this, the following steps are taken:

  1. Initialise a retry counter at 0
  2. Check that retry counter < 10. If it’s >= 10, fail the overall execution.
  3. Wait 30 seconds
  4. Using DescribeStatement, check the result of the query
  5. Increment the retry counter by 1
  6. If the query has failed / succeeded, mark the overall appropriately. If it is still ongoing, return to step 2.
# cloudformation/state-machine.yaml

Resources:
  DataPipelineStateMachine:
    Type: "AWS::StepFunctions::StateMachine"
    Properties:
      RoleArn: !GetAtt "StateMachineIamRole.Arn"
      Definition:
        StartAt: "Execute Redshift query"
        States:
          Execute Redshift query:
            Type: "Task"
            Resource: "arn:aws:states:::aws-sdk:redshiftdata:executeStatement"
            Parameters:
              ClusterIdentifier: !Sub "dataplatform-redshift-cluster-${AWS::Region}"
              Database: !Ref "ClusterDatabaseName"
              DbUser: "db_admin"
              Sql.$: !Sub States.Format('COPY public.data_loading_demo FROM \'s3://{}/{}\' IAM_ROLE \'arn:aws:iam::${AWS::AccountId}:role/data-platform-redshift-role\' IGNOREHEADER 1 CSV', $.bucket, $.key)
            ResultSelector:
              Id.$: $.Id
            ResultPath: $.Query
            Next: Initialise number of attempts
          
          Initialise number of attempts:
            Type: "Pass"
            Parameters:
              Id.$: "$.Query.Id"
              Attempts: 0
            ResultPath: "$.Query"
            Next: "Wait before checking result"

          Check number of attempts:
            Type: "Choice"
            Choices:
              - Variable: "$.Query.Attempts"
                NumericGreaterThanEquals: 10
                Next: "Fail"
            Default: "Wait before checking result"

          Wait before checking result:
            Type: Wait
            Seconds: 30
            Next: "Get result of Redshift query"
          
          Get result of Redshift query:
            Type: "Task"
            Resource: "arn:aws:states:::aws-sdk:redshiftdata:describeStatement"
            Parameters:
              Id.$: "$.Query.Id"
            ResultSelector:
              Status.$: "$.Status"
            ResultPath: "$.Query.Result"
            Next: "Increment number of attempts"

          Increment number of attempts:
            Type: "Pass"
            Parameters:
              Id.$: "$.Query.Id"
              Attempts.$: "States.MathAdd($.Query.Attempts, 1)"
              Result.$: "$.Query.Result"
            ResultPath: "$.Query"
            Next: "Check if query succeeded"
            
          Check if query succeeded:
            Type: "Choice"
            Choices:
              - Variable: "$.Query.Result.Status"
                StringEquals: "FINISHED"
                Next: "Succeed"
              - Or:
                  - Variable: "$.Query.Result.Status"
                    StringEquals: "FAILED"
                  - Variable: "$.Query.Result.Status"
                    StringEquals: "ABORTED"
                Next: "Fail"
            Default: "Check number of attempts"
          
          Fail:
            Type: "Fail"

          Succeed:
            Type: "Succeed"

The final step of building a solution to automate data ingestion is triggering an execution of the state machine when an object is uploaded to S3. To do this, we’ll use an EventBridge rule. You’ll remember we enabled EventBridge notifications on the S3 bucket at the very start - that means that any event that occurs on the bucket will be published to the default event bus and can be reacted to with a rule.

As usual, we need an IAM role for the EventBridge role to assume. The only permissions it needs in this case is to start an execution of our state machine. The EventBridge rule itself is then relatively simple. We match on Object Created events from our bucket only, with an additional condition to ensure the event is coming from our account. The sole target of the event is the state machine, where we specify our input as a JSON object of bucket and key.

# cloudformation/state-machine.yaml

Resources:
  EventBridgeRuleRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "events.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Policies:
        - PolicyName: "root"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "states:StartExecution"
                Resource:
                  - !GetAtt "DataPipelineStateMachine.Arn"

  EventBridgeRule:
    Type: "AWS::Events::Rule"
    Properties:
      State: ENABLED
      EventPattern:
        source: 
          - aws.s3
        detail-type:
          - "Object Created"
        account:
          - !Ref "AWS::AccountId"
        detail:
          bucket:
            name:
              - !ImportValue "DataPlatformSourceBucketName"
      Targets:
        - Id: "RedshiftQueryStateMachine"
          RoleArn: !GetAtt "EventBridgeRuleRole.Arn"
          Arn: !GetAtt "DataPipelineStateMachine.Arn"
          InputTransformer:
            InputPathsMap:
              bucket: "$.detail.bucket.name"
              key: "$.detail.object.key"
            InputTemplate: >-
              {"bucket": <bucket>, "key": <key>}

Once that’s all deployed, you should be able to upload a CSV file (with a header) to the S3 bucket with the same columns as the table in Redshift and a short while later see the data in your table. Exciting!

Taking it to the next level

What we’ve been through here works - but if you’re looking to run a solution like this in production, there are a few areas to look at to make it battle ready.

Conclusion

In conclusion, we’ve covered here how to build a minimum viable solution that replicates the functionality that Snowpipe provides for Snowflake based data platforms. There’s some next steps that can be taken to make it production-ready, but at a foundational level our goal of being able to upload a file to S3 and have it automatically loaded into Redshift has been achieved.

All the source code for this article can be found in this repository on GitHub.