Building a Productive Data Lake: How to keep three systems in sync

Three Systems for save Development

When you are building a productive Data Lake it is important to have at least three environments:

  • Development: for development, where “everything” is allowed.
  • Staging: for testing changes in a production like environment.
  • Production: Running your tested and productive data applications

With these different environments comes the need to keep data structures in sync. You need to find a way to change each system in a controlled way, when it is time to deploy some changes. There are differences in how to manage data structures, depending on where in data processing they occur. Theses differences we split into the parts “Data Entry to Staging” and “Core and Presentation”. I will dive deeper into these two in the following.

Productive Data Lake: Data Structure Management
Data Structure Management

Big Data Solutions Architect
@Karsun Solutions LLC (2017-06-28)
Big Data Architect, Mid
@Booz Allen Hamilton (2017-06-20)

Managing different data structures in the three systems of your Productive Data Lake

As all three systems can be have different data structures, using tools to make this easier is important. Different Apache Avro™ schema versions are managed by a publishing them to a tool called Apache Avro™ Schema Service of which there also also three instances. By publishing changes first on the development, then staging, then production system, we make sure, only tested data structures are deployed to the Production Data Lake. Each change to a schema belongs to a version number, so rolling back to an earlier state, in case of an error is possible.
Evolving our data structures not contained in Apache Avro™ schemas, we use tables managing all meta information.

Data Structures from Entry to Staging

Apache Avro™ is a good way to manage evolving data structures, as I explained here. Once the data is available in Apache Hive™ mapping the AVRO data types to those of your Productive Data Lake system is necessary. We use Apache HAWQ™ as base of our Data Lake. So we map the AVRO data types to HAWQ data types. To keep this as effortless as possible, we reuse attribute names as often as possible. If an attribute, like “user_id” is occuring in different schemas and has the same meaning, then we just reuse it. So the mapping table does not contain an entry for each column of each schema, but only for distinct ones. Considering more than 100 schema with on average 10 attributes, this helps in keeping maintanence down. In the table below is an example, of this mapping.

Schema FieldnameAVRO datatypeHAWQ datatype
user_idstringvarchar(100)
transaction_idstringvarchar(100)
parametersmaptext
request_timelongbigint
session_idstringtext
list_of_somethingarraytext

Data Strucutes for Core and Presentation

A totally different challenge is it, to manage the data structures after data entry. There need to occur transformations, for technical and business user benefit. This can mean other attributes / column names or a splitting or joining of data from one schema into different tables. These tables here again can evolve. Evolving database tables is a pain, since you cannot, in most cases just drop the table and recreate the new one. Here the processing power of Apache HAWQ™ comes into play, as we can exactly do that now. In most cases, at least for transactional data, we just reload the whole table, during the daily batch process. So we manage all table definitions in tables. They contain all table and column attributes, as well as a version number. This makes rolling back possible.

Example of table definitions:

Schema NameTable NamePartitioneddistributed bydropableVersion
coredim_useruser_idtrue3.0
corerel_user_purchaseuser_id, purchase_idtrue4.0
corefact_user_registrationuser_idtrue4.2
corefact_usage_pagetime_value_requestfalse5.0

Exmaple of column definitions:

Schema NameTable NameColumn NameData TypeData Type LengthVersion
coredim_useruser_idvarchar1003.0
coredim_useruser_typevarchar503.0
corerel_user_purchaseuser_idvarchar1004.0
corerel_user_purchasepurchase_idvarchar1004.0

Automation of data structure deployment.

After defining all the meta data concerning the data structures, we need a way to automate this. For this I created a Java programm using all the information from schema definitions and tables for data type mappings and table definitions. This programm runs on any of the three environments and also take in consideration the version it should create. This only works, if no one can create or change table information manually. So the programm first deploys changes to the data structure on the development system. If it works there and all changes are ready to be deployed on the staging system, run the programm in this environment. With the version numbers defined besides each change, it is possible to run version 5.0 on production, version 5.1 on staging and version 5.2 on development if neccessary.

Conclussion

This way is a possibility to make data structure evolution easier and keep three different systems in sync. It is most important to make sure, that there are no manual changes to the data structure. This approach works nicely, if you can drop and recreate all tables without hesitation. If that is not the case, you can still define your tables this way, just make sure they are not dropped automatically when the programm runs. Having all definitions in one place and being able to automatically deploy the tables and the changes, with version control, can be a great help. This way no one has to remember to create new tables manually in either system. So errors due to missing tables or columns are minimized which is good if you create a Productive Data Lake, where errors affect data products needed by other products of your company.

Please follow and like us:

Apache AVRO Schema evolution and Apache Hive

Apache AVRO&#153 for schema evolutionA perfect fit

Apache Avro™ and Apache Hive™ can go hand into glove, when it comes to using schema evolution. Accessing data stored in Apache Avro™ with Apache Hive™ works pretty straight forward, since Hive comes with a built-in (de-)serializer for this format. You just need to specify the format of the table as shown below.

CREATE TABLE schema_evo
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES (
    'avro.schema.url'='http://url_to_schema_evo/schema_evo.avsc');

As you see, there is no need to even specify the column names, as they will be derived from the schema. The above example uses an URL for the schema definition, but you could just as easily define your schema explicitly in the create table statement. This is done by using the following options in the statement:

TBLPROPERTIES ('avro.schema.literal'='{
  "namespace": "com.example",
  "name": "schema_evo",
  "type": "record",
  "fields": [
    {
      "name":"id",
      "type":"int",
      "doc":"unique identifier of row"
    },
    {
      "name":"name",
      "type":"string",
      "doc":"name of field entry"
    },
    {
      "name":"description",
      "type":"string",
      "doc":"description of field entry"
    }
  ]
}');

For the purpose of using schema evolution in Apache AVRO™ I would disencourage this, since then you will need to adapt your tables each time you change the schema.

Making use of Schema Evolution to handle Data

After you set up the tables in Apache Hive™ and already stored some data, you do realize, that some information is missing. Since changing the schema and still be able to process the files before the change is a feature of Apache AVRO™, this seems to be no major issue. Hive does support those changes. To make them compatible with your old data in accessed by Hive, the new fields need to be optional. Optional fields are defined by saying type should be [“null”, “datatype of field”].

{
"name": "optional",
"type": ["null", "string"],
"doc": "optional field"
}

To make that field usable in Apache Hive™, you need to supply a default value. This changes the field definition.

{
"name": "optional",
"type": ["null", "string"],
"default": null,
"doc": "optional field"
}

I intentionally made the default value null. Then the new field contains NULL values when queried in Apache Hive™. Different approaches for the optional field proved unsatisfying for me, when handling the data and processing it further. A filled default value also would need further explaination for other people using the data. The complete schema then looks like this:

{
  "namespace": "com.example",
  "name": "schema_evo",
  "type": "record",
  "fields": [
    {
      "name":"id",
      "type":"int",
      "doc":"unique identifier of row"
    },
    {
      "name":"name",
      "type":"string",
      "doc":"name of field entry"
    },
    {
      "name":"description",
      "type":"string",
      "doc":"description of field entry"
    },
    {
      "name": "optional",
      "type": ["null", "string"],
      "default": null,
      "doc": "optional field"
    }
  ]
}
Please follow and like us:

Apache HAWQ: Using HAWQ gpfdist for small tables

HAWQ gpfdistWhat about small data?

Once Apache HAWQ is installed and running on your HDFS, there is the question of “How do we effectively store small data?”. This is important, since the minimal proposed blocksize for Apache HAWQ is 128MB. Saving small dimensional or mapping data of just a few kilobytes seems like a waste here. This is especially the case, since you propably need a lot of those for your ELT process. This is where HAWQ gpfdist comes in.

HAWQ gpfdist: What is it?

HAWQ gpfdist is a file server for Apache HAWQ. It was orginally used in Pivotal Greenplum to load data parallely into the database. Hence the name Greenplum Parallel File Server, in short gpfdist. The main intention of it is, to write or read data from an external system. This results in the drawback, that data cannot be updated.

HAWQ gpfdist: How to use it?

Install gpfdist on a seperate server. It mainly is a server written in Java, that can be used inside of Apache HAWQ to access external data. Since it is running on a normal filesystem and not HDFS, blocksize does not matter here as much and it seems a good workaround to save small files for use in HAWQ.
Since the original purpose was to read or write file from or to external systems, you need to have two table for each meta- or dimensiondate file. Once to write into the file and one to read from the file. Here is an example:

Writable table

drop external table if EXISTS schema.gpfdist_dimension_write;
CREATE writable EXTERNAL TABLE schema.gpfdist_dimension_write
(
  id int,
  value_name varchar(100),
  value_description varchar(500)
)
 LOCATION (
'gpfdist://gpfdist.server.com:8081/gpfdist_dimension.csv'
)
FORMAT 'CSV' ;

Readable table

drop external table if exists schema.gpfdist_dimension_read;
CREATE readable EXTERNAL TABLE schema.gpfdist_dimension_read
(
  id int,
  value_name varchar(100),
  value_description varchar(500)
)
 LOCATION (
'gpfdist://gpfdist.server.com:8081/gpfdist_dimension.csv'
)
FORMAT 'CSV' ;

Now you can access the file gpfdist_dimension.csv through SQL in your HAWQ instance and query it like a normal table. CSV is not the only file format supported. You can also use:

  • Plain text with any delimiter: ‘TEXT’ (DELIMITER ‘~’)
  • fixed width files: FORMAT ‘CUSTOM’ (formatter=fixedwidth_in, id=10, value_name=100, value_description=200)
  • csv with headers: FORMAT ‘CSV’ (HEADER)

Conclussion

If you use gpfdist this way, you can access small data in you Apache HAWQ Data Lake. This comes with drawback, that tables are not updateable, but dimensional- or mapping data should not change that often. To add rows to a table, you can use normal insert commands on the created writable table. So there is flexibility in this solution. Dimensional data does not change and if it does, it is best to keep a history of what was before. So the need to delete rows should not arise. Just add valid dates to your rows and this drawback is not one any more.

Please follow and like us:

Apache AVRO: Data format for evolution of data

Apache AVROFlexible Data Format: Apache AVRO

Apache AVRO is a data serialization format. It comes with an data definition format that is easy to understand. With the possibility to add optional fields there is a solution for evolution of the schemas for the data.

Defining a Schema

Defining a schema in Apache AVRO is quite easy, since it is a JSON object in the form of:

{ 
"type": "typename",
"name": "name of field",
"doc": "documentation of field"
}

A schema does consists of:

  • a namespace
  • name
  • type = record
  • documentation
  • fields

A typical schema would look like this:

{
	"namespace": "info.datascientists.avro",
	"type": "record",
	"name": "RandomData",
	"doc": "data sets contain random data.",
	"fields": [
		{
			"name": "long_field",
			"type": "long",
			"doc": "a field containg a number"
		},
		{
			"name": "string_field",
			"type": "string",
			"doc": "a field containing a string"
		}
      ]
}

Changing a schema downwards compatible

If you now want to add a new field and stay compatible to your existing schema, you can just do the following:

{
	"namespace": "info.datascientists.avro",
	"type": "record",
	"name": "RandomData",
	"doc": "data sets contain random data.",
	"fields": [
		{
			"name": "long_field",
			"type": "long",
			"doc": "a field containg a number"
		},
		{
			"name": "string_field",
			"type": "string",
			"doc": "a field containing a string"
		},
                {
                       "name": "optional_new_field",
                       "type": ["null", "string"],
                       "default": "New Field",
                       "doc": "This is a new options field"
                }
      ]
}

This change is still compatible to the version above. It is marked as options by [“null”, “string”] in the type field. The attribute default will fill this field with the value documented here, if the field is not existing in the data.

Serializing data using the schema

Once the schema is defined, it can be used to serialize the data. This serialization also serves as a compression format of about 30% in comparison to normal text. Serializazion is possible in a wide range of programming languages, but it is best implemented in Java. A tutorial on how to serialize data using a schema can be found here.

Reading the data with Apache Hive

Data stored in Apache AVRO is easily accessible if read by Hive external tables. The data is autmatically deserialized and made human readable. Apache Hive even supports default values on schema changes. If new fields are added with a default value, the Hive table can read all versions of the schema in the same table.

Please follow and like us:

Apache HAWQ: Building an easily accessable Data Lake

Apache HAWQ for Data Lake ArchitectureData Lake vs Datawarehouse

The Data Lake Architecture is an up and coming approach to making all data accessible through several methods, be that in real-time or batch analysis. This includes unstructured data as well as structured data. In this approach the data is stored on HDFS and made accessible by several tools, including:

All of these tools have advantages and disadvantages when used to process data, but all of them combined make your data accessible. This is the first step in building a Data Lake. You have to have your data, even schemaless data accessible to your customers.
A classical Datawarehouse on the opposite only contains structured data, that is at least preproccessed and has a fixed schema. Data in a classical Datawarehouse is not the raw data entered into the system. You need a seperate staging area for tranformations. Usually this is not accessible for all consumers of your data, but only for the Datawarehouse developers.

Data Lake Architecture using Apache HAWQ

It is a challenge to build a Data Lake with Apache HAWQ, but this can be overcome on the design part. One solution to build such a system can be seen in then picture below.
data_lake_with_hawq

Data Entry

To make utilization of Apache HAWQ possible the starting point is a controlled Data Entry. This is a compromise between schemaless and schematized data. Apache AVRO is a way to do this. Schema evolution is an integral part of AVRO and it provides structures to save unstrcutured data, like maps and arrays. A separate article about AVRO will be one of this next topics here, to explain schema evolution and how to make the most of it.
Data structured in schema can then be pushed message wise into a messaging queue. Chose a queue that fits your needs best. If you need secure transactions RabbitMQ may be the right choice. Another option is Apache Kafka.

Pre-aggregating Data

Processing and storing single message on HDFS is not an option, so there is need of another system to aggregate messages before storing them on HDFS. For this a software project called Apache Nifi is a good choice. This system comes with processors that make things like this pretty easy. It has a processor called MergeContent that merges single AVRO messages and removes all headers but one, before writing them to HDFS.
nifi_merge_messages
If those messages are still not above the HDFS blocksize, there is the possibility to read messages from HDFS and merge them into larger files still.
nifi_merge_files

Making data available in the Data Lake

Use Apache Hive to make data accessible from AVRO format. HAWQ could read the AVRO files directly, but Hive handles schema evolution in a more effective way. For example, if there is the need to add a new optional field to an existing schema, add a default value for that field and Hive will fill entries from earlier messages with this value. So if HAWQ now accesses this Hive table it automatically reads the default value for field added later with default values. It could not do this by itself. Hive also has a more robust way of handling and extracting keys and values from map fields right now.

Data Lake with SQL Access

All data is available in Apache HAWQ now. This enables tranformations using SQL and making all of your data accessible by a broad audience in your company. SQL skills are more common than say Spark programming in Java, Scala or PySpark. From here it is possible to give analysts access to all of the data or building data marts for single subjects of concern using SQL transformations. Connectivity to reporting tools like Tableau is possible with a driver for Postgresql. Even advanced analytics are possible, if you install Apache MADlib on your HAWQ cluster.

Using Data outside of HAWQ

It is even possible to use all data outside of HAWQ, if there is a need for it. Since all data is available in AVRO format, accessing it by means of Apache Spark with Apache Zeppelin is also possible. Hive queries are possible too, since all data is registered there using external tables, which we used for integration into HAWQ.
Accessing results of such processing in HAWQ is possible too. Save the results in AVRO format for integration in the way described above or use “hawq register” to access parquet files directly from HDFS.

Conclusion

Using Apache HAWQ as base of a Data Lake is possible. Just take some contraints into consideration. But entering data with semi-structured with AVRO format also saves work later when you process the data. The main advantage is, that you can utilize SQL as an interface to all of you data. This enables many people in your company to access your data and will help you on your way to Data Driven decisions.

Please follow and like us: