Tuesday, March 29, 2016

AWS Kinesis Firehose

What is Kinesis Firehose?

Kinesis Firehose accepts streaming data and logs the data into Amazon S3 and optionally into a Amazon Redshift table. Each data entry is a json object (name-value pairs). The data stored in S3 is a series of these objects. When uploading the data to Amazon Redshift, Firehose uses the names  to map data to Redshift table columns (names must be all lowercase since Redshift table column names are always lowercase).

Kinesis Firehose is built on Kinesis Streams. It adds the following benefits.
  • Firehose manages the underlying Kinesis Streams and the scaling thereof.
  • With mere configuration, the data can be logged into S3 and into Redshift. That is, no code needs to be written to get the data off the Kinesis Stream and put it in S3 and/Redshift.
Kinesis Firehose has the following disadvantages.
  • The underlying Kinesis stream is hidden from us so we can't write an additional Client (Kinesis Client Library) to read the stream and possibly publish to another destination such as a dashboard.
  • Although each data entry added to Kinesis Firehose Stream may compose of differing data fields, practically speaking each Kinesis Firehose Stream is really useful when dealing with a single type of data. Otherwise, how else would you map the data to columns in a Amazon Redshift table? In other words, each delivery stream loads into a single Redshift Cluster, and table. However, over time additional columns can be added to this data type and automatically mapped to expanded definition of the same Amazon Redshift table (because the data entry rows are stored as json objects). 

Setup

1) Create Kinesis Firehose Delivery Stream. This is where you specify where and how often you want to log data. This can be done from the AWS Console or through Firehose management APIs.

Useful Redshift COPY options
json 'auto' gzip timeformat 'auto' dateformat 'auto' truncatecolumns trimblanks blanksasnull emptyasnull acceptinvchars

2) Write a client or endpoint that places data into the Kinesis Stream. Each data record (also called data blob) can be a maximum of 1,000 KB. API: PutRecord (1MB) or PutRecordBatch (500 records, up to 5 MB total).

Note: There is special Java program called Kinesis Agent that can be deployed to running instance. The agent reads data logged to a file and submits to a Kinesis delivery stream per its configuration.

Data deposited in S3

  • Data is logged in the specified S3 bucket organized by date time. A top level folder can be pre-pended (example, mylog or mylog/, the second one will create a folder).
  • The S3 object names contain a version after the date time. The version corresponds to an increasing integer value (the value is incremented every time the delivery stream config is updated).
  • Data can be gzipped (only compression supported by Redshift COPY).

Data COPY to Redshift

  • Periodically copied from S3 to Redshift using COPY command. Uses manifest files to ensure proper COPY. A manifests folder is created that holds the manifest files.
  • An errors folder is created for failed COPY commands (contains manifest files containing the failed data files).
  • Retries: Try every 5 minutes for 60 minutes. After 60 minutes, skips the current batch and adds manifests to the errors folder. This data must then be manually COPY to Redshift table.

Troubleshooting S3 put

Reference: http://docs.aws.amazon.com/firehose/latest/dev/troubleshooting.html

1. Check data delivery metrics (see below). Is the data flowing?

2. Check the S3 bucket exists.

3. Check Firehose IAM role. Does it have permission to write to S3.

Troubleshooting COPY to Redshift

When COPY fails, Firehose retries every 5 minutes for 1 hour. After that it writes manifests to the "error" folder in S3. While this happening, all other traffic blocked!

1. Check configuration of Redshift config in Firehose

2. Check tracking tables in Redshift.

select * from stl_connection_log where remotehost like '52%' order by recordtime desc;

select * from stl_query order by endtime desc limit 100;

select tablename,
   HAS_TABLE_PRIVILEGE(tablename, 'select') as select,
   HAS_TABLE_PRIVILEGE(tablename, 'insert') as insert,
   HAS_TABLE_PRIVILEGE(tablename, 'update') as update,
   HAS_TABLE_PRIVILEGE(tablename, 'delete') as delete,
   HAS_TABLE_PRIVILEGE(tablename, 'references') as references
from pg_tables where schemaname='public' order by tablename;


select * from stl_load_errors  order by starttime desc;
select * from stl_error where userid!=0 order by recordtime desc;

Monitor Dataselect * from stl_query order by endtime desc limit 10; Delivery Metrics

Kinesis Firehose publishes CloudWatch metrics. The following are particularly useful.

Incoming.Records: Is the data being received in the delivery stream.

DeliveryToS3.Records: Records delivered to S3.

DeliveryToS3.DataFreshness: Maximum age in seconds of the oldest undelivered record in Firehose. Any data older than this has been delivered to S3.

DeliveryToRedshift.Records: Number of records copied to Redshift.

DeliveryToRedshift.Success: Sum  of Redshift COPY commands over sum of all COPY commands.