Google Professional Data Engineer – Dataflow ~ Apache Beam Part 2
Lab: Running A Java Data flow Program In this lecture, you’ll understand why you would choose to use the Pair Do class while running transforms in a Java pipeline, we’ll implement the same Grep program that we saw earlier, but this time we are going to do it in Java and we’ll run it on the Cloud and see how that works. The code for this Java implementation is also part of Google’s Code Labs for the Data Engineer certification. This code is available in training data analyst courses. Data…
In this lecture, you’ll understand why you would choose to use the Pair Do class while running transforms in a Java pipeline, we’ll implement the same Grep program that we saw earlier, but this time we are going to do it in Java and we’ll run it on the Cloud and see how that works. The code for this Java implementation is also part of Google’s Code Labs for the Data Engineer certification. This code is available in training data analyst courses. Data analysis, lab two. This, of course, means that you have to get clone the Training Data Analyst repository to your Cloud Shell machine or your local machine if that’s where you’re running the code. Under this lab two folder are demos for both Java and Python. We’ve already executed and are done with the Python code. Let’s now look at the Java code. In order to install all the packages that I need to run this code in Java, I’m going to use the Apache Maven tool. The Maven build tool is already available to us in Cloud Shell.
We don’t have to do anything special to install it. You can simply run MVN on your Cloud Shell command line to confirm that this is the case. Here on screen is the Maven command that I plan to run. I’m going to generate these packages. I’m going to download it from the Maven repository. Google Cloud. Dataflow Java archetypes Starter. That’s the name of my artifact. ID. The Archetype group ID is. com Google Cloud Dataflow and the group ID is. com Google Cloud Training Data Analyst. Java health that’s also the name of the package which contains the Grep Java code. The remaining arguments are the Artifact version, interactive mode and so on that you need to specify. When you install these libraries that you need run this Maven command on your Cloud Shell command line. It will run for a bit because it’s downloading a whole bunch of information. Wait for this to complete. This is when all your dependencies have been downloaded and installed. Change directory to the Java Help directory under here and let’s take a look at the Grep Java file. The Java file is present under the Source main directory at this long path that you can see on screen.
We’ll view the contents of this Grep Java file in the Sublime text on my local machine. These are all the Java libraries that we use to set up a data flow pipeline in Apache Beam. Let’s look at the individual libraries that we import. Pipeline Options is an interface that’s used to configure Beam pipelines. All the arguments that you pass in through the command line will be passed to the pipeline via Pipeline Options. The Pipeline Options factory is used to create an instance of Pipeline Options from the command line. Arguments that you pass in the pipeline is the Directed Acyclic graph which has P transforms as nodes and P collections as edges. Text IO is used to read data in text format from files. The do function is the class which holds the code that applies transformations on individual elements from the Collection that’s passed into it. Do functions are passed in as arguments to pair do. Paradox is the class which performs the core element wise transform in Apache Beam.
The individual elements in the Collection that is passed in can be processed independently and may be in parallel across distributed computing resources. You can imagine that the processing that occurs in Paradox is the equivalent of the mapper phase. In traditional MapReduce, it occurs in parallel across multiple machines. The code for our data flow will be written within this Grep class. If you think carefully about how Grep works, it will become pretty obvious to you that Grep is a map only job. You can think of Grep occurring parallel with all the mapper faces running which checks for the presence of a particular word across all lines in the input text. There is no reduce phase. A line is present in the output if it has the search term. It’s absent from the output if it doesn’t. The command line arguments that are passed into our Grep function are parsed by Pipeline Options Factory and also validated. Create an instance of the pipeline P with the configuration options.
Input text files that we want to parse are present in the Java help folder. We are interested only in the Java source code files. There, the output prefix for our output files are forward slash tempoutput, and the search term is input. You can choose to change the search term if you want to run a slightly different prep. Now come the transformations which are applied on P the pipeline. The Get Java transformation reads in all the input text files. The result of this transformation is a P collection of lines where each line is a string from the input source code. The next transform is the actual Grep. The logic of this transformation processes each line to see whether the search term is present or not. The equivalent of this in Python was the flat map. In Java, though, there’s a whole bunch of boilerplate code that you need to specify. Let’s see some of this boilerplate.
We set up a pair do class which is the core element of processing. The input to pair do is a do function which specifies what the actual processing should be. The do function is a generic class with templateized, input and output parameters. Here, each element in the input P collection is a string, and the output of this transformation is also a string. Only those input text lines which contain the search term. The process context that’s passed in to the process element within a do function. Think of that as a bridge to the external environment. Any information that you need from the environment or that you need to pass to the environment, you’ll do via the process context. After all, this boilerplate set up here is the actual code to perform the Grep operation. This should be familiar to you. And finally, we want to write out the results to a file on our local machine. We have the pipeline set up. We are now ready to execute it. Switch over to your Cloud shell. I’m going to go ahead and change my prompt so you can read the very long commands that I’ll paste in. Here. We are going to run this Java grab code on our local machine. Local machine here. Being our Cloud Shell instance, ensure that the path to your JDK is available in the path environment variable. Ensure that this is your current working directory. Under Java help. We’ll use Apache Maven to execute our Java grep program locally on Cloud shell. Remember, this is a local execution because our command line arguments do not specify a data flow run off. Wait for this to run through successfully, and then you can check within your forward slashemp folder for output text. You’ll get a list of all the lines from our Java code which have the term import within it. We’ll now set things up so that we can run this Java code on the cloud. The first thing, if you remember, is to copy over all the source files on which we want to perform this Grep operation to our cloud storage. It should already be present from your last demo.
You can use gsutil CP to copy it over if you need to. You can confirm that the files have been copied over successfully by going to cloud storage on your web console. Okay, within looney us bucket Java Help here are my source Java files. Switch over to the Grep Java code and let’s update it so that it can run on the cloud. First is we want to read from our bucket rather than the local file system. The input variable is set to where our source files live on our bucket. Similarly, we will update the output prefix to point to our bucket as well. This is in a folder named Output under Java Help in loony us bucket. Once again, if you’re working with Cloud Shell, use the Nano editor in order to edit Grep Java. Right there. The mevin command to run this Java program on the cloud comes in a script file that’s present along with the rest of this lab. Under lab two. Java help open up the run on cloud one.
Sh. At the very top are instructions on how you can execute this script you’ll call runoncloud sh. Specify the name of your project, the name of the bucket, and the main class that you want to execute. Once you pass these in on the command line, assign them to individual variables. Project Bucket and Main it’s assumed here that your main class lives in the Java Help package. The script ensures that the Java JDK installed is present in your Path Environment variable. Here is the maven compile. The actual compilation will happen on your local machine. And once compilation is complete, here are the remaining parameters which we need to specify in order to run this jar on the cloud. The project ID, the staging location, a temporary location, and finally the Data Flow Runner.
This is what indicates to Apache Beam that this has to be executed on the Google cloud. Execute the run on cloud one Script pass in your project, your Bucket and Greg, that’s the main class that you want to execute as it’s building this project. Let’s go ahead and switch to Dataflow to monitor the progress of our job. Our Dataflow console has our previous runs of example job too, and right above that should be the new job that you just submitted to Dataflow r Java Grip. Click on the Job ID and you’ll see the execution graph for this Grep operation. The execution graph should be exactly the same as in the Python implementation get Java, Grep and Textron Write.
Let’s look at the monitoring statistics in a little more detail this time around. There is a Job summary, job name and Job ID, and the current status running for this job. If you click on an individual vertex of this execution graph, you’ll see statistics for that particular transformation. This will give you insights into that particular operation, and it will help you see where your pipeline is lagging. What operations are slow? Can they be sped up? These are decisions that you can make based on these statistics. We are in very early stages of our data flow execution for this program, which is why Output collections has nothing. Get Java has not produced any elements at its output. The same thing holds true for the Grep transformation. The input to Grep is nothing, we don’t have it yet, and the output collections is also empty. This obviously means that nothing has been fed into text IO, right, which is our sync, our data sync. Input collections has no information.
To let the program run for a little bit, our pipeline will slowly fill up. Notice now that the number of elements passed into Grep is 213 and the number of elements that we’ve received from the output of Grep is 29. The change in these number of elements from input to output and the corresponding data sizes represent the transformation that Grep applied to the input data. It basically filtered the input data and output only those lines of text which contain the input search term. Let’s now systematically look at each P transform node. Get Java is the data source. It reads in input source files. It has read in 508 lines of Java code. These 508 lines have been passed on to the input of CReP.
CReP operates on these lines and finds those lines which has the search term within it, and the resultant output is just 64 lines. At this point in time, only 64 lines contain the word input. Notice that these input and output numbers are different from those we saw just a little bit earlier. That’s because as the program has been running, more collection items have been processed, and at this point in time, those 64 lines are being written out using Textio write. Wait for a bit.
Let the program run through till your execution graph says that every node or every transform has succeeded. If you look at the auto scaling parameters, once the program is complete, you’ll notice that the number of workers will move from one to zero. Wait a little while and you’ll find that it will turn to zero at some point. It should be trivial for you to answer this question now. The pair do class is the core element of transformation in the Java Apache bean pipeline, and it will parallely process your data elements across multiple machines.
At the end of this lecture, you should be able to answer this question how do you specify custom pipeline configuration parameters when you set up a data flow? In this lecture, we’ll focus on implementing and executing the simplest of all map reducers the Hello World of MapReduce. The word count operation in dataflow. You guys already know this, but this is just a reminder. If you want to run dataflow programs on the cloud, make sure that your data flow APIs are enabled. Go to API Manager and check what APIs are enabled for your project. At the very top, you’ll see APIs which are in use, and below that, the ones that are enabled but not being used yet. Dataflow is not present on my list currently. I think I disabled it while I was playing around on this cloud platform web console. So I’m going to go ahead and re enable it.
Click on Enable API, find the data flow API, and click Enable on this page. The code for this word count program is available in Google’s Dataflow Starter Kit. I’m going to go ahead and download the code using this maven command. The Artifact ID is Google Cloud dataflow java archetype examples and the Archetype group ID is. com Google Cloud dataflow. And these are the other maven parameters that you need to download this Starter Kit. Once this download is complete, you can look in your current working directory and you’ll find a directory called First Data Flow. Let’s set up some environment variables that our program will use. Export the project underscore ID environment variable and point to your current project. Also specify the bucket which we’ll use to store staging and temporary data. This bucket can also hold your source text files, but the input source text files are not specified using this environment variable. We’ll use a command line parameter to our data flow job for that the Word Count of Java file which contains the data flow that we are going to execute in the cloud.
Our Hello World MapReduce operation lives at this path that you see on screen. Let’s examine the transformations in wordcount java. Open up this file in your favorite editor and let’s walk through it. Notice that we have a bunch of new imports here that we haven’t seen earlier. Dataflow pipeline options, default value, factory default, and so on. This is because when we run this code, we want to specify some custom configuration options for our pipeline. There are a bunch of new transformations that we haven’t seen before as well. The aggregator Count, Ptransform and Sum in addition to do, function and parado. Their names are descriptive by themselves. You can kind of get an idea of what they do. We’ll see them in action in just a minute.
Anyway, the GCs Path class implements Java’s Path API for Google cloud storage. That’s what GCs stands for. This allows you to work with and manipulate files on cloud storage. The KV class represents an Immutable key value pair and it’s useful when we have P collections where every entity in the collection is a key value pair. Let’s run through the overall structure of this program by looking at the main method. We’ll then dive into the details of every transformation. The pipeline Options Factory passes the arguments that are passed into the command line and stores it in a class which implements the Word Count options interface.
This is a custom interface that we have defined which extends the Pipeline Options interface. That’s because we have custom parameters to pass in when we instantiate our pipeline. Once the pipeline is instantiated, we use Textio read to read data line by line where every line is a string from the input file. The input file we get from our options which can be passed in as a command line parameter. The output of this transformation will be a peak collection of lines. To that we apply a transformation that will split these lines into words and count the number of words in traditional map reduce. This would be both the map and the reduce operation.
We’ve kind of combined them into one transformation. Once we’ve got the words and the associated count, we want to format it nicely as a string. This we do in the function format as text function which is run as a parado operation. And finally we write it out to our output file once again specified as a configuration parameter to our pipeline. We’ll first look at word count options in some detail. This is the custom configuration for our pipeline. This specific pipeline requires an input file as well as an output file where word counts are stored. Word count options is an interface which extends the pipeline options. Interface? The interface method specifies your custom configuration parameters.
Note the annotations on each interface method. This is what allows Beep to generate a class corresponding to this interface. The interface has getter and setter methods for this input file. The default value specified by the at default string annotation points to a file stored on a bucket that is publicly accessible. Similarly, the other custom configuration parameter is the output file. The interface specifies the getter and setter methods for this as well. The default value for the output file is not a simple string. That’s because we can’t directly just access a public bucket. If a whole bunch of students ran this program, we would fill up the public bucket and it would run out of storage. Instead, the default is set up by a class called Output factory. An output factory is a class that we define. Let’s take a look at this output Factory. This implements the default value factory interface. Template parameter for this generic interface indicates the data type of the default value that we are going to return. The data type for our case is string. The output file is a string path to that file. Here is how we generate the default path to the output file.
We first check the existing data flow options to see if a staging location has been specified. The staging location is something you might remember from our previous runs of a data flow program on Google Cloud. This is generally a folder in a bucket that you have write access to. The default location of the output file which contains the final word counts will be in this staging location in a file called Counts Text. If no staging location is specified, we’ll throw an illegal argument exception which says we must specify an output or a staging location for this particular data flow program to run. Now let’s look at the count words transformation.
Our count words class extends P transform. All transforms in Apache beam are essentially P transforms. The Ptransform is a generic class which takes in two template parameters. The first template parameter, which in this case is p collection string, is the input data type to this transform. The input to it is a collection of strings where each string represents one line from the files that we read. The output of this transformation is also a P collection and that forms the second template parameter of this p transform generic class. The output data type is a p collection of key value pairs where the key is a string and the value is a long. The apply method within this Ptransform is where the logic of the transformation lives. The input argument to that is a P collection of strings.
The lines which we want to split into words. We want to convert this input lines of text into individual words. To every line we apply the extract words function. The extract words function is a do function and it’s applied using a pair do applied in parallel. The result is a words P collection, a p collection of strings where every string is an individual word. We now want to count the number of occurrences of each word, and this we can do in a single line. We simply apply the count aggregation and we say per element. We want the counts to be applied on a per word basis. You can think of this operation as the reduce fees in the word count Map Reduce. The result of this transformation is a peak collection of key value pairs, every word associated with its count.
We’ll now quickly look at extract words function. This extends the do function class. It’s a generic class with input and output data types that you have to specify the input to. This do function is a string. The output is also a string. Within this function we can instantiate an aggregator. An aggregator is essentially an abstraction which allows you to monitor some value across all your parallel processes. This aggregator will count the number of empty lines across your entire word count operation. An aggregator completely abstracts this fact away from you.
All of these operations, remember, under the hood are running in parallel across multiple machines. The aggregator simply collects all the results in one place. Now we come to the actual processing. To extract words from lines, we’ll first check whether it’s an empty line. If it’s an empty line, we’ll add one to our count of empty lines for every element from the process context, this element is the input element. To this two function, simply split it into words. If the word is not empty, simply output it. C output will add it to an output p collection. And finally, the only function left to be explained is the format as text function. Another do function which is implemented in parallel using pair do to format the word count results in the form of text. Every input element is a key value pair, where the key is a string and the value is of type long. Simply format it in the form of text. At the end of this lecture, you now know how to set up custom pipeline configuration parameters in your data flow. Simply set up your own options interface which extends from pipeline options set up the correct getters and setters and the annotations for these getters incentives.
We’ve run a bunch of dataflow programs on the Cloud and seen how to monitor them. In this lecture, you’ll learn what the wall time represents for the transformations in your execution graph. We’ve looked at the Java code for our workout example. Now we’ll see how we can run it on the Cloud. Here is the Maven command that will run on our cloud shell. To compile this program and run it on the Cloud, use Maven compile. You want to compile it as a Java program. The complete path to your class, including the package, is. com example Wordcount. That’s what you specify next. Next comes arguments to configure your Apache beam pipeline. It needs the project ID. We specify both the staging location as well as the output. Both of these are buckets on our cloud storage. The output is where our final word counts will be written.
The runner that we’ll use to execute this example on the cloud is the blocking dataflow pipeline runner. This is exactly the same as the dataflow pipeline runner, except that it’s a blocking call. When you launch this job, you’ll wait for the job to complete before returning. While this command builds and runs this Java program, let’s switch over to our dataflow web console to look at logs and monitoring stats. Our job instance will show up on the Data Flow dashboard, and we can click on it to get more details. Click through and you’ll see the execution graph.
This execution graph has four transformations read, count, words, format, as text, and finally write counts. You can click on the drop down arrow associated with each node or vertex to see more detail about each transform wordcount. Count Words has a parallel operation which extracts the words and then does a count per element where it sums up the word count for each unique word. So you can see the transforms that make up the logic of count words as our data flow directed. Asylick graph slowly starts executing its individual nodes, you can see the auto scaling. There we go from zero workers to one.
As the nodes in your execution graph are running their transforms, notice that there is a number associated with each of these. Read line says 1 second, count word says 1 second, and so on. This number displayed in every execution node of this graph is the wall time. The wall time is the total amount of time that this particular transformation took to process elements.
The wall time is the approximate time spent in this particular step on initializing processing, data shuffling data, and finally terminating processing. If you notice the wall time for right counts is the highest, the wall time will give you a very good idea of how long a particular transformation takes, and it will help you identify bottlenecks in your execution graph. Once your job has completed successfully, google Cloud provides you with a very nice graph of how scaling took place in the cluster when your job was run. You can see a graphical history of how workers were allocated to your job.
Our job just had one worker allocated to it, which is why this graph is less interesting than it would be if there were multiple workers processing this pipeline. If you remember our data flow program, we used an aggregator to count the number of empty lines in the input text files. In addition to finding word counts, the total number of empty lines in our input text is available here under custom counters. There were 1663. The word count program seems to have run through fine.
To see the final result, go to your cloud storage bucket. Loony Asia bucket was where we stored the data and look at the output files. You can open up any of these files and see the word counts. For Shakespeare’s King Lear, that was the default input file specified. You now know that the wall time for a particular transform is the approximate time spent on that particular step. This includes initialization of data processing, shuffling, and everything that goes on under there. It’s very helpful to identify bottlenecks in your dataflow pipeline.