2017-02-16


Preface

I am sure we all have been into situations that involve extracting data from various data sources like these:

an SFTP server

an HTTP endpoint

a webhook or websocket

a constant stream of events like Twitter streams, Kafka, or IoT-based sensors

Then transforming the data from almost any format, including CSV, XML, JSON, and plain logs.

And last but not the least, loading this into any of the following data stores for analysis and long-term storage/archival:

a Big data store, like HDFS or HBASE

a stream data store, like Kafka or kinsesis

an AWS services, like S3 or DynamoDB

relational or JDBC-backed databases, like AWS RDS/Redshift or MySQL

Here’s a visualization of the problem we’re talking about:



Source: www.linuxfoundation.org

Wow! I know what’s shown above has a lot of requirements to start with, so let’s take a specific use case and see how Apache NiFi can help us solve this. But before that, let’s quickly try and understand what is Apache NiFi after all.

Introduction

NiFi (short for “Niagara Files”) is a powerful enterprise-grade dataflow tool that can collect, route, enrich, transform, and process data in a scalable and reliable manner.

NiFi was developed by the National Security Agency (NSA) over 8 years, and is now a Top Level Apache Project that is strongly backed by HortonWorks. NiFi is based on the concepts of flow-based programming (FBP).

Essentially, Apache NiFi is a comprehensive platform that is:

For data acquisition, transportation, and guaranteed data delivery

For data-based event processing with buffering and prioritized queuing

Designed to accommodate highly diverse and complicated dataflows

A visual interface for configuration and control

Take a look at the following conceptual diagram to get a feel for NiFi at a high level:



Source: kisstechdocs.wordpress.com

Now let’s get down to business, shall we?

Use Case

The problem in our case was to be able to generically ingest data from the following sources and transform/store them in a generic JSON format for further storage and analysis:

Incoming survey data in CSV, XML, and REST API that is pulled and transformed into a canonical and specific data format

Data to be parked on S3 for both long term and intermittent needs

Data to be ingested to a NoSQL solution like DynamoDB for further analysis and application needs

The following is the approach that we took to address the above concerns, which we did by dividing the problem into 3 parts and solving them one at a time.

Data ingestion

We needed a tool that could help us fetch incoming CSV and XML data from our SFTP server as it arrived. Furthermore, other JSON data had to pulled from a REST API endpoint via a pull mechanism that can be scheduled at specific time interval, or via a listener onto which a webhook can send a payload at a regular interval.

Data Transformation and Storage

The data in CSV, XML, and JSON needs to be transformed into a generic data format for later data analysis needs. Also, the data needs to be transformed into DynamoDB JSON format to be stored on S3 for further processing.

Data Processing and Analysis

Data stored on S3 needs to be loaded into DynamoDB, where it will be stored. Also, the data needs to be processed at a control rate to be able to match to DynamoDB’s Write Capacity Units (WCU).

Now, before we dive into the thick of things, I’d like to take a quick moment to address the core features of NiFi along with some Visuals, since we will be using these in our implementation.

Flow File

Unit of data moving through the system

Content + Attributes (key/value pairs)

Flow File Processor

Performs the work, can access FlowFiles

Connections

Links between processors

Queues that can be dynamically prioritized

Flow Controller

Set of processors and their connections

Receive data via input ports, send data via output ports

Solution

I was in two minds as to whether a non-native Tech Tool like Apache NiFi can be easily leveraged in our environment, which is primarily on AWS. With the announcement of AWS Glue during the December 2016 re:Invent, things became even more exciting. But since Apache NiFi is an already well-established top level project at Apache, its production grade is high and it has a solid open source community support by HortonWorks and BatchIQ. All of this made it good choice to start with it as a proof of concept.

Let’s start with a high level diagram to visualize the solution and then follow that with NiFi processors used for ingestion, transformation, and loading/storing data.

High-level diagram

Below is a simplified high level diagram to showcase the data extraction from the SFTP server and the REST API, the interim storage on S3, and finally the storage into DynamoDB.

NiFi Processors

Ingestion Processors: ListSFTP and FetchSFTP processors were used to extract the CSV/XML data files from our SFTP server, which is an EC2 instance on AWS. It will route the inbound file to next process based on the format.

Similarly, the InvokeHTTP processor can be used to extract data from the REST API, i.e. HTTP XML/JSON.

ListSFTP: This will listen to the incoming files at the specified “Remote Path” and server (as shown below), which in our case was an EC2 instance we used as our SFTP file server. The best part of this processor is that it will fetch a file only once based on its file stamp.

FetchSFTP: This processor will receive the FlowFile from ListSFTP based on the “Remote File” attribute value set to ‘${path}/${filename}’ (mandatory). Furthermore, if needed, the processed file can be moved to an alternate directory as specified in the “Move Destination Directory” category.

InvokeHTTP: This is used to pull data from a REST API URL as specified in “Remote URL.” Make sure to specify the “Content-Type” to “application/json” if you are expecting JSON, otherwise it will default to “application/octet-stream.” Also, you may specify the values sent in the header in “Attributes to send” and the authentication mechanism in “Use Digest Authentication.”

Transformation Processors: We leveraged TransformXML, SplitJSON, and JoltTransformJSON processors to converts incoming files (CSV/XML) to the respective JSON format, and then transformed it into our Raw data schema and DynamoDB-specific schema.

TransformXML: This processor helps in converting incoming XML to the respective JSON structure, provided we specify an XSLT for the same in “XSLT File name,” as shown below:

SplitJSON: This will split the incoming JSON from TransformXML, based on the attribute specified in “JonPath Expression.” An example would be based on each “Content” Map below, and should match the JSON structure as well:

JoltTransformJSON: This is by far the most interesting of the processors that are there in NiFi. This helps to transform an incoming JSON to an altogether  different structure/schema. It is based on the open source library/framework JOLT. Below is an example SPEC, which again is written in a JSON called DSL (Domain Specific Language).

Load Processors: I leveraged the MergeContent, ControlRate, PutS3Object, and PutDynamoDB processors to aggregate, demarcate, and store the transformed JSON data into S3 for archival, and then further stored it onto DynamoDB.

MergeContent: This processor aggregates the various split files from SplitJSON into one output file. The important attributes to set are “Min No. of Entries,” “Max No. of Entries,” and “Max No. of Bins.”

PutS3Object: This processor will load the merged file into S3 at the specified bucket with the same filename as the original incoming file. Also, be sure to choose the Region where your bucket is located accurately, along with the Access/Secret Keys respectively. You may update the filename with the UpdateAttribute processor with ${filename:replace(‘xml’, ‘json’)}, respectively.

ControlRate: This processor controls the rate at which MergeContent spits data out to PutDynamoDB to regulate the write to DynamoDB based on the WCU.

PutDynamoDB: This processor will actually load the data into DynamoDB based on the table, hash keys, and range keys/values, respectively.

Production deployment considerations for Apache NiFi

Performance

We should ideally start with a t2.medium instance for initial performance observations and benchmarking, since these are good choices for low-cost experimentation and POC building for NiFi.

Choosing an instance type for NiFi is complicated because it involves all of the below:

AWS’s bewildering array of instance types, many of which are “optimized” for compute, memory, IO, etc. All of these attributes may be essential.

General requirements of Apache NiFi.

Unique requirements of our NiFi flow, which we will learn from experience.

Last but not least, a budget.

Apache NiFi is typically disk IO intensive, and we should prefer instance types with an EBS-optimized disk IO. Also, NiFi is frequently performing network-intensive work that can benefit from AWS’s Enhanced-Networking instance types. Instance types that include Enhanced Networking by default include the M4 general-purpose family.

There are also other performance considerations. Below are the NiFi properties that have an impact on performance:

nifi.bored.yield.duration=10 millis – This property is designed to help with CPU utilization by preventing processors that are using the timer driven scheduling strategy from using excessive CPU when there is no work to do.

nifi.queue.swap.threshold=20000 – If the number of total FlowFiles in any one-connection queue exceeds this value, swapping will occur and performance can be affected.

nifi.provenance.repository.index.threads=1 – For flows that operate on a very high number of FlowFiles, the indexing of Provenance events could become a bottleneck. If this happens, increasing the value of this property may increase the rate at which the Provenance Repository is able to process these records, resulting in better overall throughput.

nifi.provenance.repository.index.shard.size=500 MB – Large values for the shard size will result in more Java heap usage when searching the Provenance Repository, but it should provide better performance. The default value is 500 MB.

Bootstrap.conf – The following properties are the most significant for performance as far as JVM is concerned, since NiFi is Java-based:

– java.arg.2=-Xms<size_in_MB/GB>

– java.arg.3=-Xmx<size_in_MB/GB>

NOTE: Apache NiFi 1.x and later requires more than 1 Gigabyte of RAM to start up, and can easily use 2 Gigabytes for a simple flow, therefore it’s not feasible to run NiFi 1.x on a micro instance. A t2-small is the most inexpensive instance type for running an experimental NiFi. A t2-medium is an economical starter instance type for a modest production flow.

Reliability

Starting small with a single node deployment is a sensible choice; however, Apache NiFi 1.x Zero-Master Clustering has been introduced, which eliminates a single point of failure (in earlier versions) and helps in simple setting up of a 3 node (or more) cluster leveraging Apache ZooKeeper under the hood. Take a quick look at the following diagram to get an idea of this:

A few other tips for this:

Do regular backups and enable termination protection and daily snapshot.

Enable instance status check and CPU usage (75%) for the nodes on which Apache NiFi is being deployed.

There are two 2 important databases used by NiFi:

User DB (keeps track of user logins when the NiFi is secured) and

History DB (keeps track of all changes made on the graph) that stay relatively small and require very little hard drive space.

The default installation path of <root-level-nifi-dir>/database_repository would result in the directory being created at the root level of our NiFi installation (same level as conf, bin, lib, etc. directories). It is recommended to move all repositories to a location outside of the NiFi install directories which can simplify upgrading, thus allowing us to retain the user and component history information after upgrading.

Other NiFi repositories – FlowFile Repository, Content Repository, and Provenance Repository. All of these should ideally be placed outside of the install directory for future scalability options. Furthermore, these can be moved onto a separate disk (high performance RAID preferably) like that of EBS IOPS optimized instances.

Security

The Apache NiFi instance should be deployed on an EC2 instance under the appropriate security group. Additionally, only the following ports should be enabled for specific IP(s) for inbound and outbound.

Furthermore, access to the Apache NiFi Web UI should be secured with SSL/TLS-based access, which involves the following steps:

Creating and installing a user certificate

Setting up the server’s KeyStore

Setting up the server’s TrustStore

Installing the user certificate into the TrustStore

Configuring authorization for the user

NOTE: The exhaustive steps for security configuration are beyond the scope/need of this blog, but for the interested souls please refer to the official NiFi System Admin Guide: https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#security-configuration

Real-Time Monitoring and Debugging

The best part about NiFi is Data Provenance, which is essentially “live” monitoring and also “debugging” of the production workflows we’ve set up. Trust me, there will be absolutely no need for us to even look at logs. Yes, you heard it right! Let’s see quickly how we can achieve this.

Below is what NiFi’s Data Provenance looks like, which primarily helps us to:

1.Record, index, and make events available for display.

2. Visualize the lineage or flow of NiFi events to and from the processors in question.

3. View details, attributes, and content at given points in time (before and after each processor).

4. And last but not the least, able to view/download and replay the last execution again, in cases where an error might have occurred. An example of this in our case would be if the incoming XML is not in a valid format, or the schema itself is not valid, or any other processor configuration. We can correct the format and then replay from that point on. Isn’t that cool?

Wrapping Up

For me personally, NiFi has solved the age-old problem of enterprise integration via an automated and managed data ingestion and propagation workflow, for both real-time and batch-based data processing. Apart from that, I see a fascinating analogy in NiFi Processors with that of Microservices, which are supposed to do one thing and do it well (remember SRP?). I also see this with processing/queuing mechanisms as an amalgamation of a message broker (like Kafka sans pre-defined protocol/format), along with micro-batching (like Spark sans the RDD/Dataset), yet it’s unlike any one of these and does not replace anyone; instead, it works alongside them seamlessly.

The overall experience of using NiFi has been fulfilling, especially in collaboration with and with help from Bryan Bende and Matt Burgess from Hortonworks, James Wing from BatchIQ, and Milo Simpson, the creator of JOLT.

Finally, before you move onto other important things, don’t forget to take a look at this link, which showcases a creative and fun way to leverage Apache NiFi to process live tweets along with AWS’s IoT.

I hope this blog was informative and useful, and I appreciate your time and look forward to your feedback and queries alike.

The post Hi(gh) Fi(ve) with Apache NiFi appeared on 3Pillar Global.

Show more