Analytics Platform: An Evolution from Data Lake

Analytics Platform

Having built a Data Lake for your company’s analytical needs, there soon will arise new use cases, that cannot be easily covered with the Data Lake architecture I covered in previous posts, like Apache HAWQ™: Building an easily accessable Data Lake. You will need to adapt or enhance your architecture to become more flexible. One way to make this flexibility happens, is to transform your Data Lake into an Analytics Platform.

Definition of an Analytics Platform

An Analytics Platform is a platform that provides all kind of services needed for building data products. This often exceeds the functionality of a pure RDBMS or even a Data Lake based on Apache HAWQ™. There are data products that have more requirements than a SQL inferface. Reporting and basic analysis are addressed by this setup, but products dealing with predictions or recommendations often have different needs. An Analytics Platform provides flexibility in the tools used. There can be, for example, a Apache HAWQ™ setup and at the same time an environment for running Tensorflow applications.

Using existing parts: Multi-colored YARN

When you are running a Hadoop Cluster™, you are already familiar with a resource manager. This manager is YARN. With YARN you can already deploy Linux Containers, and support for Docker containers has already progressed pretty far (YARN-3611). Building complex applications and managing them with YARN is called Multi-colored YARN by Hortonworks.
Following through on this idea you will have a cluster with just some central services installed directly on bare metal. You will deploy the rest in containers, as shown in the images below.

Analytics Platform
Analytics Platform based on YARN and Docker

The example makes use of Kubernetes and Docker for virtualization and provides the following services on bare meta, since they are needed by most applications:

  • Ambari
  • Kuberneted
  • YARN
  • ZooKeeper
  • HDFS

Especially the HDFS is important as a central services. This makes it possible for all applications to access the some data. The Picture above shows, that there can be several instances of a Hadoop distribution. This is possible even in different version. So the platform allows for multi tenancy, while all instances are still processing the same data.

Development changes

Having an Analytics Platform makes the development of data products easier. There always was the problem of developing a product on a sample of the data, when you used development and staging systems, as decribed by me here. In same cases these did not contain all possibly combinations of data. This could result in error after a deployemnt on the production environment. Even going through all development and staging could not change this. This new approach allows you to deploy all three systems on the same data. So there you can account for all data induced errors on the development and staging systems already.
You can even become more agile in your development process. The picture below shows an example deployment process, that uses this system.

Analytics Platform: Deployment Process

Conclussion

Moving from a pure Data Lake to an Analytics Platform give you are flexibility and helps in the development of data products. Especially since you can develop on the same data as is available on production. Of course it brings more complexity to an already complex environment. But since it is possible to keep YARN as resource manager and move to a more agile way of development and deployment, it might be worth considering. Once Multi-Colored YARN is finished, it will be easier to make this happen.

Building a Productive Data Lake: How to keep three systems in sync

Three Systems for save Development

When you are building a productive Data Lake it is important to have at least three environments:

  • Development: for development, where “everything” is allowed.
  • Staging: for testing changes in a production like environment.
  • Production: Running your tested and productive data applications

With these different environments comes the need to keep data structures in sync. You need to find a way to change each system in a controlled way, when it is time to deploy some changes. There are differences in how to manage data structures, depending on where in data processing they occur. Theses differences we split into the parts “Data Entry to Staging” and “Core and Presentation”. I will dive deeper into these two in the following.

Productive Data Lake: Data Structure Management
Data Structure Management

Senior Big Data Engineer
@Zentech Consulting (2017-12-17)
Big Data Engineers / Architect
@Pyramid Consulting (2017-12-01)

Managing different data structures in the three systems of your Productive Data Lake

As all three systems can be have different data structures, using tools to make this easier is important. Different Apache Avro™ schema versions are managed by a publishing them to a tool called Apache Avro™ Schema Service of which there also also three instances. By publishing changes first on the development, then staging, then production system, we make sure, only tested data structures are deployed to the Production Data Lake. Each change to a schema belongs to a version number, so rolling back to an earlier state, in case of an error is possible.
Evolving our data structures not contained in Apache Avro™ schemas, we use tables managing all meta information.

Data Structures from Entry to Staging

Apache Avro™ is a good way to manage evolving data structures, as I explained here. Once the data is available in Apache Hive™ mapping the AVRO data types to those of your Productive Data Lake system is necessary. We use Apache HAWQ™ as base of our Data Lake. So we map the AVRO data types to HAWQ data types. To keep this as effortless as possible, we reuse attribute names as often as possible. If an attribute, like “user_id” is occuring in different schemas and has the same meaning, then we just reuse it. So the mapping table does not contain an entry for each column of each schema, but only for distinct ones. Considering more than 100 schema with on average 10 attributes, this helps in keeping maintanence down. In the table below is an example, of this mapping.

Schema FieldnameAVRO datatypeHAWQ datatype
user_idstringvarchar(100)
transaction_idstringvarchar(100)
parametersmaptext
request_timelongbigint
session_idstringtext
list_of_somethingarraytext

Data Strucutes for Core and Presentation

A totally different challenge is it, to manage the data structures after data entry. There need to occur transformations, for technical and business user benefit. This can mean other attributes / column names or a splitting or joining of data from one schema into different tables. These tables here again can evolve. Evolving database tables is a pain, since you cannot, in most cases just drop the table and recreate the new one. Here the processing power of Apache HAWQ™ comes into play, as we can exactly do that now. In most cases, at least for transactional data, we just reload the whole table, during the daily batch process. So we manage all table definitions in tables. They contain all table and column attributes, as well as a version number. This makes rolling back possible.

Example of table definitions:

Schema NameTable NamePartitioneddistributed bydropableVersion
coredim_useruser_idtrue3.0
corerel_user_purchaseuser_id, purchase_idtrue4.0
corefact_user_registrationuser_idtrue4.2
corefact_usage_pagetime_value_requestfalse5.0

Exmaple of column definitions:

Schema NameTable NameColumn NameData TypeData Type LengthVersion
coredim_useruser_idvarchar1003.0
coredim_useruser_typevarchar503.0
corerel_user_purchaseuser_idvarchar1004.0
corerel_user_purchasepurchase_idvarchar1004.0

Automation of data structure deployment.

After defining all the meta data concerning the data structures, we need a way to automate this. For this I created a Java programm using all the information from schema definitions and tables for data type mappings and table definitions. This programm runs on any of the three environments and also take in consideration the version it should create. This only works, if no one can create or change table information manually. So the programm first deploys changes to the data structure on the development system. If it works there and all changes are ready to be deployed on the staging system, run the programm in this environment. With the version numbers defined besides each change, it is possible to run version 5.0 on production, version 5.1 on staging and version 5.2 on development if neccessary.

Conclussion

This way is a possibility to make data structure evolution easier and keep three different systems in sync. It is most important to make sure, that there are no manual changes to the data structure. This approach works nicely, if you can drop and recreate all tables without hesitation. If that is not the case, you can still define your tables this way, just make sure they are not dropped automatically when the programm runs. Having all definitions in one place and being able to automatically deploy the tables and the changes, with version control, can be a great help. This way no one has to remember to create new tables manually in either system. So errors due to missing tables or columns are minimized which is good if you create a Productive Data Lake, where errors affect data products needed by other products of your company.

Apache AVRO Schema evolution and Apache Hive

Apache AVRO&#153 for schema evolutionA perfect fit

Apache Avro™ and Apache Hive™ can go hand into glove, when it comes to using schema evolution. Accessing data stored in Apache Avro™ with Apache Hive™ works pretty straight forward, since Hive comes with a built-in (de-)serializer for this format. You just need to specify the format of the table as shown below.

CREATE TABLE schema_evo
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES (
    'avro.schema.url'='http://url_to_schema_evo/schema_evo.avsc');

As you see, there is no need to even specify the column names, as they will be derived from the schema. The above example uses an URL for the schema definition, but you could just as easily define your schema explicitly in the create table statement. This is done by using the following options in the statement:

TBLPROPERTIES ('avro.schema.literal'='{
  "namespace": "com.example",
  "name": "schema_evo",
  "type": "record",
  "fields": [
    {
      "name":"id",
      "type":"int",
      "doc":"unique identifier of row"
    },
    {
      "name":"name",
      "type":"string",
      "doc":"name of field entry"
    },
    {
      "name":"description",
      "type":"string",
      "doc":"description of field entry"
    }
  ]
}');

For the purpose of using schema evolution in Apache AVRO™ I would disencourage this, since then you will need to adapt your tables each time you change the schema.

Making use of Schema Evolution to handle Data

After you set up the tables in Apache Hive™ and already stored some data, you do realize, that some information is missing. Since changing the schema and still be able to process the files before the change is a feature of Apache AVRO™, this seems to be no major issue. Hive does support those changes. To make them compatible with your old data in accessed by Hive, the new fields need to be optional. Optional fields are defined by saying type should be [“null”, “datatype of field”].

{
"name": "optional",
"type": ["null", "string"],
"doc": "optional field"
}

To make that field usable in Apache Hive™, you need to supply a default value. This changes the field definition.

{
"name": "optional",
"type": ["null", "string"],
"default": null,
"doc": "optional field"
}

I intentionally made the default value null. Then the new field contains NULL values when queried in Apache Hive™. Different approaches for the optional field proved unsatisfying for me, when handling the data and processing it further. A filled default value also would need further explaination for other people using the data. The complete schema then looks like this:

{
  "namespace": "com.example",
  "name": "schema_evo",
  "type": "record",
  "fields": [
    {
      "name":"id",
      "type":"int",
      "doc":"unique identifier of row"
    },
    {
      "name":"name",
      "type":"string",
      "doc":"name of field entry"
    },
    {
      "name":"description",
      "type":"string",
      "doc":"description of field entry"
    },
    {
      "name": "optional",
      "type": ["null", "string"],
      "default": null,
      "doc": "optional field"
    }
  ]
}

Apache HAWQ: Using HAWQ gpfdist for small tables

HAWQ gpfdistWhat about small data?

Once Apache HAWQ is installed and running on your HDFS, there is the question of “How do we effectively store small data?”. This is important, since the minimal proposed blocksize for Apache HAWQ is 128MB. Saving small dimensional or mapping data of just a few kilobytes seems like a waste here. This is especially the case, since you propably need a lot of those for your ELT process. This is where HAWQ gpfdist comes in.

HAWQ gpfdist: What is it?

HAWQ gpfdist is a file server for Apache HAWQ. It was orginally used in Pivotal Greenplum to load data parallely into the database. Hence the name Greenplum Parallel File Server, in short gpfdist. The main intention of it is, to write or read data from an external system. This results in the drawback, that data cannot be updated.

HAWQ gpfdist: How to use it?

Install gpfdist on a seperate server. It mainly is a server written in Java, that can be used inside of Apache HAWQ to access external data. Since it is running on a normal filesystem and not HDFS, blocksize does not matter here as much and it seems a good workaround to save small files for use in HAWQ.
Since the original purpose was to read or write file from or to external systems, you need to have two table for each meta- or dimensiondate file. Once to write into the file and one to read from the file. Here is an example:

Writable table

drop external table if EXISTS schema.gpfdist_dimension_write;
CREATE writable EXTERNAL TABLE schema.gpfdist_dimension_write
(
  id int,
  value_name varchar(100),
  value_description varchar(500)
)
 LOCATION (
'gpfdist://gpfdist.server.com:8081/gpfdist_dimension.csv'
)
FORMAT 'CSV' ;

Readable table

drop external table if exists schema.gpfdist_dimension_read;
CREATE readable EXTERNAL TABLE schema.gpfdist_dimension_read
(
  id int,
  value_name varchar(100),
  value_description varchar(500)
)
 LOCATION (
'gpfdist://gpfdist.server.com:8081/gpfdist_dimension.csv'
)
FORMAT 'CSV' ;

Now you can access the file gpfdist_dimension.csv through SQL in your HAWQ instance and query it like a normal table. CSV is not the only file format supported. You can also use:

  • Plain text with any delimiter: ‘TEXT’ (DELIMITER ‘~’)
  • fixed width files: FORMAT ‘CUSTOM’ (formatter=fixedwidth_in, id=10, value_name=100, value_description=200)
  • csv with headers: FORMAT ‘CSV’ (HEADER)

Conclussion

If you use gpfdist this way, you can access small data in you Apache HAWQ Data Lake. This comes with drawback, that tables are not updateable, but dimensional- or mapping data should not change that often. To add rows to a table, you can use normal insert commands on the created writable table. So there is flexibility in this solution. Dimensional data does not change and if it does, it is best to keep a history of what was before. So the need to delete rows should not arise. Just add valid dates to your rows and this drawback is not one any more.

Apache AVRO: Data format for evolution of data

Apache AVROFlexible Data Format: Apache AVRO

Apache AVRO is a data serialization format. It comes with an data definition format that is easy to understand. With the possibility to add optional fields there is a solution for evolution of the schemas for the data.

Defining a Schema

Defining a schema in Apache AVRO is quite easy, since it is a JSON object in the form of:

{ 
"type": "typename",
"name": "name of field",
"doc": "documentation of field"
}

A schema does consists of:

  • a namespace
  • name
  • type = record
  • documentation
  • fields

A typical schema would look like this:

{
	"namespace": "info.datascientists.avro",
	"type": "record",
	"name": "RandomData",
	"doc": "data sets contain random data.",
	"fields": [
		{
			"name": "long_field",
			"type": "long",
			"doc": "a field containg a number"
		},
		{
			"name": "string_field",
			"type": "string",
			"doc": "a field containing a string"
		}
      ]
}

Changing a schema downwards compatible

If you now want to add a new field and stay compatible to your existing schema, you can just do the following:

{
	"namespace": "info.datascientists.avro",
	"type": "record",
	"name": "RandomData",
	"doc": "data sets contain random data.",
	"fields": [
		{
			"name": "long_field",
			"type": "long",
			"doc": "a field containg a number"
		},
		{
			"name": "string_field",
			"type": "string",
			"doc": "a field containing a string"
		},
                {
                       "name": "optional_new_field",
                       "type": ["null", "string"],
                       "default": "New Field",
                       "doc": "This is a new options field"
                }
      ]
}

This change is still compatible to the version above. It is marked as options by [“null”, “string”] in the type field. The attribute default will fill this field with the value documented here, if the field is not existing in the data.

Serializing data using the schema

Once the schema is defined, it can be used to serialize the data. This serialization also serves as a compression format of about 30% in comparison to normal text. Serializazion is possible in a wide range of programming languages, but it is best implemented in Java. A tutorial on how to serialize data using a schema can be found here.

Reading the data with Apache Hive

Data stored in Apache AVRO is easily accessible if read by Hive external tables. The data is autmatically deserialized and made human readable. Apache Hive even supports default values on schema changes. If new fields are added with a default value, the Hive table can read all versions of the schema in the same table.