Google Professional Data Engineer – Dataflow ~ Apache Beam

  1. Data Flow Intro

Next up, let’s talk about data flow. Data flow is a way in which to carry out transformations on your data. This is loosely similar to some amalgam of Pig and Spark in the Hadoop ecosystem world. As we discuss data flow, here is a question that I’d like you to keep in mind and try and ponder over. It’s a question we’ve seen before while talking about Pig. In a typical ETL use case, is hive or BigQuery going to be a source or a sync? Recall that ETL stands for Extract, transform, and load. And we had had this conversation in the context of Pig, which is a procedural data flow oriented language which sits squarely in the transform step of an ETL pipeline. In any case, let’s now turn our attention to Dataflow. Dataflow, as its name would suggest, models the flow of data through a set of transformations. If you work with some other technologies, maybe something like Apache Flink, you have encountered the idea of data as flowing through a graph with different nodes in the graph expressing different operations. This is an idea which occurs in fling. It occurs in TensorFlow, and indeed, it is squarely leveraged by data flow.

Data flow is very powerful, and for that reason it is loosely semantically similar to Apache Spark. We’ll have a little more to say on a comparison between Spark and data flow in a bit. The most recent versions of data flow are based on something known as Apache Beam, which once again, we’ll talk about in a moment an older version of data flow. One point, X, was not based on Beam that used Google proprietary technologies. But the basic ideas remain the same, and they would be familiar to you if you have ever worked with a technology like Flink.

Flink is a stream first application. It’s a stream first architecture. We did not discuss Flink in detail, but the basic idea should be familiar to us. After we understand what stream processing is all about, there is a data source. Data comes out of that source, it’s subjected to a set of transformations, and then it’s loaded into a sync. In a streaming application, the data source might be, for instance, a sensor network, which is outputting data. The bit that really matters to us here has to do with the transformations. Those transformations represent different actions, and the outputs of some of these actions are in turn cascaded into the inputs of successive actions.

What this gives us is a nice data structure known as a directed acyclic graph or a Dag. This idea of representing the transformations or the operations which we would like to perform on our data using a Dag is actually a pretty common one. This is extended and it’s used in various technologies flink, TensorFlow and also Apache Beam. There are excellent reasons for the popularity of Dag based representations of data flow operations. Given a directed basic graph, it’s possible to perform something known as a topological sort. This is an ordering of the computation nodes which satisfy all of the precedence relationships.

This also lends itself very nicely to parallelization because it is possible to compute only subsets of a bag in such a way that data processing is minimized. We had seen a little bit of this while talking about lazy evaluation in Spark and notice that this is a different and a more sophisticated approach than that which was taken by Pig. Pig was a procedural language. It had a rather linear flow of command. Now Apache beam, which is the underlying basis of data flow, models all of these transformations as a pipeline. A pipeline is this directed basically graph as a whole. So once again do remember this term. The term pipeline in the context of Apache beam or an application like data flow represents the entire set of calculations. This entire directed a cyclic graph. Personally, I find the term pipeline ever so slightly misleading because it sounds rather linear. An oil pipeline might be linear and flow from point A to point B. But a pipeline in data flow is not linear.

It’s a directed acyclic graph, as with other Dag based representations of data transforms. The edges in this graph are data and in the data flow world there is a specific term for these data items. These are peak collections. Peak collections can be thought of as special containers which could represent either batch or streaming data. So the input into the pipeline is a peak collection and the output is a peak collection as well. All of the vertices are peak collections and they are transformed using, you guessed it, transforms.

Each data processing step in the pipeline is called a transform and the transforms are really the operations which are going to be defined in data flow. At the two ends of this pipeline are the data source and the sync. These are fairly typical of any ETL application. So the data sources might include logs or sources of streaming data like Twitter feeds or sensors emitting temperature information. The sync could be something like BigQuery or big table for that matter.

BigQuery is a fairly typical prototypical in fact example of a sync. It’s an OLAP data warehouse where a whole bunch of data is cleaned up a little bit, that is using transformations in data flow and then loaded into a form which is easy to query. This Apache BM architecture which we just described can actually run on a bunch of backends. Potential backends include spar, data flow and so on. So let’s make sure we really understand each of these terms again. A pipeline is a single potentially repeatable job defined from start to finish either in data flow or in an alternative backend such as Spark. This pipeline represents a directed AC click graph. In that directed asylic graph, the edges correspond to peak collections. These are specialized container classes which can represent data which is either bounded or unbounded in size. In other words, it’s possible to use peak collections for both batch and streaming applications. P collections serve as inputs into transforms.

Those transforms actually output P collections as well. Transforms represent the nodes in our directed ACK graph they take in peak collections, perform processing that we specify, and then output another peak collection. The peak collections are immutable exactly as you would expect them to be and exactly as RDDs are in Spark. The sources and the syncs represent different data storage formats. As discussed, these could be cloud storage files BigQuery, BigTable, and so on. Let’s double click ever so slightly on each of these terms, starting with a pipeline. The pipeline is the entire set of computations I e. It’s the Dag, as we’ve already discussed.

You can think of a pipeline as an entire end to end job. You can think of a pipeline as encapsulating a certain meaningful set of computations. Once again, as discussed, the pipeline deals entirely with peak collections. Peak collections go in and peak collections come out. A pipeline needs to be defined by a driver program which holds things together. The computations run on a back end. That back end could be something like Spark or something like data flow. There are also other backends available. The interactions between the pipeline and the back end are abstracted by something known as the runner. It’s worth a while to understand these terms driver and runner in a little more detail, so let’s do exactly that. The driver is a program which defines the computation bag, that is, the set of steps.

The runner can be thought of as executing that Dag on a back end. Now, the back ends supported by Apache Beam are multiple spark, Flink, data flow, and something known as beam model. Any one of these could be interfaced to a driver using a runner. We’ll have a lot more to do with creating beams and driver programs in the demos. But even so, let’s understand the structure of a typical beam driver program. The first step is usually to create a pipeline object. Next, an initial peak collection is going to be created. This will be fed into the pipeline, and the data into this initial peak collection might come from either a source API, something that reads data from an external source, or possibly by building a peak collection from some in memory data. This serves as the input P collection into a pipeline. The next step is for the driver program to actually wire up the computation tag. This will define the pipeline with all of the transforms to change, filter, group, or analyze P collections in whatever manner required here. Keep in mind that the transforms do not change the input P collections.

The peak collections are immutable. This pipeline will then have an output peak collection. That output peak collection represents the final transformed peak collection, singular or plural, which can then be written to some kind of sync using something known as a sync API. This will cause the data to be written to an external source. But all that we’ve done so far is to actually define the pipeline. We have not evaluated it. We have not run it. To actually run the pipeline, we are going to make use of something known as the designated pipeline runner. This will have the effect of actually carrying out all of the steps in the program so far on a back end.

And that back end could be Spark or Flank or Data Flow or any of the other alternatives that Apache Beam supports. This is an important distinction, the difference between defining the set of steps and executing that set of steps. This is something which we encountered in Spark. It is something which we will encounter when we discuss Tensor Flow. And it is also very much a part of the Apache Beam architecture. The pipeline runner has this role to play.

It interfaces with the back end. And the great advantage of this is that we have an abstraction over that back end. The Beam architecture allows us to work with any one of a number of different backends. Coming back now to the question that we had posed at the start of this video, it’s pretty apparent that Data Flow is a transformation tool. It’s very similar to Pig, although it’s way more powerful. And like any transformation tool which occurs in the middle of an extract, transform, and load set of pipelines, the output is going to go into a sync like Hive or BigQuery. This is an important point, and that’s why we have repeated this question. This is why this point is worth belaboring. Data warehouses like Hive or BigQuery are typically the syncs at the end of an ETL pipeline. Transformation tools like Pig or Spark or Data Flow are responsible for getting the data in there in the first place.

  1. Apache Beam

Now that we’ve got a decent sense of what the driver program does and what the backend runner is, let’s turn our attention to the remaining elements of the pipeline, starting with the edges, which are of course, Peak Collections. A Peak Collection is an abstraction for a data set. You can think of it as a specialized container class which can contain data which is either bounded or unbounded. Fixed size peak Collections can can include text files or BigQuery tables. Unbounded peak collections can include pub sub subscriptions. Pub sub is a messaging service from the Google Cloud platform which we are going to discuss in just a little bit. Peak collections represent the edges in the directed asylum graph. There are also something known as side inputs. These are ways to inject additional data as an input into some transform. We’ll have a little more to do with side inputs in the demos which are coming up. But the basic idea is that the side inputs are, in a sense, directly injected into specific nodes or specific transforms of the computation Dag. Peak Collections are the edges in the pipeline. The nodes in that pipeline dag are of course, the transforms.

And the transforms really are at the heart of processing in the beam architecture. These peak in Peak Collections as inputs perform processing functions on the elements and then produce output. Peak collections. The Apache Beam docs tend to explain this using something known as the what, where, when, how model. So let’s plunge right into that. Let’s take a look at how the Apache Beam docs describe transforms. The what in terms of transforms refer to what is being computed. You can see that the columns in these graphs represent the different back ends. So. For instance, beam model data flow, flink, Spark, Apache, Apex and Apache. Gear pump. And the rows in this table represent capabilities or operations that can be performed in your transform. The first of these parallel do stands for parallel do. The second group by key. The third is flattened. The fourth is combined. You can see that these operations have a very functional flavor to them and that’s because all of these operations tend to take in P collections and then do stuff with their elements.

And that’s why this table is used in the beam documentation to represent what can be computed in a transform. There is a similar table for the where this answers where in event time is the transform going to do its thing? Now, if the rows in the what table vaguely reminded you of Spark, the rows in the where table ought to vaguely remind you of stream processing and windows, for instance, global Windows, Fixed Windows, Sliding Windows, Session windows, and so on. These all refer to those subsets of the input P collection in terms of event time that the transform will operate over. A similar table describes when the transforms will occur in processing time.

And you can see that the rows in this table correspond to types of triggers, configurable triggers, event time triggers, processing time triggers, and so on. And lastly, there is also a similar table for how this has to do with concepts such as accumulation, retracting, and discarding. Collectively, this what, where, when, and how matrix gives us a good sense of the capabilities of the different transfer forms in a computation graph in the Apache B architecture. Let’s also very quickly talk about the source and the sync. Neither of these ought to come as big surprises to us. We’ve already spoken about how the sources are likely to be raw sources of data. This could include either bounded or unbounded data, and the syncs typically would include something like BigQuery tables.

  1. Lab: Running A Python Data flow Program

In this lecture we’ll run a data flow program in Python and look at a bunch of transformations that we can implement in Dataflow. You should be able to answer the question what is the transformation which produces zero or more outputs for every corresponding record at its input? We look at how we can implement the Grep functionality in data flow using Python. What I’m going to show you in this lecture is a part of Google’s Code Labs. All the code for this will be available in this training Data Analyst GitHub repository that you should be very familiar with. Now we are going to run one of the labs in this repo. This is lab two, and we’ll run our very first data flow in Python.

We’ll then see the same data flow implemented in Java. In the next lecture, let’s look at some of the scripts that we’ll run in this lab. Install underscorepackages Sh, installs all those Python packages that we need to run this data flow program. The list of packages are right here. We want to make sure that we have Python pip that’s our aptget install. We then pip install Google cloud dataflow. That is the dataflow library with OAuth client 30. We also want to upgrade our Pip version to the very latest one. Run this script with administrative privileges and you’ll see that it runs through and installs everything you need to run your dataflow program. At the end, the script asks you to confirm that you have the very latest version of Dataflow. Running pipv shows us that we have Pip version 9. 1 for Python 2. 7. All looks good in that same lab two B Python directory.

Open up the Grep dot PY to see how data flow operations are written using the Apache Beam libraries. I’m showing you this on my local machine using Sublime Text as an editor, so you can get the nice large font size and the syntax highlighting and so on. If you’re going to use Cloud Shell to run it and you view it there, you will have to use the Nano editor. Here are the packages that we’ll be using Apache Beam as beam regular expressions, and the Sys package. As you’ve studied earlier in this course, dataflow is all about instantiating and executing Beam pipelines, which contain a series of transformations set up in the form of a directed a cyclic graph. In order to set up those transformations, you first need to set up your pipeline, and that’s what Beam Pipeline does. The Apache Beam pipeline is the top level data flow object that you have to instantiate at the start of any data flow program. The input to this data flow is all the files that’s located in a particular directory.

That directory name is specified in the input variable. The output prefix for where we’ll store the results is an empower slash output, and the search term that we use to perform our Grep operation will be the import statement. So we want to see all the packages that are used within Java files in this particular directory. The transformations that we perform within this pipeline are Ptransform objects that operate on P collections. So you start off with the pipeline P and you use the pipe operator to add all the transformations that you’re going to perform on this pipeline. The first transformation is to read the contents of all the Java files from the input directory.

We know these are Java files because the input directory has a wild card which says star Java. These transformations which form nodes in our directed a cyclic graphs can be given names. Here the name is get Java. It will be useful when we visualize this data flow using our monitoring console. The actual grep operation is done in this single transformation. This transformation is a flat map. A flat map operates on every single input record that’s passed to it. And for every record it may produce zero records at the output one record or any number of records. The flat map transformation takes in a lambda function which determines what the action is that’s going to be performed on the input record. The output of the previous transformation is a collection of lines which form the contents of our Java files.

And this transformation takes in as input one line and produces an output which can have zero or more records for each line. We’ll apply the My grep function on every line that’s passed into the input of the flat map. Take a look at the My grep code in this file. It takes in a line and a term to grep for within that line it performs a regular expression map to check whether the term exists in that line. If it does, then return the line. Otherwise output nothing. And finally, the third transformation that we perform is a write operation where we write out to file all lines which contain the search term. The search term here is import. The pipeline has been set up. The last line simply executes it. We’ll first run this grep code locally on our cloud shell instance. This is a local execution. It’s not on the cloud. To run it on the cloud, we need to specify some additional parameters which we look at in just a bit. If you get this warning when you run this code, you can go ahead and ignore it. It just means that the logs will be written to standard error because log handlers could not be found.

The output will be located at forward slash temp and all files will have the prefix output. You can simply cat this and see what the result looks like. Every line in the Java files located in our input directory which contain the import search term. The job ran on your local machine just fine. That means it’s ready to be deployed and run on the cloud. In order to monitor your job when it runs on the Cloud, go to Data Flow on your side navigation bar. When we run our data flow on the Cloud, it cannot access files on our local machine. It doesn’t make sense for it, though. The natural place to store these input files is in cloud storage. We’ll use gsutil to copy files from our local machine onto our bucket. In cloud storage, we’ll have the input Java files live in our Loony us bucket in a directory called Java Help. The Python file in this lab, which has the correct parameters set up in order to run on the Cloud, is the Grepc PY file.

You’ll need to make some changes within this file to run it within your project and have it access your bucket. Update the project and bucket variables specified within this file to map to your project ID and your bucket. If you’re working on Cloud Shell, ensure that you’ve opened up this file on your Cloud shell instance using Nano and you’ve made the edits there. I’ll go back to examining this on the sublime text editor that I have opened on my local machine. What makes this program run on the Cloud is the arguments that you specify to its pipeline. These arguments indicate to the Google Cloud dataflow pipeline that you want this particular Python program, which is written in Apache Beam, to run within this project. This is the name of the data flow job. Example job two. Data flow jobs require a staging location where intermediate files might be stored.

Specify a folder on your bucket where staging data can be stored. And finally, this is the argument that tells Beam that this Beam pipeline has to be executed on the Google Cloud. The runner that you specify tells Beam where the execution will be. It can run on Spark, it can run on Flink, it can run locally. Data Flow Runner indicates that it has to be run on Google’s Cloud. And these arguments that are passed into the instance of your pipeline that set up here is the only difference between running it locally and running it on the Cloud. If you look at the rest of the code, you’ll find that it’s exactly the same that we saw earlier when we wanted to run this Python program locally. Execute this program on the command line exactly like you did before python grepsie PY the job has been submitted to the Cloud, and on your dataflow page on Google Cloud web console, you’ll find example job two running.

Click on example job Two, and some of you might find that no execution graph shows up on this page. Here you should find an execution graph showing what transformations will be applied to your data. I click the exclamation point and the logs on the top right, and I find that there are some errors in my logs. Let’s see if these logs can help us figure out what’s going on. Failure to retrieve the job craft. Please see job logs. Okay, I’m looking at the logs. It says workflow failed. There was a problem refreshing your credentials. I clicked on this log message, and the detailed logs give me more information. It asked me to check for two specific things. One, whether the Data Flow API is enabled for my project. I don’t think I did that. And two, whether there is a service account associated with my project.

I don’t think I did that either. But a service account is generally set up by default, and that’s what you would use to run your programs. I’m pretty sure I didn’t enable the Data Flow API for my project. It has to be explicitly done. On the side navigation bar, go to API Manager and Dashboard. Within it, on the page that you land in, click on Enable API at the top there. This will give you a list of APIs that are enabled for your project. A quick glance should show you that the Data Flow API is not present in this list. Click on Enable API at the top and then in the text box that shows up in the next screen, type in Data Flow. This will give you some autocomplete to show you what APIs are available. There. You see it? The Google data flow API. Click through to go to that page and click on Enable API to set it up. All right, now that our API is enabled for data flow, we can run our Apache beam program on the cloud. Switch to the Dataflow page on your dashboard. In order to monitor the job that you’re going to submit, go to the Cloud shell command line and type in Python Grepsie PY. Click through example job two. And there you see it, our Data Flow graph.

Click on logs on the top right and you’ll find that they look fine. No warnings there on the right. Stackdriver monitoring gives you a whole bunch of information on the job that you are running. The name of the job, the job ID which it has generated for you. The job is running. At this point, you can click Stop a Job if you want to stop the execution. On the cloud. You can see other details as well what SDK version this particular job is using, how long it has been running, and so on. Below that, you have information about how much resources this particular job consumes. So far, no virtual CPUs have been assigned to this job. That’s why vCPUs are zero.

As you keep watching, you’ll find that the number of vCPUs assigned to this job will scale up to one. This is auto scaling. Just click anywhere on the page and you’ll see a different set of information. Notice auto Scaling shows you that the number of workers assigned to your job is one. Notice that the execution graph is a directed Acyclic graph. The nodes or vertices in this graph are the transformations that you perform on the data and the edges are P collections. The visuals of this execution graph will change when different transformations have completed. Get java. And Grep has now completed Write is still running. And at some point Write will complete as well. Your execution is now complete. Your pipeline has run through successfully on the cloud. Now can you tell me what is the transformation that you would apply which takes in a single record as its input and produces zero or more records at the output? That’s right. It’s the flat map. You.