Google Professional Data Engineer – Dataflow ~ Apache Beam Part 3
Lab: Executing MapReduce In Dataflow In Python At the end of this lecture, you should be able to answer this question comfortably what Apache Beam library would you choose to find the top end elements in a list? What is a library that’s tailor made for this action? In this lecture, we’ll look at a MapReduce operation in Python using the Apache Beam libraries, and we’ll see how we can execute this locally and what parameters we need to specify to execute this on the cloud. This MapReduce program is…
At the end of this lecture, you should be able to answer this question comfortably what Apache Beam library would you choose to find the top end elements in a list? What is a library that’s tailor made for this action? In this lecture, we’ll look at a MapReduce operation in Python using the Apache Beam libraries, and we’ll see how we can execute this locally and what parameters we need to specify to execute this on the cloud.
This MapReduce program is part of lab two in the Training Data Analyst repository, and the path to this file is as shown on screen. You should already have all the Python packages that you need to run this program installed on your cloud shell machine, but just in case, you can run the install packages script that’s present in this directory.
Ensure you specify sudo before you run it. Though we’ve looked at the contents of this script earlier. It installs the latest version of Pip, Google cloud dataflow libraries, and everything else that we need to run this MapReduce. Run pitv to check that you have the latest version of Pip. Everything looks good here. Let’s get started. Open up the Iscor popular PY Python file in the lab.
Two Vythorndirectory. This is the MapReduce program that we’ll run. I’m calling this a MapReduce operation because you can clearly identify what transformations would be part of the mapper phase and what transformations would be part of the reduced phase. In a data flow operation. Though we just specify the transformations using Apache Beam, we don’t worry about map and reduce phases. The objective of this code is to find the most popular Java packages in any source directory.
Popularity of a package is determined by how often that package is used in other Java files, and a measure for this is the count of import statements for a particular package in Java files. The more a particular package is imported in another Java file, the more popular that package is. This will build on top of the Grep operation that we saw earlier. We’ve spoken of Grep as a map only job. This is a map reduce. The argument parser class in the Reparse library is the recommended Python parser for command line arguments. Specify the command line arguments that you expect the default values, and it will parse the command line to see if the arguments are present and valid. In our program, we expect the output prefix command line argument to specify where the result of the MapReduce operation will be stored, and the input argument indicates which directory is the source directory where you want to read in the contents of the Java files.
To determine the most popular package, parse the command line arguments and instantiate your beam pipeline. The input is a regular expression which looks only at the start. Java files all files with a Java extension within the input directory specified. The output prefix is used in the same manner as before. There is no explanation necessary here. And the keyword that we search for or prep for within our Java files is the import keyword. We know that packages are used based on what packages are imported into each Java files. If a package is imported into multiple files, it’s a more popular package. In a traditional MapReduce operation, the next few transforms would all be part of the mapper phase.
The first step is to read lines of text from the input files. We then apply a flat map operation which takes every line and checks whether the keyword which in this case is import is present in that line. This is very similar to the Grep operation. If the keyword is present, then the line is included in the output of this flat map. The one difference from Grep is that we don’t look for the existence of the import search term or keyword. We want that line to start with import.
If you remember the flat map operation for one record at its input, it can produce zero or more records up to any number at its output. For every line that is an import statement, we need to parse what is the package that is used. For example, if your import statement was import Java utils list, the packages that are in use would be the Java package, Java utils package, and so on. Each time a package is encountered, this step outputs a key value pair the name of the package and an associated count of one. In the total use step, we count the occurrence of each package across all Java files. The next few operations would be part of the reduced phase. In traditional macro duce, the key here at its input is the name of the package and the value here is a count of one. The combined by key operation simply sums up all the counts for each key. There is a Nifty Apache beam library that you can use to find the top N in any list. Simply call Beam transformers, combiners top off, specify the value of N.
Here we want to find the five most popular and specify a custom function for comparison. Our comparison function is the buy underscore value function traditional MapReduce. This step could occur in the reduced phase. Or in order to improve the efficiency of your MapReduce, you might want to set it up as a combiner at the end of the map phase on every machine that the mapper runs. And finally, in the last step, we write out the output. The data sync. Let’s take a quick look at some of the functions that we’ve referenced in our pipeline transformations. The starts with function simply checks whether a specified line starts with a specified term.
The split package name takes as its input the package name specified in the import statement and then splits it up into the list of packages that are actually used. For example app name library widget name returns the package example package, example app, name package, and so on. Get Packages does some clean up to find the exact name of the package that has been imported and then references the split package name to split up an import statement into its corresponding list of packages. Package Use simply gets a list of all packages from a single import statement and then emits a key value pair or a tuple for every package and associated count of one.
The buy value function is essentially a comparator between two key value pairs. The comparison is performed on the basis of the value when the key is the name of a package and the value is the count associated with that package, how many times that package has been referenced in Java files. This comparator is used in the top of library to find the most popular packages. Switch over to your cloud shell and you can simply run forward score popular pi to run it locally on your cloud shell instance. Once again, if you see this warning about no handlers being found for this logger, we can ignore it. The logs will simply be output to standard error. The output of this map, reduced by default, will go to the forward slagheap folder, and every file will have the prefix output in the directory that we’ve chosen. The top five most popular packages are the package apache, beam, or dot apache, and so on. It’s pretty clear here that the source code contains data flow code.
Let’s run the same MapReduce python code once again, but this time we’ll specify an output prefix. We want the output to go to forward slashempmyoutput. The program will run through very quickly. It’s a local execution after all. The operation on forward slash tummy output will show you that there is one file there, and if you cat the contents of this file, you’ll see the same result that we got earlier. We’ve seen earlier when we ran the Grep data flow program on the cloud that executing on the cloud only requires that we pass in the correct command line arguments to instantiate the beam pipeline.
On Google Cloud, the Grep C PY file is in the same directory as the is popular PY file. You can copy over the command line arguments and try running is popular on the cloud as well. Make sure that the paths for your source Java files and your output paths are on cloud storage, not on local disk. And remember that it’s the pipeline runner which determines where the execution of this dataflow pipeline is going to be. If you specify runner is equal to data flow runner, it is on the Google Cloud. And as for the answer to this question, you’ve seen in the code that we use a combiner called top in order to find the top n elements in any list. You can also specify a comparison function to be able to compare those elements in any way that you want to.
At the end of this lecture, you should be able to answer this question comfortably. What Java class from Beams SDK would you use to represent a key value pair? In this lecture we’ll execute the same MapReduce to find the most popular Java packages used by all the Java files in a particular directory. But this time we’ll execute it in Java. I’m going to run through this much faster than I did the earlier demo because all the logic should be very familiar to you already. To start off, we’ll quickly run through the imports and see what the various classes are for. Pipeline represents an instance of a pipeline with a data source and sync. Xiao is to read the input files, the source Java files. These are a bunch of classes which allow you to specify the configuration parameters to instantiate your pipeline default and descriptions are generally annotations that we use with custom configuration parameters. Amongst the transforms, the do function and the pair.
Do are units of executions which process one element at a time from P collections. Some and Top are helper libraries which help you perform useful tasks. Some is an aggregation, top is to help you find the top n in a list. And finally, KV represents a key value pair. The My Options interface which extends pipeline options, specifies the custom confit parameters to instantiate your pipeline. Notice the interface methods which have the getters and setters for output prefix. They have a description and a default value. This directory will be where the result of the MapReduce will be written out. These interface methods along with the annotations, represents the input path, the path to the Java files which we want to read in to track the most popular package.
Instantiating the pipeline with our custom options and setting up the variables that we need for this program are the same as before. This in traditional MapReduce programming, would be both a map and a reduced job. The next few transformations that I’ll talk about will all correspond to the mapper phase. Read in the contents of the Java file in the source directory.
Filter these lines so that the output only contains the import statements. This is what we use to track the most popular package. If there are thousands of Java files, this operation can be performed in parallel, so use a pair do class from every import statement. Get the list of packages. Remember that if a package is. com example, app name is a package.
Example is a package. Example app name will also be a package. So one import statement can yield multiple packages which are referenced emit each package name with an associated count of one. We are still in the map phase of the operation here. At this point we move on to operations that would typically be performed in the reduced phase.
We collect together all packages and sum up the counts associated with them to figure out which are the most popular packages get the top five packages using the top class again, a helper class to quickly get you the top five and allow you to pass in a comparison as well. Here there is a built in comparator called KV dot order by value. Unlike in Python, you do not have to spin up your own by value comparison. You can then format the packages and the associated counts in any way that you want to. We format it as a string, and the data sync of our pipeline is to write this out to a CSV file.
This forms the main structure of our pipeline. There are a bunch of helper functions, of course, which go into it. I’ll leave you to pass them on your own. What these helper functions do are very similar to what we saw in Python. We’ll go through some familiar steps to execute this locally. We’ll make sure that our path environment variable points to where our JDK lives. CD into the Java help directory of our training data analyst repo. Run the Maven Compile command. Indicate that it’s a Java file that you want to compile, and specify the full path to the main class, including the package name. Once the compile and run operation has gone through successfully, you can check out output CSV in your forward slagheap folder. Here are the same popular packages that we’ve seen earlier.
You can ask for the output to be directed to a different folder by specifying it as a command line argument. Output prefix is equal to forward slash temp my output java. If you run a cat command on my output Java CSV in your temp folder, you’ll notice that it has the top five most popular packages. In order to run this Java program on the cloud, you need to set up your command line arguments a little differently. And you need to read and write from a cloud storage bucket rather than the local file system. The run underscore on cloud one script file in the same directory that is Java help. Inside lab two is what we used to run a Java program on the cloud earlier.
You should be able to use the script file directly to execute this Java program on the cloud. This is the same format that we followed for the Grep Java program. Ensure that the command line parameter specifies the pipeline runner as data flow runner. That’s what indicates to Apache Beam that this has to be executed on the Google cloud. You can try this out for practice to get a little more experience with data flow. And as for representing key value pairs in Apache Beam, you would use beams a library, the class named KV, which represents a key value pair.
At the end of this lecture, you should have a very good understanding of when you would choose to use side inputs in your dataflow program. In this lecture, we look at a dataflow program which uses as its source a Big query table program also uses side inputs to use the output of a pipeline as an input into another pipeline. The objective of this dataflow program that we are going to look at is to find popular Java packages which need help. We’ll be looking at the contents of Java files in a whole bunch of GitHub repositories, and we’ll see which of these packages we’ll focus on. The popular ones have a lot of annotations in their code asking for help. For example, a package needs help if a file within that package contains a to do or a fix me. The source data for this analysis will come from a public data set that’s available and can be accessed via a BigQuery web console. Let’s first explore this data set so that we understand what exactly our data flow program is working on.
In the Sh BigQuery project, there is a data set called GitHub Extracts, and within that a table called Contents underscore Java underscore 2016. This particular table contains the content and metadata of all Java files that are present in GitHub for the year 2016. If you run this query and extract the content from this table, you’ll find that the entire Java source code is present right there. Now this can be useful to extract two pieces of information the packages that are popular and also which packages need help based on the to do’s present in the content. Explore the content of some of these Java files so you understand what’s available through this table. This JSON representation should show you that every line in the Java source code is separated by a backslash n, the new line character. We’ll run a Count star query to see the number of rows in this table.
One interesting thing to notice here is that when you run Count Star, the amount of data processed from the table is exactly zero bytes. This seems surprising unless you stop to think. The number of rows in any table within BigQuery represents metadata information for that table. The records in this table don’t have to be processed to calculate this aggregation, which is why the data processed is zero bytes. If you change the query to a select star rather than a select count star, you’ll immediately find that the query will now process the entire table 12. 8gb of data. Now that we’ve understood the source from which we are reading the data, let’s take a look at the data flow pipeline. The name of the file that contains this program is called Java.
Projects that need Help java this is in the Training Data Analyst repository under courses Data Analysis Lab to Java. Help. Within that, find the source folder which contains the Java source files. The complete path to the file is right here on screen. In order to read data from BigQuery, we’ll use the BigQuery connector that’s available for Hadoop. The libraries that we have to reference for this is the BigQuery model table row, which represents one row in a BigQuery table and the BigQuery IO, which is the connection to BigQuery. Here are the transforms that we are going to use within this pipeline. The do function and the parado. They are par for course, some top and a new one called View. The View is a singleton transform which is used to create an Immutable collection from peak collection objects. These Immutable collections are then used as side inputs to another pipeline.
We’ve used the KV class to represent a key value pair output before and we’ve also used Peak Collections. Peak Collections are essentially the abstraction representing lists of entities. This forms the data that moves from one transform to another. The peak collection view is new, though. We are going to be looking at this class for the very first time in this course. This is an Immutable view of a Peak collection and it’s often used to represent that data which we are going to feed in as a side input to another pipeline. We can now look at the actual code. At the very top I initialize a static final variable called top n. This indicates how many packages we want to write out, how many packages which need help, we want to write to our output. You can tweak this and change it to the value that you want it to be. We have the My Options interface that is used to set up some custom config parameters for your beam pipeline. Here, the only config parameter we specify is the output prefix. That is where this pipeline is going to write its output data.
After you’ve specified the boilerplate code instantiating the pipeline from the options or arguments that you pass from the command line. We are now ready to start our transformations. The Java query variable holds the query that we want to run on BigQuery select content from and the name of the table which has the Java content and its metadata from GitHub. Reading from BigQuery is very easy using the BigQuery IO connector. Simply say Read and specify the query that you want to run on this public data set. The result is a P collection. When you read from BigQuery, you get a collection of table rows where a table row represents one record. In BigQuery, every table row contains the contents of one Java file. This is what we’d examined when we looked at the BigQuery output at the beginning of this lecture. We want to split this chunk of text data, the contents of a Java source file, into individual lines in the Java file. This processing is implemented using a pair too, where the input is a table row and the output is an array of strings where each string represents one line from the source Java code within the process element method access the content blob from every table row.
You simply access the Content column and you get a blob of Content of data type string. This content represents an entire Java source code file. Split this content on the new line character so you get individual lines of Java code and simply write out these individual lines to the output. We filter out the empty new lines since they don’t contain any information. Each Java source file is now an array of strings. And if you look at the Pcollection which we get at the end of this transformation is a P collection where every element is an array of strings.
Now that we have the Java content P collection which contains the source code for all Java files from GitHub, of course this is only the Java files from the year 2016. If you remember, this Java content will serve as the source for two separate branches of processing. We look at the very first one right now where we want to find those packages that need help. Each element that is input to this transformation is an array of strings. The output of this transformation is a key value pair, a mapping from a string to an integer where the string is a package and the integer specifies how many requests for help exist within that package. Calls for help are counts of To, Dos or two fixes that developers have put in in the source code. Let’s look at the processing for this transformation.
For every Java file represented by an array of lines, find the package where that file lives. If the Java file is in a package named. com example app name, you get a resultant list of packages it’s in the package comm example example app name we want all the packages for this particular file from the most to the least specific. Once we have the list of packages, we use a helper method to count the number of calls for help from each Java source file. The lines represent the source code in that file. We check for two dose, two fixes, and some other common ways that developers seek help within a particular file. If indeed a particular source Java file has a call for help, we want to include it in the output of this transform. The output of this transform is a key value pair.
The key is the package name and the value is the number of calls for help. Pass this on to a function which performs an aggregation. We want to sum the number of help calls on a per package basis. We want to see how many calls for help there are in example. For example the output of this transformation will be the package name and the total calls for help for that package across all Java packages. This particular output that we got from this pipeline we want to use as an input to another pipeline that we are going to set up. That means this output will be a side input to that other pipeline. In order to use this as a side input, we have to set up an Immutable peak collection of the results of this pipeline. The result in peak collection has to be converted to a peak collection view, which we do by applying the Transformation view dot as map.
The beauty of this Immutable peak collection, which is used as a side input, is that the elements within this side input can be copied over to multiple machines where the other pipeline runs, where it requires this output to serve as an input to that pipeline. Now let’s look at this other pipeline that will use the Java packages that need help as a side input to it. Earlier, we had passed all the Java source code files into arrays of strings. We want to use this Java content in order to find the packages that are the most popular. We’ve done this before. We output a count of one associated with every package that we’ve imported into our Java source code anywhere from all the files that we’ve passed, and then we sum up accounts for each package. That will give us a popularity measure for each package account of how many times it has been referenced by or imported into other Java files. I’ll move through this bit quickly since we are already familiar with this.
We find the most popular package by parsing the import statements and outputting account of one for every package that we encounter. We then apply a sum aggregation to get the total number of times a particular package was referenced from other Java source code. We are now ready to calculate something called the composite score. We want one score to represent how popular a package is and how much help it needs. We want to combine these together so that we ask for resources on the most popular packages which need the most help. The input to this is a key value pair where the key is a string representing the name of a package and the value is a count of how many times that package has been referenced, a measure of how popular that package is.
The output of this transformation is a key value pair of string and double. The key continues to be the package name. The double that is associated with every package name is the composite score. The composite score is made up of how popular the package is and how much help it needs. We first get the package name and the total number of times it’s referenced from other Java packages and from the side input to this particular pipeline. We access how much help this particular package needed. C side Input Process context side input gives us this information from an alternate input that’s passed into this pipeline. This alternate side input is specified using the with side inputs method that you can see at the very bottom. This is where we pass in the Java packages that need help. The side input can be in the form of a bunch of different data types.
In our particular example, the side input is in the form of a map. We used View as map to specify the immutable p collection. This is why we can use the get method on the map to access the number of calls for help for a particular package name. At this point, for every package we have two pieces of information one account indicating how popular that package is, another account indicating how many calls for help exist within that package. Now it’s time to compute the composite score. A composite score is associated with a package name and that forms the key of our output value is the composite score, which we calculate by taking logs and multiplying the two numbers. There is a reason why we use logs here. Logs are used to avoid something called the tournament effect.
This is a winner take all phenomenon where results will be skewed towards the most popular packages irrespective of how many calls of help there are. In order to avoid this, we take logs for both numbers. This will give us a more balanced result. And there we’ve successfully used the side input specifying the Java packages that need help in another pipeline. Now we have the composite score. We simply apply a top N operation which should give you the most popular packages that need help.
When we get the final result, we want it to be sorted in the descending order of composite score so that we get the packages which are more popular and need more help at the very top, format the result, and then finally write it out to a CSV file on cloud storage. Let’s quickly look at some of the helper methods that we used in setting up this pipeline. Parse Import Statements simply helps us find the most popular packages by figuring out how many times that package has been imported in other Javasurce files.
This takes the help of Get packages and finally split package names to get the final packages count. Calls for help is new, it iterates over all lines in Java source code and finds whether the line contains fix me or to do. This is what we assume to be a call for help that a developer has put in in source code. Parse package statement helps us find the names of packages. The package name that we encounter is split exactly like we do in Parse Import Statements.
And finally, here is the code to split a package name into all those packages that it represents. We’ve seen this code multiple times. There’s no explanation needed here. And so what are side inputs? Side inputs are the result of one pipeline that can be fed into other pipelines. These are represented by an immutable p collection and are generally copied across multiple machines where the other pipeline may be processing data. Side inputs are a rather nifty data flow feature.
At the end of this lecture, you should know how auto scaling in the GCP deals with quota restrictions that you might have in your account. In the last lecture, we saw a data flow program which read input from BigQuery, wrote the results out to cloud storage, and also used side inputs in this lecture. Let’s run this dataflow pipeline on the Cloud and monitor it. Go to the Dataflow page on your dashboard by using the side navigation bar where you’ll see the list of jobs that you’ve executed on Google Cloud. We’ll run our dataflow pipeline on the Cloud using the script run on Cloud three sh. This is in the same Java help folder where we’ve been working so far. This is in the training Data Analyst repository. On the command line. You need to specify the project ID, the name of the bucket where you want the results to be written, and the main class that you want to execute the script.
Accesses these variables from the command line sets up the environment variable so that you can execute Java programs, and then finally runs a Maven compile to run this program on the cloud. You know it’s running on the Cloud. When your runner is Dataflow runner, switch to your Cloud shell window and execute this script. Once the build is complete, the job will be submitted to Google Cloud and we can monitor its progress using our dataflow web console. Once the job shows up on the dataflow dashboard, you can click through on the job and view logs and view the execution graph. The execution graph hasn’t shown up yet, but the logs don’t show any errors. It’s just taking a while to compute it and it’ll show up on your screen momentarily. Meanwhile, let’s look at what the monitoring has to say. You can stop any running job by clicking on Stop Job in this console.
And if you look at the bottom on that same pane, you’ll find that auto scaling seems to be doing some work. It’s scaling up to six workers. When you look through the code, it must have been pretty obvious to you that this particular job requires some heavy-duty processing. Google Cloud has realized that and it’s using more workers to perform these operations.
And here is our execution graph. Because there were a whole bunch of transformations that we applied in this dataflow pipeline, the execution graph is more complicated than once we’ve seen before. There are some transformations which will have more details. You can click on those nodes and you’ll notice that to get the Java source code, we read from BigQuery and it does some internal processing where it passes what it has read through a cleaning phase.
This is all part of BigQuery IO. This is an interesting execution graph because we have two pipelines which run in parallel. We take all the Java source code in terms of array of strings, and we first find out what the most popular packages are. This forms the left part of our graph that’s highlighted on screen now and on the right part of the screen we use the same Java source code contents to find out which packages need help in the form of To, Dos or Fix Mes that are specified in code.
The last step in the second pipeline is the two view which results in an Immutable P collection view that can be used as a side input to another pipeline. And here is the pipeline which uses the most popular Java packages as well as the packages which need help. It uses two inputs to produce the final result a composite score for the most popular packages that need help. You can observe the very first step and see how many rows are read from big table. If you look at the size of the output collections on the right NAV pane, you’ll see that we’ve read 8. 17gb of data so far because a lot of data is flowing through this pipeline. You can see the numbers tick upward as more and more processing is done to the data that’s input to every step. The first step should give you how much data is in the system. We are now up to 13. 94gb.
If you remember when we spoke of directed Acyclick graph. A node can start processing data only if all its inputs are available. The composite score step can be calculated only when elements come into its input. Elements have finally arrived here, which is why it’s now green and running. As I watch my pipeline execute, I notice that there is a warning in my logs. And when I look at the log files, it says Auto scaling resized worker pool to seven. But the goal was nine workers. This could be a quota issue, which means that the Google Cloud platform determined that you need nine workers to process this job efficiently. But my quota restrictions basically allowed just seven workers. So Google cloud proceeds with seven workers. But it warns us about this.
And our execution graph seems to be nearing completion. In fact, it has completed. Let’s take a look at a top thousand node. The size of the input data to this node was just 4. 25 MB. Many gigabytes of data flowed through this execution graph, but at the end we were left with roughly 4. 25 MB. Now that the job is complete, auto Scaling now scales down from seven to zero workers. We can see whether the final results have been written out. Check out cloud storage and go to Loony Asia bucket.
That’s where we wrote out the final results. The Java help folder there should have output CSV. That’s the output file that our program wrote. There you see it. Apache has a very high composite score. It’s very popular and needs a lot of help. You can see Elastic search in there as well as well as Android popular packages which need help. You saw in this lecture that auto scaling might determine that you need a certain number of workers. If the quota on your GCP account doesn’t permit that many workers to be spun up, then auto scaling will make do with the workers that it has available. It will emit a warning message in the logs, though.