Google Professional Data Engineer – Dataproc ~ Managed Hadoop

  1. Data Proc

Let’s now discuss Dataproc. This is the Google Cloud platforms managed hadoop offering this conversation is going to bring our entire discussion of big data technologies full circle, because, after all, we had started with a conversation about Hadoop. As usual, I have a question here which I’d like you to keep in mind while we discuss Dataproc. Once again, this is a question which we have encountered before while discussing HDFS. The question is, when we use Hadoop on the Google Cloud platform, we do not use HDFS or the Hadoop distributed file system as our underlying storage technology. Instead, we use Google Cloud Storage. Think about why that is.

Let’s now plunge into a discussion of Dataproc which will hopefully provide clarity on this and on some other issues. As I just mentioned a moment ago, Dataproc can be thought of as a managed version of Hadoop plus Spark. This means that you can create a managed Hadoop cluster easily deploy stuff, deploy jobs on it, and delete the cluster once you’re done with it. Every machine on the cluster will include Hadoop, Spark, Hive, and Pig. For folks who’ve wrestled with setting up and configuring Hadoop clusters, big Hadoop clusters, the no ops nature of Dataproc is a big attraction. You can create a cluster, use it, turn it off, and this is where the use of Google Cloud storage rather than HDFS makes a lot of sense. HDFS is inherently server driven.

We have a name node and then we have data nodes. The coordination between the name nodes and the data nodes is essential for any MapReduce job to work successfully. But that isn’t a great model for using on the cloud because after all, billing is on the basis of utilization of virtual machine instances. So if you had a VM instance to keep your master and worker nodes up at all times, you would end up with a pretty hefty GCP bill. More generally, Dataproc is a great option for organizations which are now thinking to move to the cloud because it allows you to move your existing Hadoop or Hadoop ecosystem code to the cloud seamlessly. All you need to do is commission a cluster, set up as many machines as you think you’d require, go ahead and move your jobs to your VM instances, and you are in business. And what’s more, you can also scale up and down that cluster pretty seamlessly. More on this in a moment. Correctly setting up a Dataproc cluster involves a number of choices related to the virtual machine instances that are going to run in the cluster.

And so let’s discuss some of these considerations. Remember that a Dataproc cluster is going to be built using Google Compute Engine VM instances. Each cluster will have at least one master and two workers. More on this in a moment. Because there is actually something known as single node Dataproc, which is in beta. You might recall that we had discussed something known as preemptible instances these are very cheap compute engine resources. The reason that they are cheap is because they can be pulled away by the compute engine at 30 seconds notice I e. Basically, at any point, preemptible VM instances make a great choice for Dataproc clusters. If, and this is a big if, they are used with care. So exercise judgment while using preemptible VMs. Keep some of these basic guidelines in mind. Remember that preemptible instances can be pulled away at 30 seconds notice and they will definitely be forced to terminate if they’ve been running a job for 24 hours.

So this means that it only makes sense to use preemptible VMs for processing. Do not use them for data storage. Clearly, it makes no sense for your master node to be on a preemptible instance. And indeed, it’s not even possible to have a preemptible only cluster. If you attempt to do something like this, Dataproc will automatically add at least two non preemptible workers to your cluster. Dataproc also does one other thing automatically in order to save you from yourself. It assigns a minimum persistent disk size. This is the smaller of 100GB and the primary workers boot disk size. Note that this is for local caching, it’s not a part of HDFS.

This allows Preemptible VMs to have a decent amount of disk space should they require to use it during their shutdown script. Preemptible VMs need to be able to clear up all of their stuff in 30 seconds, because that is the length of time during which the shutdown script can execute before control reverts to the Google Compute engine. Specifying the correct shutdown actions in the form of a shutdown script. That’s an important bit if you are ever going to use Preemptible VMs. Initialization actions also can be important for some Hadoop clusters. This is possible to specify via scripts. Those scripts could be either on GitHub or on cloud storage.

You can specify these initialization actions or scripts via GCP console, the G cloud command line interface or programmatically. You can use these initialization actions as the place where you perform all of your Hadoop configuration. Remember that there is a rich set of configuration files which we keep tweaking when we are working on Hadoop in a non cloud world. These initialization actions will run as root, so you do not need pseudo. This is worth keeping in mind. Also, be careful to use absolute paths. Do not use relative paths. And lastly, remember to indicate what script interpreter you’d like for your script in the initialization to work with by using a shebang line. A shebang line is a typical line which is used to start script files. It begins with a hash, followed by an exclamation point, followed by the absolute path to the interpreter.

We can see that all of these are mechanisms to achieve the same functionality in Dataproc on the cloud as we would in a traditional Hadoop cluster. Maybe the biggest attraction of Dataproc is the ease with which we can scale clusters. In fact, it’s possible to scale up a cluster even when jobs are actually running. The scaling operations that can be carried out while scaling a cluster are the fairly typical ones. You could add workers, or you could remove workers, or you could add HDFS storage. Let’s quickly talk about a few features which are still in Beta but are quite interesting. One of these is a high availability Dataproc cluster. If you specify the High Availability configuration, your cluster will have three master nodes rather than one.

Dataproc will set them up for automatic failover, and it will do so by adding them into an Apache Zookeeper configuration. Features that are in Beta are accompanied by the asterisks that they might change in ways that break backward compatibility, and you should also be careful while using them for really mission critical or production scale usage. Another feature that’s in Beta is that of single node clusters. Here, this corresponds to pseudo distributed mode in some sense. In traditional Hadoop, there’s just one node which is both master and worker.

And of course, it goes without saying that that a single node cannot be a preemptible VM instance. This is something that you’d use for prototyping or for teaching Dataproc and so on. Another feature that’s in Beta, but that might be of interest to you, is automatically restarting jobs on Dataproc. By default, jobs on Dataproc will not restart on failure. This is something that you can change if you make use of this Beta feature. You can specify that long running or streaming job, something like in Spark streaming, for instance, ought to restart even if they fail. This will help to deal with issues like out of memory errors or unscheduled reboots, stuff like that. And finally, just as you would expect, Dataproc has connectors which can be used to interface with Data in other GCP storage technologies such as BigQuery, BigTable and Cloud Storage. So you are not constrained to using HDFS by any means. In fact, as we’ve discussed, it does not make sense to use HDFS as the underlying storage technology. With Dataproc, HDFS is inherently serverful. It has a name node. That name node would be on at all times in your cluster and you would be hit with a pre big compute bill. So when you’re running hadoop on the Google Cloud platform via Dataproc, just use Google Cloud storage.

  1. Lab: Creating And Managing A Dataproc Cluster

At the end of this lecture, you should be able to answer this question comfortably and confidently. If we wanted to set up a low cost Dataproc cluster, it’s possible that we make up the entire cluster of preemptible instances. The master is preemptible as well as the workers. Is this correct or wrong? In our very first lab on Dataproc, we’ll see how we can create and manage Dataproc clusters. Now that we’ve been using Google Cloud platform a fair bit over this course, you’ll find that your top level project dashboard starts showing a little activity. This card here shows me all the resources that I’ve instantiated and I’m currently using. My billing still shows zero because I’m working on my free credit. And the Compute engine card right here in the center of my screen shows a fair bit of activity. Let’s navigate to the page for Dataproc using the side navigation bar and set up our very first Dataproc cluster. Dataproc, as you know, is managed Hadoop on the Google Cloud.

This means that using Dataproc you can provision a Hadoop cluster just like you would using real machines. In fact, it’s far easier to do so using Dataproc. The web console very helpfully prompts you that you need to enable the Dataproc API before you can do anything with it. So go ahead and hit Enable the form then very helpfully prompts you to create your first cluster. And once you click on this button, it will walk you through setting one up. Data Flow and BigQuery were both serverless, so you didn’t have to worry about instances, regions and zones. But once you come to Dataproc managed Hadoop on the Cloud, you need to start making these decisions once again.

We’ve seen earlier Google has a bunch of machine types that it offers for you to set up your virtual machine instances. For this particular lab, we can set up the simplest and most lightweight cluster instance. We’ll use the N one standard with one vCPU and 3. 75 GPU of Ram. This is the configuration for the master node in this cluster. Right next to that is the option to choose your cluster configuration. You can have a single node cluster. This is your development environment if you want to quickly develop some hadoop jobs. Or you can use a standard environment, one master with N workers, where N, you will determine based on how much data you want to store and so on, or if you want to set up a highly traffic production environment which is mission critical. You might want it to be highly available. A high availability environment will have three masters and any number of workers. I’m going to choose the standard environment.

I don’t really need anything fancy and I’m going to just set up 10GB of primary disk size. This is the minimum available. Again, this is a demo cluster, so I’m going to choose the bare minimum of resources. Now we’ll move on to configuring the worker nodes in our cluster. Again, we choose the smallest and most lightweight machine for our worker nodes, and I specify that I want two worker nodes. This is the minimum number one master, two workers. That’s a standard configuration. Google is very helpful about warning you that there are some constraints on how many workers you can have. At the maximum, it depends on our quota and the number of solid state drives which is attached to each node. Once again, I’m going to choose the primary disk size to be tang to improve processing and performance in your Dataproc cluster.

You can also ask for local SSDs solid state drives to be attached to each of your worker nodes. I have not chosen that here, but you can definitely set it up and configure it right here on this page. Our yarn cluster manager will use two cores and sixGB of memory. These are the default settings. There are some advanced settings that you can configure for your cluster, preemptible workers, bucket network, and so on. Let’s check these out. If you want lower cost resources to increase the compute capacity of your cluster, using preemptible instances is a great idea. Think of a preemptible resource as a shared resource across all projects in Google Cloud platform. If Google finds that it is overcapacity in some projects, it might allocate you. The preemptible resource that you have requested for you to use this worker node will be lower cost than a dedicated worker node for your Dataproc cluster. However, Google can take away this preemptible worker node at any point in time without giving you a heads up, and it will definitely shut down your preemptible instance at least once every 24 hours.

Because of these constraints, you can never have a Dataproc cluster that’s fully made up of preemptible nodes. These can only be add on worker nodes to your cluster. We won’t use any of these advanced options for now. Close this dialog and then let’s go ahead and create our first Dataproc cluster. This might take a while to spin up, but then finally you’ll see it on your Dataproc dashboard. Let’s click on this instance and explore the settings. You’ll find that the CPU graph shows some activity. This is the activity that’s related to the provisioning of this cluster. Setting up this Dataproc cluster basically involved setting up three VM instances one for the master and two for the two workers that we provisioned. Click on the VM instances tab and you’ll be able to see a list of your master and your worker machines. Setting up a Hadoop Cluster there is one task that you end up doing really often SSHing into the master node in order to administer the cluster.

Google Cloud makes this very easy for you. Simply click on the drop down next to SSH and it gives you a variety of ways in which you can connect to that machine. You can SSH in a browser window or open in a browser window on a custom port, we can view the Cloud command to SSH into that machine or use a different SSH client, such as the terminal on your local machine. We’ve seen these options before for individual VM instances. Here, the option is available for the master node. I SSH into it using my browser window on the default port. I can tell by the prompt that I’m logged into my master node instance. Dataproc machine instances will have a whole bunch of software pre installed for you. For example, Python is installed, python Two 7. 9 java is preinstalled java eight Pye.

Spark will be installed as well. So your Dataproc cluster has come set up for you to run Spark. Other hadoop technologies that you might find useful and that you might want to run, such as Pig for EPL and Hive for data analysis. Also come preinstalled. Clicking on the configuration tab will show you the current configuration setup of this cluster. So you can see a whole bunch of details, like what kind of master node you’ve set up, what kind of machine type you’ve used, what’s the primary disk size, how many worker nodes you have, what are their configurations, and so on. We can also see that this cluster does not have any preemptible worker nodes. All of the nodes are dedicated. Clicking on the Jobs tab will show you what jobs were executed on this cluster. This is a fresh cluster.

We haven’t run anything on it yet, so this tab is completely empty. You should now be able to answer this question comfortably in order to reduce cost. Can we run a Dataproc cluster completely with preemptible worker nodes? And the answer to this is false, because preemptive worker nodes can be taken away by Google at any point in time, and they’ll definitely be shut down at least once in a 24 hours’ period. They can’t be used as the only nodes in a Dataproc cluster. We have to have dedicated nodes for the cluster to be set up.

  1. Lab: Creating A Firewall Rule To Access Dataproc

In this demo, you will learn a quick way to access your project ID when you are in Cloud Shell using an environment variable. End of this lecture you should know what environment variable this is. In this lab, we’ll set up a Dataproc cluster and we’ll submit a pi spark job to run on this cluster. Set up a simple cluster with a standard configuration and choose to do it via the command line. But you can also do it using the web console. This is a cluster with one master node and two worker nodes. All the machine types are n one standard one. This cluster is named my cluster. Wait for a little bit. As your cluster is provisioned. The spark job that I want to run on this cluster is specified in a Python file, and I want to store that Python file along with the data inputs for that spark job on cloud storage. To that end, I use gsutil to create a new bucket for this. This bucket will be in the US central one region. Let’s say that I want the name of my bucket to be the same name as my project ID. I can use a handy environment variable for this in my cloud shell. That is the dollar dev shell underscore Project underscore ID. This will always have your current project ID stored in it.

Go ahead, create this bucket with the same name as your project ID and go to the web console to check that it’s indeed there. There you see it. My test project followed by 172707. The code and the data files for this particular lab is in the training Data at list repo get clone this repo if you haven’t done so already. If you’ve been following this course from the beginning, you already have it on your Cloud Shell instance within Training data Analyst CD into courses unstructured. There is a bash script called Replace and Upload. In there, use the nanotech editor to see what the script does. It copies over files on your local cloud shell to your cloud storage bucket.

Within your bucket, the files will be copied over to your unstructured directory. This script copies over the Python notebook and initialization scripts onto one particular folder in the bucket. These will be the Python files and the IPython notebooks from your current working directory. So make sure you are in the unstructured directory when you run the script. It also copies over some sample data and photographs into unstructured. And lastly, it sets the content type of all these files that have been uploaded into the cloud storage bucket to be text plain. This basically treats the Python files that you’ve uploaded as plain text files.

When you click on it, you can view the Python code in your Cloud Shell command line. In the unstructured directory, run Replace and upload Sh followed by the name of your bucket, which is the same as your current project ID. Once the upload is done, you can go to Cloud Storage on your Web Console and check out what files have been copied over within unstructured. Notice the Python file. Lab two PY. This is the spar code that will run on Google Cloud. Open up this Python file on your browser and let’s see what it does. At the very top is the hash bank indicating that it’s Python code and should be interpreted as such. The very first line after that instantiates the Spark Context the Spark context, as you might already know, is the main entry point to access Spark functionality, run the Spark program on our Dataproc cluster the Local. While instantiating Spark context indicates that it’s a local connection to Spark, local is the cluster URL. The next line of code reads in a file from Cloud Storage. This is a text file and it’s located in the bucket that we just created using our project ID in the unstructured folder and in Lab two input TXT, we can switch over to our Cloud Storage bucket and see what this input file lap two input dot TXT looks like. This is a text file where every line has two bits of information separated by a comma. Every line contains a type of animal and the name of that animal. Let’s see how this par program passes this text file.

For every line, we perform a map operation and we split the line into two parts. The split is on the comma and gives us a tuple. The first field of the tuple is the type of the animal dog, pig, rat, et cetera. And the second field of the tuple is the name of the animal. Once we have these tuples in an RDD, we perform a reduced by key operation. This reduced by key sets up a mapping from each type of animal to a list of names for that kind of animal. So a list of names of all the dogs, all the cats, and so on. The next transformation is a map which outputs a different tuple for every kind of animal, the number of animals of that type. The input to this map is a kind of animal and a list of names. The output of this map is the kind of animal and the number of animals of that kind. Now that you’ve understood what the Spark program does, let’s run it on our cluster.

Go to Dataproc on the side navigation bar and click on Jobs. We are going to submit a Spark job using the Web Console UI. Click on Submit Job and the Web Console will throw up a simple, easy to use UI which will allow you to specify your job parameters step by step. Notice that the cluster which you can run your job on is just my cluster. That’s the only cluster that I have. For the Job type, choose PY spark it’s a Spark program in Python that we want to run in the text box that says Main Python file. Specify Lab two pi in your cloud storage bucket, make sure you specify the complete path.

Click on Submit Job and under the Jobs tab on the left navigation pane in Cloud Dataproc, you should be able to see and monitor the progress of this job. Click on the Job instance and the log files for this job run will give you the outputs for your spark job. Notice the output of the first transformation.

This is where we split up every line from the input file into a tuple, the type of animal and the name of the animal. In the next step, we did a reduced by key where we collected all the animals of the same type and set up a list of their names as the value. These are all the cat names in our input data file, and then the last transformation gives us the kind of animal and the number of animals of that type. In our input file. We have details of five cats, six dogs, two frogs, three pigs, and so on. And hopefully at the end of the lab, you know that environment variable in your cloud shell which holds your project ID is deshell underscore Project underscore ID.

  1. Lab: Running A PySpark Job On Dataproc

At the end of this lecture, you should know the answer to this question how would you browse the HDFS directory on your Hadoop cluster on Dataproc? In this lab, we learn how to use the Pi Spark read evaluate print loop interactive shell, and we’ll see how we can run Pic scripts on Dataproc. Let’s first ensure that a Dataproc cluster is up and running. Go to Dataproc Clusters and then look at the dashboard to see whether your cluster is green. We still have the bucket on cloud storage with the same name as the project ID. I haven’t deleted that yet. Ensure that you still have the firewall rule. Set up that whitelist the IP address of your local machine to connect to the Dataproc cluster. All right. There it is. The default allow Dataproc access. We are all set to be able to access the cluster manager web console for Hadoop and also the name node console on HDFS.

We have TCP access to port 80 88 50,070 and 80 80. Now let’s SSH into the master node of our cluster. Go to Dataproc, click on the name of the cluster, click on VM Instances. It will show you all the machines that are present in the cluster. And click on SSH to connect to the master node. And here we are in the master node and ready to launch the Pi Spark interactive shell. The interactive shell, or the repel environment, allows you to type in one Spark command at a time and see the result immediately.

Once we are in this Pi Spark shell, we can run our Spark commands in Python. We’ll start off by initializing Data, which is an array of integers you can call SC parallelize to convert this array of integers into a distributed RDB that is dist Data. Next, we apply a map transformation on every element of this RDD, and the result of the mapper is the square of the individual elements. The next step is a reduce operation, which sums up all the squares that is rest. Now, none of these operations have been carried out thanks to Spark’s lazy evaluation of RDDs.

When you perform an action like the print statement, that’s when the RDDs will be evaluated, giving you the final result of 55, which is the sum of squares of the individual elements in Data. The integers in our Data array typing exit will get you out of this Pi Spark interactive shell. This should give you a sense of how this Dataproc cluster can act as a Hadoop cluster or as a Spark cluster. You can behave exactly like you would in a local cluster set up with real machines on the master node. Create a directory called Lab Two and CD into that directory.

We are now going to run a pig script on this cluster. Use the GS Util command to copy some files from our cloud storage bucket to this local directory. These are files that we copied over to cloud storage in the last lab. This Gust CP command copies two files into our current working directory a Pet Details pig file that’s our pig script, and a Pet Details text file that will serve as our input to the pig script. The LS command should show us that these files have been copied over to our current directory. Let’s run a cat command to examine the text file that we are going to operate on. Every line in this text file represents a pet. We have the kind of pet dog, cat, frog, etc.

The name of the pet, the breed, the color, and the last column is the weight in pounds. We’ll now see what transformations the Pixcript applies to this input text file. We start off by removing a directory named grouped by type on our HDFS. It’s a top level directory, so we RMS grouped by type. This will remove results from the previous run of our script. Load the data from Pet Details into a relation named x one. Pig Storage is the function that we use to parse the text file. The comma that we pass into Pick storage indicates that the file is a CSV file where the fields are delimited by the comma.

We also specify the schema for the file that we just read. In all the fields are char array, except for the very last field weight, which is an integer. The next line of Pixcript filters out any header row if it exists in the file. Any row where the type field is equal to the string type. We then use A for each generate statement to transform this data. The only change here is that we want to express the weight in kilograms, not pounds. We then apply a filter operation where we preserve the information for only those pets that are black or white in color. And finally, we perform a grouping and store it into the group by type directory on HDFS.

We group the pet information by the kind of animal we run the Hadoop FS command to create a directory called Pet Details. Once this directory has been created, you can run a Hadoop FSLs command to see that it exists. Use the Hadoop FS put command to copy from our local instance machine to HDFS. We want to copy Peg Details text to the Peg Details directory on HDFS. Run LS to confirm that the copy has been successful. Yes, it has. This is the input file that our Pix script will work on. Let’s explore HDFS using the name node web console to check that all these updates to HDFS have been done successfully. So click on the instance machine for your cluster in order to retrieve its IP address. This is the external IP address of the master node of our Dataproc cluster.

Copy this IP address, switch to a browser window, paste it in, and go to port 50,070. This is the port for our HDFS web console. If you click on the Data Nodes tab, at the very top, you’ll see that we have two data nodes, the workers on our cluster. You can browse HDFS on your Dataproc cluster by clicking on Utilities and browse the file system. Here you can see all the top level directories that are present in HDFS. Pet Details is the one that we created. Click on Pet Details and within that you’ll find Pet Details text. Everything looks fine. We are now ready to run our Pig script. Switch back to the command line that you have open on the master node, the one that you have Sashed into, and run the Pig command pass in Pet Details pig as the script that you want to run.

Wait for the script to run through. It should run through successfully, and then switch to the web console in order to browse HDFS once again at the very top level in HDFS. Notice we now have a new directory grouped by type. Click on this and you’ll find that there are two files within it. One is underscore success. If you’ve run any Hadoop MapReduce Job, you’ll know that underscore Success is the file that Hadoop MapReduce Jobs put in to indicate that they are done with that directory.

They’ve completely written out what they need to that directory. Pig, as you know, is a technology that runs on Hadoop, and it essentially just runs MapReduce Jobs under the hood, which is why underscore Success is present here. The second file with the prefix part is what contains our final output. If you click on it here, you won’t be able to view it.

Let’s switch over to our command line, copy it over to our local machine using Hadoop FS, and then view it. And there you see it the end result of the pig script that we just ran. And how would you browse the HDFS directory on your Dataproc cluster? You would find the external IP of the master node of your cluster and access or 50,070 there. This, of course, assumes that you’ve set up a firewall rule allowing your local machine to connect to the HDFS web console.

img