Setting up “some-what streaming” with AWS Kinesis Firehose, S3 and Snowflake

Stefan Frost
10 min readApr 2, 2021

--

So we want to extend our data ingestion to Snowflake, with some-what streaming data within Amazon AWS.

Some-what streaming? What is that?

We basically can’t achieve true streaming with this setup. I imply, that since we will not be able to achieve streaming data into Snowflake. Given the current state of Snowflake architecture, where there exist no true streaming interface and since AWS Kinesis Firehose isn’t really streaming either…

Ok so what am I going for then?

Snowflake offers a piece of technology named Snowpipe which can be triggered to load data into Snowflake tables based on file arrivals in AWS S3 buckets ASAP. My tests conclude that in reality ASAP seems to be between 20 to 200 seconds. So feeding data into Snowflake in small time slots makes the pattern some-what streaming-like. That is, after our data has made an S3 stopover.

Look at the architectural image provided above, It clearly shows that there are too many data points that touchdown on disk, in order for us to even consider it to be streaming. So this pattern is more or less trickle-feeding.

Snowpipe only credits your account for each processed file and for files queued (At this point in time Snowpipe charges 0.06 credits per 1000 files queued). So based on fact that we will have a trickle-feed pattern and on the way Snowpipe is billed; Means that I do not want it to read and write data all the time and certainly not every message as a single file. If I can’t achieve real streaming, why pay for it. So let’s build as small chunks we can for our trickle-feed pattern instead.

That’s why my choice for message transport became AWS Kinesis Firehose. Firehose is a reliable and easy-to-setup solution for recieving streaming data and microbulking it to something else. Setting up AWS Kinesis Firehose is a simple step by step process that can be automated into infrastructure as code in optional tool. There are however a few things to parameterize within this setup.

Setting up AWS Kinesis Firehose

Pick your data input. Choose and setup AWS Kinesis Data Stream as input if you have high volume throughput events. I choose PUT as input, since my scenario does not have high throughput.

Next step is to add some transformations (if needed). In my case this is not required since I really want untampered data in raw format transported into Snowflake. So I’ll bypass that page.

Third step is to define output/target/destination of data. In my case S3 buckets. In this guide I assume that S3 bucket is already setup in advance. If you haven't done that, please follow any of the +100K guides provided on the net :)

Add some kind of prefix to your bucket in order to keep data organized and clean. Best practice is to have a single delivery stream per source data type and name your data delivery stream and your S3 bucket prefix using the same naming convention. Furthermore, I like to organize data by (System, Source,) Data Stream, Version, Time of arrival. S3 adds time of arrival as /YYYY/MM/DD/HH by default using UTC-time which for me is exactly as I want it.

Next and final step is to setup the interval of data deliveries written from AWS Kinesis Firehose to S3. Here you can play around with buffering options. Whenever either of your buffering option hits the ceiling, a file will be written with all buffered content to your S3 destination as organized above.

So how should we configure buffering for our scenario?

According to Snowflake documentation the optimum file size for loading is 100–250MB per file (uncompressed). Snowflake documentation also states :

If it takes longer than one minute to accumulate MBs of data in your source application, consider creating a new (potentially smaller) data file once per minute. This approach typically leads to a good balance between cost (i.e. resources spent on Snowpipe queue management and the actual load) and performance (i.e. load latency).

In another section in the Snowflake documention it is stated that

The VARIANT data type imposes a 16 MB (compressed) size limit on individual rows.

In my case I want to receive Json or XML documents through this ingestion pattern and store it raw within a column using the Variant data type. So it seems to be a good idea to put a “Buffer size” limit of 16 MB. And since we want to “streaming like” but not continuously, a good option is to set “Buffer interval” to 60 seconds.

This setup gives us optimal loading into snowflake with new data refreshed every 1 minute with a total latency of maximum 2–3 minutes from source to Snowflake!

Optionally add encryption, and compression on your S3 bucket files. I prefer both.

That’s it. Firehose is ready. You can test it using some of the provided test tools within Amazon AWS. I however decided to write my own in python-script using boto3 and templates found in AWS documentation.

Generating and pushing data for test purpose

Consider my sample python program.

"""
Purpose

Shows how to use the AWS SDK for Python (Boto3) with the Amazon Kinesis Firehose API to generate a data stream.
"""

import datetime
import time
import json
import random
import boto3

# FIXED arguments
NUMBER_OF_ITERATIONS = 100
NUMBER_OF_MESSAGES_PER_ITERATION = 300
SLEEP_TIME_IN_SECONDS = 1
AWS_ACCESS_KEY_ID = "YOUR_AWS_ACCESS_KEY"
AWS_SECRET_ACCESS_KEY = "YOUR_AWS_ACCESS_SECRET"
AWS_REGION = "eu-west-1"
STREAM_NAME = "mydatastreamname"
#function creates fictional data, as Dict()
def get_data():
return {
'EVENT_TIME': datetime.datetime.now().isoformat(),
'TICKER': random.choice(
['AAA', 'BBB', 'CCC', 'DDD', 'EEE']
),
'PRICE': round(random.random() * 100, 2)
}

#function puts data on firehose delivery stream
def generate(
stream_name, kinesis_client
, iterations, messages_per_iteration, sleep
):
i = 0
m = 0
tic = time.perf_counter()
while i < iterations:
m = m + 1
data = get_data() #generate data
# puts data on queue
response = kinesis_client.put_record(
DeliveryStreamName=stream_name,
Record={
'Data': json.dumps(data)
}
)
if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
print('FAILURE: '+str(response))
if m == messages_per_iteration:
m = 0
toc = time.perf_counter()
i = i + 1
print(
f"{iterations-i} iterations to go. "
f"Wrote {messages_per_iteration} messages "
f"in {toc - tic:0.4f} seconds "
f"(velocity: "
f"{(messages_per_iteration/(toc - tic)):0.4f} m/s)"
)
time.sleep(sleep)
tic = time.perf_counter()

if __name__ == '__main__':

#BOTO3 connect using specified access credentials
# Swap named credentials to a credentials file.
firehoseclient = boto3.client(
'firehose',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)

#generate data and put messages on firehose
generate(
STREAM_NAME, firehoseclient
, NUMBER_OF_ITERATIONS, NUMBER_OF_MESSAGES_PER_ITERATION
, SLEEP_TIME_IN_SECONDS
)

This script connects to Amazon AWS and Kinesis Firehose, then pushes messages using a generator function to my selected delivery stream. I extended my sample script with sampling of data in iterations. Specify a selected amount of iterations with a selected amount of messages within each iteration. Start the program and let it run, files will be created every minute in your S3-bucket (since my generated messages will not hit the 16 MB buffer size limit within 1 minute).

So, We now have a pattern for receiving data and writing it in chunks of messages batched into files either 1 minute apart or with maximum 16 MB in size. Encrypted and compressed at rest on our selected S3 bucket. Awesome…

Repeat for all your data flows!

Ok, so data can be “chunked” into S3 using AWS Kinesis Firehose, so what? How do we continue the chain of events to Snowflake?

I’ve followed a this guide on Snowflake documentation, but we can narrow it down to 3 steps. Access, Copy Into and Notification.

Accessing S3 from Snowflake

Step 1 : Configure Snowflake to access S3 buckets.

So first, create an IAM User with access rights to the S3 bucket (or more) using this guide. Basically note down the ACCESS_KEY and ACCESS_SECRET and proceed. And yes, this step could and should be more complicated in order to achieve higher grade of security, but we’ll save that for another post.

I’ll proceed with the simplest approach by defining a Stage in my own database. Enter your database of choice using the Snowflake Web UI and enter the following SQL.

CREATE OR REPLACE stage mydatastreamnameingestion 
url = 's3://mys3bucket/mydatastreamname/v1/'
CREDENTIALS = (
AWS_KEY_ID = '**************'
AWS_SECRET_KEY = '**************'
);

Define a file format so that we can access the file instantly using SQL

CREATE OR REPLACE FILE FORMAT myjsonformat 
TYPE = 'json'
COMPRESSION = auto ;

Test

SELECT metadata$filename AS fileName,
$1 AS fileContent
FROM @mydatastreamnameingestion (file_format => myjsonformat) ;

If this works, your Snowflake installation is now setup to work with your Amazon AWS S3 Bucket !

Enabling Snowpipe

Step 2 : Configure Snowpipe

Snowpipe is a piece of techonolgy within Snowflake that keeps track of new data within a specific S3-bucket and loads it ASAP into Snowflake tables. It does other things as well to be honest, but this is what I want to utilize. The way it works is that S3 notifies Snowflake using a SQS queue which in turn trigger a COPY INTO statement. Please note that there is only one notification channel (SQS queue) per S3 bucket. This is an important fact since all notifications sent to the designated SQS-queue will initiate all Snowpipes associated regardless of defintion. There is more on that topic in Snowflake documentation.

One queue per bucket, works for me, so no show stopper yet…

Lets create a destination table and a Snowpipe definition

--Table
CREATE OR
REPLACE TABLE MYDB.MYSCHEMA.MYDATASTREAMTABLE
(
fileContent variant
);


--Snowpipe
CREATE OR
REPLACE PIPE MYDB.MYSCHEMA.MYDATASTREAMSNOWPIPE AUTO_INGEST = TRUE AS
COPY INTO MYDB.MYSCHEMA.MYDATASTREAMTABLE
FROM @mydatastreamnameingestion
FILE_FORMAT = (TYPE = JSON)
;

Then verify its running

SELECT system$pipe_status('MYDB.MYSCHEMA.MYDATASTREAMSNOWPIPE');

Copy the Json returned and notice “executionState” and “notificationChannelName”. Save the value for “notificationChannelName” somewhere, you will need it later. And check the value of “executionState”, the value should be “RUNNING”.

{
"executionState":"RUNNING"
"pendingFileCount":0,
"notificationChannelName":"arn:aws:sqs:eu-west-1:12345678901011:something-snowpipe-XSWDA4F4SDS2345SFSR3R-5b-09xAwWsalGASWcUj4xW",
"numOutstandingMessagesOnChannel":0,
"lastReceivedMessageTimestamp":"2021-03-18T12:50:31.91Z",
"lastForwardedMessageTimestamp":"2021-03-12T10:01:37.407Z"
}

Great, our pipe is listening, and we have data in our S3 bucket, yet we are missing Notifications on the notification channel (SQS queue), So in fact Snowpipe has nothing to do. Let’s address that.

Setup notifications

Step 3: Setup S3 to Snowflake notification.

Jump back to Amazon AWS Web Console and bring the value from “notificationChannelName” copied earlier. Browse to your S3 bucket and enter properties. Scroll down until you find “Event notifications”. Press “Create event notification” and follow the guide.

Give your event a name and type in the listening prefix. I choose to setup one event notification per source using prefix as specified below. However skipping the v1 used in the AWS Kinesis Firehose delivery stream setup above, meaning i can change the versioning in the delivery stream without having to change any notification event.

NOTE! Snowflake only has one SQS notification queue per S3 bucket and since S3 bucket event notifications cannot be overlapping and we can have no more than 100 events, you should really think about the overall structure before preceding.

Next step is to select the appropriate events. It is sufficient to listen to all “create events” in this scenario.

Last step is to select the event notification destination. In our case the provided Snowflake SQS queue, noted down earlier. Paste the “notificationChannelName”-value into SQS queue text input in the bottom of the page.

Great. Notifications are set! And our trickle-feed or streaming-like data pattern is completed.

Before finishing we should improve a few things…

Let’s address the over-simplified loading into Snowflake using a single column. A table loaded from each S3 Bucket file potencially splitted by row with no metadata what so ever. Let’s improve our solution a bit. Start by redefining the table within Snowflake, adding a table row id, the S3 bucket file name and the loading timestamp. Hence the pipe must also be redefined in order to add the requested metadata, and later reloaded.

CREATE OR REPLACE TABLE MYDB.MYSCHEMA.MYDATASTREAMTABLE 
(
rowId integer autoincrement start 1 increment 1,
fileName string,
fileContent variant,
loadedTs timestamp_tz default current_timestamp
);


CREATE OR REPLACE PIPE MYDB.MYSCHEMA.MYDATASTREAMSNOWPIPE AUTO_INGEST = TRUE AS
COPY INTO MYDB.MYSCHEMA.MYDATASTREAMTABLE (fileName, fileContent)
FROM (
SELECT metadata$filename, $1
FROM @mydatastreamnameingestion ( FILE_FORMAT => myjsonformat )
)
;

Note that there are a few differences in the Snowpipe definition. Actually within the COPY INTO statement. First and formost we now use named columns to load.

COPY INTO MYDB.MYSCHEMA.MYDATASTREAMTABLE (fileName, fileContent)

Which in turn requires a SELECT specified in the FROM clause, that has a ripple effect on the syntax in total.

  FROM (
SELECT metadata$filename, $1
FROM @mydatastreamnameingestion ( FILE_FORMAT => myjsonformat )
)
;

With a SELECT -statement we can access a few metadata fields and load them together with the data. I only find filename useful for now.

Please note that the SELECT enforces a different notation in the FILE_FORMAT, from “=” to “=>” followed by a predefined file format (which we created in the beginning of this guide , and which can be re-used now).

Done, let’s test it!

Try it out with the python script (or some other tool for that matter) and wait a few minutes before selecting from the table.

SELECT *
FROM MYDB.MYSCHEMA.MYDATASTREAMTABLE;

Although this guide took while to read and complete, it is still a very short time for building and end to end streaming-like solution, right?

--

--

Stefan Frost
Stefan Frost

Written by Stefan Frost

Data engineer, data architect and data platform automation entusiast. Building cloud based data platforms and tools, for improved productivity.

Responses (2)