This is a little text with all the stuff that helped me prepare for the Google Cloud Data Engineer Exam. It covers topics I had to struggle a little with.
When you use streaming to insert data into BigQuery, it gets saved into the streaming buffer and then will be loaded into the persistent table from there. It can take up to 90 minutes until the data is persistent. But all data is queryable together, buffer and persistent table. So your results get updated immediately.
During streaming inserts each row gets an insertID which Bigquery uses to try, find and remove duplicates, if these occur in a time window of one minute.
If this method is not secure enough, Google Cloud Datastore can be used, since it supports transactions.
You can also stream into partitioned tables, if you add a partition decorator $YYYYMMDD to the table. Streamed data gets saved to the UNPARTITIONED partition. It can be checked, because the system column _PARTITIONDATE IS NULL.
Typical use cases include
- high volume input, but not transactional data
- analysis on aggregated data and not for single row queries
- max row size: 1 MB
- HTTP limit: 10 MB
- max 10,000 rows / second per project
- max 10,000 rows / query; 500 rows recommended
- max 100 MB / s and table inserts in streaming
- API limit of 100 queries / s per user
- max 300 concurrent users
- max 500 tablesets / s
Partitioned tables are tables, that are separated into smaller units to optimize query performance. There are several ways to to partition tables in BigQuery:
- partitioned by loading time (automatically)
- partitioned by field with the data type TIMESTAMP or DATE
You can correct loading time by updating the suffix or pseudo columns. Partitioned tables contain a system column _PARTITION_TIME, you can use to filter. If you used an explizit, not automatic partitioning, the column _PARTITION_TIME is not exiting.
There are two columns for data, that do not belong to partitions:
- __NULL for rows with a NULL value in the date field
- __UNPARTITIONED for rows with a date outside of the valid timerange#
Another way to optimize tables, is sharding, where you use different tables to split data, instead of a partitioning column. These tables need then be joined using a UNION. In this case keep in mind, that only 1,000 tables can be processed in one query.
- viewer: right to read all tables in a dataset
- editor: viewer plus creating, deleting and changing of tables in a dataset
- owner: editor plus creating and deleting datasets
- admin: everything
There are some restrictions you need to keep in mind, when using BigQuery:
- max 100 concurrent interactive queries possible
- max 4 concurrent interactive queries on external sources
- max 6 UDFs in a single interactive query
- max 1,000 updates per table per day
- max query duration is 6 hours
- max 1,000 tables per query
- SQL command can be max 1 MB in size
- max 10,000 query parameters
- max 100 MB / row
- max 1,000 columns per table / query / view
- max 16 levels of nested fields
ETL from relational database to BigQuery
There are several ways to import data from a relational database into BigQuery. This can be done by using the WebUI, in which case the schema is upload able as as JSON file. You can export data to Google Cloud Storage in JSON format and import from there.
Another way is using Google Cloud Dataflow. This can be set up automatically, but you can customize the process and maybe join data already in Dataflow, to denormalize it by using lookup tables as side input.
Backup / Snapshots
BigQuery supports a feature for “Point in Time Snapshots”. If you you a query decorator @<time> in Legacy SQL you can get an image of the table during the specified timestamp. This decorator can contain relative or absolute values.
BigQuery supports machine learning in SQL for structured data. This is easy to learn, as it is based on SQL and not some other programming language. Right now it does provide linear regression, binary logistic regression for classification, multi class logistic regression and k-means clustering. The advantage of using this is, that you only have to use one tool, SQL, for everything. This means, data does not need to be moved from one place to another.
BigQuery does not need normalized data for fast query speed, but rather denormalized data. This means you should use nested and repeated fields in tables. Disk usage is not an issue here and denormalized data saves you the need to join.
You can reuse nested and repeated fields by using formats like AVRO, Parquet, ORC, JSON, Datastore or Firestore.
BigQuery Transfer Service
There is an option to repeatedly load data into BigQuery from Campaign Manager, GCS, S3, AdManager, Google Ads, Google Play and Youtube. You can also load Stackdriver logs into BigQuery.
BigTable is Googles solution for a real time key / value datastore. It handles sparse tables, meaning tables, where a lot of columns in rows are null. Empty columns do not use any space. This system is designed for billions of rows, thousands of columns and data the size of TB or PB.
BigTable only has one key per row, called rowkey. It has small latency and is HBase compatible. Also it scales linearly corresponding to the number of nodes in a cluster. There is an option for replication over two clusters, in which one is used as cold standby. It is possible to change cluster sizes while the system is running, but after that BigTable takes a few minutes to rebalance cluster load.
To make the most out of it, its use case is for high scalability, key / value data and streaming.
BigTable orders columns belonging together into groups or families. Empty columns do not use any space. To make the most of it, key values should be divided equally on all nodes. Data is not stored on the cluster nodes, but on Colossus. The nodes only contain references to the data. This makes it easy to recover data after a system crash and no data is lost if nodes fail.
Data is distributed by the master node to optimize performance. Data locality is important. It can be influenced by key design. Group data that is queried together by designing key in that way. But BigTable does not support SQL or joins and is only really recommended with over 1TB of data.
|SSD||read 10,000 rows in 6 ms|
|write 10,000 rows in 6 ms|
|HDD||read 5,000 rows in 200 ms|
|write 10,000 rows in 50 ms|
Reasons for underperformance
- incorrect schema design: key does not distribute data equally, only one key is possible
- columns are sorted by family and then alphabetical
- reads / writes are scewed
- all operations need to be atomic on a row
- information is distributed over several rows
- row is to big (only 10MB per cell and 100MB per row)
- timestamp is beginning of rowkey
- not using combined keys
- key consists of domains, hash or sequential ids
- too little data (< 300GB)
- test was to short (needs to run at least 10 minutes)
- cluster too small (>70% CPU; 70% space)
- HDD is used
- still is a development instance
- data is distributed automatically due to usage pattern, make sure to give system time to distribute to your needs
The Key Visualizer is a tool for supporting you in finding BigTable usage patterns. It provides graphic reports to help you recognize performance problems, like spotting hotspots (rows with a lot of actions) or rows with too much data. It helps you in getting evenly distributed accesses over all rows.
The reports are generated automatically (daily / hourly) if a table has more than 300GB or at least 10,000 row read or writes per second. The reports also recognize hierarchical keys and provides them as this hierarchy. Also connected keys are summarized.
You can scale BigTable programmatically. This is helpful if you, for example get monitoring data from Stackdriver and then react on this databy scaling up or down. After scaling a cluster it can take up to 20 minutes until you can see better performance, as data needs to be redistributed.
This automatic process does not help in cases where performance peaks for a short period of time. In this case, you need to scale manually before the event. If scaling does not help performance, then probably the key design leads to imbalanced load distribution.
You can scale a running cluster. It is possible to change the following things:
- nodes in cluster
- cluster per instance
- usage profiles
- label / names
- upgrade from development to production instance
Schema design in BigTable is crucial for performance. There is only one index per row, called rowkey. All rows are automatically sorted due to the rowkey, in alphabetical order. And as all operations are atomical per row, rows should not be dependent on one another.
Use key design to make sure data queried together is lying close to each other. And keep in mind, that one cell should not be bigger than 10MB and one row no larger than 100MB. The max values for this is 100MB per cell and 256MB per row.
There are up to 100 column families possible. If all dependent values are in one family, it is possible to only query this family. Less columns is better and column names should be short.
There are several best pratices for key design:
- use inverted domain names, like com.domain.shop
- string ids should not contain hashes, due to readability
- integrate timestamp if time is queried often, but not only timestamp
- use inverted timestamp if newest data is queried more often
- always use grouping field first in key
- try to avoid a lot of updates on a row, try to write several rows instead
Dataflow is a system to develop and run data transformation pipelines in. It has the following key concepts:
- Pipeline: complete process in a DAG
- PCollection: distributed dataset for a pipeline
- PTransform: data processing operation, that takes a PCollection and performs processing. It returns again a PCollection
- ParDo: Parallel processing runs a function on a PCollection
- SideInputs: additional inputs for ParDo / DoFns
It is a best practice to write invalid input to its own sink and not to throw it away.
Messaging queue provided by Google. It keeps data up to seven days. All logs are automatically pushed to Stackdriver for analysis. It has the following attributes:
- messages might not be delivered in the order they were received in, because the system is distributed.
- if message delivery is not acknowledged after a defined time, message will be redelivered. This timespan is configurable
Data Transfer Appliance
Data Transfer Appliance is an encrypted offline data transfer up to PB in size. You get the appliance and ship it back to Google where it will be uploaded into the cloud. This is faster than internet upload and can be handled like a NAS. Only applicable if the data is not changed in between.
Storage Transfer Service
Storage Transfer Service is a transfer service online to GCS or from bucket to bucket. It can be used to backup online data, or as a sync from source to sink, including deletion of source once it is copied. If you need to decide between gsutil and Storage Transfer Service it is:
- local data -> gsutil; this also supports rsync
- other cloud providers -> Storage Transfer
Dedicated Interconnect is a physical connection between on premise and Google Cloud. You provide your own routing equipment. It is possible at 10 or 100Gbit.
Dataproc is a managed Hadoop / Spark cluster in Google Cloud. It offers some specials:
- Cloud Storage Connector: Hadoop or Spark jobs access GCS directly
- data is not transferred to HDFS
- GCS is HDFS compatible, just use gs:// instead of hdfs://
- data still available if cluster is gone
- high availability is possible
- Preemptible workers:
- same machine type is others
- own instance group
- lost instances will be re-added once there are free resources
- save no data
- no clusters with just preemptible nodes
- local disk is not available in hdfs, it is only used for local caching
- Scale cluster:
- scaling is always possible, even with running jobs
Machine Learning Engine
Machine Learning Engine provides predefined scaling values, but there is also a custom tier. There you can change:
- masterType: machine type for the master
- workerCount: number of workers in cluster
- parameterServerCount: number of parameter servers
Data Loss Prevention API
This API finds sensible data (PPI) and can remove it, e.g. personal data, payment data, device identification, also custom and country specific. You can define word lists and regexes. It also provides data masking, removing and encryption. It can be used in streams, text, files, BigQuery, pictures
Natural Language API
Textanalysis, Sentiment Analysis, Topic Mining, People and Location identification
Picture analysis, object recognition, etc
Translation, Speech to Text, Text to Speech, Dialogflow as Chatbot, Video API
Datastore is a NoSQL DB, that is ACID compliant and has a SQL like querying language. There is an index for rows, combined indexes are possible too. It also provides a Firestore export to a GCS bucket.
Reporting solution provided by Google. It provides caching to speed up reports or reduce costs. There are two different kind of caches:
- responsive cache: remembers results of a query and if data is queried again, cache supplies it
- predictive cache:
- analyses dimensions, metrics and filters and tries to predict possible queries.
- queries are executed in background and response is stored in predictable cache
- if responsive cache has no data, predictive cache is asked
- if no data in predictive cache, data is queried from source
- works only with owner credentials
Cache is automatically refreshed, but it can be done manually. The default for refreshing is 15 minutes, but also 4 and 12 hours. You should turn of predictive cache in case of:
- data changes frequently and data freshness is more important than speed
- reduce costs by turning it off