Having your data infrastructure in the cloud has become a real option for a lot of companies, especially since the big cloud providers have a lot of managed services available for a modern data architecture aside from just a database management system.
In this article I will look into what the three big cloud providers offer in tools to support a modern data architecture.
Modern Data Infrastructure Setup
A generic data infrastructure looks like the picture shown below. It contains one data entry point, where all data is collected as the general API for all producers of data. This data is pushed in single events, whenever possible and defined using a schema. All of the schemas are defined and stored in a schema registry.
Following this entry point there is the possibility to do some ETL, like collecting several messages for one event into bigger chunks. This is useful for writing data out onto a storage or inserting into an analytical database. As these do not handle single row operations as well as bulk ones. Storing raw event data on a cloud storage is also possible and then access these files using SQL from the DWH system. This potentially saves more expensive storage in the DWH system.
After the initial ETL transformations done on the events it seems sensible to do all the other transformations from the source using the compute power of the database system. As this can be done by using SQL, this offers the possibility to have people like analysts with SQL knowledge participate in the process. No other programming skills are needed from this point on.
Next we will look into the tools offered by the three largest cloud vendors that support this setup.
Schema Registry: Google offers no dedicated schema registry, but it is possible to use the Confluent Schema Registry, if needed. If you want to use AVRO schemas it is possible to use this schema repo, but it is not maintained anymore.
PubSub is a messaging system. It only delivers once and data replay is not possible. The messages are also not ordered.
Cloud Dataflow is a serverless data processing platform, which can be programmed using Apache Beam. The upside of using Apache Beam is that it supports several processing platforms, like Spark, Flink. So you are flexible to move away from Dataflow, if you chose to.
AWS Glue provides a schema registry that can be used for Kinesis streams and transformations.
Kinesis Data Stream provides a platform for streaming events. Data replay is possible here and messages are stored in an ordered fashion.
Kinesis Firehose takes the data streams and offers some in-built transformations like storing data aggregated in an Apache Parquet or Apache ORC format. Both formats are column oriented and fast to query from their Datawarehouse solution Redshift. Custom transformations are also supported using AWS Lambda. Lambda provides serverless computing for transformation code.
S3 offers cheap object storage for object from Amazon
Redshift is Amazon’s datawarehouse solution. It is a multi-parallel-processing engine (MPP) based on a PostgreSQL database. Here compute and storage are not seperated, so you need to size your cluster appropriately. Also there is some administrative effort necessary for data optimizing. This includes vacuuming, index creation, and table analysis. Sizing is also a manual task.
Microsoft Azure offers other tools that support this data infrastructure in the cloud.
Data Catalog discovers data schemas and you can define them here as well.
EventHubs is a serverless streaming solution for events. Events can be ordered by using the partitioning function that is provided.
Data Factory enables you to collect and transform all data coming into the data platform.
Data Lake Storage object storage for data analytics that decouples compute from storage.
Synapse Analytics is a framework for storing and analysing all data, like Spark or an MPP engine for SQL workloads. It seemlessly integrates with long existing MS products like SSIS.
Data Infrastructure in the Cloud is possible with all three major cloud providers. Each of them offers tools to set up this solution. Which one fits your needs best depends propably most on the preference and knowledge of your data engineers, scientists and analysts or even your other infrastructure setup.
Developing and bringing machine learning models into production is a task with a lot of challenges. These include model and attribute selection, dealing with missing values, normalization and others.
Finding a workflow that puts all the gears, from data preprocessing and analysis over building models and selecting the best performing one to serving the model in a real time API, into motion is the one I want to discuss here.
Life cycle of a machine learning model
The life cycle of machine learning is basically described by the iteration of the following four steps.
Each of these steps is under constant evaluation. Especially in case model performance can be enhanced by adding different data attributes or preprocessing methods.
For the presented approach we split the process of modeling into two other parts. Part one contains the above mentioned four steps and we call it Manual Run Modeling. Step two is automating the steps of part one.
Manual Run Modeling
In the manual part we start by analysing our new task. After that we come up with a hypothesis we want to prove and test.
Development and Prototyping Environment
First we set up a development environment for working on the new task. For this we spin up a Jupyter notebook server. We deploy it on Google Cloud AI Platform. It provides ready to run containers for Jupyter. The notebook approach enables us to develop fast and share results with the team using a browser. With the ability to easily visualize data inline in a notebook, this approach is especially useful in the data extraction and preprocessing process.
Data preparation and visualization
Python provides some nice packages for generating graphics on data for faster insights. This speeds up our prototyping process in the notebook. We are especially fond of using Seaborn.
We load the data identified for this model into a dataframe in the notebook. After that we start looking at each attribute and its values, often in combination with the other attributes. For a first overview we use a pairplot provided by Seaborn.
We use a combination of visualizations, such as e.g. a correlation matrix. Then we decide which attributes to use and how to handle outliers and missing values. Finally we use one hot encoding for categorical attributes and normalize the continuous attributes to create the input into our models.
Model Selection and Evaluation
When the data is ready we choose several models to find a solution for our problem. These models can range from a multilinear regression model over random forests to deep neural networks with Tensorflow.
After splitting the data for training, evaluation and test we decide on a measure each model has to optimize for, e.g. mean squared error or precision. This depends on the kind of problem. Once we identified the best performing model we start by transforming the code for Google Cloud AI Platform.
RT Prediction Deployment
After manual evaluation of preprocessing and modeling, we start the task of automating training and deployment for bringing machine learning models into production. This can be split into three tasks:
Training the model with hyperparameter optimization on Google Cloud AI platform
Deploying the model on Google Cloud AI platform
Deploying an API to access the model for real time predictions
Training on Google Cloud AI Platform
After deciding on a model to go forward into production with, we optimize our code for data extraction and preprocessing. The reason for this is to make it reusable and compliant with Google Cloud AI Platform rules. This means basically we have to create a Python package out of the first three steps.
A project could be set up as shown in the picture below.
This Python package is then deployed to Google Cloud platform and executed there. If you have custom packages there is an option to supply those too. An example call for training on the cloud would then look like the following example.
One advantage of using Google Cloud AI platform is the possibility of using automated hyperparameter tuning for models. This enables us to train a model automatically with different configurations. Then we select the one performing best for the defined measure in hptuning_config.yaml.
In the AI platform dashboard you can then see, which hyperparameter combination of your defined values in params had the best results for the defined hyperparameterMetricTag and goal.
The identified model is then ready to be deployed to the platform, where Google provides an URL to access the model in real time.
Deploying the model on GCP
Deploying to production is done with a Jenkins job. We use Jenkinsfile to define our jobs as part of our code. A model deployment consists of the following steps:
Copying the model to the correct GCP bucket (differs for our three development systems development, staging and production)
Deploy model to AI platform using a gcloud command
Test model with a prepared test dataset
If all of these steps are successful the model is ready for usage in the specified environment via an URL endpoint.
Deploying the Real Time API
Since the model is deployed and accessible using an URL endpoint, we now have to build a transformation API that takes the input data and transforms it into the needed format for the model endpoint and then calls the model.
To make using the model easier for other services, our data entry format is JSON. This makes the data human readable and changes to any steps concerning the model, except changing the number of attributes, can be done without dependencies on our client services.
As framework for our REST API we chose Flask, since it is lightweight, flexible, easy to use and also written in Python. Since API and model are written in the same language we can make use of the preprocessing from the training package we needed for training above. The main work here lies in adapting the code to only run one single event, instead of the batch prediction, used to validate the result during training.
For stability and security reasons we added some additional checks:
checking values for validity (ee.g. range or location checks)
We also created an extra package containing all transformation functions, we use in several of our models. This package contains, e.g. min-max-normalization and distance calculation functions.
Speed is important in this component, so we store all data for enriching and transforming the incoming data inside a cache.
After receiving the prediction from the model, we qualify the results for regression models, by adding a confidence value. This helps our clients to better understand the results, especially if they are meant to be shown to end users.
Each of our responses has its own error code and message that is supplied in the result. The result is again in JSON format with the following fields:
success: true or false, indicating result of request
message: (error) message for response
Deployment of API
Deployment to our production system is then handled by a Jenkins job with the following steps:
By using Cloud Run we do not need to worry about hardware configuration and can focus on optimizing the API and the model.
By following this process we make sure that the time spent on the necessary things, beside building a model, for bringing machine learning models into production is kept to a minimum and does not include managing any hardware resources or availability concerns.
Especially the part after the manual data and model selection process is usable as a template to fasten the deployment process. This is thanks to the tools provided by Google and our extracting reusable functions into their own Python package.
This is a little text with all the stuff that helped me prepare for the Google Cloud Data Engineer Exam. There are a lot of courses and resources, that help you in preparing for this. The following links helped me in preparation for my Google Data Engineer Exam.
On Coursera there is are several courses provided by Google to prepare for this exam, including a Specialization for the Google Cloud Data Engineer Exam.
And Google also offers a test exam for the Google Cloud Data Engineer.
And now to a little deeper description of the topics I struggled a bit with.
When you use streaming to insert data into BigQuery, it gets saved into the streaming buffer and then is loaded into the persistent table from there. It can take up to 90 minutes until the data is persistent. But all data is queryable together, buffer and persistent table. So your results update immediately.
During streaming inserts each row gets an insertID which Bigquery uses to try, find and remove duplicates, if these occur in a time window of one minute.
If this method is not secure enough, Google Cloud Datastore is an option, since it supports transactions.
You can also stream into partitioned tables, if you add a partition decorator $YYYYMMDD to the table. Streamed data gets saved to the UNPARTITIONED partition. To check this query the system column _PARTITIONDATE IS NULL.
Typical use cases include
high volume input, but not transactional data
analysis on aggregated data and not for single row queries
max row size: 1 MB
HTTP limit: 10 MB
max 10,000 rows / second per project
max 10,000 rows / query; 500 rows recommended
max 100 MB / s and table inserts in streaming
API limit of 100 queries / s per user
max 300 concurrent users
max 500 tablesets / s
Partitioned tables are separated into smaller units to optimize query performance. There are several ways to to partition tables in BigQuery:
partitioned by loading time (automatically)
partitioned by field with the data type TIMESTAMP or DATE
You can correct loading time by updating the suffix or pseudo columns. Partitioned tables contain a system column _PARTITION_TIME, you can use to filter. If you used an explizit, not automatic partitioning, the column _PARTITION_TIME is not exiting.
There are two columns for data, that do not belong to partitions:
__NULL for rows with a NULL value in the date field
__UNPARTITIONED for rows with a date outside of the valid timerange#
Another way to optimize tables, is sharding, where you use different tables to split data, instead of a partitioning column. Join these tables using a UNION. In this case keep in mind, that there is a limit of 1,000 tables per query.
viewer: right to read all tables in a dataset
editor: viewer plus creating, deleting and changing of tables in a dataset
owner: editor plus creating and deleting datasets
There are some restrictions you need to keep in mind, when using BigQuery:
max 100 concurrent interactive queries possible
max 4 concurrent interactive queries on external sources
max 6 UDFs in a single interactive query
max 1,000 updates per table per day
max query duration is 6 hours
max 1,000 tables per query
SQL command can be max 1 MB in size
max 10,000 query parameters
max 100 MB / row
max 1,000 columns per table / query / view
max 16 levels of nested fields
ETL from relational database to BigQuery
There are several ways to import data from a relational database into BigQuery. Do this by using the WebUI, in which case the schema is upload able as as JSON file. You can export data to Google Cloud Storage in JSON format and import from there.
Another way is using Google Cloud Dataflow. Automatic setup is possible, but you can customize the process and maybe join data already in Dataflow, to denormalize it by using lookup tables as side input.
Backup / Snapshots
BigQuery supports a feature for “Point in Time Snapshots”. If you you a query decorator @<time> in Legacy SQL you can get an image of the table during the specified timestamp. This decorator can contain relative or absolute values.
BigQuery supports machine learning in SQL for structured data. This is easy to learn, as it is based on SQL and not some other programming language. Right now it does provide linear regression, binary logistic regression for classification, multi class logistic regression and k-means clustering. The advantage of using this is, that you only have to use one tool, SQL, for everything. This means, data need not be moved from one place to another.
BigQuery does not need normalized data for fast query speed, but rather denormalized data. This means you should use nested and repeated fields in tables. Disk usage is not an issue here and denormalized data saves you the need to join.
You can reuse nested and repeated fields by using formats like AVRO, Parquet, ORC, JSON, Datastore or Firestore.
BigQuery Transfer Service
There is an option to repeatedly load data into BigQuery from Campaign Manager, GCS, S3, AdManager, Google Ads, Google Play and Youtube. You can also load Stackdriver logs into BigQuery.
BigTable is Googles solution for a real time key / value datastore. It handles sparse tables, meaning tables, where a lot of columns in rows are null. Empty columns do not use any space. This system is designed for billions of rows, thousands of columns and data the size of TB or PB.
BigTable only has one key per row, called rowkey. It has small latency and is HBase compatible. Also it scales linearly corresponding to the number of nodes in a cluster. There is an option for replication over two clusters, in which sets up one as cold standby. It is possible to change cluster sizes while the system is running, but after that BigTable takes a few minutes to rebalance cluster load.
To make the most out of it, its use case is for high scalability, key / value data and streaming.
BigTable orders columns belonging together into groups or families. Empty columns do not use any space. To make the most of it, divide key values equally on all nodes. Data is not stored on the cluster nodes, but on Colossus. The nodes only contain references to the data. This makes it easy to recover data after a system crash and no data is lost if nodes fail.
The master node distributes data node to optimize performance. Data locality is important. It can be influenced by key design. Group data that is queried together by designing key in that way. But BigTable does not support SQL or joins and is only really recommended with over 1TB of data.
read 10,000 rows in 6 ms
write 10,000 rows in 6 ms
read 5,000 rows in 200 ms
write 10,000 rows in 50 ms
Reasons for underperformance
incorrect schema design: key does not distribute data equally, only one key is possible
columns are sorted by family and then alphabetical
reads / writes are scewed
all operations need to be atomic on a row
information is distributed over several rows
row is to big (only 10MB per cell and 100MB per row)
timestamp is beginning of rowkey
not using combined keys
key consists of domains, hash or sequential ids
too little data (< 300GB)
test was to short (needs to run at least 10 minutes)
cluster too small (>70% CPU; 70% space)
HDD is used
still is a development instance
data is distributed automatically due to usage pattern, make sure to give system time to distribute to your needs
The Key Visualizer is a tool for supporting you in finding BigTable usage patterns. It provides graphic reports to help you recognize performance problems, like spotting hotspots (rows with a lot of actions) or rows with too much data. It helps you in getting evenly distributed accesses over all rows.
The reports are generated automatically (daily / hourly) if a table has more than 300GB or at least 10,000 row read or writes per second. The reports also recognize hierarchical keys and provides them as this hierarchy. Also connected keys are summarized.
You can scale BigTable programmatically. This is helpful if you, for example get monitoring data from Stackdriver and then react on this databy scaling up or down. After scaling a cluster it can take up to 20 minutes until you can see better performance, as data needs to be redistributed.
This automatic process does not help in cases where performance peaks for a short period of time. In this case, you need to scale manually before the event. If scaling does not help performance, then probably the key design leads to imbalanced load distribution.
You can scale a running cluster. It is possible to change the following things:
nodes in cluster
cluster per instance
label / names
upgrade from development to production instance
Schema design in BigTable is crucial for performance. There is only one index per row, called rowkey. All rows are automatically sorted due to the rowkey, in alphabetical order. And as all operations are atomical per row, rows should not be dependent on one another.
Use key design to make sure data queried together is lying close to each other. And keep in mind, that one cell should not be bigger than 10MB and one row no larger than 100MB. The max values for this is 100MB per cell and 256MB per row.
There are up to 100 column families possible. If all dependent values are in one family, it is possible to only query this family. Less columns is better and column names should be short.
There are several best pratices for key design:
use inverted domain names, like com.domain.shop
string ids should not contain hashes, due to readability
integrate timestamp if time is queried often, but not only timestamp
use inverted timestamp if newest data is queried more often
always use grouping field first in key
try to avoid a lot of updates on a row, try to write several rows instead
Dataflow is a system to develop and run data transformation pipelines in. It has the following key concepts:
Pipeline: complete process in a DAG
PCollection: distributed dataset for a pipeline
PTransform: data processing operation, that takes a PCollection and performs processing. It returns again a PCollection
ParDo: Parallel processing runs a function on a PCollection
SideInputs: additional inputs for ParDo / DoFns
It is a best practice to write invalid input to its own sink and not to throw it away.
Messaging queue provided by Google. It keeps data up to seven days. All logs are automatically pushed to Stackdriver for analysis. It has the following attributes:
messages might not be delivered in the order they were received in, because the system is distributed.
if message delivery is not acknowledged after a defined time, message will be redelivered. This timespan is configurable
Data Transfer Appliance
The Data Transfer Appliance is an encrypted offline data transfer up to PB in size. You get the appliance and ship it back to Google where it will be uploaded into the cloud. This is faster than internet upload and can be handled like a NAS. Only applicable if the data is not changed in between.
Storage Transfer Service
Storage Transfer Service is a transfer service online to GCS or from bucket to bucket. It can be used to backup online data, or as a sync from source to sink, including deletion of source once it is copied. If you need to decide between gsutil and Storage Transfer Service it is:
local data -> gsutil; this also supports rsync
other cloud providers -> Storage Transfer
Dedicated Interconnect is a physical connection between on premise and Google Cloud. You provide your own routing equipment. It is possible at 10 or 100Gbit.
Dataproc is a managed Hadoop / Spark cluster in Google Cloud. It offers some specials:
Cloud Storage Connector: Hadoop or Spark jobs access GCS directly
data is not transferred to HDFS
GCS is HDFS compatible, just use gs:// instead of hdfs://
data still available if cluster is gone
high availability is possible
same machine type is others
own instance group
lost instances will be re-added once there are free resources
save no data
no clusters with just preemptible nodes
local disk is not available in hdfs, it is only used for local caching
scaling is always possible, even with running jobs
Machine Learning Engine
The Machine Learning Engine provides predefined scaling values, but there is also a custom tier. There you can change:
masterType: machine type for the master
workerCount: number of workers in cluster
parameterServerCount: number of parameter servers
Data Loss Prevention API
This API finds sensible data (PPI) and can remove it, e.g. personal data, payment data, device identification, also custom and country specific. You can define word lists and regexes. It also provides data masking, removing and encryption. Use it in streams, text, files, BigQuery, pictures
Natural Language API
Textanalysis, Sentiment Analysis, Topic Mining, People and Location identification
Picture analysis, object recognition, etc
Translation, Speech to Text, Text to Speech, Dialogflow as Chatbot, Video API
Datastore is a NoSQL DB, that is ACID compliant and has a SQL like querying language. There is an index for rows, combined indexes are possible too. It also provides a Firestore export to a GCS bucket.
Reporting solution provided by Google. It provides caching to speed up reports or reduce costs. There are two different kind of caches:
responsive cache: remembers results of a query and if data is queried again, cache supplies it
analyses dimensions, metrics and filters and tries to predict possible queries.
queries are executed in background and response is stored in predictable cache
if responsive cache has no data, predictive cache is asked
if no data in predictive cache, data is queried from source
works only with owner credentials
Cache is automatically refreshed, but it can be done manually. The default for refreshing is 15 minutes, but also 4 and 12 hours. You should turn of predictive cache in case of:
data changes frequently and data freshness is more important than speed
reduce costs by turning it off
After the Google Cloud Data Engineer Exam
After taking the exam, you get a preliminary result on the spot. You can see it after submission or later in your Webassessor account. Once Google checked the exam you get an email with a voucher and a link to their perk shop, where you can chose between a backpack and a hoodie. I chose the backpack.
There are several serialized file formats out there, so chosing the one most suited for your needs is crucial. This blog entry will not compare them, but it will just point out some advantages of AVRO and AVRO Schema for an Apache Hadoop ™ based system.
Avro schema can be written in JSON
Avro schema is always present with data, getting rid of the need to know the schema before accessing the data
small file size, since schema is always present there need to be stored less type information
schema evolution is possible by using a union field type with default values. This was explained here. Deleted fields also need to be defined with a default value.
Avro files are compressible and splitable by Hadoop MapReduce and other tools from the Hadoop universe.
files can be compressed with Snappy and Deflate.
AVRO Schema generation
Generating Apache AVRO ™ schemas is pretty straight forward. They can be written in JSON and are always stored with the data. There are field types for everything needed, even complex types, such as maps and arrays. A schema can also contain a record, which is in itself an independent schema, as a field. This makes it possible to store data of almost unlimited complexity in AVRO. In case of very complex schema definitions keep in mind, that to access complex data structures can be very expensive later on in the process of transforming and working with such data. Here are some examples of AVRO supported datatypes
Primitive types as null, integer, long, boolean float, double, string and byte
Complex types such as records. This fields are basically complete schemas in their own right. These fields consist of:
Fixed length fields
Logical datatypes are something special and by using these you can define other fields you might need. As you can see in the list above there is no datatype for date or datetime. These are implemented using logical datatypes. Define a logical type like this:
Supported logical datatypes are decimal, date, time, timestamp and duration.
Downsides in Schema Generation
There is one downside though, namely that individual fields are not reusable. This topic was addressed by Treselle Systems in this entry. They introduce a way to make fields in a AVRO Schema reusable by working with placeholders and then replacing them with before defined subschemas. This comes in handy when you have fields, that should be available in each AVRO schema, such as meta information for a message pushed into your system.
AVRO Schema Generator
To make AVRO Schema generation more comfortable, I worked on a project, inspired by Treselle Systems’ text and combined it with other tools I use daily:
AVRO-Doc: a JS based server reformatting AVRO schemas into an easily readable HTML format.
AVRO schema repo server: a simple REST based server to publish schemas to and provide them for all parties that generate and consume the data stored in AVRO format
This combination of several tools makes it possible to handle data more easily.
Schemas are written using a Jupyter notebook server. The project contains:
AVRO Schema Editor.ipynb: To create new schemas and adapt existing ones. You load the existing files into the notebook and then edit them before saving them to file again.
Avro Schema Generator.ipynb: This notebook checks schema syntax and replaces subschemas in a generated version of the schema. Subschemas need to be defined before generating a final version of a schema. This notebook also implents functions to upload the schemas to the repository server.
Docker file for setting up the schema repository server in docker_schema_repo. Make sure to set the correct URL before trying to upload the generated schemas.
Docker file for setting up the avrodoc server, with built in active directory plugin in Nginx. Find this file in docker_avrodoc
The project contains an example schema for reference.
The schema repository provides a generally available schema store. This store has a built-in version control. It helps sources to take their time adapting to a new version of the schema.
This asynchronity is possible, because all schemas are compatible with previous version. With that restraint it is also possible to have different sources push one schema in different versions and still be able to transform the data using one process. Not existing values in different version of a schema are filled with the mandatory default value and this default value can even be NULL.
This project aims to help managing data definitions in Hadoop based systems. With the schema repository it provides a single source of truth for all data definitions, at least for data entry and if you decide to use AVRO schemas thourghout your system, even after transformation, you can manage all data definition here.
There are several other schema repositories out there, that can be used, e.g. the one provided by Confluent or the one introduced by Hortonworks for Apache Nifi. The tools used here are just examples of how such a system can be set up and how to introduce reusable AVRO fields into your schemas.
Running R Project in production is a controversially discussed topic, as is everything concerning R vs Python. Lately there have been some additions to the R Project, that made me look into this again. Researching R and its usage in production environments I came across several packages / project, that can be used as a solution for this:
There are several more, but those I found the most interesting.
For reasons of ease of use and because it was not a hosted version, I took a deeper look into Plumber. This felt quite natural as it uses function decorators for defining endpoints and parameters. This is similar to Spring Boot, which I normally use for programming REST APIs.
So using Plumber is really straight forward, as the example below shows:
The #’ @get defines the endpoint for this request. In this case /hello, so the full url on localhost is http://127.0.0.1:8001/hello. To pass in one or more parameters you can use the decorator #’ @param parameter_name parameter_description. A more complicated example using Plumber is hosted on our Gitlab. This example was built with the help of Tidy Textmining.
Plumber comes with Swagger, so the webserver is automatically available. As the R instance is already running, processing the R code does not take long. If your model is complicated, then, of course, this is reflected in the processing time. But as R is a single thread programming language, Plumber can only process one request at a time.
There are ways to tweak this of course. You can run several instances of the service, using a Docker image. This is decribed here. There is also the option of using a webserver to fork the request to serveral instances of R. Depending on the need of the API, single thread processing can be fast enough. If the service has to be highly available the Docker solutions seems like a good choice, as it comes with a load balancer.
After testing Plumber I am surprised by the easy of use. This package makes deploying an REST API in R really easy. Depending on your business needs, it might even be enough for a productive scenario, especially when used in combination with Docker.
Having built a Data Lake for your company’s analytical needs, there soon will arise new use cases, that cannot be easily covered with the Data Lake architecture I covered in previous posts, like Apache HAWQ™: Building an easily accessable Data Lake. You will need to adapt or enhance your architecture to become more flexible. One way to make this flexibility happens, is to transform your Data Lake into an Analytics Platform.
Definition of an Analytics Platform
An Analytics Platform is a platform that provides all kind of services needed for building data products. This often exceeds the functionality of a pure RDBMS or even a Data Lake based on Apache HAWQ™. There are data products that have more requirements than a SQL inferface. Reporting and basic analysis are addressed by this setup, but products dealing with predictions or recommendations often have different needs. An Analytics Platform provides flexibility in the tools used. There can be, for example, a Apache HAWQ™ setup and at the same time an environment for running Tensorflow applications.
Using existing parts: Multi-colored YARN
When you are running a Hadoop Cluster™, you are already familiar with a resource manager. This manager is YARN. With YARN you can already deploy Linux Containers, and support for Docker containers has already progressed pretty far (YARN-3611). Building complex applications and managing them with YARN is called Multi-colored YARN by Hortonworks.
Following through on this idea you will have a cluster with just some central services installed directly on bare metal. You will deploy the rest in containers, as shown in the images below.
The example makes use of Kubernetes and Docker for virtualization and provides the following services on bare meta, since they are needed by most applications:
Especially the HDFS is important as a central services. This makes it possible for all applications to access the some data. The Picture above shows, that there can be several instances of a Hadoop distribution. This is possible even in different version. So the platform allows for multi tenancy, while all instances are still processing the same data.
Having an Analytics Platform makes the development of data products easier. There always was the problem of developing a product on a sample of the data, when you used development and staging systems, as decribed by me here. In same cases these did not contain all possibly combinations of data. This could result in error after a deployemnt on the production environment. Even going through all development and staging could not change this. This new approach allows you to deploy all three systems on the same data. So there you can account for all data induced errors on the development and staging systems already.
You can even become more agile in your development process. The picture below shows an example deployment process, that uses this system.
Moving from a pure Data Lake to an Analytics Platform give you are flexibility and helps in the development of data products. Especially since you can develop on the same data as is available on production. Of course it brings more complexity to an already complex environment. But since it is possible to keep YARN as resource manager and move to a more agile way of development and deployment, it might be worth considering. Once Multi-Colored YARN is finished, it will be easier to make this happen.
When you are building a productive Data Lake it is important to have at least three environments:
Development: for development, where “everything” is allowed.
Staging: for testing changes in a production like environment.
Production: Running your tested and productive data applications
With these different environments comes the need to keep data structures in sync. You need to find a way to change each system in a controlled way, when it is time to deploy some changes. There are differences in how to manage data structures, depending on where in data processing they occur. Theses differences we split into the parts “Data Entry to Staging” and “Core and Presentation”. I wi
Managing different data structures in the three systems of your Productive Data Lake
As all three systems can be have different data structures, using tools to make this easier is important. Different Apache Avro™ schema versions are managed by a publishing them to a tool called Apache Avro™ Schema Service of which there also also three instances. By publishing changes first on the development, then staging, then production system, we make sure, only tested data structures are deployed to the Production Data Lake. Each change to a schema belongs to a version number, so rolling back to an earlier state, in case of an error is possible.
Evolving our data structures not contained in Apache Avro™ schemas, we use tables managing all meta information.
Data Structures from Entry to Staging
Apache Avro™ is a good way to manage evolving data structures, as I explained here. Once the data is available in Apache Hive™ mapping the AVRO data types to those of your Productive Data Lake system is necessary. We use Apache HAWQ™ as base of our Data Lake. So we map the AVRO data types to HAWQ data types. To keep this as effortless as possible, we reuse attribute names as often as possible. If an attribute, like “user_id” is occuring in different schemas and has the same meaning, then we just reuse it. So the mapping table does not contain an entry for each column of each schema, but only for distinct ones. Considering more than 100 schema with on average 10 attributes, this helps in keeping maintanence down. In the table below is an example, of this mapping.
Data Strucutes for Core and Presentation
A totally different challenge is it, to manage the data structures after data entry. There need to occur transformations, for technical and business user benefit. This can mean other attributes / column names or a splitting or joining of data from one schema into different tables. These tables here again can evolve. Evolving database tables is a pain, since you cannot, in most cases just drop the table and recreate the new one. Here the processing power of Apache HAWQ™ comes into play, as we can exactly do that now. In most cases, at least for transactional data, we just reload the whole table, during the daily batch process. So we manage all table definitions in tables. They contain all table and column attributes, as well as a version number. This makes rolling back possible.
Example of table definitions:
Exmaple of column definitions:
Data Type Length
Automation of data structure deployment.
After defining all the meta data concerning the data structures, we need a way to automate this. For this I created a Java programm using all the information from schema definitions and tables for data type mappings and table definitions. This programm runs on any of the three environments and also take in consideration the version it should create. This only works, if no one can create or change table information manually. So the programm first deploys changes to the data structure on the development system. If it works there and all changes are ready to be deployed on the staging system, run the programm in this environment. With the version numbers defined besides each change, it is possible to run version 5.0 on production, version 5.1 on staging and version 5.2 on development if neccessary.
This way is a possibility to make data structure evolution easier and keep three different systems in sync. It is most important to make sure, that there are no manual changes to the data structure. This approach works nicely, if you can drop and recreate all tables without hesitation. If that is not the case, you can still define your tables this way, just make sure they are not dropped automatically when the programm runs. Having all definitions in one place and being able to automatically deploy the tables and the changes, with version control, can be a great help. This way no one has to remember to create new tables manually in either system. So errors due to missing tables or columns are minimized which is good if you create a Productive Data Lake, where errors affect data products needed by other products of your company.
Apache AVRO is a data serialization format. It comes with an data definition format that is easy to understand. With the possibility to add optional fields there is a solution for evolution of the schemas for the data.
Defining a Schema
Defining a schema in Apache AVRO is quite easy, since it is a JSON object in the form of:
"name": "name of field",
"doc": "documentation of field"
A schema does consists of:
type = record
A typical schema would look like this:
"doc": "data sets contain random data.",
"doc": "a field containg a number"
"doc": "a field containing a string"
Changing a schema downwards compatible
If you now want to add a new field and stay compatible to your existing schema, you can just do the following:
"doc": "data sets contain random data.",
"doc": "a field containg a number"
"doc": "a field containing a string"
"type": ["null", "string"],
"default": "New Field",
"doc": "This is a new options field"
This change is still compatible to the version above. It is marked as options by [“null”, “string”] in the type field. The attribute default will fill this field with the value documented here, if the field is not existing in the data.
Serializing data using the schema
Once the schema is defined, it can be used to serialize the data. This serialization also serves as a compression format of about 30% in comparison to normal text. Serializazion is possible in a wide range of programming languages, but it is best implemented in Java. A tutorial on how to serialize data using a schema can be found here.
Data stored in Apache AVRO is easily accessible if read by Hive external tables. The data is autmatically deserialized and made human readable. Apache Hive even supports default values on schema changes. If new fields are added with a default value, the Hive table can read all versions of the schema in the same table.
The Data Lake Architecture is an up and coming approach to making all data accessible through several methods, be that in real-time or batch analysis. This includes unstructured data as well as structured data. In this approach the data is stored on HDFS and made accessible by several tools, including:
All of these tools have advantages and disadvantages when used to process data, but all of them combined make your data accessible. This is the first step in building a Data Lake. You have to have your data, even schemaless data accessible to your customers.
A classical Datawarehouse on the opposite only contains structured data, that is at least preproccessed and has a fixed schema. Data in a classical Datawarehouse is not the raw data entered into the system. You need a seperate staging area for tranformations. Usually this is not accessible for all consumers of your data, but only for the Datawarehouse developers.
Data Lake Architecture using Apache HAWQ
It is a challenge to build a Data Lake with Apache HAWQ, but this can be overcome on the design part. One solution to build such a system can be seen in then picture below.
To make utilization of Apache HAWQ possible the starting point is a controlled Data Entry. This is a compromise between schemaless and schematized data. Apache AVRO is a way to do this. Schema evolution is an integral part of AVRO and it provides structures to save unstrcutured data, like maps and arrays. A separate article about AVRO will be one of this next topics here, to explain schema evolution and how to make the most of it.
Data structured in schema can then be pushed message wise into a messaging queue. Chose a queue that fits your needs best. If you need secure transactions RabbitMQ may be the right choice. Another option is Apache Kafka.
Processing and storing single message on HDFS is not an option, so there is need of another system to aggregate messages before storing them on HDFS. For this a software project called Apache Nifi is a good choice. This system comes with processors that make things like this pretty easy. It has a processor called MergeContent that merges single AVRO messages and removes all headers but one, before writing them to HDFS.
If those messages are still not above the HDFS blocksize, there is the possibility to read messages from HDFS and merge them into larger files still.
Making data available in the Data Lake
Use Apache Hive to make data accessible from AVRO format. HAWQ could read the AVRO files directly, but Hive handles schema evolution in a more effective way. For example, if there is the need to add a new optional field to an existing schema, add a default value for that field and Hive will fill entries from earlier messages with this value. So if HAWQ now accesses this Hive table it automatically reads the default value for field added later with default values. It could not do this by itself. Hive also has a more robust way of handling and extracting keys and values from map fields right now.
Data Lake with SQL Access
All data is available in Apache HAWQ now. This enables tranformations using SQL and making all of your data accessible by a broad audience in your company. SQL skills are more common than say Spark programming in Java, Scala or PySpark. From here it is possible to give analysts access to all of the data or building data marts for single subjects of concern using SQL transformations. Connectivity to reporting tools like Tableau is possible with a driver for Postgresql. Even advanced analytics are possible, if you install Apache MADlib on your HAWQ cluster.
Using Data outside of HAWQ
It is even possible to use all data outside of HAWQ, if there is a need for it. Since all data is available in AVRO format, accessing it by means of Apache Spark with Apache Zeppelin is also possible. Hive queries are possible too, since all data is registered there using external tables, which we used for integration into HAWQ.
Accessing results of such processing in HAWQ is possible too. Save the results in AVRO format for integration in the way described above or use “hawq register” to access parquet files directly from HDFS.
Using Apache HAWQ as base of a Data Lake is possible. Just take some contraints into consideration. But entering data with semi-structured with AVRO format also saves work later when you process the data. The main advantage is, that you can utilize SQL as an interface to all of you data. This enables many people in your company to access your data and will help you on your way to Data Driven decisions.
Pivotal ported their massively parallel processing (MPP) database Greenplum to Hadoop and made it open source as an incubating project at Apache, called Apache HAWQ. This bring together full ANSI SQL with MPP capabilities and Hadoop integration.
The integration in an existing Hadoop installation is easy, as you can integrate all existing data via external tables. This is done using the pxf API to query external data. This API is customizable, but already brings the most used formats ready made. These include:
To access and store small amounts of data Apache HAWQ has an interface called gpfdist. This enables you to store data outside of your HDFS and still access it within HAWQ to join with the data stored in HDFS. This is especially handy, when you need small tables for dimension or mapping data in Apache HAWQ. This data will then not use a whole block of your HDFS, that is mostly empty.
Apache HAWQ even come integrated with MADlib, also an Apache incubating product, developed by Pivotal. MADlib is a Machine Learning framework, based on SQL. So moving data between different tools for analysing it, is not need anymore. If you have stored your data in Apache HAWQ, you can mine it in the database directly and don’t have to export it, e.g. to a Spark client or tools like Knime or RapidMiner.
MADLib comes with algorithms in the following categories:
Assocition Rule Mining
By using HAWQ you even can leverage tools like Tableau with real time database connections, which was not satisfactory so far when you used Hive.
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.