Files
lab/ds/25-1/2/1-09_dask-cudf.ipynb
2025-10-31 12:17:30 +03:00

713 lines
41 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"./images/DLI_Header.png\" width=400/>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fundamentals of Accelerated Data Science # "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Transition Path: cuDF provides a way for users to scale their pandas workflows as data sizes grow, offering a middle ground between single-threaded pandas and distributed computing solutions like Dask or Apache Spark ."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 09 - Introduction to Dask cuDF ##\n",
"\n",
"**Table of Contents**\n",
"<br>\n",
"[Dask](https://dask.org/) cuDF can be used to distribute dataframe operations to multiple GPUs. In this notebook we will introduce some key Dask concepts, learn how to setup a Dask cluster for utilizing multiple GPUs, and see how to perform simple dataframe operations on distributed Dask dataframes. This notebook covers the below sections: \n",
"1. [An Introduction to Dask](#An-Introduction-to-Dask)\n",
"2. [Setting up a Dask Scheduler](#Setting-up-a-Dask-Scheduler)\n",
" * [Obtaining the Local IP Address](#Obtaining-the-Local-IP-Address)\n",
" * [Starting a `LocalCUDACluster`](#Starting-a-LocalCUDACluster)\n",
" * [Instantiating a Client Connection](#Instantiating-a-Client-Connection)\n",
" * [The Dask Dashboard](#The-Dask-Dashboard)\n",
"3. [Reading Data with Dask cuDF](#Reading-Data-with-Dask-cuDF)\n",
"4. [Computational Graph](#Computational-Graph)\n",
" * [Visualizing the Computational Graph](#Visualizing-the-Computational-Graph)\n",
" * [Extending the Computational Graph](#Extending-the-Computational-Graph)\n",
" * [Computing with the Computational Graph](#Computing-with-the-Computational-Graph)\n",
" * [Persisting Data in the Cluster](#Persisting-Data-in-the-Cluster)\n",
"6. [Initial Data Exploration with Dask cuDF](#Initial-Data-Exploration-with-Dask-cuDF)\n",
" * [Exercise #1 - Counties North of Sunderland with Dask](#Exercise-#1---Counties-North-of-Sunderland-with-Dask)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## An Introduction to Dask ##\n",
"[Dask](https://dask.org/) is a Python library for parallel computing. In Dask programming, we create computational graphs that define code we **would like** to execute, and then, give these computational graphs to a Dask scheduler which evaluates them lazily, and efficiently, in parallel. \n",
"\n",
"In addition to using multiple CPU cores or threads to execute computational graphs in parallel, Dask schedulers can also be configured to execute computational graphs on multiple CPUs, or, as we will do in this workshop, multiple GPUs. As a result, Dask programming facilitates operating on data sets that are larger than the memory of a single compute resource.\n",
"\n",
"Because Dask computational graphs can consist of arbitrary Python code, they provide [a level of control and flexibility superior to many other systems](https://docs.dask.org/en/latest/spark.html) that can operate on massive data sets. However, we will focus for this workshop primarily on the Dask DataFrame, one of several data structures whose operations and methods natively utilize Dask's parallel scheduling:\n",
"* Dask DataFrame, which closely resembles the Pandas DataFrame\n",
"* Dask Array, which closely resembles the NumPy ndarray\n",
"* Dask Bag, a set which allows duplicates and can hold heterogeneously-typed data\n",
"\n",
"In particular, we will use a Dask-cuDF dataframe, which combines the interface of Dask with the GPU power of cuDF for distributed dataframe operations on multiple GPUs. We will now turn our attention to utilizing all 4 NVIDIA V100 GPUs in this environment for operations on an 18GB UK population data set that would not fit into the memory of a single 16GB GPU."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setting up a Dask Scheduler ##\n",
"We begin by starting a Dask scheduler which will take care to distribute our work across the 4 available GPUs. In order to do this we need to start a `LocalCUDACluster` instance, using our host machine's IP, and then instantiate a client that can communicate with the cluster."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Obtaining the Local IP Address ###"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"import subprocess # we will use this to obtain our local IP using the following command\n",
"cmd = \"hostname --all-ip-addresses\"\n",
"\n",
"process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)\n",
"output, error = process.communicate()\n",
"IPADDR = str(output.decode()).split()[0]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Starting a `LocalCUDACluster` ###\n",
"`dask_cuda` provides utilities for Dask and CUDA (the \"cu\" in cuDF) interactions."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-10-25 08:07:00,100 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8f17edf062cd44677ef15826b6901cf7 initialized by task ('shuffle-transfer-8f17edf062cd44677ef15826b6901cf7', 40) executed on worker tcp://172.18.0.2:40069\n",
"2025-10-25 08:07:01,138 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8f17edf062cd44677ef15826b6901cf7 deactivated due to stimulus 'task-finished-1761379621.1352813'\n"
]
}
],
"source": [
"from dask_cuda import LocalCUDACluster\n",
"cluster = LocalCUDACluster(ip=IPADDR)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Instantiating a Client Connection ###\n",
"The `dask.distributed` library gives us distributed functionality, including the ability to connect to the CUDA Cluster we just created. The `progress` import will give us a handy progress bar we can utilize below."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client, progress\n",
"\n",
"client = Client(cluster)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### The Dask Dashboard"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask ships with a very helpful dashboard that in our case runs on port `8787`. Open a new browser tab now and copy this lab's URL into it, replacing `/lab/lab` with `:8787` (so it ends with `.com:8787`). This should open the Dask dashboard, currently idle."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading Data with Dask cuDF ##\n",
"With `dask_cudf` we can create a dataframe from several file formats (including from multiple files and directly from cloud storage like S3), from cuDF dataframes, from Pandas dataframes, and even from vanilla CPU Dask dataframes. Here we will create a Dask cuDF dataframe from the local csv file `pop5x_1-07.csv`, which has similar features to the `pop.csv` files you have already been using, except scaled up to 5 times larger (18GB), representing a population of almost 300 million, nearly the size of the entire United States."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"18G data/uk_pop5x.csv\n"
]
}
],
"source": [
"# get the file size of `pop5x_1-07.csv` in GB\n",
"!ls -sh data/uk_pop5x.csv"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We import dask_cudf (and other RAPIDS components when necessary) after setting up the cluster to ensure that they establish correctly inside the CUDA context it creates."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import dask_cudf"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"ddf = dask_cudf.read_csv('./data/uk_pop5x.csv', dtype=['float32', 'str', 'str', 'float32', 'float32', 'str'])"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"age float32\n",
"sex object\n",
"county object\n",
"lat float32\n",
"long float32\n",
"name object\n",
"dtype: object"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf.dtypes"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Computational Graph ##\n",
"As mentioned above, when programming with Dask, we create computational graphs that we **would eventually like** to be executed. We can already observe this behavior in action: in calling `dask_cudf.read_csv` we have indicated that **would eventually like** to read the entire contents of `pop5x_1-07.csv`. However, Dask will not ask the scheduler execute this work until we explicitly indicate that we would like it do so.\n",
"\n",
"Observe the memory usage for each of the 4 GPUs by executing the following cell, and notice that the GPU memory usage is not nearly large enough to indicate that the entire 18GB file has been read into memory:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sat Oct 25 08:04:38 2025 \n",
"+-----------------------------------------------------------------------------+\n",
"| NVIDIA-SMI 525.85.12 Driver Version: 525.85.12 CUDA Version: 12.0 |\n",
"|-------------------------------+----------------------+----------------------+\n",
"| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |\n",
"| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |\n",
"| | | MIG M. |\n",
"|===============================+======================+======================|\n",
"| 0 Tesla T4 On | 00000000:00:1B.0 Off | 0 |\n",
"| N/A 31C P0 26W / 70W | 14956MiB / 15360MiB | 0% Default |\n",
"| | | N/A |\n",
"+-------------------------------+----------------------+----------------------+\n",
"| 1 Tesla T4 On | 00000000:00:1C.0 Off | 0 |\n",
"| N/A 31C P0 26W / 70W | 168MiB / 15360MiB | 0% Default |\n",
"| | | N/A |\n",
"+-------------------------------+----------------------+----------------------+\n",
"| 2 Tesla T4 On | 00000000:00:1D.0 Off | 0 |\n",
"| N/A 31C P0 26W / 70W | 168MiB / 15360MiB | 0% Default |\n",
"| | | N/A |\n",
"+-------------------------------+----------------------+----------------------+\n",
"| 3 Tesla T4 On | 00000000:00:1E.0 Off | 0 |\n",
"| N/A 30C P0 25W / 70W | 168MiB / 15360MiB | 0% Default |\n",
"| | | N/A |\n",
"+-------------------------------+----------------------+----------------------+\n",
" \n",
"+-----------------------------------------------------------------------------+\n",
"| Processes: |\n",
"| GPU GI CI PID Type Process name GPU Memory |\n",
"| ID ID Usage |\n",
"|=============================================================================|\n",
"+-----------------------------------------------------------------------------+\n"
]
}
],
"source": [
"!nvidia-smi"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Visualizing the Computational Graph ###\n",
"Computational graphs that have not yet been executed provide the `.visualize` method that, when used in a Jupyter environment such as this one, will display the computational graph, including how Dask intends to go about distributing the work. Thus, we can visualize how the `read_csv` operation will be distributed by Dask by executing the following cell:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"image/svg+xml": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.43.0 (0)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"115pt\" height=\"44pt\"\n",
" viewBox=\"0.00 0.00 115.00 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-40 111,-40 111,4 -4,4\"/>\n",
"<!-- &#45;3452522387815099107 -->\n",
"<g id=\"node1\" class=\"node\">\n",
"<title>&#45;3452522387815099107</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"107,-36 0,-36 0,0 107,0 107,-36\"/>\n",
"<text text-anchor=\"middle\" x=\"53.5\" y=\"-13\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">ReadCSV</text>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
],
"text/plain": [
"<graphviz.graphs.Digraph at 0x7effad6a7ac0>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf.visualize(format='svg') # This visualization is very large, and using `format='svg'` will make it easier to view."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As you can see, when we indicate for Dask to actually execute this operation, it will parallelize the work across the 4 GPUs in something like 69 parallel partitions. We can see the exact number of partitions with the `npartitions` property:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"69"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf.npartitions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Extending the Computational Graph ###\n",
"The concept of constructing computational graphs with arbitrary operations before executing them is a core part of Dask. Let's add some operations to the existing computational graph and visualize it again.\n",
"\n",
"After running the next cell, although it will take some scrolling to get a clear sense of it (the challenges of distributed data analytics!), you can see that the graph already constructed for `read_csv` now continues upward. It selects the `age` column across all partitions (visualized as `getitem`) and eventually performs the `.mean()` reduction (visualized as `series-sum-chunk`, `series-sum-agg`, `count-chunk`, `sum-agg` and `true-div`)."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.43.0 (0)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"276pt\" height=\"188pt\"\n",
" viewBox=\"0.00 0.00 276.00 188.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 184)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-184 272,-184 272,4 -4,4\"/>\n",
"<!-- 1212456804757430049 -->\n",
"<g id=\"node1\" class=\"node\">\n",
"<title>1212456804757430049</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"221,-180 47,-180 47,-144 221,-144 221,-180\"/>\n",
"<text text-anchor=\"middle\" x=\"134\" y=\"-157\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">Sum(Projection)</text>\n",
"</g>\n",
"<!-- &#45;8961516369453771125 -->\n",
"<g id=\"node2\" class=\"node\">\n",
"<title>&#45;8961516369453771125</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"268,-108 0,-108 0,-72 268,-72 268,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"134\" y=\"-85\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">Projection(ReadCSV, age)</text>\n",
"</g>\n",
"<!-- &#45;8961516369453771125&#45;&gt;1212456804757430049 -->\n",
"<g id=\"edge1\" class=\"edge\">\n",
"<title>&#45;8961516369453771125&#45;&gt;1212456804757430049</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M134,-108.3C134,-116.02 134,-125.29 134,-133.89\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"130.5,-133.9 134,-143.9 137.5,-133.9 130.5,-133.9\"/>\n",
"</g>\n",
"<!-- &#45;3452522387815099107 -->\n",
"<g id=\"node3\" class=\"node\">\n",
"<title>&#45;3452522387815099107</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"187.5,-36 80.5,-36 80.5,0 187.5,0 187.5,-36\"/>\n",
"<text text-anchor=\"middle\" x=\"134\" y=\"-13\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">ReadCSV</text>\n",
"</g>\n",
"<!-- &#45;3452522387815099107&#45;&gt;&#45;8961516369453771125 -->\n",
"<g id=\"edge2\" class=\"edge\">\n",
"<title>&#45;3452522387815099107&#45;&gt;&#45;8961516369453771125</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M134,-36.3C134,-44.02 134,-53.29 134,-61.89\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"130.5,-61.9 134,-71.9 137.5,-61.9 130.5,-61.9\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
],
"text/plain": [
"<graphviz.graphs.Digraph at 0x7effad7539a0>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mean_age = ddf['age'].sum()\n",
"mean_age.visualize(format='svg')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Computing with the Computational Graph ###\n",
"There are several ways to indicate to Dask that we would like to perform the computations described in the computational graphs we have constructed. The first we will show is the `.compute` method, which will return the output of the computation as an object in one GPU's memory - no longer distributed across GPUs.\n",
"\n",
"**NOTE**: This value is actually a [*future*](https://docs.python.org/3/library/concurrent.futures.html) that it can be immediately used in code, even before it completes evaluating. While this can be tremendously useful in many scenarios, we will not need in this workshop to do anything fancy with the futures we generate except to wait for them to evaluate so we can visualize their values.\n",
"\n",
"Below we send the computational graph we have created to the Dask scheduler to be executed in parallel on our 4 GPUs. If you have the Dask Dashboard open on another tab from before, you can watch it while the operation completes. Because our graph involves reading the entire 18GB data set (as we declared when adding `read_csv` to the call graph), you can expect the operation to take a little time. If you closely watch the dashboard, you will see that Dask begins follow-on calculations for `mean` even while data is still being read into memory."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"ename": "MemoryError",
"evalue": "std::bad_alloc: out_of_memory: CUDA error at: /opt/conda/include/rmm/mr/device/cuda_memory_resource.hpp",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mMemoryError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[8], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mmean_age\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/opt/conda/lib/python3.10/site-packages/dask_expr/_collection.py:481\u001b[0m, in \u001b[0;36mFrameBase.compute\u001b[0;34m(self, fuse, concatenate, **kwargs)\u001b[0m\n\u001b[1;32m 479\u001b[0m out \u001b[38;5;241m=\u001b[39m out\u001b[38;5;241m.\u001b[39mrepartition(npartitions\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m1\u001b[39m)\n\u001b[1;32m 480\u001b[0m out \u001b[38;5;241m=\u001b[39m out\u001b[38;5;241m.\u001b[39moptimize(fuse\u001b[38;5;241m=\u001b[39mfuse)\n\u001b[0;32m--> 481\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mDaskMethodsMixin\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43mout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/opt/conda/lib/python3.10/site-packages/dask/base.py:372\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 348\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 349\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 350\u001b[0m \n\u001b[1;32m 351\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 370\u001b[0m \u001b[38;5;124;03m dask.compute\u001b[39;00m\n\u001b[1;32m 371\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 372\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 373\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
"File \u001b[0;32m/opt/conda/lib/python3.10/site-packages/dask/base.py:660\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 657\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[1;32m 659\u001b[0m \u001b[38;5;28;01mwith\u001b[39;00m shorten_traceback():\n\u001b[0;32m--> 660\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 662\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n",
"File \u001b[0;32m/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py:3799\u001b[0m, in \u001b[0;36mFused._execute_internal_graph\u001b[0;34m(internal_tasks, dependencies, outkey)\u001b[0m\n\u001b[1;32m 3796\u001b[0m \u001b[38;5;129m@staticmethod\u001b[39m\n\u001b[1;32m 3797\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_execute_internal_graph\u001b[39m(internal_tasks, dependencies, outkey):\n\u001b[1;32m 3798\u001b[0m cache \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mdict\u001b[39m(dependencies)\n\u001b[0;32m-> 3799\u001b[0m res \u001b[38;5;241m=\u001b[39m \u001b[43mexecute_graph\u001b[49m\u001b[43m(\u001b[49m\u001b[43minternal_tasks\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcache\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcache\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m[\u001b[49m\u001b[43moutkey\u001b[49m\u001b[43m]\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3800\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m res[outkey]\n",
"File \u001b[0;32m/opt/conda/lib/python3.10/site-packages/dask_cudf/_legacy/io/csv.py:184\u001b[0m, in \u001b[0;36m_read_csv\u001b[0;34m(fn, dtypes, **kwargs)\u001b[0m\n\u001b[1;32m 183\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_read_csv\u001b[39m(fn, dtypes\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[0;32m--> 184\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mcudf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mread_csv\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/opt/conda/lib/python3.10/site-packages/cudf/utils/performance_tracking.py:51\u001b[0m, in \u001b[0;36m_performance_tracking.<locals>.wrapper\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 43\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m nvtx\u001b[38;5;241m.\u001b[39menabled():\n\u001b[1;32m 44\u001b[0m stack\u001b[38;5;241m.\u001b[39menter_context(\n\u001b[1;32m 45\u001b[0m nvtx\u001b[38;5;241m.\u001b[39mannotate(\n\u001b[1;32m 46\u001b[0m message\u001b[38;5;241m=\u001b[39mfunc\u001b[38;5;241m.\u001b[39m\u001b[38;5;18m__qualname__\u001b[39m,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 49\u001b[0m )\n\u001b[1;32m 50\u001b[0m )\n\u001b[0;32m---> 51\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mfunc\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/opt/conda/lib/python3.10/site-packages/cudf/io/csv.py:80\u001b[0m, in \u001b[0;36mread_csv\u001b[0;34m(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, prefix, mangle_dupe_cols, dtype, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, skip_blank_lines, parse_dates, dayfirst, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, comment, delim_whitespace, byte_range, storage_options, bytes_per_thread)\u001b[0m\n\u001b[1;32m 77\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m na_values \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mand\u001b[39;00m is_scalar(na_values):\n\u001b[1;32m 78\u001b[0m na_values \u001b[38;5;241m=\u001b[39m [na_values]\n\u001b[0;32m---> 80\u001b[0m df \u001b[38;5;241m=\u001b[39m \u001b[43mlibcudf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcsv\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mread_csv\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 81\u001b[0m \u001b[43m \u001b[49m\u001b[43mfilepath_or_buffer\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 82\u001b[0m \u001b[43m \u001b[49m\u001b[43mlineterminator\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlineterminator\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 83\u001b[0m \u001b[43m \u001b[49m\u001b[43mquotechar\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mquotechar\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 84\u001b[0m \u001b[43m \u001b[49m\u001b[43mquoting\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mquoting\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 85\u001b[0m \u001b[43m \u001b[49m\u001b[43mdoublequote\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdoublequote\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 86\u001b[0m \u001b[43m \u001b[49m\u001b[43mheader\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mheader\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 87\u001b[0m \u001b[43m \u001b[49m\u001b[43mmangle_dupe_cols\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mmangle_dupe_cols\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 88\u001b[0m \u001b[43m \u001b[49m\u001b[43musecols\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43musecols\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 89\u001b[0m \u001b[43m \u001b[49m\u001b[43msep\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43msep\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 90\u001b[0m \u001b[43m \u001b[49m\u001b[43mdelimiter\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdelimiter\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 91\u001b[0m \u001b[43m \u001b[49m\u001b[43mdelim_whitespace\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdelim_whitespace\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 92\u001b[0m \u001b[43m \u001b[49m\u001b[43mskipinitialspace\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mskipinitialspace\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 93\u001b[0m \u001b[43m \u001b[49m\u001b[43mnames\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mnames\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 94\u001b[0m \u001b[43m \u001b[49m\u001b[43mdtype\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdtype\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 95\u001b[0m \u001b[43m \u001b[49m\u001b[43mskipfooter\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mskipfooter\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 96\u001b[0m \u001b[43m \u001b[49m\u001b[43mskiprows\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mskiprows\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 97\u001b[0m \u001b[43m \u001b[49m\u001b[43mdayfirst\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdayfirst\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 98\u001b[0m \u001b[43m \u001b[49m\u001b[43mcompression\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcompression\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 99\u001b[0m \u001b[43m \u001b[49m\u001b[43mthousands\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mthousands\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 100\u001b[0m \u001b[43m \u001b[49m\u001b[43mdecimal\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdecimal\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 101\u001b[0m \u001b[43m \u001b[49m\u001b[43mtrue_values\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mtrue_values\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 102\u001b[0m \u001b[43m \u001b[49m\u001b[43mfalse_values\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mfalse_values\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 103\u001b[0m \u001b[43m \u001b[49m\u001b[43mnrows\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mnrows\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 104\u001b[0m \u001b[43m \u001b[49m\u001b[43mbyte_range\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mbyte_range\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 105\u001b[0m \u001b[43m \u001b[49m\u001b[43mskip_blank_lines\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mskip_blank_lines\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 106\u001b[0m \u001b[43m \u001b[49m\u001b[43mparse_dates\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mparse_dates\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 107\u001b[0m \u001b[43m \u001b[49m\u001b[43mcomment\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcomment\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 108\u001b[0m \u001b[43m \u001b[49m\u001b[43mna_values\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mna_values\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 109\u001b[0m \u001b[43m \u001b[49m\u001b[43mkeep_default_na\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mkeep_default_na\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 110\u001b[0m \u001b[43m \u001b[49m\u001b[43mna_filter\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mna_filter\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 111\u001b[0m \u001b[43m \u001b[49m\u001b[43mprefix\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mprefix\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 112\u001b[0m \u001b[43m \u001b[49m\u001b[43mindex_col\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mindex_col\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 113\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 115\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m dtype \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mor\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(dtype, abc\u001b[38;5;241m.\u001b[39mMapping):\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# There exists some dtypes in the result columns that is inferred.\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# Find them and map them to the default dtypes.\u001b[39;00m\n\u001b[1;32m 118\u001b[0m specified_dtypes \u001b[38;5;241m=\u001b[39m {} \u001b[38;5;28;01mif\u001b[39;00m dtype \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;28;01melse\u001b[39;00m dtype\n",
"File \u001b[0;32mcsv.pyx:210\u001b[0m, in \u001b[0;36mcudf._lib.csv.read_csv\u001b[0;34m()\u001b[0m\n",
"File \u001b[0;32mcsv.pyx:274\u001b[0m, in \u001b[0;36mpylibcudf.io.csv.read_csv\u001b[0;34m()\u001b[0m\n",
"\u001b[0;31mMemoryError\u001b[0m: std::bad_alloc: out_of_memory: CUDA error at: /opt/conda/include/rmm/mr/device/cuda_memory_resource.hpp"
]
}
],
"source": [
"mean_age.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Persisting Data in the Cluster ###\n",
"As you can see, the previous operation, which read the entire 18GB csv into the GPUs' memory, did not retain the data in memory after completing the computational graph:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"!nvidia-smi"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A typical Dask workflow, which we will utilize, is to persist data we would like to work with to the cluster and then perform fast operations on that persisted data. We do this with the `.persist` method. From the [Dask documentation](https://distributed.dask.org/en/latest/manage-computation.html#client-persist):\n",
"\n",
">The `.persist` method submits the task graph behind the Dask collection to the scheduler, obtaining Futures for all of the top-most tasks (for example one Future for each Pandas [*or cuDF*] DataFrame in a Dask[*-cudf*] DataFrame). It then returns a copy of the collection pointing to these futures instead of the previous graph. This new collection is semantically equivalent but now points to actively running data rather than a lazy graph.\n",
"\n",
"Below we persist `ddf` to the cluster so that it will reside in GPU memory for us to perform fast operations on. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf = ddf.persist()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As you can see by executing `nvidia-smi` (after letting the `persist` finish), each GPU now has parts of the distributed dataframe in its memory:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"!nvidia-smi"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Running `ddf.visualize` now shows that we no longer have operations in our task graph, only partitions of data, ready for us to perform operations:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"ddf.visualize(format='svg')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Computing operations on this data will now be much faster:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"ddf['age'].mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initial Data Exploration with Dask cuDF ##\n",
"The beauty of Dask is that working with your data, even though it is distributed and massive, is a lot like working with smaller in-memory data sets."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf.head() # As a convenience, no need to `.compute` the `head()` method"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf.count().compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf.dtypes"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Exercise #1 - Counties North of Sunderland with Dask ###\n",
"Here we ask you to revisit an earlier exercise, but on the distributed data set. Hopefully, it's clear how similar the code is for single-GPU dataframes and distributed dataframes with Dask.\n",
"\n",
"Identify the latitude of the northernmost resident of Sunderland county (the person with the maximum `lat` value), and then determine which counties have any residents north of this resident. Use the `unique` method of a cudf `Series` to de-duplicate the result.\n",
"\n",
"**Instructions**: <br>\n",
"* Modify the `<FIXME>` only and execute the below cell to identify counties north of Sunderland. "
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"0 South Tyneside\n",
"0 Cumbria\n",
"0 County Durham\n",
"0 Gateshead\n",
"0 Northumberland\n",
"Name: county, dtype: object"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sunderland_residents = ddf.loc[ddf['county'] == 'Sunderland']\n",
"northmost_sunderland_lat = sunderland_residents['lat'].max()\n",
"counties_with_pop_north_of = ddf.loc[ddf['lat'] > northmost_sunderland_lat]['county'].unique()\n",
"results=counties_with_pop_north_of.compute()\n",
"results.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import IPython\n",
"app = IPython.Application.instance()\n",
"app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Well Done!** Let's move to the [next notebook](1-09_cudf-polars.ipynb). "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"./images/DLI_Header.png\" width=400/>"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.15"
}
},
"nbformat": 4,
"nbformat_minor": 4
}