AVRO schema generation with reusable fields

Why use AVRO and AVRO Schema?

There are several serialized file formats out there, so chosing the one most suited for your needs is crucial. This blog entry will not compare them, but it will just point out some advantages of AVRO and AVRO Schema for an Apache Hadoop ™ based system.

  • Avro schema can be written in JSON
  • Avro schema is always present with data, getting rid of the need to know the schema before accessing the data
  • small file size, since schema is always present there need to be stored less type information
  • schema evolution is possible by using a union field type with default values. This was explained here. Deleted fields also need to be defined with a default value.
  • Avro files are compressible and splitable by Hadoop MapReduce and other tools from the Hadoop universe.
  • files can be compressed with Snappy and Deflate.

AVRO Schema generation

Generating Apache AVRO ™ schemas is pretty straight forward. They can be written in JSON and are always stored with the data. There are field types for everything needed, even complex types, such as maps and arrays. A schema can also contain a record, which is in itself an independent schema, as a field. This makes it possible to store data of almost unlimited complexity in AVRO. In case of very complex schema definitions keep in mind, that to access complex data structures can be very expensive later on in the process of transforming and working with such data. Here are some examples of AVRO supported datatypes

AVRO Datatypes

  • Primitive types as null, integer, long, boolean float, double, string and byte
  • Complex types such as records. This fields are basically complete schemas in their own right. These fields consist of:
    • name
    • namespace
    • fields
  • Enums
  • Arrays
  • Maps
  • Fixed length fields
  • Logical datatypes

Logical datatypes are something special and by using these you can define other fields you might need. As you can see in the list above there is no datatype for date or datetime. These are implemented using logical datatypes. Define a logical type like this:

{
  "type": "bytes",
  "logicalType": "decimal",
  "precision": 4,
  "scale": 2
}

Supported logical datatypes are decimal, date, time, timestamp and duration.

Downsides in Schema Generation

There is one downside though, namely that individual fields are not reusable. This topic was addressed by Treselle Systems in this entry. They introduce a way to make fields in a AVRO Schema reusable by working with placeholders and then replacing them with before defined subschemas. This comes in handy when you have fields, that should be available in each AVRO schema, such as meta information for a message pushed into your system.

AVRO Schema Generator

To make AVRO Schema generation more comfortable, I worked on a project, inspired by Treselle Systems’ text and combined it with other tools I use daily:

  • Jupyter Notebook
  • AVRO-Doc: a JS based server reformatting AVRO schemas into an easily readable HTML format.
  • AVRO schema repo server: a simple REST based server to publish schemas to and provide them for all parties that generate and consume the data stored in AVRO format
AVRO Schema Generator
AVRO Schema Generator

This combination of several tools makes it possible to handle data more easily.

Schema generator

Schemas are written using a Jupyter notebook server. The project contains:

  • AVRO Schema Editor.ipynb: To create new schemas and adapt existing ones. You load the existing files into the notebook and then edit them before saving them to file again.
  • Avro Schema Generator.ipynb: This notebook checks schema syntax and replaces subschemas in a generated version of the schema. Subschemas need to be defined before generating a final version of a schema. This notebook also implents functions to upload the schemas to the repository server.
  • Docker file for setting up the schema repository server in
    docker_schema_repo. Make sure to set the correct URL before trying to upload the generated schemas.
  • Docker file for setting up the avrodoc server, with built in active directory plugin in Nginx. Find this file in
    docker_avrodoc

The project contains an example schema for reference.

Schema repository

The schema repository provides a generally available schema store. This store has a built-in version control. It helps sources to take their time adapting to a new version of the schema.

This asynchronity is possible, because all schemas are compatible with previous version. With that restraint it is also possible to have different sources push one schema in different versions and still be able to transform the data using one process. Not existing values in different version of a schema are filled with the mandatory default value and this default value can even be NULL.

Conclussion

This project aims to help managing data definitions in Hadoop based systems. With the schema repository it provides a single source of truth for all data definitions, at least for data entry and if you decide to use AVRO schemas thourghout your system, even after transformation, you can manage all data definition here.

There are several other schema repositories out there, that can be used, e.g. the one provided by Confluent or the one introduced by Hortonworks for Apache Nifi. The tools used here are just examples of how such a system can be set up and how to introduce reusable AVRO fields into your schemas.

The code can be found in our repository.

Plumber: Getting R ready for production environments?

R Project and Production

Running R Project in production is a controversially discussed topic, as is everything concerning R vs Python. Lately there have been some additions to the R Project, that made me look into this again. Researching R and its usage in production environments I came across several packages / project, that can be used as a solution for this:

There are several more, but those I found the most interesting.

Plumber


For reasons of ease of use and because it was not a hosted version, I took a deeper look into Plumber. This felt quite natural as it uses function decorators for defining endpoints and parameters. This is similar to Spring Boot, which I normally use for programming REST APIs.
So using Plumber is really straight forward, as the example below shows:

#' return text "Hello"
#' @get /hello
function() {
  list(msg = "hello")
}

The #’ @get defines the endpoint for this request. In this case /hello, so the full url on localhost is http://127.0.0.1:8001/hello. To pass in one or more parameters you can use the decorator #’ @param parameter_name parameter_description. A more complicated example using Plumber is hosted on our Gitlab. This example was built with the help of Tidy Textmining.

Production ready?

Plumber comes with Swagger, so the webserver is automatically available. As the R instance is already running, processing the R code does not take long. If your model is complicated, then, of course, this is reflected in the processing time. But as R is a single thread programming language, Plumber can only process one request at a time.
There are ways to tweak this of course. You can run several instances of the service, using a Docker image. This is decribed here. There is also the option of using a webserver to fork the request to serveral instances of R. Depending on the need of the API, single thread processing can be fast enough. If the service has to be highly available the Docker solutions seems like a good choice, as it comes with a load balancer.

Conclussion

After testing Plumber I am surprised by the easy of use. This package makes deploying an REST API in R really easy. Depending on your business needs, it might even be enough for a productive scenario, especially when used in combination with Docker.

Analytics Platform: An Evolution from Data Lake

Analytics Platform

Having built a Data Lake for your company’s analytical needs, there soon will arise new use cases, that cannot be easily covered with the Data Lake architecture I covered in previous posts, like Apache HAWQ™: Building an easily accessable Data Lake. You will need to adapt or enhance your architecture to become more flexible. One way to make this flexibility happens, is to transform your Data Lake into an Analytics Platform.

Definition of an Analytics Platform

An Analytics Platform is a platform that provides all kind of services needed for building data products. This often exceeds the functionality of a pure RDBMS or even a Data Lake based on Apache HAWQ™. There are data products that have more requirements than a SQL inferface. Reporting and basic analysis are addressed by this setup, but products dealing with predictions or recommendations often have different needs. An Analytics Platform provides flexibility in the tools used. There can be, for example, a Apache HAWQ™ setup and at the same time an environment for running Tensorflow applications.

Using existing parts: Multi-colored YARN

When you are running a Hadoop Cluster™, you are already familiar with a resource manager. This manager is YARN. With YARN you can already deploy Linux Containers, and support for Docker containers has already progressed pretty far (YARN-3611). Building complex applications and managing them with YARN is called Multi-colored YARN by Hortonworks.
Following through on this idea you will have a cluster with just some central services installed directly on bare metal. You will deploy the rest in containers, as shown in the images below.

Analytics Platform
Analytics Platform based on YARN and Docker

The example makes use of Kubernetes and Docker for virtualization and provides the following services on bare meta, since they are needed by most applications:

  • Ambari
  • Kuberneted
  • YARN
  • ZooKeeper
  • HDFS

Especially the HDFS is important as a central services. This makes it possible for all applications to access the some data. The Picture above shows, that there can be several instances of a Hadoop distribution. This is possible even in different version. So the platform allows for multi tenancy, while all instances are still processing the same data.

Development changes

Having an Analytics Platform makes the development of data products easier. There always was the problem of developing a product on a sample of the data, when you used development and staging systems, as decribed by me here. In same cases these did not contain all possibly combinations of data. This could result in error after a deployemnt on the production environment. Even going through all development and staging could not change this. This new approach allows you to deploy all three systems on the same data. So there you can account for all data induced errors on the development and staging systems already.
You can even become more agile in your development process. The picture below shows an example deployment process, that uses this system.

Analytics Platform: Deployment Process

Conclussion

Moving from a pure Data Lake to an Analytics Platform give you are flexibility and helps in the development of data products. Especially since you can develop on the same data as is available on production. Of course it brings more complexity to an already complex environment. But since it is possible to keep YARN as resource manager and move to a more agile way of development and deployment, it might be worth considering. Once Multi-Colored YARN is finished, it will be easier to make this happen.

Building a Productive Data Lake: How to keep three systems in sync

Three Systems for save Development

When you are building a productive Data Lake it is important to have at least three environments:

  • Development: for development, where “everything” is allowed.
  • Staging: for testing changes in a production like environment.
  • Production: Running your tested and productive data applications

With these different environments comes the need to keep data structures in sync. You need to find a way to change each system in a controlled way, when it is time to deploy some changes. There are differences in how to manage data structures, depending on where in data processing they occur. Theses differences we split into the parts “Data Entry to Staging” and “Core and Presentation”. I wi

Managing different data structures in the three systems of your Productive Data Lake

As all three systems can be have different data structures, using tools to make this easier is important. Different Apache Avro™ schema versions are managed by a publishing them to a tool called Apache Avro™ Schema Service of which there also also three instances. By publishing changes first on the development, then staging, then production system, we make sure, only tested data structures are deployed to the Production Data Lake. Each change to a schema belongs to a version number, so rolling back to an earlier state, in case of an error is possible.
Evolving our data structures not contained in Apache Avro™ schemas, we use tables managing all meta information.

Data Structures from Entry to Staging

Apache Avro™ is a good way to manage evolving data structures, as I explained here. Once the data is available in Apache Hive™ mapping the AVRO data types to those of your Productive Data Lake system is necessary. We use Apache HAWQ™ as base of our Data Lake. So we map the AVRO data types to HAWQ data types. To keep this as effortless as possible, we reuse attribute names as often as possible. If an attribute, like “user_id” is occuring in different schemas and has the same meaning, then we just reuse it. So the mapping table does not contain an entry for each column of each schema, but only for distinct ones. Considering more than 100 schema with on average 10 attributes, this helps in keeping maintanence down. In the table below is an example, of this mapping.

Schema FieldnameAVRO datatypeHAWQ datatype
user_idstringvarchar(100)
transaction_idstringvarchar(100)
parametersmaptext
request_timelongbigint
session_idstringtext
list_of_somethingarraytext

Data Strucutes for Core and Presentation

A totally different challenge is it, to manage the data structures after data entry. There need to occur transformations, for technical and business user benefit. This can mean other attributes / column names or a splitting or joining of data from one schema into different tables. These tables here again can evolve. Evolving database tables is a pain, since you cannot, in most cases just drop the table and recreate the new one. Here the processing power of Apache HAWQ™ comes into play, as we can exactly do that now. In most cases, at least for transactional data, we just reload the whole table, during the daily batch process. So we manage all table definitions in tables. They contain all table and column attributes, as well as a version number. This makes rolling back possible.

Example of table definitions:

Schema NameTable NamePartitioneddistributed bydropableVersion
coredim_useruser_idtrue3.0
corerel_user_purchaseuser_id, purchase_idtrue4.0
corefact_user_registrationuser_idtrue4.2
corefact_usage_pagetime_value_requestfalse5.0

Exmaple of column definitions:

Schema NameTable NameColumn NameData TypeData Type LengthVersion
coredim_useruser_idvarchar1003.0
coredim_useruser_typevarchar503.0
corerel_user_purchaseuser_idvarchar1004.0
corerel_user_purchasepurchase_idvarchar1004.0

Automation of data structure deployment.

After defining all the meta data concerning the data structures, we need a way to automate this. For this I created a Java programm using all the information from schema definitions and tables for data type mappings and table definitions. This programm runs on any of the three environments and also take in consideration the version it should create. This only works, if no one can create or change table information manually. So the programm first deploys changes to the data structure on the development system. If it works there and all changes are ready to be deployed on the staging system, run the programm in this environment. With the version numbers defined besides each change, it is possible to run version 5.0 on production, version 5.1 on staging and version 5.2 on development if neccessary.

Conclussion

This way is a possibility to make data structure evolution easier and keep three different systems in sync. It is most important to make sure, that there are no manual changes to the data structure. This approach works nicely, if you can drop and recreate all tables without hesitation. If that is not the case, you can still define your tables this way, just make sure they are not dropped automatically when the programm runs. Having all definitions in one place and being able to automatically deploy the tables and the changes, with version control, can be a great help. This way no one has to remember to create new tables manually in either system. So errors due to missing tables or columns are minimized which is good if you create a Productive Data Lake, where errors affect data products needed by other products of your company.

Apache AVRO: Data format for evolution of data

Apache AVROFlexible Data Format: Apache AVRO

Apache AVRO is a data serialization format. It comes with an data definition format that is easy to understand. With the possibility to add optional fields there is a solution for evolution of the schemas for the data.

Defining a Schema

Defining a schema in Apache AVRO is quite easy, since it is a JSON object in the form of:

{ 
"type": "typename",
"name": "name of field",
"doc": "documentation of field"
}

A schema does consists of:

  • a namespace
  • name
  • type = record
  • documentation
  • fields

A typical schema would look like this:

{
	"namespace": "info.datascientists.avro",
	"type": "record",
	"name": "RandomData",
	"doc": "data sets contain random data.",
	"fields": [
		{
			"name": "long_field",
			"type": "long",
			"doc": "a field containg a number"
		},
		{
			"name": "string_field",
			"type": "string",
			"doc": "a field containing a string"
		}
      ]
}

Changing a schema downwards compatible

If you now want to add a new field and stay compatible to your existing schema, you can just do the following:

{
	"namespace": "info.datascientists.avro",
	"type": "record",
	"name": "RandomData",
	"doc": "data sets contain random data.",
	"fields": [
		{
			"name": "long_field",
			"type": "long",
			"doc": "a field containg a number"
		},
		{
			"name": "string_field",
			"type": "string",
			"doc": "a field containing a string"
		},
                {
                       "name": "optional_new_field",
                       "type": ["null", "string"],
                       "default": "New Field",
                       "doc": "This is a new options field"
                }
      ]
}

This change is still compatible to the version above. It is marked as options by [“null”, “string”] in the type field. The attribute default will fill this field with the value documented here, if the field is not existing in the data.

Serializing data using the schema

Once the schema is defined, it can be used to serialize the data. This serialization also serves as a compression format of about 30% in comparison to normal text. Serializazion is possible in a wide range of programming languages, but it is best implemented in Java. A tutorial on how to serialize data using a schema can be found here.

Reading the data with Apache Hive

Data stored in Apache AVRO is easily accessible if read by Hive external tables. The data is autmatically deserialized and made human readable. Apache Hive even supports default values on schema changes. If new fields are added with a default value, the Hive table can read all versions of the schema in the same table.

Apache HAWQ: Building an easily accessable Data Lake

Apache HAWQ for Data Lake ArchitectureData Lake vs Datawarehouse

The Data Lake Architecture is an up and coming approach to making all data accessible through several methods, be that in real-time or batch analysis. This includes unstructured data as well as structured data. In this approach the data is stored on HDFS and made accessible by several tools, including:

All of these tools have advantages and disadvantages when used to process data, but all of them combined make your data accessible. This is the first step in building a Data Lake. You have to have your data, even schemaless data accessible to your customers.
A classical Datawarehouse on the opposite only contains structured data, that is at least preproccessed and has a fixed schema. Data in a classical Datawarehouse is not the raw data entered into the system. You need a seperate staging area for tranformations. Usually this is not accessible for all consumers of your data, but only for the Datawarehouse developers.

Data Lake Architecture using Apache HAWQ

It is a challenge to build a Data Lake with Apache HAWQ, but this can be overcome on the design part. One solution to build such a system can be seen in then picture below.
data_lake_with_hawq

Data Entry

To make utilization of Apache HAWQ possible the starting point is a controlled Data Entry. This is a compromise between schemaless and schematized data. Apache AVRO is a way to do this. Schema evolution is an integral part of AVRO and it provides structures to save unstrcutured data, like maps and arrays. A separate article about AVRO will be one of this next topics here, to explain schema evolution and how to make the most of it.
Data structured in schema can then be pushed message wise into a messaging queue. Chose a queue that fits your needs best. If you need secure transactions RabbitMQ may be the right choice. Another option is Apache Kafka.

Pre-aggregating Data

Processing and storing single message on HDFS is not an option, so there is need of another system to aggregate messages before storing them on HDFS. For this a software project called Apache Nifi is a good choice. This system comes with processors that make things like this pretty easy. It has a processor called MergeContent that merges single AVRO messages and removes all headers but one, before writing them to HDFS.
nifi_merge_messages
If those messages are still not above the HDFS blocksize, there is the possibility to read messages from HDFS and merge them into larger files still.
nifi_merge_files

Making data available in the Data Lake

Use Apache Hive to make data accessible from AVRO format. HAWQ could read the AVRO files directly, but Hive handles schema evolution in a more effective way. For example, if there is the need to add a new optional field to an existing schema, add a default value for that field and Hive will fill entries from earlier messages with this value. So if HAWQ now accesses this Hive table it automatically reads the default value for field added later with default values. It could not do this by itself. Hive also has a more robust way of handling and extracting keys and values from map fields right now.

Data Lake with SQL Access

All data is available in Apache HAWQ now. This enables tranformations using SQL and making all of your data accessible by a broad audience in your company. SQL skills are more common than say Spark programming in Java, Scala or PySpark. From here it is possible to give analysts access to all of the data or building data marts for single subjects of concern using SQL transformations. Connectivity to reporting tools like Tableau is possible with a driver for Postgresql. Even advanced analytics are possible, if you install Apache MADlib on your HAWQ cluster.

Using Data outside of HAWQ

It is even possible to use all data outside of HAWQ, if there is a need for it. Since all data is available in AVRO format, accessing it by means of Apache Spark with Apache Zeppelin is also possible. Hive queries are possible too, since all data is registered there using external tables, which we used for integration into HAWQ.
Accessing results of such processing in HAWQ is possible too. Save the results in AVRO format for integration in the way described above or use “hawq register” to access parquet files directly from HDFS.

Conclusion

Using Apache HAWQ as base of a Data Lake is possible. Just take some contraints into consideration. But entering data with semi-structured with AVRO format also saves work later when you process the data. The main advantage is, that you can utilize SQL as an interface to all of you data. This enables many people in your company to access your data and will help you on your way to Data Driven decisions.

Apache HAWQ: Full SQL and MPP support on HDFS

Apache HAWQPivotal ported their massively parallel processing (MPP) database Greenplum to Hadoop and made it open source as an incubating project at Apache, called Apache HAWQ. This bring together full ANSI SQL with MPP capabilities and Hadoop integration.

The integration in an existing Hadoop installation is easy, as you can integrate all existing data via external tables. This is done using the pxf API to query external data. This API is customizable, but already brings the most used formats ready made. These include:

To access and store small amounts of data Apache HAWQ has an interface called gpfdist. This enables you to store data outside of your HDFS and still access it within HAWQ to join with the data stored in HDFS. This is especially handy, when you need small tables for dimension or mapping data in Apache HAWQ. This data will then not use a whole block of your HDFS, that is mostly empty.

Apache HAWQ even come integrated with MADlib, also an Apache incubating product, developed by Pivotal. MADlib is a Machine Learning framework, based on SQL. So moving data between different tools for analysing it, is not need anymore. If you have stored your data in Apache HAWQ, you can mine it in the database directly and don’t have to export it, e.g. to a Spark client or tools like Knime or RapidMiner.

MADlib algorithms

MADLib comes with algorithms in the following categories:

  • Classification
  • Regression
  • Clustering
  • Topic Modelling
  • Assocition Rule Mining
  • Descriptive Statistics

By using HAWQ you even can leverage tools like Tableau with real time database connections, which was not satisfactory so far when you used Hive.

Apache Zeppelin: Use with remote Spark cluster and Yarn

Apache Zeppelin is pretty usefull for interactive programming using the web browser. It even comes with its own installation of Apache Spark. For further information you can check my earlier post.
But the real power in using Spark with Zeppelin lies in its easy way to connect it to your existing Spark cluster using YARN. The following steps are necessary:

  • Copy your Hadoop configuration files to your Zeppelin installation under $ZEPPELIN_HOME/conf
  • Restart your Zeppelin Notebook
  • Insert the value “yarn-client” into the field master in the spark interpreter, as shown in the picture below.

spark_interpreter_yarn

After these steps you can use your notebooks with spark running on a yarn cluster. So you can make use of all the resources in the queue you assigned spark on you cluster.

Apache Zeppelin: Visualization and Spark data processing

Apache Zeppelin

Apache Zeppelin is a web-based notebook for interactive data analytics. It comes will features for all the steps of data analysis:

  • Data Ingestion
  • Data Discovery
  • Data Analytics
  • Data Visualization & Collaboration

Besides that feature set it also supports multiple languages in the backend. Currently it supports languages like:

But there is also the possibility to add your own interpreter to Zeppelin. This makes this tool really flexible.
Another feature it has, is the built in integration of Apache Spark. It ships with the following features and more:

  • Automatic SparkContext and SQLContext injection
  • Runtime jar dependency loading from local filesystem or maven repository.
  • Canceling job and displaying its progress

It also has built in visualization, which is an improvemnt over using ipython notebooks I think. The visualization covers the most basic graphs, like:

  • Tables
  • BarCharts
  • Pies
  • Scatterplot
  • Lines

These visualizations can be used with all interpreters and are always the same. So you can show data from Postgres and Spark in the same notebook with the same functions used. There is no need to handle different data sources differently.
You can also use dynamic forms in your notebooks, e.g. to provide filter options to the user. This comes in handy, if you embedd a notebook in your own website.

Apache Spark 2.0

Apache Spark has release version 2.0, which is a major step forward in usability for Spark users and mostly for people, who refrained from using it, due to the costs of learning a new programming language or tool. This is in the past now, as Spark 2.0 supports improved SQL functionalities with SQL2003 support. It can now run all 99 TPC-DS. The new SQL parser supports ANSI-SQL and HiveQL and sub queries.
Another new features is native csv data source support, based on the already existing Databricks spark csv module. I personally used this module as well as the spark avro module before and they make working with data in those formats really easy.
Also there were some new features added to MLlib:

  • PySpark includes new algorithms like LDA, Gaussian Mixture Model, Generalized Linear Regression
  • SparkR now includes generalized linear models, naive Bayes, k-means clustering, and survival regression.

Spark increased its performance with the release of 2.0. The goal was to make Spark 2.0 10x faster and Databricks shows this performance tuning in a notebook.

All of these improvements make Spark a more complete tool for data processing and analysing. The added SQL2003 support even makes it available for a larger user base and more importantly makes it easier to migrate existing applications from databases to Spark.