From 417326498e135fc583063b9908a440f8887bc12d Mon Sep 17 00:00:00 2001 From: SEK1RO Date: Mon, 26 Jan 2026 20:58:26 +0300 Subject: [PATCH] ds: r9 --- ds/25-1/3/1_01_data_loading.md | 1116 +++++++++++++++++ ds/25-1/3/1_02_EDA.md | 654 ++++++++++ .../3/1_03_categorical_feature_engineering.md | 279 +++++ ds/25-1/3/1_04_nvtabular_and_mgpu.md | 341 +++++ ds/25-1/3/README.md | 7 + ds/25-1/3/memory_utilization.md | 92 ++ ds/25-1/r/9.Rmd | 161 +++ ds/25-1/r/r2.Rproj | 13 - 8 files changed, 2650 insertions(+), 13 deletions(-) create mode 100644 ds/25-1/3/1_01_data_loading.md create mode 100644 ds/25-1/3/1_02_EDA.md create mode 100644 ds/25-1/3/1_03_categorical_feature_engineering.md create mode 100644 ds/25-1/3/1_04_nvtabular_and_mgpu.md create mode 100644 ds/25-1/3/README.md create mode 100644 ds/25-1/3/memory_utilization.md create mode 100644 ds/25-1/r/9.Rmd delete mode 100644 ds/25-1/r/r2.Rproj diff --git a/ds/25-1/3/1_01_data_loading.md b/ds/25-1/3/1_01_data_loading.md new file mode 100644 index 0000000..69c81f1 --- /dev/null +++ b/ds/25-1/3/1_01_data_loading.md @@ -0,0 +1,1116 @@ + Header + +# Enhancing Data Science Outcomes With Efficient Workflow # + +## 01 - Data Loading ## +In this lab, you will learn the motivation behind doing data science on GPU clusters. This lab covers ETL, data exploration, and feature engineering steps of the data processing pipeline. Extract, transform, load, or [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load), is the process where data is transformed into a proper structure for the purposes of querying and downstream analysis. [Feature engineering](https://en.wikipedia.org/wiki/Feature_engineering), on the other hand, is the process of converting raw data into features that can be used to develop machine learning models. + +

+ +**Table of Contents** +
+In this notebook, we will load data into a Dask DataFrame, perform data cleaning, and persist the processed data for downstream analytics. This notebook covers the below sections: +1. [Background](#s1-1) + * [RAPIDS, cuDF, and Dask-cuDF](#s1-1.1) + * [Problem Scoping and Other Applications](#s1-1.2) +2. [Introduction to Dataset](#s1-2) + * [Data Size](#s1-2.1) +3. [Data Loading](#s1-3) + * [About CSV File Format](#s1-3.1) + * [Exercise #1 - Understanding DataFrame Memory Usage](#s1-e1) + * [Exploring Ways to Load Large Dataset on Single GPU](#s1-3.2) + * [Dask-cuDF With Single GPU](#s1-3.3) + * [Distributed cuDF DataFrame on Multi-GPU with Dask-cuDF](#s1-3.4) + * [Exercise #2 - Starting a Dask Cluster](#s1-e2) +4. [Data Quality Check, Data Cleaning, and GroupBy Operations](#s1-4) + * [Handling Null Values](#s1-4.1) + * [Exercise #3 - Understanding Groupby Operations Behavior](#s1-e3) + * [Data Imputation](#s1-4.2) + * [Exercise #4 - Check Missing Values](#s1-e4) + * [Exercise #5 - Static Fill Null Values](#s1-e5) +5. [Filter Relevant Data](#s1-5) +6. [Save to Parquet Format](#s1-6) + * [Dask-cuDF to cuDF Conversion](#s1-6.1) +7. [Summary](#s1-7) + + +## Background ## +Unlike the field of computer vision and conversational AI, which have already fully adopted GPU-acceleration for both data processing and training of machine learning models, much of the data science field is still predominantly powered by CPUs. This is due to the _tabular nature_ of the underlying data. Tabular data are ubiquitous and easily understood. They support the analytics enterprises rely on to thrive and succeed. However, as the amount of data continues to grow, traditional software and hardware can no longer support the processing speed needed. + +With RAPIDs, we can achieve significant performance improvements for both data preprocessing and machine learning model training alike. RAPIDS uses high-level APIs to simplify access to CUDA interfaces for various operations. By keeping the data entirely on the GPU and in the same format throughout the data processing pipeline, there is no need to copy/move data between CPU and GPU memory. Minimizing data transfer helps us to realize full potential of GPU acceleration. The benefits of reduced computation time on tabular data for data science include: +* Allows more features to be discovered through experimentation +* Improves model performance through hyperparameter optimization +* Reduces cost and improves productivity + + +### RAPIDS, cuDF, and Dask-cuDF ### +Data manipulation can be done using the RAPIDS suite of software libraries. They parallelize compute to multiple cores in the GPU to achieve improved performance over traditional data processing tools. We can use cuDF to manipulate data using a DataFrame-style API and Dask-cuDF to distribute our computation across a GPU cluster. This is useful when working with large amounts of data that exceeds the memory of a single GPU. + +

+ +**cuDF** [[doc]](https://docs.rapids.ai/api/cudf/stable/) is a Python GPU DataFrame library (built on the [Apache Arrow](https://arrow.apache.org/) columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating data. cuDF provides a [pandas](https://pandas.pydata.org/)-like API that will be familiar to data engineers and data scientists. This enables them to easily accelerate their workflows without going into the details of CUDA programming. cuDF also offers great interoperability with other tools in the GPU PyData ecosystem such as [CuPy](https://docs.rapids.ai/api/cudf/stable/user_guide/cupy-interop.html). + +RAPIDS uses **Dask** [[doc]](https://dask.org/) to scale data manipulation across multiple GPUs through **Dask-cuDF**. Dask is a Python distributed framework used to run workloads on both CPUs and GPUs. It is an open-source library designed to natively scale Python code. With Dask, data is split into partitions, processed by one or more nodes within a cluster, and aggregated when necessary. Dask handles distributing our data across multiple GPUs with **Dask-CUDA**, which is a library extending `Dask.distributed`[[doc]](https://distributed.dask.org/en/stable/). Dask consists of 3 main components: a **client**, a **scheduler**, and one or more **workers**. When the Dask scheduler assigns tasks to workers, they can run in parallel by individual workers. Of course, the tasks themselves will use **CUDA** for parallelized computation. For more information on how to use Dask with GPUs, please see the Dask-CUDA [documentation](https://dask-cuda.readthedocs.io/en/latest/). + +

+ +**Note**: cuDF is great for tabular datasets that fit nicely in GPU memory while Dask-cuDF becomes useful when the dataset is larger than memory. A quick introduction to cuDF and Dask-cuDF can be found via [10 Minute to cuDF and Dask-cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min.html). Throughout the workshop, we will denote `pandas.DataFrame` as `df`, `cudf.DataFrame` as `gdf`, and `dask.dataframe.DataFrame` as `ddf`. + + +### Problem Scoping and Other Applications ### +For this lab, we will perform various data science tasks using an E-Commerce dataset. We set the primary objective as building a _classification model_ that can accurately predict if a customer will purchase an item. Throughout the lab, we'll closely examine each step that helps us improve our results. In doing so, we will be able to apply the techniques discussed to other use cases. For example, similar pipelines is used for: +* **Retail Analytics**: Some retailers need to make forecasts regularly for inventory and supply chain management purposes. The high frequency of this process, often weekly, puts constraints on time and demands high levels of acceleration to process large amounts of data. The ability to process data efficiently and develop powerful analytical tools enable higher sales and revenue. +* **Recommendation System**: Enterprises across industries rely on the ability to process inputs, build models, and inference quickly to help users find relevant information. +* **Financial Modeling**: Investment management companies, banks, and venture capital firms rely on analyzing large datasets and performing analytics in near real-time to identify trends. + + +## Introduction to Dataset ## +We use the [e-Commerce Behavior Data from Multi-Category Store](https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store) from [REES46 Marketing Platform](https://rees46.com/) as our dataset. Below is a sample of the original data: + +

+ +**Description**: +* `event_time` - time when event happened (in UTC) +* `event_type` - events can be: + * `view` - a user viewed a product + * `cart` - a user added a product to shopping cart + * `remove_from_cart` - a user removed a product from shopping cart + * `purchase` - a user purchased a product +* `product_id` - ID of a product +* `category_id` - product's category ID +* `category_code` - product's category taxonomy (usually present for meaningful categories and skipped for different kinds of accessories) +* `brand` - brand name (downcased) +* `price` - float price of a product +* `user_id` - permanent user ID +* `user_session` - temporary user's session ID. + * session is changed every time a user comes back after a long pause + * a session can have multiple purchase events and is considered a single order + + +### Data Size ### +Performance often isn't a problem when working with a small dataset. However, runtime can be much longer and the kernel might fail due to insufficient memory if the dataset is large. Memory capacity is an important consideration for us since GPU memory can be smaller and more expensive than host memory. If we want to keep data processing in the GPU to take advantage of the acceleration, we need to be mindful about memory utilization. We start by examining the size of the source data files. + + +```python +# use ls command to list directory contents +# the options -s shows size and -h shows in readable format +!ls -sh data +``` + + +```python +# use gzip command to uncompress .csv.gz files +# the options -d decompresses and -k keeps the original file +!gzip -cdk data/2020-Mar.csv.gz > ./2020-Mar.csv +``` + + +```python +# the uncompressed csv file is much larger +!ls -sh 2020-Mar.csv +``` + +

+ +Some categorical features, such as `event_type`, `category_code`, `brand`, and etc. are stored as raw text. They could have alternatively been stored as integer with a separate mapping. When represented as strings, they require significant amount of disk space and memory to process. + + +## Data Loading ## +**cuDF** and **Dask-cuDF** support all the common I/O formats such as `read_csv()`, `read_json()`, `read_parquet()`, `read_orc()`, and more. If the data fits comfortably on a single GPU, then cuDF on a single GPU should be used for data manipulation. cuDF is much faster than Dask-cuDF on small datasets since there is no orchestration overhead. On the other hand, if the workflow is to be distributed across multiple GPUs, data is spread across many files at once, or doesn't fit in memory on a single GPU, Dask-cuDF is recommended. More information about I/O operations can be found in the [documentation](https://docs.rapids.ai/api/cudf/stable/api_docs/io.html). + + +### About CSV File Format ### +CSV files are commonly used because they’re human-readable. While they continue to be a popular choice when storing data, CSV files are usually not the best file format for data analysis. It's best to start our analysis by first converting the files to a more desirable file format. The `.read_csv()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.read_csv.html) function for loading data is memory intensive and requires memory in excess of the original source. This is related to intermediary processes that create temporary data when parsing the file(s). The spike in memory is particularly noticable when loading large datasets that contain many string values since they require much more working memory. In general, whether memory will be an issue depends on the size and types of data, but using Dask can help us deal with potential memory issues. + + +### Exercise #1 - Understanding DataFrame Memory Usage ### + +**Instructions**:
+* Review the [memory_utilization notebook](memory_utilization.ipynb) to understand how memory usage is calculated. + + +### Exploring Ways to Load Large Dataset on Single GPU ### +Since our dataset is quite large, attempting to read the data as is into a single T4 GPU with cuDF will result in *going out of memory* (see sample error message): + +``` +MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /opt/conda/envs/rapids/include/rmm/mr/device/cuda_memory_resource.hpp +``` + +In order to read the data into a `cudf.DataFrame`, there are a few options: +1. Limit the amount of data read with `nrows` parameter, e.g. `gdf=cudf.read_csv('2020-Mar.csv', nrows=30000000)` +2. Limit the number of columns read with `usecols` parameter, e.g. `gdf=cudf.read_csv('2020-Mar.csv', usecols=['event_time', 'event_type'])` +3. Use pandas to read the CSV file using host memory, then create a `cuDF.DataFrame` from the `pandas.DataFrame` with `.from_pandas()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.dataframe.from_pandas), e.g. `gdf=cudf.from_pandas(pd.read_csv('2020-Mar.csv'))`. This relies on the host memory to perform many of the intermediary processes before the data is moved to the GPU. It is not recommended as reading large datasets with pandas can be inefficient. +4. Read the entire dataset with cuDF by enabling memory spilling from the GPU to host using RMM (managed memory), e.g. `cudf.set_allocator('managed')`, `gdf=cudf.read_csv('2020-Mar.csv')`. This is not recommended as it can corrupt driver context and require the whole machine to be rebooted. +5. Whether it's to analyze data on one GPU or spread the analysis across multiple GPUs for even faster results, Dask-cuDF is the recommended way of reading large datasets. Even with a single GPU, Dask-cuDF would be able to read large datasets by leveraging mechanisms that help manage memory, which include chunking the workload as well as **spilling** [[doc]](https://docs.rapids.ai/api/dask-cuda/stable/spilling/) from GPU memory to host memory and disk as needed. + + +### Dask-cuDF With Single GPU ### +When using Dask, it is always recommended to create a cluster first with `dask_cuda.LocalCUDACluster()`. Dask will run even if we don't create a cluster explicitly, but none of the optimizations introduced by [Dask-CUDA](https://docs.rapids.ai/api/dask-cuda/stable/index.html) will be available in such cases. Dask's default scheduler is a single-machine scheduler (as described [here](https://docs.dask.org/en/latest/scheduling.html)) that cannot run on multiple machines. We will use the distributed scheduler to work with a cluster. + +The Dask client is the primary entry point for `dask.distributed`. When we create a Dask client with `dask.distributed.Client()` and point it to the cluster, we enable compute to be done by the distributed system. Instantiating a `dask.distributed.Client` with no arguments will implicitly create a `dask.distributed.LocalCluster` and pass it as the argument (as described [here](https://docs.dask.org/en/latest/deploying-python.html#client)). While we can make a `LocalCluster` work with multiple GPUs, it is recommended to use the `LocalCUDACluster` variant that uses one GPU per process. To create a `LocalCUDACluster`, we will want to either use the `dask_cuda.LocalCUDACluster`[[doc]](https://docs.rapids.ai/api/dask-cuda/stable/api.html) API, launch Dask workers from the command line, or use the [Dask JupyterLab Extension](https://github.com/dask/dask-labextension) UI to start a cluster. We will use `dask_cuda.LocalCUDACluster()` to instantiate the cluster and discuss some important parameters. + +Using `dask-cuda-worker` or `LocalCUDACluster` will automatically launch one worker for each GPU available on the node from where it was executed, but we will first use Dask with a single GPU before scaling out to multiple GPUs. We can control the number of workers with the `CUDA_VISIBLE_DEVICES` parameter, or by defining it as an environment variable. Alternatively, the `n_workers` can be used, but it must be smaller than or equal to the number of GPUs specified in `CUDA_VISIBLE_DEVICES`. + +Importantly, Dask-CUDA supports spilling from device memory to host memory when the GPU can't fit more data. The spilling mechanism is triggered once the user-defined limit, `device_memory_limit`, is reached. The `device_memory_limit` parameter takes a string representing the number of bytes in human-readable format (e.g. `2GB`), float representing the fraction of device memory, or integer representing the number of bytes per worker. By default, this parameter is set to `0.8`. which means the cluster will spill to the host when memory usage becomes high. The `rmm_pool_size` parameter is used to specify the initial RMM pool size for each worker. More information about spilling from device can be found [here](https://docs.rapids.ai/api/dask-cuda/stable/spilling/). Memory spilling is important when doing complex operations like `.groupby()`, which can spike memory far higher than just the raw file size. On the other hand, if the workflow is not memory constrained, `device_memory_limit` can be set to `0` to disable spilling. + +A separate parameter `memory_limit` is used to set bytes of host memory each worker can use. It's defaulted to `auto`, which will assign host memory to each worker based on the calculation `min(1, num_threads / num_cores)`, where `num_threads` is `1` for GPU workers and `num_cores` is `n_workers * n_threads`. This will split the total system memory evenly between the workers and enable spilling to disk. When host memory is constrained, splitting on an already low host memory can trigger Dask to [pause](https://distributed.dask.org/en/stable/worker-memory.html#pause-worker) or [kill](https://distributed.dask.org/en/stable/worker-memory.html#kill-worker) workers. More information about memory management can be found [here](https://distributed.dask.org/en/stable/worker-memory.html). + +More information about `dask_cuda.LocalCUDACluster` can be found in the [documentation](https://docs.rapids.ai/api/dask-cuda/nightly/api/#dask_cuda.LocalCUDACluster). + + +```python +# import dependencies +from dask.distributed import Client, get_client +from dask_cuda import LocalCUDACluster +import dask_cudf +import cudf + +# create cluster +cluster=LocalCUDACluster( + CUDA_VISIBLE_DEVICES="0", # equivalent to integer value 1 + # rmm_pool_size="14GB", # This GPU has 15GB of memory + device_memory_limit=0.8, # this is the default value so doesn't need to be stated explicitly + dashboard_address=':8787' # this is the default value so doesn't need to be stated explicitly +) + +# instantiate client +Client(cluster) +``` + +Once a Dask client is created, we can use `dask.distributed.get_client()` and `dask.distributed.Client.scheduler_info()` to get back information about locally created clients and the workers in the cluster. The cluster state can be reset with the `dask.distributed.Client.restart()` method. More information about the `dask.distributed` API can be found [here](https://distributed.dask.org/en/stable/api.html). With the cluster and client set up, Dask will assign work to the GPU. + + +```python +# get existing client +client=get_client() + +# get workers info +client.scheduler_info() +``` + +

+ +The scheduler information pop-up shows that there is currently 1 worker in the cluster, along with its respective host and device memory. + +The Dask distributed scheduler provides live feedback via an interactive dashboard. We will use this [Dashboard Diagnostics](https://docs.dask.org/en/stable/dashboard.html) to monitor and gain insight on the Dask computations. The cluster's `dashboard_address` parameter is defaulted to `:8787`, where the dashboard will be served. We can navigate to the address to access the dashboard. Note that the localhost address (e.g. 127.0.0.1:8787) is not directly accessible. We have exposed it for public access through this workshop instance's URL. We recommend having the Dashboard open as it's useful to understand Dask behavior. Please refer to the [introduction notebook](1_00_introduction.ipynb) for how the plots are used. + +When using the Dask Dashboard Diagnostics, bad signs to watch out for include: +* Lots of white space in the task stream plot is a bad sign. This indicates that workers are idle, which might be caused by chunks being too small. +* Lots of red in the task stream plot is also a bad sign. While workers need some communication, too much red means there isn't much productive work going on except communication. +* Watch out for orange bars or grey bars (spilled to disk) in the worker memory plot. This is a sign of getting close to memory limit or spilling to disk, which can be caused by chunks being too big. + + +```python +# get the machine's external IP address +from requests import get + +ip=get('https://api.ipify.org').content.decode('utf8') + +print(f'Dask dashboard (status) is accessible on http://{ip}:8787/status') +print(f'Dask dashboard (gpu) is accessible on http://{ip}:8787/gpu') +``` + +Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed using cuDF GPU DataFrames. For example, when we call `dask_cudf.read_csv()`, the cluster’s GPU does the work of parsing the CSV file(s) by calling `cudf.read_csv()`. Dask reads the file(s) in **chunks** into **partitions** of the resulting `dask_cudf.core.DataFrame`. A Dask DataFrame contains multiple cuDF DataFrames and each `cudf.DataFrame` is referred to as a partition of the `dask_cudf.core.DataFrame`. + +We can use the `chunksize` parameter to control the size of each partition and by extention how many partitions there should be. The default value of `chunksize` is `256 MiB` (or ~ `268.4 MB`), which is fairly low for most modern GPUs. Using default values, `.read_csv()` will create a `dask_cudf.core.DataFrame` with 30 partitions for this dataset, which happens to be approximately 7.3 GB / 268.4 MB. The decision on the `chunksize` will influence the number of partitions created. The number of partitions goes down when the `chunksize` increases. It is an important consideration since: +* Number of partitions should be the same or a multiple of the number of workers so they can be tasked equally. +* If the `chunksize` is too small, there will be too many partitions and too many tasks. We will see performance decrease since every operation on every partition costs the scheduler some overhead (a few hundred microseconds) to process. +* If the `chunksize` is too large, we will likely run out of working memory and see performance decrease as data spills. + + +```python +%%time +# OPTION 5 - create Dask-cuDF DataFrame +import dask_cudf + +ddf=dask_cudf.read_csv('2020-Mar.csv') +print(f'Total of {len(ddf)} records split across {ddf.npartitions} partitions. ') +``` + +

+ +Even with a single GPU, `dask_cudf` is still able to process large CSV files. When there isn't enough device memory to contain all the data, Dask allows memory to spill to host memory or to disk. For this reason, we use `dask_cudf.read_csv()` in preference to `dask_cudf.from_cudf(cudf.read_csv())` since the former can handle CSV files larger than what would fit in memory on a single GPU and easily parallelize across multiple GPUs. + +Note that this screenshot shows the task stream of reading the csv file into 30 partitions, performing `len()` on each, and aggregating once at the end. All work is done on 1 worker as expected. +

+ + +```python +# shutdown kernel +import IPython + +app=IPython.Application.instance() +app.kernel.do_shutdown(restart=True) +``` + + +### Distributed cuDF DataFrame on Multi-GPU with Dask-cuDF ### +By default, Dask will only use a single GPU. To use multiple GPUs, we will start a cluster with `dask_cuda.LocalCUDACluster()` and assign a different GPU to each Dask worker process. We will also initialize a `dask.distributed.Client` and point it to the cluster. With a cluster of 4 workers setup, the Dask client will connect to and submit computations to the cluster. + +When starting a cluster, the default value of `None` for the `CUDA_VISIBLE_DEVICES` and `n_workers` parameters will use all available GPUs and assign a different GPU to each Dask worker process. We can also explicitedly pass `"0, 1, 2, 3"` or `[0, 1, 2, 3]` to `CUDA_VISIBLE_DEVICES`, or `4` to `n_workers` to achieve the same result. + + +### Exercise #2 - Starting a Dask Cluster ### + +**Instructions**:
+* Modify the `` only and execute the below cell to create a custer with 4 workers, one for each GPU. +* Execute the cell below and navigate to the cluster's dashboard URL. We will use Dashboard Diagnostics to monitor our Dask computations. Please refer to the [introduction notebook](1_00_introduction.ipynb) for how the plots are used. + + +```python +# import dependencies +from dask.distributed import Client, wait +from dask_cuda import LocalCUDACluster +import cudf +import dask_cudf +import cupy as cp +import gc +import matplotlib.pyplot as plt +from timeit import timeit + +# create cluster +cluster=<<<>>> + +# instantiate client +client=Client(cluster) +``` +from dask.distributed import Client, wait +from dask_cuda import LocalCUDACluster +import cudf +import dask_cudf +import cupy as cp +import gc +import matplotlib.pyplot as plt +from timeit import timeit + +cluster=LocalCUDACluster( + CUDA_VISIBLE_DEVICES="0, 1, 2, 3", # equivalent to [0, 1, 2, 3] or None. By default, all gpus will be used so doesn't need to be stated explicitly + device_memory_limit=0.8, # this is the default value so doesn't need to be stated explicitly + dashboard_address=':8787' # this is the default value so doesn't need to be "stated explicitly +) + +client=Client(cluster) +Click ... to show **solution**. + + +```python +# get the machine's external IP address +from requests import get + +ip=get('https://api.ipify.org').content.decode('utf8') + +print(f'Dask dashboard (status) is accessible on http://{ip}:8787/status') +print(f'Dask dashboard (gpu) is accessible on http://{ip}:8787/gpu') +``` + +With a cluster of 4 workers set up, Dask will distribute work over all GPUs in this machine. Dask can read data in parallel, even if there is only a single CSV file. This is done by instructing each worker to read distinct byte-ranges in the file. Generally, it's faster to read, compute, and write big datasets that are spread across multiple files with cluster computing. One of the easiest way to make data loading more efficient is to avoid single files unnecessarily. Often when we encounter a large file, the first thing to do is split it up into smaller files. Big files are a common performance bottleneck in big data processing pipelines. Dask can load collections of files all at once. Parallel I/O is a huge strength of Dask. If there is more than one source data file, we can read all CSV files in a directory into a single `dask_cudf.core.DataFrame` by using the asterix (`*`) wildcard, such as: + +``` +dask_cudf.read_csv('data/*.csv') +``` + +The `.read_csv()` accepts a `dtype` parameter to specify the data types for each column. Specifying dtypes directly is recommended as it's less error prone and more performant. Using the correct data type will prevent the allocation of unnecessary memory space and make a significant difference in memory usage. Choosing the right data type for numerical features is predicated on knowing each variable's range, which should be information given by the data provider. We have figured them out ahead of time based on a separate preview of the data. When we specify the `dtype` for all columns, Dask won't do any data type inference and we will avoid potential errors or performance slowdowns. + +It is recommended to set the `chunksize` value based on the specific hardware. With the T4, V100, or Quadro 8000 GPUs, partitions of 2-3 GB is usually a good choice. + + +```python +%%time +dtypes={'event_time': 'object', # to be converted to datetime64[ns] + 'event_type': 'object', # to be converted to category + 'product_id': 'int32', # max value is 1e8 + 'category_id': 'int64', # has to be positive value + 'category_code': 'object', # keep as string + 'brand': 'object', # to be converted to category + 'price': 'float32', # since the max value is 1e3, we could have used float16 if it were supported + 'user_id': 'int32', # has to be positive value + 'user_session': 'object' # keep as string + } + +ddf=dask_cudf.read_csv('2020-Mar.csv', dtype=dtypes, chunksize='2GB') +print(f'Total of {len(ddf)} records split across {ddf.npartitions} partitions. ') +``` + + +

+ +When reading data from CSV files, which do not contain an accompanying schema, the data type is inferred for each column based on the data if they are not explicitly set by the user. DataFrames are created using major case (`int64`, `float64`) for numerical columns and object type otherwise. In most cases, there are more appropriate data types to optimize memory utilization. Furthermore, if Dask and cuDF infers the data types based on the dataset, they can run into errors when there are missing values. For example, if the first chunk of data contains missing values for a column, it will be inferred as float. This becomes an issue when reading the entire dataset if the column actually contains `string` or `object`-typed data. This is another reason why it's advantageous to ascertain the schema of the dataset and explicitedly define them for data loading. + +The `datetime` data type is useful for conducting time-series analysis. cuDF and Dask-cuDF supports `datetime`-typed columns. They allow users to reference and filter records based on specific timestamps. Since each `datetime` value is 64-bit and costs 8 bytes, storing a value as `datetime` is usually more memory efficient than storing it as `object` data type, which would require 1 byte per character. In some cases, date values stored as `YYYYMMDD` or similar can be read and represented as an integer, which would be even more efficient. + +The `category` data type is useful if a column contains a limited set of values. It uses integer values under the hood to represent the values in a column rather than the raw values. DataFrames use a separate mapping dictionary to map integer values to the raw ones. For columns with low cardinality, where the number of unique values is lower than 50% of the count, `category` data type should be used. More information on supported data type can be found in the [documentation](https://docs.rapids.ai/api/cudf/nightly/user_guide/data-types.html). + +In the following table, we show memory usage with and without specifying data types using this workshop's dataset. Overall, memory utilization is significantly reduced when specifying data types. We were able to benefit from a ~99% reduction in memory usage for the `event_type` column by loading it as `category` instead of `object` since there are only a few possible values (`view`, `cart`, `remove_from_cart`, or `purchase`). + +

+ +The memory requirements for the CSV file and the DataFrame are very different. A large contributor is the number of string columns. In Python, each string requires approximately 49 bytes, with an additional byte for each character. This is significantly more than the 1 byte required to store a character in the CSV file. In cuDF, string columns are represented in accordance with Apache Arrow memory specification and can be more efficient than in pandas. + +The below screenshot shows the task stream of reading the csv file into 4 partitions, performing `len()` on each, and aggregating once at the end. The tasks are distributed across 4 workers and 1 worker is assigned the task to aggregate. The GPU screenshot shows memory utilized, but these are mostly related to memory used for the CUDA context and computation results instead of the data. + +

+ +Data loading with multiple GPUs was significantly faster. Next, we will convert the `event_time` feature as `datetime` with `cudf.to_datetime()`. This requires executing `cudf.to_datetime()` on each partition of the Dask DataFrame. The `dask_cudf.core.DataFrame.map_partitions()`[[doc]](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) applies a function on each DataFrame partition. Extra arguments and keywords can optionally be provided, which will be passed to the function for each partition. One important parameter is `meta`, which is used to specify the name and data type of the expected output. This can be specified in many forms. Without it, Dask will try to infer the output metadata but can sometimes be expensive. For more information on metadata, please see `dask.dataframe.utils.make_meta`[[doc]](https://docs.dask.org/en/stable/generated/dask.dataframe.utils.make_meta.html). + +With `cudf.to_datetime()`, the format of the strings from which `datetime` objects will be created can be specified through the `format` paramter. This works well when the values are in a standard format such as [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601). Otherwise, the `infer_datetime_format` parameter should be used to enable a faster parsing method when possible. Similar results can be achieved by specifying `datetime[64]` as data type when initially reading the csv file, using the `parse_dates` parameters, or with `dask_cudf.core.Series.astype('datetime64[ns]')`. + +

+ +Reading data as `category` data type is currently not supported for CSV files but these columns can be converted with `dask_cudf.core.DataFrame.categorize()`[[doc]](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.categorize.html) or individually with `dask_cudf.core.Series.astype('category')`[[doc]](https://docs.rapids.ai/api/cudf/legacy/api_docs/api/cudf.series.astype). + + +```python +# convert feature as datetime data type +ddf['event_time']=ddf['event_time'].map_partitions(cudf.to_datetime, format='%Y-%m-%d %H:%M:%S UTC', unit='ns', meta=('event_type', 'datetime64[ns]')) + +# alternative way to achieve similar results +# ddf['event_time']=ddf['event_time'].astype('datetime64[ns]') +``` + + +```python +# show the Dask DataFrame +ddf +``` + +The Dask DataFrame doesn't have numbers in it, but the structure is set up. This is the result of the [DAG](https://mathworld.wolfram.com/AcyclicDigraph.html). Rather than computing the results immediately, Dask recorded what we want to compute as a task into the graph that will be run later in parallel. In fact, most of the Dask operations are delayed and execute [lazily](https://en.wikipedia.org/wiki/Lazy_evaluation). In other words, when the `.read_csv()` function is called, the task is scheduled but not yet executed. We can force a result with `.head()` or `.compute()`, causing the CSV files to be read. There are a few options for us to tell Dask to execute the task graph: +* `.compute()` - this call will process all the partitions and the return results to the scheduler for final aggregation and conversion to `cuDF.DataFrame`. This should be used sparingly and only on heavily reduced results or it can cause the scheduler node to run out of memory. +* `.persist()` - this call executes the graph but instead of returning the results to the scheduler node, it persists them across the cluster in memory so we can reuse these intermediate results later without the need of rerunning the same processes. In other words, calling `.persist()` evaluates a computation and stores the results in memory, so that further computations can be faster. +* `.head()`[[doc]](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.head.html) - this call will return `n` records back to the scheduler node. By default, this will only return results from the first partition. We can pass an integer to the `npartitions` parameter or `-1` to use all partitions. + +We can see how Dask is developing the task graph with the `.dask` attribute. Furthermore, we can visualize the task graph at any point with the `.visualize()` or `.dask.visualize()` methods. It works like the `.compute()` method, except that rather than computing the result, they produce an image of the task graph. These images are written to files. Since we are within a Jupyter notebook context, they will also be displayed as cell outputs. It is often helpful to inspect the task graph before and after graph optimizations are applied. The optimized task graph will provide a much higher-level view of the tasks in the task graph. It can be done by setting the `optimize_graph` keyword. By looking at the inter-connectedness of tasks, we can learn more about potential bottlenecks where parallelism may not be possible. Importantly, we can also identify areas where many tasks depend on each other, which may cause a great deal of communication. More details about visualizing task graphs can be found [here](https://docs.dask.org/en/stable/graphviz.html). + +

+ +Some functions don't follow the rules of the delayed evaluation and Dask executes immediately when we run them. `len()` is one of those functions. Normally we don't want to execute the DAG with `.compute()` or other functions until the task graph has been completely defined. This mechanism is very efficient for memory use and avoids filling up memory. For this lab, we will use `.compute()` periodically to inspect the results. Furthermore, we will use `.persist()` so some slow operations don't have to be repeated when we want to explore and learn the trade-offs between several options. + + +```python +ddf.visualize(rankdir='LR') +``` + + +```python +ddf.dask.visualize(rankdir='LR') +``` + + +```python +# persist data into distributed memory +ddf=ddf.persist() +wait(ddf) + +# visualize task graph after persist +ddf.dask.visualize(rankdir='LR') +``` + + +```python +# preview DataFrame +ddf.head(npartitions=-1) +``` + + +## Data Quality Check, Data Cleaning, and GroupBy Operations ## +An important and early step in the ETL process is to ensure that we have high quality data before it can be used for analytics. This will largely depend on upstream data providers, but it's prudent to perform some checks. We will perform two types of checks: +1. Handling missing (also referred to as `NA`, `nan`, or null) values +2. Filter to relevant data + +Below are some facts we know about this dataset: +* Each `user_session` can only be associated with one unique `user_id` +* `price` can vary across time for the same `product_id`, even within the same `user_session` +* `category_id`, `category_code`, and `brand` values are mostly unique for each `product_id` +* Some `purchase (event_type)` occurs without `purchase (cart)` first +* Rows with the same `user_session` can be associated with multiple `product_id` + + +### Handling Missing Values ### +Missing data can arise for many reasons and most datasets contain them. We can use `.isna()` or `.isnull()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.dataframe.isnull), followed by `.sum()` to detect and count the number of missing values. This works logically since `.isna()` returns `True` when values are missing and `False` otherwise. We then call `.sum()` on the boolean values, which uses `1` for `True` values and `0` for `False` values. Note that we need to call `.compute()` to examine the output cuDF DataFrame. + +Null values propagate naturally through arithmetic operations between cuDF objects but can behave differently on calculations. In some cases, it may not be necessary to treat them. For example, while summing data along a `cudf.Series`, `NA` values will be treated as `0`. The `sum` and `mean` methods support `skipna` parameters, which defaults to `True`. Cumulative methods like `.cumsum()` and `.cumprod()` ignore `NA` values by default, same with `GroupBy` operations. Since `NA` values are treated as `0`. `NA` values are automatically excluded in `GroupBy` operations. + +There are several ways to handle null values: +* Fill with static value using `.fillna()` +* Infer from other data points +* Drop from dataset + + +```python +# check number of null values per column +ddf.isna().sum().compute() +``` + +There are missing values for `category_code` and `brand`. To deal with missing values, we can use `.fillna()` to fill missing values with a static value or based on some inference. As a last resort, we can drop records with missing values with `.dropna()`. + +We will further investigate these columns to see if any of the missing values can be inferred. Specifically, knowing that each `product_id` should be associated with one `category_code`, we want to see if `category_code` is missing for all records with the same `product_id`. If any `category_code` value exists for records that share the same `product_id` value, we will be able to fill the missing values for all records with that `product_id`. There are several ways to approach this. We break down the procedure into the following steps: +1. Get a list of `product_id` values from records that have null `category_code` values +2. Get all records associated with the previous list of `product_id` values +3. Count the number of `category_code` values for each group of records that share the same `product_id` value - a `0` is expected if `category_code` is missing for all members of a group + +**Step 1**. Get a list of `product_id` values from records that have null `category_code` values + +We can use `dask_cudf.core.DataFrame.loc` to get a subset of the DataFrame. The `.loc` property selects rows and columns by label or [boolean mask](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/#boolean-indexing). It uses the syntax `.loc[row(s), column(s)]`, `.loc[boolean_mask]`, `.loc[boolean_mask, column(s)]`, or `.loc[row]`. For the boolean mask, we use `dask_cudf.core.Series.isnull()` so we get all the rows that have missing `category_code` values. We have to call `.compute()` here since we need the resulting list. + + +```python +# get list of product_id values with null category_code values +null_category_products=ddf.loc[ddf['category_code'].isnull(), 'product_id'].unique() +null_category_products_list=null_category_products.compute() +null_category_products_list +``` + + +```python +# visualize graph +null_category_products.visualize(rankdir='LR') +``` + +**Step 2**. Get all records associated with the previous list of `product_id` values + +There are quite a bit of products that do not have an associated `category_code` value. We will use `dask_cudf.core.DataFrame.loc` again to get a subset of the DataFrame. For the boolean mask, we use `dask_cudf.core.Series.isin()` so we get all the rows with a `product_id` value that is within the previous `product_id` values list. We have to call `.compute()` here to see the result. + + +```python +# get records associated with missing category_code +null_category_products_ddf=ddf.loc[ddf['product_id'].isin(null_category_products_list), ['product_id', 'category_code']] +``` + + +```python +# visualize graph +display(null_category_products_ddf.visualize(rankdir='LR')) +``` + +**Step 3**. Count the number of `category_code` values for each group of records that share the same `product_id` value - a 0 is expected if category_code is missing for all members of a group + +Groupby operations are common for ETL and analysis in which we split data into groups, apply a function to each group independently, and then combine the results back together. + +

+ +The typical syntax for a groupby operation is `ddf.groupby('col')`, followed by `.aggregate()`, `.transform()`, or `.apply()`. `.groupby()` can cause memory to spike when computing. This is related to the intermediate `DataFrameGroupBy`s that are created. This is obvious when there are a lot of groups in the data. Even if the groups have very few records, there will be a lot of overhead. Being aware of this behavior and being aware of the "groupiness" of the data can help avoid memory spilling. + +_Shuffle_ is a frequent challenge that arise in distributed computing. It involves _broadcasting_ all partitions to all workers. Shuffling is necessary when performing _sorting_, _grouping_, and _indexing_ operations, because each row needs to be compared with every other row in the entire Dask DataFrame to determine its correct relative position. This is a time-expensive operation, because it necessitates transferring large amounts of data. Therefore, groupby operations require special consideration and failure to consider how they work may result in significant slowdowns. At a high-level, we want to avoid communication and transfer of data across partitions. Furthermore, the performance of Dask's groupby operations will largely depend on the number of groups and how they are organized in the partitions. + +Generally, there are three types of groupby operations: +* `.aggregate(func)` or `agg(func)` - aggregate using one of the built-in operations or a custom one. Several built-in aggregation methods are supported such as `sum`, `count`, `mean`, `var`, `std`, `max`, `min`, etc. The full list can be found [here](https://docs.rapids.ai/api/cudf/stable/user_guide/groupby/#aggregation) + + Dask uses an ACA (apply-concat-apply) procedure for groupby aggregations. This is usually an efficient operation and generally quite fast because it can be broken down into well known operations. The data doesn't have to move around too much and we can pass around small intermediate values across workers. Each stage is effectively a reduction and data from each group is eventually collapsed into a single row based on `func`. By default, groupby aggregations return the result as a single partition DataFrame. Since the results are usually quite small, this is usually a good choice. However, this becomes an issue when there are many groups and the result doesn't fit in memory. We can use the `split_out` parameter to split the output into multiple partitions. In the below example, we illustrate `ddf.groupby('group')['value'].sum()`. By default, `split_out` is set to 1 (left). We can compare this with `split_out` being set to 2 (right). If computed, Dask will: + 1) perform sum aggregation on each partition + 2) concatenate the results in into `split_out` partitions + 3) performs a sum aggregation on the concatenated results. + +

+ +* `.transform(func)` - apply `func` group-wise without changing its shape and return a DataFrame filled with the transformed values. Good examples of this are `.cumsum()` and `.diff()`. In addition to custom functions, `.transform()` also supports many of the built-in aggregation methods such as `sum`, `count`, `mean`, `var`, `std`, `max`, `min`, etc. If `func` returns a scalar value, it will be broadcasted across the group to have the same index as the input subframe (group). The returned object will have the same index and shape as the original + +* `.apply(func)` - apply `func` group-wise and combine the results after. The returned object can have any arbitrary size and shape + + Some groupby operations require all data from a given group. This is an _expensive_ operation that requires shuffling and will force a great deal of communication amongst partitions. This should be avoided if a groupby aggregation works. The `meta` parameter is a prescription of the names and data types of the output from the computation. This is required because `.apply()` is flexible enough that it can produce just about anything from a Dataframe. If we don't provide a `meta`, then Dask actually computes part of the data to see what the types should be. We can avoid this pre-computation, which can be expensive, and be more explicit when we know what the output should look like. This parameter is available on all functions/methods that take user provided callables such as `DataFrame.map_partitions()` and `DataFrame.apply()`. The `meta` parameter can be a zero-row version of the output (dataframe or series), or just the types. + +

+ +Now that we have a subset of the original DataFrame that only contains records with `product_id`s that are partially or fully missing `category_code` values, we want to see if we can infer the missing values. We will try this using various approaches and discuss the differences: +1. `df.count()` - we will rely on null values returning 0 for `.count()`. Without performing a `.groupby()`, we do not know which specific `product_id` can be inferred. +2. `df.groupby().agg('count')` or `df.groupby().count()` - we will rely on null values returning 0 for `.count()`. +3. `df.groupby().agg('count', split_out)` or `df.groupby().count(split_out)` - we will see the result of `split_out`. +4. `df.notnull().groupby().agg('sum')` or `df.notnull().groupby().sum()`- we will rely on using `0` for `False` values. +5. `df.groupby().transform('count')` - we will rely on null values returning 0 for `.count()`. +6. `df.groupby().transform(lambda x: x.count())` - we will rely on null values returning 0 for `.count()`. +7. `df.groupby().apply(lambda x: x.count())` - we will rely on null values returning 0 for `.count()`. +8. `df.groupby().apply(lambda x: x.isnull().all())` - we will rely on null values returning `True` for `.isnull().all()`. + +

+ +`cudf.DataFrame.groupby` does not sort by default for better performance. + + +```python +# since we want to compare multiple approaches, we persist the data into distributed memory, where new tasks will start from +null_category_products_ddf=null_category_products_ddf.persist() +wait(null_category_products_ddf) + +# create a dictionary to compare options +groupby_options={} +``` + + +```python +# OPTION 1 +time=timeit( + lambda: null_category_products_ddf['category_code'].count().compute(), + number=1 +) +groupby_options['count()']=time +print(f'Took {time} seconds') + +print(f"There are {null_category_products_ddf['category_code'].count().compute()} products with at least one category_code populated. ") +``` + + +```python +# visualize graph +null_category_products_ddf['category_code'].count().visualize(rankdir='LR') +``` + +Based on the count being 0, we know that `category_code` isn't populated for any of these records. + + +```python +# OPTION 2 +null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].agg('count') + +# alternative way to achieve similar results +# null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].count() + +time=timeit( + lambda: (null_category_products_count_ddf>0).sum().compute(), + number=1 +) +groupby_options['groupby().agg("count")']=time +print(f'Took {time} seconds') +``` + + +```python +# visualize graph +null_category_products_count_ddf.visualize(rankdir='LR') +``` + + +```python +# OPTION 3 +null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].agg('count', split_out=2) + +# alternative way to achieve similar results +# null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].count(split_out=2) + +time=timeit( + lambda: (null_category_products_count_ddf>0).sum().compute(), + number=1 +) +groupby_options['groupby().agg("count", split_out=2)']=time +print(f'Took {time} seconds') +``` + + +```python +# visualize graph +null_category_products_count_ddf.visualize(rankdir='LR') +``` + + +```python +# OPTION 4 +null_category_products_count_ddf=null_category_products_ddf.notnull().groupby(null_category_products_ddf['product_id'])['category_code'].agg('sum') + +time=timeit( + lambda: null_category_products_count_ddf.sum().compute(), + number=1 +) +groupby_options['notnull().groupby().agg("sum")']=time +print(f'Took {time} seconds') +``` + + +```python +# visualize graph +null_category_products_count_ddf.visualize(rankdir='LR') +``` + + +```python +# OPTION 5 +null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].transform('count', meta=('category_code_count', 'int32')) + +time=timeit( + lambda: null_category_products_count_ddf.sum().compute(), + number=1 +) +groupby_options['groupby().transform("count")']=time +print(f'Took {time} seconds') +``` + + +```python +# visualize graph +null_category_products_count_ddf.visualize(rankdir='LR') +``` + + +```python +# OPTION 6 +null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].transform(lambda x: x.count(), meta=('category_code_count', 'int32')) + +time=timeit( + lambda: null_category_products_count_ddf.sum().compute(), + number=1 +) +groupby_options['groupby().transform(lambda x: x.count())']=time +print(f'Took {time} seconds') +``` + + +```python +# visualize graph +null_category_products_count_ddf.visualize(rankdir='LR') +``` + + +```python +# OPTION 7 +null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].apply(lambda x: x.count(), meta=('category_code_count', 'int32')) + +time=timeit( + lambda: null_category_products_count_ddf.sum().compute(), + number=1 +) +groupby_options['groupby().apply(lambda x: x.count())']=time +print(f'Took {time} seconds') +``` + + +```python +# visualize graph +display(null_category_products_count_ddf.visualize(rankdir='LR')) +``` + + +```python +# OPTION 8 +null_category_products_count_ddf=null_category_products_ddf.groupby('product_id')['category_code'].apply(lambda x: x.isnull().all(), meta=('category_code_null', 'boolean')) + +time=timeit( + lambda: (~null_category_products_count_ddf).sum().compute(), + number=1 +) +groupby_options['groupby().apply(lambda x: x.isnull().all())']=time +print(f'Took {time} seconds') +``` + + +```python +# visualize graph +null_category_products_count_ddf.visualize(rankdir='LR') +``` + + +### Exercise #3 - Understanding GroupBy Operations Behavior ### + +**Instructions**:
+* Execute the below cell to plot the performance comparison across various groupby operations. +* Describe why there is discrepancy between the options in the cell below. + + +```python +# show groupby_options dictionary +plt.bar(groupby_options.keys(), groupby_options.values()) +plt.xticks(rotation=30, ha='right') +plt.ylabel('Compute Time (s)') +``` +Type your answer here: + +The plot shows that `.groupby().apply()` is slow and expensive. This is due to shuffle and the data transfer between the partitions. The time to complete scales with the number of groups in each partition. +Click ... to show **solution**. + + +The `GroupBy.apply()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.core.groupby.groupby.groupby.apply#cudf.core.groupby.groupby.GroupBy.apply) function will apply a Python transformation function over the grouped chunk. We can achieve faster results with `GroupBy.agg()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.core.groupby.groupby.groupby.agg#cudf.core.groupby.groupby.GroupBy.agg). Upon examining the _profile_ of each task graph, we can see `GroupBy.agg()` is more efficient. More information about the `GroupBy` API can be found [here](https://docs.rapids.ai/api/cudf/stable/api_docs/groupby.html). + + +We used `.compute()` to create a few `cudf.DataFrame`s in order to inspect the results, but this should be avoided when possible. We use `del` to get rid of the data in GPU memory for these objects and free up memory. What is left in the GPU memory is the original `dask_cudf.core.DataFrame` that was persisted. + + +```python +# clean up +del null_category_products, null_category_products_list, null_category_products_ddf, null_category_products_count_ddf +gc.collect() +``` + +

+ +Note that `df.groupby(df['col'])` is syntactical sugar for `df.groupby('col')`, which is designed to make things easier to read or to express. Additionally, `DataFrame.groupby()` returns a `DataFrameGroupBy` while `DataFrame.groupby()['col']` returns a `SeriesGroupBy`. Understanding the types of object and their shape returned by each method will help us program effectively with DataFrames, especially with `.apply()`. + + +### Data Imputation ### +The `.fillna()` method is used to fill missing values. There are several considerations when attempting to infer missing values: +* A common technique is to replace missing data with statistical estimates such as mean, median, or mode. Alternatively, we can perform interpolation or extrapolation to fill the missing data. Linear interpolation is the simplest method but other options can also be useful. We can impute with `mean` by: + + ``` + df['value'].fillna(df[['group', 'value']].groupby('group')['value'].transform('mean')) + ``` + + or + + ``` + df[['group', 'value']].groupby('group')['value'].transform(lambda x: x.fillna(x.mean())) + ``` + +* For time-series data or when sequencing matters, last observation carried forward is a common method for imputation. This method replaces missing values with the last observed value. A similar technique is next observation carried backward. We can use the `method` parameter: + * `ffill`: Ffill or forward-fill propagates the last observed non-null value forward until another non-null value is encountered + * `bfill`: Bfill or backward-fill propagates the first observed non-null value backward until another non-null value is met +* Another imputation technique replaces all occurrences of missing value within a variable explicitly with an arbitrary value. + * We can set an chosen value to replace all missings. Such a substitution could be -999, for example, to clearly indicate that the value is missing + +When data is missing, we can optionally create an additional binary variable to indicate whether data was missing for that observation. This may help capture the significance of missing values. + + +### Exercise #4 - Check Missing Values ### +We will try a similar process with `brand`. + +**Instructions**:
+* Modify the `` only and execute the below cell to see if we can impute missing `brand` values. + + +```python +# check if brand can be inferred +# get list of product_ids with missing brand values +null_brand_products=ddf.loc[ddf['brand'].isnull(), 'product_id'].unique() +null_brand_products_list=null_brand_products.compute() + +# get records associated with missing brand values +null_brand_products_ddf=ddf.loc[ddf['product_id'].isin(null_brand_products_list), ['product_id', 'brand']] + +# perform groupby aggregation +null_brand_products_count_ddf=<<<>>> + +print(f"There are {(null_brand_products_count_ddf>0).sum().compute()} products with at least one brand populated. ") +``` +null_brand_products=ddf.loc[ddf['brand'].isnull(), 'product_id'].unique() +null_brand_products_list=null_brand_products.compute() + +null_brand_products_ddf=ddf.loc[ddf['product_id'].isin(null_brand_products_list), ['product_id', 'brand']] + +null_brand_products_count_ddf=null_brand_products_ddf.groupby('product_id')['brand'].agg('count') + +print(f"There are {(null_brand_products_count_ddf>0).sum().compute()} products with at least one brand populated. ") +Click ... for **solution**. + +Unlike `category_code`, there are several `product_id` groups with partially missing `brand` values. Therefore we have an opportunity to infer a value to fill the null values with. For string categorical values, the _mode_ is a common option. The mode calculates the value that appears most often. Since this is not a supported aggregation, according to the [documentation](https://docs.rapids.ai/api/cudf/stable/user_guide/groupby/), we have to use `.groupby().apply()`. We will replace null values with the most frequently occurring value per `product_id` group. The `.groupby().transform()` and `.groupby().apply()` scales poorly with large amounts of groups, therefore we only want to perform this procedure on a subset of relevant records. + +There are several ways to approach this. We breakdown the procedure into the following steps: +1. Get records associated with multiple `brand` values +2. Generate lookup table with `product_id` values and their most frequent `brand` values (example below) +| product_id | missing_brand_mode | +| ----------- | ------------------- | +| 1002701 | huawei | +| 1004976 | samsung | +3. Merge original DataFrame with lookup table, fill missing `brand` values, and delete lookup table +4. Check missing `brand` values + +**Step 1**. Get records associated with multiple `brand` values + + +```python +# get list of product_id values with multiple brand values +multi_brand_products_list=null_brand_products_count_ddf[(null_brand_products_count_ddf>0)].index.compute() + +# get records associated with multiple brand values +multi_brand_products_ddf=ddf.loc[ddf['product_id'].isin(multi_brand_products_list), ['product_id', 'brand']] + +# preview DataFrame +multi_brand_products_ddf.head() +``` + +**Step 2**. Generate lookup table with `product_id` values and their most frequent `brand` values + +**Note**: `.groupby().apply()` requires shuffling and scales very poorly with large number of groups. + + +```python +# create lookup table +# impute missing values with mode +multi_brand_products_lookup=multi_brand_products_ddf.groupby('product_id').apply(lambda g: g.mode(), meta={'product_id': 'int32', 'brand': 'object'}) +multi_brand_products_lookup.columns=['product_id', 'missing_brand_mode'] + +# preview lookup table +multi_brand_products_lookup.head() +``` + + +```python +multi_brand_products_lookup.visualize(rankdir='LR') +``` + +**Step 3**. Merge original DataFrame with lookup table and fill missing `brand` values + +Merging data requires special consideration. We will try this using various approaches and discuss the difference: +1. Join a Dask DataFrame with a cuDF DataFrame +2. Join a Dask DataFrame with another Dask DataFrame with multiple partitions +3. Join a Dask DataFrame with another Dask DataFrame on a single partition + + +```python +# since we want to compare multiple approaches, we persist the data into distributed memory, where new tasks will start from +multi_brand_products_lookup=multi_brand_products_lookup.persist() +wait(multi_brand_products_lookup) + +# load small DataFrame in memory +multi_brand_products_lookup_in_memory=multi_brand_products_lookup.compute() + +# create a dictionary to compare options +merge_options={} +``` + + +```python +# time +time=timeit( + lambda: wait(ddf.merge(multi_brand_products_lookup_in_memory, how='left', on='product_id').persist()), + number=1 +) +merge_options['merge_cudf_dataframe']=time +print(f'Took {time} seconds') +``` + + +```python +# time +time=timeit( + lambda: wait(ddf.merge(multi_brand_products_lookup, how='left', on='product_id').persist()), + number=1 +) +merge_options['merge_dask_dataframe_multi_partitions']=time +print(f'Took {time} seconds') +``` + + +```python +multi_brand_products_lookup=multi_brand_products_lookup.repartition(npartitions=1).persist() +wait(multi_brand_products_lookup) + +# time +time=timeit( + lambda: wait(ddf.merge(multi_brand_products_lookup, how='left', on='product_id').persist()), + number=1 +) +merge_options['merge_dask_dataframe']=time +print(f'Took {time} seconds') +``` + + +```python +# show groupby_options dictionary +plt.bar(merge_options.keys(), merge_options.values()) +plt.xticks(rotation=30, ha='right') +plt.ylabel('Compute Time (s)') +``` + +

+ +Joining a Dask DataFrame with a small cuDF DataFrame is fast as it can be done in parallel. Each partition in the Dask DataFrame can be joined against the single cuDF DataFrame without incurring overhead. When joining a left Dask DataFrame with a right Dask DataFrame, we can turn the right DataFrame into a single partition to get fast results if we can't or don't want to load the right DataFrame into memory using `.repartition()`. These two operations are programmatically equivalent which means there's no meaningful difference in performance between them. However, merging two Dask DataFrames is an expensive operation in a distributed computing context. If Dask doesn't know how the data is partitioned, it needs to move all of the data around by performing a shuffle so that rows with matching values in the joining column(s) end up in the same partition. This is an extremely memory-intensive process and often cause the workers to run out of memory or spill. This is a situation we want to avoid. + + +```python +# merge with lookup table +ddf=ddf.merge(multi_brand_products_lookup_in_memory, how='left', on='product_id') + +# fill missing value with mode +ddf['brand']=ddf['brand'].fillna(ddf['missing_brand_mode']) + +# drop reference data +ddf=ddf.drop(columns='missing_brand_mode') + +# persist data into distributed memory +ddf=ddf.persist() +wait(ddf) +``` + +**Step 4**. Check missing `brand` values + + +```python +# check if brand can be inferred +# get list of product_ids with missing brand values +null_brand_products=ddf.loc[ddf['brand'].isnull(), 'product_id'].unique() +null_brand_products_list=null_brand_products.compute() + +# check if imputation was effective +# get records associated with missing brand values +null_brand_products_ddf=ddf.loc[ddf['product_id'].isin(null_brand_products_list), ['product_id', 'brand']] + +# perform groupby aggregation +null_brand_products_count_ddf=null_brand_products_ddf.groupby('product_id')['brand'].agg('count') + +print(f"There are {(null_brand_products_count_ddf>0).sum().compute()} products with at least one brand populated. ") +``` + + +```python +# clean up +del multi_brand_products_list, multi_brand_products_ddf, multi_brand_products_lookup, multi_brand_products_lookup_in_memory +del null_brand_products, null_brand_products_list, null_brand_products_ddf, null_brand_products_count_ddf +gc.collect() +``` + +

+We could have done the same procedure on the entire dataset, but filtering down to only affected records first will significantly reduce the compute and memory needed. + + +### Exercise #5 - Static Fill Null Values ### +We can use the `.fillna()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.dataframe.fillna/) method to fill null values. + +**Instructions**:
+* Modify the `` only and execute the below cell to fill null values with the string literal value `UNKONWN`. + + +```python +# fill with static value +ddf[['category_code', 'brand']]=ddf[['category_code', 'brand']].<<<>>> +``` +ddf[['category_code', 'brand']]=ddf[['category_code', 'brand']].fillna('UNKNOWN') +Click ... to show **solution**. + +Sometimes it may be necessary to drop records (rows or columns) with missing value. This should be the last resort. + + +```python +# drop records with missing user_session value +ddf=ddf.dropna(subset=['user_session']) +``` + + +```python +# check data +ddf.isna().sum().compute() +``` + + +## Filter Relevant Data ## +The dataset contains a lot of noisy `view` records, which we can exclude. Furthermore, we will apply additional data cleaning procedures such as: +* Filter out `purchase` records that were not `cart` +* Filter out `cart` records that were also `purchase` + + +```python +# filter out view records +ddf=ddf[ddf['event_type']!='view'] + +# create unique identifier for session_product +ddf['session_product']=ddf['user_session']+'_'+ddf['product_id'].astype(str) + +# get DataFrame for purchase records +ddf_purchase=ddf[ddf['event_type']=='purchase'] +purchase_session_product=ddf_purchase['session_product'].compute() + +# get DataFrame for cart records +ddf_cart=ddf[ddf['event_type']=='cart'] +cart_session_product=ddf_cart['session_product'].compute() + +# filter out purchase records that were not cart +ddf_purchase=ddf_purchase[ddf_purchase['session_product'].isin(cart_session_product)] + +# filter out cart records that were purchase +ddf_cart=ddf_cart[~ddf_cart['session_product'].isin(purchase_session_product)] + +# set target +ddf_cart['target']=0 +ddf_purchase['target']=1 + +# concatenate DataFrames +ddf=dask_cudf.concat([ddf_purchase, ddf_cart]) + +# shuffle on event_time to make even +ddf=ddf.shuffle('event_time', npartitions=4) +ddf=ddf.persist() +``` + + +```python +# clean up +del purchase_session_product +del cart_session_product +del ddf_cart +del ddf_purchase +gc.collect() +``` + +cuDF supports powerful `string`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/string_handling.html#) and `datetime` methods to handle common use cases. + + +```python +# split category_code into separate columns +ddf[['cat_0', 'cat_1', 'cat_2', 'cat_3']]=ddf['category_code'].str.split('.', n=3, expand=True).fillna('NA') + +# extract data time values +ddf['date']=ddf['event_time'].dt.floor('D') +ddf['ts_hour']=ddf['event_time'].dt.hour +ddf['ts_minute']=ddf['event_time'].dt.minute +ddf['ts_weekday']=ddf['event_time'].dt.weekday +ddf['ts_day']=ddf['event_time'].dt.day +ddf['ts_month']=ddf['event_time'].dt.month +ddf['ts_year']=ddf['event_time'].dt.year +``` + + +## Save to Parquet Format ## +After processing the data, we persist it for later use. [Apache Parquet](https://parquet.apache.org/) is a columnar binary format and has become the de-facto standard for the storage of large volumes of tabular data. Converting to Parquet file format is important and CSV files should generally be avoided in data products. In fact, many developers will start their analysis by first converting CSV files to the Parquet file format. There are many reasons to be Parquet format for analytics: +* The columnar nature of Parquet files allows for column pruning, which often yields big query performance gains. +* It uses metadata to store the schema and supports more advanced data types such as categorical, datetimes, and more. This means that importing data would not require schema inference or manual schema specification. +* It captures metadata related to row-group level statistics for each column. This enables predicate pushdown filtering, which is a form of query pushdown that allows computations to happen at the “database layer” instead of the “execution engine layer”. In this case, the database layer is Parquet files in a filesystem, and the execution engine is Dask. +* It supports flexible compression options, making it more compact to store and more portable than a database. + +We will use `.to_parquet(path)`[[doc]](https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html#dask-dataframe-to-parquet) to write to Parquet files. By default, files will be created in the specified output directory using the convention `part.0.parquet`, `part.1.parquet`, `part.2.parquet`, ... and so on for each partition in the DataFrame. This can be changed using the `name_function` parameter. Ouputting multiple files lets Dask write to multiple files in parallel, which is faster than writing to a single file. + +

+ +

+ +When working with large datasets, decoding and encoding is often an expensive task. This challenge tends to compound as the data size grows. A common pattern in data science is to subset the dataset by columns, row slices, or both. Moving these filtering operations to the read phase of the workflow can: 1) reduce I/O time, and 2) reduce the amount of memory required, which is important for GPUs where memory can be a limiting factor. Parquet file format enables filtered reading through **column pruning** and **statistic-based predicate filtering** to skip portions of the data that are irrelevant to the problem. Below are some tips for writing Parquet files: +* When writing data, sorting the data by the columns that expect the most filters to be applied or columns with the highest cardinality can lead to meaningful performance benefits. The metadata calculated for each row group will enable predicate pushdown filters to the fullest extent. +* Writing Parquet format, which requires reprocessing entire data sets, can be expensive. The format works remarkably well for read-intensive applications and low latency data storage and retrieval. +* Partitions in Dask DataFrame can write out files in parallel, so multiple Parquet files are written simultaneously. + + +```python +# clean up output directory +!rm -rf clean_parquet/* + +# write to parquet +ddf.to_parquet('clean_parquet', index=False, overwrite=True) +``` + + +```python +# show output directory +!ls -alh clean_parquet +``` + +The original data in CSV is uncompressed and requires more disk space. We were able to require the disk space requirement greatly by using Parquet file format. + + +### Dask-cuDF to cuDF DataFrame Conversion ### +In general, its recommended to avoid calling `.compute()` on large DataFrames, and restrict to only using it when the post-processed result needs to be inspected. Running the `.head()` or `.compute()` method on a `dask_cudf.DataFrame` will return a `cudf.DataFrame`, which means the data needs to fit comfortably on a single GPU. Furthermore, converting a Dask-cuDF DataFrame to a cudf DataFrame will result in losing out on all the benefits of Dask. Instead of `.compute()`, users can call the `.persist()` method to persist data into distributed memory. + +For data that fits into memory, cuDF can often be faster to use than Dask DataFrame. Even for large datasets, there may be a point in the process where it is reduced to more manageable level. We can consider switching to cuDF at this point. + + +```python +# convert Dask-cuDF DataFrame to cudf DataFrame +gdf=ddf.compute() + +# preview DataFrame +display(gdf.head()) + +# clean up +del gdf +``` + + +```python +# clean GPU memory +import IPython +app = IPython.Application.instance() +app.kernel.do_shutdown(restart=False) +``` + + +## Summary ## +Understanding the data and the desired result can help set up computations efficiently to optimize performance. +* Dask enables processing large datasets by using partition and memory spill mechanisms +* Getting an optimal partition size can have an importance impact on performance +* Groupby operations that do not require shuffling and result in a reduction is efficient +* Merging DataFrame in a single partition or in memory is efficient +* When choosing data format, it can be beneficial choose one that is conducisve to analytics + +**Well Done!** Let's move to the [next notebook](1_02_EDA.ipynb). + + Header diff --git a/ds/25-1/3/1_02_EDA.md b/ds/25-1/3/1_02_EDA.md new file mode 100644 index 0000000..2f628af --- /dev/null +++ b/ds/25-1/3/1_02_EDA.md @@ -0,0 +1,654 @@ + Header + +# Enhancing Data Science Outcomes With Efficient Workflow # + +## 02 - Data Exploration and Data Visualization ## +In this lab, you will learn the motivation behind doing data science on a GPU cluster. This lab covers the ETL, data exploration, and feature engineering steps of the data processing pipeline. Extract, transform, load, or [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load), is the process where data is transformed into a proper structure for the purposes of querying and analysis. Feature engineering, on the other hand, involves the extraction and transformation of raw data. + +

+ +**Table of Contents** +
+In this notebook, we will load data from Parquet file format into a Dask DataFrame and perform various data transformations and exploratory data analysis. This notebook covers the below sections: +1. [Quick Recap](#s2-1) +2. [Data Exploration and Data Visualization](#s2-2) + * [Plotly](#s2-2.1) + * [Summarize](#s2-2.2) + * [Visualizing Distribution](#s2-2.3) + * [Exercise #1 - Histogram with GPU](#s2-e1) + * [Exercise #2 - Histogram with Log Scale](#s2-e2) + * [GroupBy Summarize](#s2-2.4) + * [Exercise #3 - Probability Bar Chart](#s2-e3) + * [User Features]() + * [Exercise #4 - Customer GroupBy Aggregation](#s2-e4) + * [Exercise #5 - Time-Series Analysis](#s2-e5) + * [Pivot Table](#s2-2.5) +2. [Summary](#s2-3) + + +## Quick Recap ## +So far, we've identified several sources of hidden slowdowns when working with Dask and cuDF: +* Reading data without a schema or specifying `dtype` +* Having too many partitions due to small `chunksize` +* Memory spilling due to partitions being too large +* Performing groupby operations on too many groups scattered across multiple partitions + +Going forward, we will continue to learn how to use Dask and RAPIDS efficiently. + + +## Data Exploration and Data Visualization ## +Exploratory data analysis involves identification of predictor/feature variables and the target/class variable. We use this time to understand the distribution of the features and identify potentially problematic outliers. Data exploration helps users understand the data in order to better tackle a problem. It can be a way to ascertain the validity of the data as we begin to look for useful features that will help in the following stages of the development workflow. + + +### Plotly ### +**Plotly** [[Doc]](https://plotly.com/) is a popular library for graphing and data dashboards. Plotly uses `plotly.graph_objects` to create figures for data visualization. Graph objects can be created using `plotly.express` or from the ground up. In order for Plotly to make a graph, data needs to be on the host, not the GPU. If the dataset is small, it may be more efficient to use `pandas` instead of `cudf` or `Dask-cuDF`. However, if the dataset is large, sending data to the GPU is a great way to speed up computation before sending it to the host for visualization. When using GPU acceleration and Plotly, only move the GPU DataFrame(s) to the host at the end with `to_pandas()`, as opposed to converting the entire GPU DataFrame(s) to a pandas DataFrame immediately. This will allow us to take advantages of GPU acceleration for processing. + +For more information about how to use Plotly, we recommend [this guide](https://plotly.com/python/getting-started/). + +We start by initiating the `LocalCUDACluster()` and Dask `Client()`, followed by loading data from the Parquet files into a Dask DataFrame. + + +```python +# import dependencies +from dask.distributed import Client, wait +from dask_cuda import LocalCUDACluster +import cudf +import dask_cudf +import numpy as np +import cupy as cp +import plotly.express as px +import gc + +# create cluster +cluster=LocalCUDACluster() + +# instantiate client +client=Client(cluster) +``` + + +```python +# get the machine's external IP address +from requests import get + +ip=get('https://api.ipify.org').content.decode('utf8') + +print(f'Dask dashboard (status) address is: http://{ip}:8787/status') +print(f'Dask dashboard (Gpu) address is: http://{ip}:8787/gpu') +``` + + +```python +# read data +ddf=dask_cudf.read_parquet('clean_parquet') + +print(f'Total of {len(ddf)} records split across {ddf.npartitions} partitions. ') + +ddf.dtypes +``` + +

+The Parquet file format includes metadata to inform `Dask-cuDF` which data types to use for each column. + + +```python +# create continue and categorical column lists +continuous_cols=['price', 'target', 'ts_hour', 'ts_minute', 'ts_weekday', 'ts_day', 'ts_month', 'ts_year'] +categorical_cols=['event_type', 'category_code', 'brand', 'user_session', 'session_product', 'cat_0', 'cat_1', 'cat_2', 'cat_3', 'product_id', 'category_id', 'user_id'] +``` + + +```python +# preview DataFrame +ddf.head() +``` + + +### Summarize ### +We can use the `describe()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.dataframe.describe/) method to generate summary statistics for continuous features. + + +```python +# generate summary statistics for continuous features +ddf[continuous_cols].describe().compute().to_pandas().apply(lambda s: s.apply('{0:.2f}'.format)) +``` + +For categorical values, we are often interested in the [cardinality](https://en.wikipedia.org/wiki/Cardinality) of each feature. Cardinality is number of unique elements the set contains. We use `.nunique()` to get the number of possible values for each categorical feature as it will inform how they can be encoded for machine learning model consumption. + + +```python +# count number of unique values for categorical features +ddf[categorical_cols].nunique().compute() +``` + +Note that in the previous step, we added `read_parquet()` to the task graph but did not `.persist()` data in memory. Recall that the Dask DataFrame APIs build the task graph until `.compute()`. The result of `.compute()` is a cuDF DataFrame and should be small. + +**Observations**: +* The dataset has an ~41% purchase rate +* All data come from March of 2020 +* Price has a very large standard deviation + + +### Visualizing Distribution ### +A histogram is a graph that shows the frequency of data using rectangles. It's used to visualize the distribution of the the data so we can quickly approximate concenration, skewness, and variability. We will use histograms to identify popularity characteristics in the dataset. + +When using Plotly or other visualization libraries, it's best to keep the data on the GPU for as long as possible and only move the data to the host when needed. For example, instead of relying on Plotly Express's `.histogram()`[[doc]](https://plotly.com/python/histograms/) function, we can use the `.value_count()` method to count the number of occurences. We can then pass the results to the `.bar()` function to generate a frequency bar chart. This can yield faster results by enabling GPU acceleration. Furthermore, we can use `.nlargest()` to limit the number of bars in the chart. + +We want to visualize the distribution for specific features. Now that the data is in Parquet file format, we can use column pruning to only read in one column at a time to reduce the memory burden. + + +```python +%%time +# set cat column of interest +cat_col='cat_0' + +# read data +ddf=dask_cudf.read_parquet('clean_parquet', columns=cat_col) + +# create histogram +px.histogram( + # move data to CPU + ddf[cat_col].compute().to_pandas() +).update_layout( + yaxis_title='Frequency Count', + xaxis_title=cat_col, + title=f'Distribution of {cat_col}' +) +``` + + +### Exercise #1 - Histogram with GPU ### +Instead of generating the histogram on CPU, we can use `.value_counts()` to achieve similar results. + +**Instructions**:
+* Modify the `` only and execute the below cell to visualize the frequency of each `cat_0` value. +* Compare the performance efficiency with the previous CPU approach. + + +```python +%%time +# set cat column of interest +cat_col='cat_0' +n_bars=25 + +# read data +ddf=dask_cudf.read_parquet('clean_parquet', columns=cat_col) + +# create frequency count DataFrame +cat_count_df=<<<>>> + +# create histogram +px.bar( + # move data to CPU + cat_count_df.compute().to_pandas() +).update_layout( + yaxis_title='Frequency Count', + xaxis_title=cat_col, + title=f'Distribution of {cat_col}' +) +``` +%%time +cat_col='cat_0' +n_bars=25 +ddf=dask_cudf.read_parquet('clean_parquet', columns=cat_col) + +cat_count_df=ddf[cat_col].value_counts().nlargest(n_bars) + +px.bar( + cat_count_df.compute().to_pandas() +).update_layout( + yaxis_title='Frequency Count', + xaxis_title=cat_col, + title=f'Distribution of {cat_col}' +) +Click ... to show **solution**. + +Using cuDF to calculate the frequency is much more efficient. For continuous features, we often have to bin the values into buckets. We can use `cudf.Series.digitize()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.series.digitize/), but it's not implemented for `dask_cudf.core.Series`, so we have to use `.map_partitions()` to perform the `cudf.Series.digitize()` method on each partition. + + +```python +%%time +# set cont column of interest +cont_col='price' + +# read data +ddf=dask_cudf.read_parquet('clean_parquet', columns=cont_col) + +# set bin +bins=np.array(range(-1, 10000, 50)).astype('float32') + +# create frequency count DataFrame +cont_hist_df=ddf[cont_col].map_partitions(lambda p: p.digitize(bins)).value_counts() + +# create histogram +px.bar( + # move data to CPU + cont_hist_df.compute().to_pandas(), +).update_xaxes( + tickmode='array', + tickvals=np.array(range(1, len(bins))), + ticktext=[f'{int(bins[idx])} - {int(bins[idx+1])}' for idx, bin in enumerate(bins[:-1])], +).update_layout( + yaxis_title='Frequency Count', + xaxis_title=f'{cont_col} Bin', + title=f'Distribution of {cont_col}' +) +``` + + +### Exercise #2 - Histogram with Log Scale ### +`price` is positively skewed. We might be able to get visualize a better distribution by creating bins in logarithmic scale. We will create bin ranges using `numpy.logspace()` and pass it to `cudf.Series.digitize()`. + +**Instructions**:
+* Modify the `` only and execute the below cell to visualize the frequency of `price` bins in log scale. + + +```python +# set cont column of interest +cont_col='price' + +# read data +ddf=dask_cudf.read_parquet('clean_parquet', columns=cont_col) + +# set bin +bins=np.logspace(0, 5).astype('float32') + +# create frequency count DataFrame +cont_hist_df=ddf['price'].map_partitions(<<<>>>).value_counts() + +# create histogram +px.bar( + # move data to CPU + cont_hist_df.compute().to_pandas(), +).update_xaxes( + tickmode='array', + tickvals=np.array(range(1, len(bins))), + ticktext=[f'{int(bins[idx])} - {int(bins[idx+1])}' for idx, bin in enumerate(bins[:-1])] +).update_layout( + yaxis_title='Frequency Count', + xaxis_title=f'{cont_col} Bin', + title=f'Distribution of {cont_col}' +) +``` +cont_col='price' + +ddf=dask_cudf.read_parquet('clean_parquet', columns=cont_col) + +bins=np.logspace(0, 5).astype('float32') + +cont_hist_df=ddf['price'].map_partitions(lambda p: p.digitize(bins)).value_counts() + +px.bar( + cont_hist_df.compute().to_pandas(), +).update_xaxes( + tickmode='array', + tickvals=np.array(range(1, len(bins))), + ticktext=[f'{int(bins[idx])} - {int(bins[idx+1])}' for idx, bin in enumerate(bins[:-1])] +).update_layout( + yaxis_title='Frequency Count', + xaxis_title=f'{cont_col} Bin', + title=f'Distribution of {cont_col}' +) +Click ... to show **solution**. + +**Observations**: +* Vast majority of the products are below $300. + + +### GroupBy Summarize ### +We can use a variety of groupby aggregations to learn about the data. The aggregations supported by cuDF, as described [here](https://docs.rapids.ai/api/cudf/stable/user_guide/groupby/#aggregation), are very efficient. We might be interested in exploring several variations. To make the execution more efficient, we can `.persist()` the data into the memory after reading from the source. Subsequent operations will not require loading from the source again. + +For example, we can visualize the probability of an event for each category. When the target column is a binary indicator, we can do this quickly by calculating the aggregate mean. For a categorical feature with binary outcomes, users can use the arithmetic mean to find the _probability_. + +We use `.groupby()` on `cat_0`, followed by `.agg('mean')` on `target` to determine the probability of positive outcome for each `cat_0` group. + + +```python +# read data +ddf=dask_cudf.read_parquet('clean_parquet') + +# persist data in memory +ddf=ddf.persist() +wait(ddf) +``` + + +```python +# set cat column of interest +cat_col='cat_0' +n_bars=25 + +# create groupby probability DataFrame +cat_target_df=ddf.groupby(cat_col)['target'].agg({'target': 'mean'}).nlargest(n_bars) + +# create bar chart +px.bar( + # move data to CPU + cat_target_df.compute().to_pandas() +).update_layout( + yaxis_title='Probability', + xaxis_title=cat_col, + title=f'Probability of {cat_col}' +) +``` + +Some categoriies have a higher probability than other. + +Other groupby aggregations include: +* What time of the week is the busiest + +``` +# show probability of each ts_weekday and ts_hour group +ddf.groupby(['ts_weekday', 'ts_hour'])['target'].agg({'target': 'mean'}) +``` + +

+ +`.groupby().size()` or `.groupby().agg('size')` is very similar to `.value_counts()`. + + +### Exercise #3 - Probability Bar Chart ### + +**Instructions**:
+* Mofidy the `` only and execute the below cell to visualize the probability of each `ts_hour` value. + + +```python +# set cat column of interest +cat_col=<<<>>> +n_bars=25 + +# create groupby probability DataFrame +cat_target_df=ddf.groupby(cat_col)['target'].agg({'target': 'mean'}).nlargest(n_bars) + +# create bar chart +px.bar( + # move data to CPU + cat_target_df.compute().to_pandas() +).update_layout( + yaxis_title='Probability', + xaxis_title=cat_col, + title=f'Probability of {cat_col}' +) +``` +cat_col='ts_hour' +n_bars=25 +cat_target_df=ddf.groupby(cat_col)['target'].agg({'target': 'mean'}).nlargest(n_bars) + +px.bar( + cat_target_df.compute().to_pandas() +).update_layout( + yaxis_title='Probability', + xaxis_title=cat_col, + title=f'Probability of {cat_col}' +) +Click ... to show **solution**. + +Some aggregations require all data within the same group to be in memory for calculation such as `median`, `mode`, `nunique`, and etc. For these operations, `.groupby().apply()` is used. Because `.groupby().apply()` performs a shuffle, these operations scales poorly with large amounts of groups. + +We use `.groupby()` on `brand` and `.apply()` on `SeriesGroupBy['user_id'].nunique()` to get the number of unique customers that have interacted with each brand. + + +```python +# set cat columns of interest +cat_col='brand' +group_statistic='user_id' + +# create groupby summarize DataFrame +product_frequency=ddf.groupby(cat_col)[group_statistic].apply(lambda g: g.nunique(), meta=(f'{group_statistic}_count', 'int32')).nlargest(25) + +# create bar chart +px.bar( + # move data to CPU + product_frequency.compute().to_pandas() +).update_layout( + yaxis_title=f'Number of Unique {group_statistic}', + xaxis_title=cat_col, + title=f'Number of Unique {group_statistic} per {cat_col}' +) +``` + + +```python +# visualize graph +product_frequency.visualize(rankdir='LR') +``` + +Certain brands and categories have a higher penetration and higher probability of positive outcome. + +Other groupby aggregations include: +* How many unique `customer_id` are in each `product_id` group + +``` +# show how many customers interacted with each product +ddf.groupby('product_id')['user_id'].apply(lambda g: g.nunique(), meta=('nunique', 'int32')) +``` + +* How many unique `cat_0` are in each `brand` group + +``` +# show how many categories of product do each brand carry +ddf.groupby('brand')['cat_0'].apply(lambda g: g.nunique(), meta=('nunique', 'int32')) +``` + +* How many unique `product_id` are in each `user_session` group + +``` +# show how many products are view in each session +ddf.groupby('user_session')['product_id'].apply(lambda g: g.nunique(), meta=('nunique', 'int32')) +``` + +* How many unique `product_id` are in each `user_id` group + +``` +# show how many products each user interacts with +ddf.groupby('user_id')['product_id'].apply(lambda g: g.nunique(), meta=('nunique', 'int32')) +``` + +

+ +For Dask, we can ensure the result of `.groupby()` is sorted using the `sort` paramter. + +Sometimes we want to perform custom aggregations that are not yet supported. For custom aggregations, we can use `.groupby().apply()` and user-defined functions. For example, we might be interested in the range of `price` for each `category_code`. Arithmetically, this is done by taking the difference between the group-specific maxmimum and group-specific minimum. We can normalize the range by dividing it by the group-specific mean. + +It's best to avoid using `.groupby().apply()` when possible. Similar results can be calculated by using `.groupby().agg()` to obtain the `max`, `min`, and `mean` separately, then applying a row-wise calculation with `.apply()`. This can be more efficient. + + +```python +%%time +# set cat column of interest +cat_col='category_code' + +# define group-wise function +def normalized_range(group): + return (group.max()-group.min())/group.mean() + +# create groupby apply DataFrame +normalized_range_df=ddf.groupby(cat_col)['price'].apply(normalized_range, meta=('normalize_range', 'float64')).nlargest(25) + +# create bar chart +px.bar( + # move data to CPU + normalized_range_df.compute().to_pandas() +).update_layout( + yaxis_title='Normalize Range', + xaxis_title=cat_col, + title=f'Normalize Range of price per {cat_col}' +) +``` + + +```python +# visualize graph +normalized_range_df.visualize(rankdir='LR') +``` + + +### Exercise #4 - Custom GroupBy Aggregation ### + +**Instructions**:
+* Modify the `` only and execute the below cell to visualize the normalized range of each `category_code`. +* Compare the performance efficiency with the previous `.groupby().apply()` approach. + + +```python +%%time +# set cat column of interest +cat_col='category_code' + +# define row-wise function +def normalized_range(group): + return <<<>>> + +# create groupby aggregate DataFrame +normalized_range_df=ddf.groupby(cat_col)['price'].agg({'price': ['max', 'min', 'mean']}).apply(normalized_range, axis=1, meta=('normalize_range', 'float64')).nlargest(25) + +# create bar chart +px.bar( + # move data to CPU + normalized_range_df.compute().to_pandas() +).update_layout( + yaxis_title='Normalize Range', + xaxis_title=cat_col, + title=f'Normalize Range of price per {cat_col}' +) +``` +cat_col='category_code' + +def normalized_range(group): + return (group['max']-group['min'])/group['mean'] + +normalized_range_df=ddf.groupby(cat_col)['price'].agg({'price': ['max', 'min', 'mean']}).apply(normalized_range, axis=1, meta=('normalize_range', 'float64')).nlargest(25) + +px.bar( + normalized_range_df.compute().to_pandas() +).update_layout( + yaxis_title='Normalize Range', + xaxis_title=cat_col, + title=f'Normalize Range of price per {cat_col}' +) +Click ... to show **solution**. + + +```python +# visualize graph +normalized_range_df.visualize(rankdir='LR') +``` + +We can apply predicate pushdown filters when reading from Parquet files with the `filters` parameter. This will enable Dask-cuDF to skip row-groups and files where _none_ of the rows can satisfy the criteria. This works well when the partitions are thought-out and uses the filter column. Since this is not the case for our dataset, we will apply a separate filter after importing the data. + + +### Exercise #5 - Time-Series Analysis ### +When dealing with time-series data, cuDF provides powerful `.rolling()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.dataframe.rolling/) and `.resample()`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.dataframe.resample/) methods to perform window operations. Functionally, they behave very similarly to `.groupby()` operations. We use `.rolling()` followed by `.interpolate()` to find the frequency and probability over the entire span of the dataset. + +We use `.map_partitions()` to perform `.resample()` on each partition. Because `.map_partitions()` doesn't perform a shuffle first, we manually perform a `.shuffle()` to ensure all members of each group are together. Once we have all the needed data in the same partitions, we can use `.map_partitions()` and pass the cuDF DataFrame `.resample()` operation. + +**Instructions**:
+* Execute the below cell to clear memory. +* Execute the cell below to read data into memory and shuffle based on `ts_day`. This ensure that all records belonging to the same group are in the same partition. +* Execute the cell below to show the user shopping behavior. +* Modify the `resample_frequency` to various frequencies to look for more obvious patterns. + + +```python +del ddf +gc.collect() +``` + + +```python +# read data with predicate pushdown +ddf=dask_cudf.read_parquet('clean_parquet', filters=[('ts_day', "<", 15)]) + +# apply filtering +ddf=ddf[ddf['ts_day']<15] + +# shuffle first on ts_day +ddf=ddf.shuffle('ts_day') +``` + + +```python +# set resample frequency +resample_frequency='3h' + +# get time-series DataFrame +activity_amount_trend=ddf.map_partitions(lambda x: x.resample(resample_frequency, on='event_time').size().interpolate('linear')) +purchase_rate_trend=ddf.map_partitions(lambda x: x.resample(resample_frequency, on='event_time')['target'].mean().interpolate('linear')) + +# create scatter plot +px.scatter( + # move data to CPU + activity_amount_trend.compute().to_pandas().sort_index(), + color=purchase_rate_trend.compute().to_pandas().sort_index() +).update_traces( + mode='markers+lines' +).update_layout( + yaxis_title='Number of Records', + xaxis_title='event_time', + title=f'Amount of Transactions Over Time' +) +``` + + +### Pivot Table ### +When data is small enough to fit in single GPU, it's often faster to perform data transformation with cuDF. Below we will read a few numerical columns, which fits nicely in memory. We use `.pivot_table()` to find the probability and frequency at each `ts_hour` and `ts_weekday` group. + + +```python +# read data +gdf=cudf.read_parquet('clean_parquet', columns=['ts_weekday', 'ts_hour', 'target']) +``` + + +```python +# create pivot table +activity_amount=gdf.pivot_table(index=['ts_weekday'], columns=['ts_hour'], values=['target'], aggfunc='size')['target'] + +# create heatmap +px.imshow( + # move data to CPU + activity_amount.to_pandas(), + title='there is more activity in the day' +).update_layout( + title=f'Number of Records Heatmap' +) +``` + + +```python +# create pivot table +purchase_rate=gdf[['target', 'ts_weekday', 'ts_hour']].pivot_table(index=['ts_weekday'], columns=['ts_hour'], aggfunc='mean')['target'].to_pandas() + +# create heatmap +px.imshow( + # move data to CPU + purchase_rate, + title='there is potentially a higher purchase rate in the evening' +).update_layout( + title=f'Probability Heatmap' +) +``` + +**Observations**: +* Behavior changes on `ts_weekday` and `ts_hour` - e.g. during the week, users will not stay up late as they work next day. + + +## Summary ## +* `.groupby().apply()` requires shuffling, which is time-expensive. When possible, try to use `.groupby().agg()` instead +* Keeping data processing on the GPU can help generate visualizations quickly +* Use predicate filtering and column pruning to reduce the amount of data read into memory. When data size is small, processing on cuDF can be more efficient than Dask-cuDF +* Use `.persit()` if subsequent operations are exploratory +* `.map_partitions()` does not involve shuffling + + +```python +# clean GPU memory +import IPython +app = IPython.Application.instance() +app.kernel.do_shutdown(True) +``` + +**Well Done!** Let's move to the [next notebook](1_03_categorical_feature_engineering.ipynb). + + Header diff --git a/ds/25-1/3/1_03_categorical_feature_engineering.md b/ds/25-1/3/1_03_categorical_feature_engineering.md new file mode 100644 index 0000000..e5d7b8c --- /dev/null +++ b/ds/25-1/3/1_03_categorical_feature_engineering.md @@ -0,0 +1,279 @@ + Header + +# Enhancing Data Science Outcomes With Efficient Workflow # + +## 03 - Feature Engineering for Categorical Features ## +In this lab, you will learn the motivation behind doing data science on a GPU cluster. This lab covers the ETL, data exploration, and feature engineering steps of the data processing pipeline. Extract, transform, load, or [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load), is the process where data is transformed into a proper structure for the purposes of querying and analysis. Feature engineering, on the other hand, involves the extraction and transformation of raw data. + +

+ +**Table of Contents** +
+In this notebook, we will load data from Parquet file format into a Dask DataFrame and create additional features for machine learning model training. This notebook covers the below sections: +1. [Quick Recap](#s3-1) +2. [Feature Engineering](#s3-2) + * [User Defined Functions](#s3-2.1) +3. [Feature Engineering Techniques](#s3-3) + * [One-Hot Encoding](#s3-3.1) + * [Combining Categories](#s3-3.2) + * [Categorify / Label Encoding](#s3-3.3) + * [Count Encoding](#s3-3.4) + * [Target Encoding](#s3-3.5) + * [Embeddings](#s3-3.6) +4. [Summary](#s3-4) + + +## Quick Recap ## +So far, we've identified several sources of hidden slowdowns when working with Dask and cuDF: +* Reading data without a schema or specifying `dtype` +* Having too many partitions due to small `chunksize` +* Memory spilling due to partitions being too large +* Performing groupby operations on too many groups scattered across multiple partitions + +Going forward, we will continue to learn how to use Dask and RAPIDS efficiently. + + +## Feature Engineering ## +Feature engineer converts raw data to numeric vectors for model consumption. This is generally referred to as encoding, which transforms categorical data into continuous values. When encoding categorical values, there are three primary methods: +* Label encoding when no ordered relationship +* Ordinal encoding in case have ordered relationship +* One-hot encoding when categorical variable data is binary in nature. + +Additionally, we can create numerous sets of new features from existing ones, which are then tested for effectiveness during model training. Feature engineering is an important step when working with tabular data as it can improve a machine learning model's ability to learn faster and extract patterns. Feature engineering can be a time-consuming process, particularly when the dataset is large if the processing cycle takes a long time. The ability to perform feature engineering efficiently enables more exploration of useful features. + + +### User-Defined Functions ### +Like many tabular data processing APIs, cuDF provides a range of composable, DataFrame style operators. While out of the box functions are flexible and useful, it is sometimes necessary to write custom code, or **user-defined functions** (UDFs), that can be applied to rows, columns, and other groupings of the cells making up the DataFrame. + +Users can execute UDFs on `cudf.Series` with: +* `cudf.Series.apply()` or +* Numba's `forall` syntax [(link)](https://docs.rapids.ai/api/cudf/stable/user_guide/guide-to-udfs.html#lower-level-control-with-custom-numba-kernels) + +Users can execute UDFs on `cudf.DataFrame` with: +* `cudf.DataFrame.apply()` +* `cudf.DataFrame.apply_rows()` +* `cudf.DataFrame.apply_chunks()` +* `cudf.rolling().apply()` +* `cudf.groupby().apply_grouped()` + +Note that applying UDFs directly with Dask-cuDF is not yet implemented. For now, users can use `map_partitions` to apply a function to each partition of the distributed dataframe. + +Currently, the use of string data within UDFs is provided through the `string_udf` library. This is powerful for use cases such as string splitting, regular expression, and tokenization. The topic of handling string data is discussed extensively [here](https://docs.rapids.ai/api/cudf/stable/user_guide/guide-to-udfs.html#string-data). In addition to `Series.str`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/string_handling.html), cudf also supports `Series.list`[[doc]](https://docs.rapids.ai/api/cudf/stable/api_docs/list_handling.html) for applying custom transformations. + +

+ +Below are some tips: +* `apply` works by applying the provided function to each group sequentially, and concatenating the results together. This can be very slow, especially for a large number of small groups. For a small number of large groups, it can give acceptable performance. +* With cuDF, we can also combine NumPy or cuPy methods into the precedure. +* Related to `apply`, iterating over a cuDF Series, DataFrame or Index is not supported. This is because iterating over data that resides on the GPU will yield extremely poor performance, as GPUs are optimized for highly parallel operations rather than sequential operations. In the vast majority of cases, it is possible to avoid iteration and use an existing function or methods to accomplish the same task. It is recommended that users copy the data from GPU to host with `.to_arrow()` or `.to_pandas()`, then copy the result back to GPU using `.from_arrow()` or `.from_pandas()`. + + +## Feature Engineering Techniques ## +Below is a list of common feature engineering techniques. + + + + +```python +from dask.distributed import Client, wait +from dask_cuda import LocalCUDACluster +import cudf +import dask.dataframe as dd +import dask_cudf +import gc + +# instantiate a Client +cluster=LocalCUDACluster() +client=Client(cluster) +``` + + +```python +# get the machine's external IP address +from requests import get + +ip=get('https://api.ipify.org').content.decode('utf8') + +print(f'Dask dashboard (status) is accessible on http://{ip}:8787/status') +print(f'Dask dashboard (gpu) is accessible on http://{ip}:8787/gpu') +``` + + +```python +# read data as Dask-cuDF DataFrame +ddf=dask_cudf.read_parquet('clean_parquet') +ddf=ddf.categorize(columns=['brand', 'cat_0', 'cat_1', 'cat_2', 'cat_3']) +``` + + +```python +ddf=ddf.persist() +``` + +

+Did you get an error message? This notebook depends on the processed source file from previous notebooks. + + +### One-Hot Encoding ### +**One-Hot Encoding**, also known as dummy encoding, creates several binary columns to indicate a row belonging to a specific category. It works well for categorical features that are not ordinal and have low cardinality. With one-hot encoding, each row would get a single column with a 1 and 0 everywhere else. + +For example, we can get `cudf.get_dummies()` to perform one-hot encoding on all of one of the categorical columns. + + +One-hot encoding doesn't work well for categorical features when the cardinality is large as it results in high dimensionality. This is particularly an issue for neural networks optimizers. Furthermore, data should not be saved in one-hot encoding format. If needed, it should only be used temporarily for specific tasks. + + +```python +def one_hot(df, cat): + temp=dd.get_dummies(df[cat]) + return dask_cudf.concat([df, temp], axis=1) +``` + + +```python +one_hot(ddf, 'cat_0').head() +``` + + +### Combining Categories ### + +**Combining categories** creates new features that better identify patterns when the categories indepedently don't provide information to predict the target. It's also known as _cross column_ or _cross product_. It's a common data preprocessing step for machine learning since it reduces the cost of model training. It's also common for exploratory data analysis. Properly combined categorical features encourage more effective splits in tree-based methods than considering each feature independently. + +For example, while `ts_weekday` and `ts_hour` may independently have no significant patterns, we might observe more obvious patterns if the two features are combined into `ts_weekday_hour`. + + +When deciding which categorical features should be combined, it's important to balance the number of categories used, the number of observations in each combined category, and information gain. Combining features together reduces the number of observations per resulting category, which can lead to overfitting. Typically, combining low cardinal categories is recommended. Otherwise, experimentations are needed to discover the best combinations. + + +```python +def combine_cats(df, left, right): + df['-'.join([left, right])]=df[left].astype('str').str.cat(df[right].astype('str')) + return df +``` + + +```python +combine_cats(ddf, 'ts_weekday', 'ts_hour').head() +``` + + +### Categorify and Grouping ### + +**Categorify**, also known as *Label Encoding*, converts features into continuous integers. Typically, it converts the values into monotonically increasing positive integers from 0 to *C*, or the cardinality. It enables numerical computations and can also reduce memory utilization if the original feature contains string values. Categorify is a necessary data preprocessing step for neural network embedding layers. It is required for using categorical features in deep learning models with Embedding layers. + +Categorifying works well when the feature is ordinal, and is sometimes necessary when the cardinality is large. Categories with low frequency can be grouped together to prevent the model overfitting on spare signals. When categorifying a feature, we can apply a threshold to group all categories with lower frequency count to the `other` category. + +Encode categorical features into continuous integer values if the category occurs more often than the specified threshold- frequency threshold. Infrequent categories are mapped to a special ‘unknown’ category. This handy functionality will map all categories which occur in the dataset with some threshold level of infrequency to the same index, keeping the model from overfitting to sparse signals. + + +```python +def categorify(df, cat, freq_threshold): + freq=df[cat].value_counts() + freq=freq.reset_index() + freq.columns=[cat, 'count'] + + # reset index on the frequency dataframe for a new sequential index + freq=freq.reset_index() + freq.columns=[cat+'_Categorify', cat, 'count'] + + # we apply a frequency threshold of 5 to group low frequent categories together + freq_filtered=freq[freq['count']>5] + + # add 2 to the new index as we want to use index 0 for others and 1 for unknown + freq_filtered[cat+'_Categorify']=freq_filtered[cat+'_Categorify']+2 + freq_filtered=freq_filtered.drop(columns=['count']) + + # merge original dataframe with newly created dataframe to obtain the categorified value + df=df.merge(freq_filtered, how='left', on=cat) + + # fill null values with 0 to represent low frequency categories grouped as other + df[cat + '_Categorify'] = df[cat + '_Categorify'].fillna(0) + return df +``` + + +```python +categorify(ddf, 'cat_0', 10).head() +``` + + +### Count Encoding ### + +*Count Encoding* represents a feature based on the frequency. This can be interpreted as the popularity of a category. + +For example, we can count the frequency of `user_id` with `cudf.Series.value_counts()`. This creates a feature that can help a machine learning model learn the behavior pattern of users with low frequency together. + + +```python +def count_encoding(df, cat): + count_df=df[cat].value_counts() + count_df=count_df.reset_index() + count_df.columns=[cat, cat+'_CE'] + df=df.merge(count_df, on=cat) + return df +``` + + +```python +count_encoding(ddf, 'user_id').head() +``` + + +### Target Encoding ### + +**Target Encoding** represents a categorical feature based on its effect on the target variable. One common technique is to replace values with the probability of the target given a category. Target encoding creates a new feature, which can be used by the model for training. The advantage of target encoding is that it processes the categorical features and makes them more easily accessible to the model during training and validation. + +Mathematically, target encoding on a binary target can be: + +p(t = 1 | x = ci) + +For a binary classifier, we can calculate the probability when the target is `true` or `1` by taking the mean for each category group. This is also known as *Mean Encoding*. + +In other words, it calculates statistics, such as the arithmetic mean, from a target variable grouped by the unique values of one or more categorical features. + + + +*Leakage*, also known as data leakage or target leakage, occurs when training a model with information that would not be avilable at the time of prediction. This can cause the inflated model performance score to overestimate the model's utility. For example, including "temperature_celsius" as a feature when training and predicting "temperature_fahrenheit". + + +```python +def target_encoding(df, cat): + te_df=df.groupby(cat)['target'].mean().reset_index() + te_df.columns=[cat, cat+'_TE'] + df=df.merge(te_df, on=cat) + return df +``` + + +```python +target_encoding(ddf, 'brand').head() +``` + + +### Embeddings ### + +Deep learning models often apply **Embedding Layers** to categorical features. Over the past few years, this has become an increasing popular technique for encoding categorical features. Since the embeddings need to be trained through a neural network, we will cover this in the next lab. + + +```python +ddf=one_hot(ddf, 'cat_0') +ddf=combine_cats(ddf, 'ts_weekday', 'ts_hour') +ddf=categorify(ddf, 'product_id', 100) +ddf=count_encoding(ddf, 'user_id') +ddf=count_encoding(ddf, 'product_id') +ddf=target_encoding(ddf, 'brand') +ddf=target_encoding(ddf, 'product_id') +ddf.head() +``` + + +```python +# clean GPU memory +import IPython +app = IPython.Application.instance() +app.kernel.do_shutdown(True) +``` + +**Well Done!** Let's move to the [next notebook](1_04_nvtabular_and_mgpu.ipynb). + + Header diff --git a/ds/25-1/3/1_04_nvtabular_and_mgpu.md b/ds/25-1/3/1_04_nvtabular_and_mgpu.md new file mode 100644 index 0000000..a33a0bb --- /dev/null +++ b/ds/25-1/3/1_04_nvtabular_and_mgpu.md @@ -0,0 +1,341 @@ + Header + +# Enhancing Data Science Outcomes With Efficient Workflow # + +## 04 - NVTabular ## +In this lab, you will learn the motivation behind doing data science on a GPU cluster. This lab covers the ETL, data exploration, and feature engineering steps of the data processing pipeline. Extract, transform, load, or [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load), is the process where data is transformed into a proper structure for the purposes of querying and analysis. Feature engineering, on the other hand, involves the extraction and transformation of raw data. + +

+ +**Table of Contents** +
+In this notebook, we will use NVTabular to perform feature engineering. This notebook covers the below sections: +1. [NVTabular](#s4-1) + * [Multi-GPU Scaling in NVTabular with Dask](#s4-1.1) +2. [Operators](#s4-2) +3. [Feature Engineering and Preprocessing with NVTabular](#s4-3) + * [Defining the Workflow](#s4-3.1) + * [Exercise #1 - Using NVTabular Operators](#s4-e1) + * [Defining the Dataset](#s4-3.2) + * [Fit, Transform, and Persist](#s4-3.3) + * [Exercise #2 - Load Saved Workflow](#s4-e2) + + +## NVTabular ## +[NVTabular](https://nvidia-merlin.github.io/NVTabular/main/index.html) is a feature engineering and preprocessing library for tabular data that is designed to easily manipulate terabyte scale datasets. It provides high-level abstraction to simplify code and accelerates computation on the GPU using the RAPIDS [cuDF](https://docs.rapids.ai/api/cudf/stable/) library. While NVTabular is built upon the RAPIDS cuDF library, it improves cuDF since data is not limited to GPU memory capacity. The API documentation can be found [here](https://nvidia-merlin.github.io/NVTabular/main/api.html#). + +Core features of NVTabular include: +* Easily process data by leveraging built-in or custom operators specifically designed for machine learning algorithms +* Computations are carried out on the GPU with best practices baked into the library, allowing us to realize significant acceleration +* Provide higher-level API to greatly simplify code complexity while still providing the same level of performance +* Work on arbitrarily large datasets when used with [Dask](https://www.dask.org/) +* Minimize the number of passes through the data with [Lazy execution](https://en.wikipedia.org/wiki/Lazy_evaluation) + +In doing so, NVTabular helps data scientists and machine learning engineers to: +* Process datasets that exceed GPU and CPU memory without having to worry about scale +* Focus on what to do with the data and not how to do it by using abstraction at the operation level +* Prepare datasets quickly and easily for experimentation so that more models can be trained + +Data science can be an iterative process that requires extensive repeated experimentation. The ability to perform feature engineering and preprocessing quickly translates into faster iteration cycles, which can help us to arrive at an optimal solution. + + +### Multi-GPU Scaling in NVTabular with Dask ### +NVTabular supports multi-GPU scaling with [Dask-CUDA](https://github.com/rapidsai/dask-cuda) and `dask.distributed`[[doc]](https://distributed.dask.org/en/latest/). For multi-GPU, NVTabular uses [Dask-cuDF](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) for internal data processing. The parallel performance can depend strongly on the size of the partitions, the shuffling procedure used for data output, and the arguments used for transformation operations. + + +## Operators ## +NVTabular has already implemented several data transformations, called `ops`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/generated/nvtabular.ops.Operator.html). An `op` can be applied to a `ColumnGroup` from an overloaded `>>` operator, which in turn returns a new `ColumnGroup`. A `ColumnGroup` is a list of column names as text. + +``` +features = [ column_name_1, column_name_2, ...] >> op1 >> op2 >> ... +``` + +Since the Dataset API can both ingest and output a Dask collection, it is straightforward to transform data either before or after an NVTabular workflow is executed. This means that some complex preprocessing operations, that are not yet supported in NVTabular, can still be accomplished with the Dask-CuDF API: + +Common operators include: +* [Categorify](https://nvidia-merlin.github.io/NVTabular/main/api/ops/categorify.html) - transform categorical features into unique integer values + * Can apply a frequency threshold to group low frequent categories together +* [TargetEncoding](https://nvidia-merlin.github.io/NVTabular/main/api/ops/targetencoding.html) - transform categorical features into group-specific mean of each row + * Using `kfold=1` and `p_smooth=0` is the same as disabling these additional logic +* [Groupby](https://nvidia-merlin.github.io/NVTabular/main/api/ops/groupby.html) - transform feature into the result of one or more groupby aggregations + * **NOTE**: Does not move data between partitions, which means data should be shuffled by groupby_cols +* [JoinGroupby](https://nvidia-merlin.github.io/NVTabular/main/api/ops/joingroupby.html) - add new feature based on desired group-specific statistics of requested continuous features + * Supported statistics include [`count`, `sum`, `mean`, `std`, `var`]. +* [LogOp](https://nvidia-merlin.github.io/NVTabular/main/api/ops/log.html) - log transform with the continuous features +* [FillMissing](https://nvidia-merlin.github.io/NVTabular/main/api/ops/fillmissing.html) - replaces missing values with constant pre-defined value +* [Bucketize](https://nvidia-merlin.github.io/NVTabular/main/api/ops/bucketize.html) - transform continuous features into categorical features with bins based on provided bin boundaries +* [LambdaOp](https://nvidia-merlin.github.io/NVTabular/main/api/ops/lambdaop.html) - enables custom row-wise dataframe manipulations with NVTabular +* [Rename](https://nvidia-merlin.github.io/NVTabular/main/api/ops/rename.html) - rename columns +* [Normalize](https://nvidia-merlin.github.io/NVTabular/main/api/ops/normalize.html) - perform normalization using the mean standard deviation method + + +```python +# import dependencies +import nvtabular as nvt +from nvtabular.ops import * + +from dask.distributed import Client, wait +from dask_cuda import LocalCUDACluster +import dask_cudf +import cudf +import gc + +# instantiate a Client +cluster=LocalCUDACluster() +client=Client(cluster) +``` + + +```python +# get the machine's external IP address +from requests import get + +ip=get('https://api.ipify.org').content.decode('utf8') + +print(f'Dask dashboard (status) is accessible on http://{ip}:8787/status') +print(f'Dask dashboard (gpu) is accessible on http://{ip}:8787/gpu') +``` + + +```python +# read data as Dask DataFrame +ddf=dask_cudf.read_parquet('clean_parquet') + +# preview DataFrame +ddf.head() +``` + + +## Feature Engineering and Preprocessing with NVTabular ## +The typical steps for developing with NVTabular include: +1. Design and Define Operations in the Pipeline +2. Create Workflow +3. Create Dataset +4. Apply Workflow to Dataset + +

+ + +### Defining the Workflow ### +We start by creating the `nvtabular.workflow.workflow.Workflow`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/workflow/workflow.html), which defines the operations and preprocessing steps that we would like to perform on the data. + +We will perform the following feature engineering and preprocessing steps: +* Categorify the categorical features +* Log transform and normalize continuous features +* Calculate group-specific `sum`, `count`, and `mean` of the `target` for categorical features +* Log transform `price` +* Calculate `product_id` specific relative `price` to average `price` +* Target encode all categorical features + +One of the key advantages of using NVTabular is the high-level abstraction we can use, which simplifies code significantly. + + +```python +# assign features and label +cat_cols=['brand', 'cat_0', 'cat_1', 'cat_2', 'cat_3'] +cont_cols=['price', 'ts_hour', 'ts_minute', 'ts_weekday'] +label='target' +``` + + +```python +# categorify categorical features +cat_features=cat_cols >> Categorify() +``` + + +### Exercise #1 - Using NVTabular Operators ### +We can use the `>>` operator to specify how columns will be transformed. We need to transform the `price` feature by performing the log transformation and normalization. + +**Instructions**:
+* Review the documentation for the `LogOp()`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/ops/log.html) and `Normalize()`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/ops/normalize.html) operators. +* Modify the ``s only and execute the cell below to create a workflow. + + +```python +# log transform +price = ( + ['price'] + >> FillMissing(0) + >> <<<>>> + >> <<<>>> + >> LambdaOp(lambda col: col.astype("float32"), dtype='float32') +) +``` +price = ( + ['price'] + >> FillMissing(0) + >> LogOp() + >> Normalize() + >> LambdaOp(lambda col: col.astype("float32"), dtype='float32') +) +Click ... to show **solution**. + +There are several ways to create a feature for relative `price` to average. We will do so with the below steps: +1. Calculate average `price` per group. +2. Define a function to calculate the percentage difference +3. Apply the user defined function to `price` and average `price` + + +```python +# relative price to the average price for the product_id +# create product_id specific average price feature +avg_price_product = ['product_id'] >> JoinGroupby(cont_cols =['price'], stats=["mean"]) + +# create user defined function to calculate percent difference +def relative_price_to_avg(col, gdf): + # introduce tiny number in case of 0 + epsilon = 1e-5 + col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int) + return col + +# create product_id specific relative price to average +relative_price_to_avg_product = ( + avg_price_product + >> LambdaOp(relative_price_to_avg, dependency=['price'], dtype='float64') + >> Rename(name='relative_price_product') +) +``` + + +```python +avg_price_category = ['category_code'] >> JoinGroupby(cont_cols =['price'], stats=["mean"]) + +# create product_id specific relative price to average +relative_price_to_avg_category = ( + avg_price_category + >> LambdaOp(relative_price_to_avg, dependency=['price'], dtype='float64') + >> Rename(name='relative_price_category') +) +``` + + +```python +# calculate group-specific statistics for categorical features +ce_features=cat_cols >> JoinGroupby(stats=['sum', 'count'], cont_cols=label) + +# target encode +te_features=cat_cols >> TargetEncoding(label) +``` + +We also add the target, i.e. `label`, to the set of returned columns. We can visualize our data processing pipeline with `graphviz` by calling `.graph`. The data processing pipeline is a DAG (direct acyclic graph). + + +```python +features=cat_features+cont_cols+ce_features+te_features+price+relative_price_to_avg_product+relative_price_to_avg_category+[label] +features.graph +``` + +We are now ready to construct a `Workflow` that will run the operations we defined above. To enable distributed parallelism, the NVTabular `Workflow` must be initialized with a `dask.distributed.Client` object. Since NVTabular already uses Dask-CuDF for internal data processing, there are no other requirements for multi-GPU scaling. + + +```python +# define our NVTabular Workflow with client to enable multi-GPU execution +# for multi-GPU execution, the only requirement is that we specify a client when +# initializing the NVTabular Workflow. +workflow=nvt.Workflow(features, client=client) +``` + + +### Defining the Dataset ### +All external data need to be converted to the universal `nvtabular.io.dataset.Dataset`[[doc]](https://nvidia-merlin.github.io/NVTabular/v0.7.1/api/dataset.html) type. The main purpose of this class is to abstract away the raw format of the data, and to allow other NVTabular classes to reliably materialize a `dask.dataframe.DataFrame` collection and/or collection-based iterator on demand. + +The collection-based iterator is important when working with large datasets that do not fit into GPU memory since operations in the `Workflow` often require statistics calculated across the entire dataset. For example, `Normalize` requires measurements of the dataset mean and standard deviation, and `Categorify` requires an accounting of all the unique categories a particular feature can manifest. The `Dataset` object partitions the dataset into chunks that will fit into GPU memory to compute statistics in an online fashion. + +A `Dataset` can be initialized from a variety of different raw-data formats: +1. With a parquet-dataset directory +2. With a list of files +3. In addition to handling data stored on disk, a `Dataset` can also be initialized from an existing cuDF DataFrame, or from a `dask.dataframe.DataFrame` + +The data we pass to the `Dataset` constructor is usually the result of a query from some source, for example a data warehouse or data lake. The output is usually in Parquet, ORC, or CSV format. In our case, we have the data in parquet format saved on the disk from previous steps. When initializing a `Dataset` from a directory path, the engine should be used to specify either `parquet` or `csv` format. If initializing a `Dataset` from a list of files, the engine can be inferred. + +Memory is an important consideration. The workflow will process data in chunks, therefore increasing the number of partitions will limit the memory footprint. Since we will initialize the `Dataset` with a DataFrame type (`cudf.DataFrame` or `dask.dataframe.DataFrame`), most of the parameters will be ignored and the partitions will be preserved. Otherwise, the data would be converted to a `dask.dataframe.DataFrame` with a maximum partition size of roughly 12.5% of the total memory on a single device by default. We can use the `npartitions` parameter for specifying into how many chunks we would like the data to be split. The partition size can be changed to a different fraction of total memory on a single device with the `part_mem_fraction` argument. Alternatively, a specific byte size can be specified with the `part_size` argument. + +

+ +The NVTabular dataset should be created from Parquet files in order to get the best possible performance, preferably with a row group size of around 128MB. While NVTabular also supports reading from CSV files, reading CSV can be over twice as slow as reading from Parquet. It's recommended to convert a CSV dataset into Parquet format for use with NVTabular. + + +```python +# create dataset +dataset=nvt.Dataset(ddf) + +print(f'The Dataset is split into {dataset.npartitions} partitions') +``` + + +### Fit, Transform, and Persist ### +NVTabular follows a familiar API for pipeline operations. We can `.fit()` the workflow to a training set to calculate the statistics for this workflow. Afterwards, we can use it to `.transform()` the training set and validation dataset. We will persist the transformed data to disk in parquet format for fast reading and train time. Importantly, we can use the `.save()`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/workflow/workflow.html#nvtabular.workflow.workflow.Workflow.save) method so that our `Workflow` can be used during model inference. + +

+ +Since the `Dataset` API can both ingest and output a Dask collection, it is straightforward to transform data either before or after an NVTabular workflow is executed. This means that some complex pre-processing operations, that are not yet supported in NVTabular, can still be accomplished with the `dask_cudf.DataFrame` API after the `Dataset` is converted with `.to_ddf`. + + +```python +# fit and transform dataset +workflow.fit(dataset) +output_dataset=workflow.transform(dataset) +``` + + +```python +# save the workflow +workflow.save('nvt_workflow') + +!ls -l nvt_workflow +``` + + +```python +# remove existing parquet directory +!rm -R processed_parquet/* + +# save output to parquet directory +output_path='processed_parquet' +output_dataset.to_parquet(output_path=output_path) +``` + +If needed, we can convert the `Dataset` object to `dask.dataframe.DataFrame` to inspect the results. + + +```python +# convert to DataFrame and preview +output_dataset.to_ddf().head() +``` + + +### Exercise #2 - Load Saved Workflow ### +We can load a saved workflow, which will contain the graph, schema, and statistics. This is useful if the workflow should be applied to future datasets. + +**Instructions**:
+* Review the [documentation](https://nvidia-merlin.github.io/NVTabular/main/api/workflow/workflow.html#nvtabular.workflow.workflow.Workflow.load) for the `.load()` _class_ method. +* Modify the `` only and execute the cell below to create a workflow. +* Execute the cell below to apply the graph of operators to transform the data. + + +```python +# load workflow +loaded_workflow=<<<>>> +``` +loaded_workflow=nvt.Workflow.load('nvt_workflow') +Click ... to show **solution**. + + +```python +# create dataset from parquet directory +dataset=nvt.Dataset('clean_parquet', engine='parquet') + +# transform dataset +loaded_workflow.transform(dataset).to_ddf().head() +``` + + +```python +# clean GPU memory +import IPython +app = IPython.Application.instance() +app.kernel.do_shutdown(restart=False) +``` + + Header diff --git a/ds/25-1/3/README.md b/ds/25-1/3/README.md new file mode 100644 index 0000000..2a5b833 --- /dev/null +++ b/ds/25-1/3/README.md @@ -0,0 +1,7 @@ +h + +Some categorical features, such as `event_type`, `category_code`, `brand`, and etc. are stored as raw text. +Dask > cuDF if dataset doesn't fit into memory +none of the optimizations introduced by Dask-CUDA will be available in such cases (without dask_cuda.LocalCUDACluster()) +dask.distributed.Client +spilling uses host when is reached diff --git a/ds/25-1/3/memory_utilization.md b/ds/25-1/3/memory_utilization.md new file mode 100644 index 0000000..d0ba2c8 --- /dev/null +++ b/ds/25-1/3/memory_utilization.md @@ -0,0 +1,92 @@ + Header + + +### Memory Utilization ### +Memory utilization on a DataFrame depends largely on the date types for each column. + +

+ +We can use `DataFrame.memory_usage()` to see the memory usage for each column (in bytes). Most of the common data types have a fixed size in memory, such as `int`, `float`, `datetime`, and `bool`. Memory usage for these data types is the respective memory requirement multiplied by the number of data points. For `string` data types, the memory usage reported is the number of data points times 8 bytes. This accounts for the 64-bit required for the pointer that points to an address in memory but doesn't include the memory used for the actual string values. The actual memory required for a `string` value is 49 bytes plus an additional byte for each character. The `deep` parameter provides a more accurate memory usage report that accounts for the system-level memory consumption of the contained `string` data type. + +Separately, we've provided a `dli_utils.make_decimal()` function to convert memory size into units based on powers of 2. In contrast to units based on powers of 10, this customary convention is commonly used to report memory capacity. More information about the two definitions can be found [here](https://en.wikipedia.org/wiki/Byte#Multiple-byte_units). + + +```python +# import dependencies +import pandas as pd +import sys +import random + +# import utility +from dli_utils import make_decimal + +# import data +df=pd.read_csv('2020-Mar.csv') + +# preview DataFrame +df.head() +``` + + +```python +# convert feature as datetime data type +df['event_time']=pd.to_datetime(df['event_time']) +``` + + +```python +# lists each column at 8 bytes/row +memory_usage_df=df.memory_usage(index=False) +memory_usage_df.name='memory_usage' +dtypes_df=df.dtypes +dtypes_df.name='dtype' + +# show each column uses roughly number of rows * 8 bytes +# 8 bytes from 64-bit numerical data as well as 8 bytes to store a pointer for object data type +byte_size=len(df) * 8 * len(df.columns) + +print(f'Total memory use is {byte_size} bytes or ~{make_decimal(byte_size)}.') + +pd.concat([memory_usage_df, dtypes_df], axis=1) +``` + + +```python +# lists each column's full memory usage +memory_usage_df=df.memory_usage(deep=True, index=False) +memory_usage_df.name='memory_usage' + +byte_size=memory_usage_df.sum() + +# show total memory usage +print(f'Total memory use is {byte_size} bytes or ~{make_decimal(byte_size)}.') + +pd.concat([memory_usage_df, dtypes_df], axis=1) +``` + + +```python +# alternatively, use sys.getsizeof() instead +byte_size=sys.getsizeof(df) + +print(f'Total memory use is {byte_size} bytes or ~{make_decimal(byte_size)}.') +``` + + +```python +# check random string-typed column +string_cols=[col for col in df.columns if df[col].dtype=='object' ] +column_to_check=random.choice(string_cols) + +overhead=49 +pointer_size=8 + +# nan==nan when value is not a number +# nan uses 32 bytes of memory +print(f'{column_to_check} column uses : {sum([(len(item)+overhead+pointer_size) if item==item else 32 for item in df[column_to_check].values])} bytes of memory.') +``` + +

+When Python stores a string, it actually uses memory for the overhead of the Python object, metadata about the string, and the string itself. The amount of memory usage we calculated includes temporary objects that get deallocated after the initial import. It's important to note that Python has memory optimization mechanics for strings such that when the same string is created multiple time, Python will cache or "intern" it in memory and reuse it for later string objects. + + Header diff --git a/ds/25-1/r/9.Rmd b/ds/25-1/r/9.Rmd new file mode 100644 index 0000000..f679659 --- /dev/null +++ b/ds/25-1/r/9.Rmd @@ -0,0 +1,161 @@ +--- +title: "Lab9: Decision trees" +author: "Vladislav Litvinov " +output: + pdf_document: + toc_float: TRUE +--- +# Data preparation +```{r} +setwd('/home/sek1ro/git/public/lab/ds/25-1/r') +survey <- read.csv('survey.csv') + +train_df = survey[1:600,] +test_df = survey[601:750,] +``` +# Building classification tree +decision formula is MYDEPV ~ Price + Income + Age + +Use three-fold cross-validation and the information gain splitting index +Which features were actually used to construct the tree? +Plot the tree using the “rpart.plot” package. + +Three-fold cross-validation - Делают 3 прогона: +Прогон 1: обучаемся на B + C, тестируем на A +Прогон 2: обучаемся на A + C, тестируем на B +Прогон 3: обучаемся на A + B, тестируем на C + +Получаем 3 значения метрики (accuracy, F1, MSE и т.п.). +Берём среднее значение — это и есть итоговая оценка качества модели. + +rpart сам отбрасывает признаки, если они не улучшают разбиение по information gain. + +CP-table - связь сложности дерева и ошибки +Root node error — ошибка без разбиений +nsplit — число split-ов +rel error — обучающая ошибка относительно корня +xerror — ошибка по cross-validation +xstd — стандартное отклонение xerror + +type — расположение split-ов +extra — доп. информация в узлах +fallen.leaves — выравнивание листьев + +H = -x\cdot\log\left(x\right)-\left(1-x\right)\log\left(1-x\right) +Gain(A) = Info(S) - Info(S_A) - максимизируем + +Ранняя остановка. Ограничение грубины. Минимальное количество примеров в узле. + +Отсечение ветвей. +Строительство полного дерева, в котором листья содержат примеры одного класса. +Определение двух показателей: относительную точность модели и абсолютную ошибку. +Удаление листов и узлов, потеря которых минимально скажется на точности модели и увеличении ошибки. + + +```{r} +library(rpart) +tree = rpart( + MYDEPV ~ Price + Income + Age, + data = train_df, + method = "class", + parms = list(split = "information"), + control = rpart.control( + xval = 3, + ), +) +printcp(tree) + +library(rpart.plot) + +rpart.plot( + tree, + type = 1, + extra = 106, + #6 Class models: the probability of the second class only. Useful for binary responses. + #100 display the percentage of observations in the node. + fallen.leaves = TRUE, +) +``` +Score the model with the training data and create the model’s confusion matrix. Which class of MYDEPV was the model better able to classify? +```{r} +pred_class = predict(tree, train_df, type="class") + +conf_mat = table( + Actual = train_df$MYDEPV, + Predicted = pred_class +) + +conf_mat +print(diag(conf_mat) / rowSums(conf_mat)) +``` +Define the resubstitution error rate, and then calculate it using the confusion matrix from the previous step. Is it a good indicator of predictive performance? Why or why not? + +Resubstitution error rate — это доля неправильных предсказаний на тех же данных, на которых обучалась модель +```{r} +print(1 - sum(diag(conf_mat)) / sum(conf_mat)) +``` +ROC curve - Receiver Operating Characteristic +x - FPR = FP / (FP + TN) +y - TPR = TP / (TP + FN) +```{r} +pred_prob = predict(tree, train_df, type="prob")[,2] + +library(ROCR) +pred = prediction(pred_prob, train_df$MYDEPV) +perf = performance(pred, "tpr", "fpr") + +plot(perf) +abline(a = 0, b = 1) + +auc_perf = performance(pred, measure = "auc") +auc_perf@y.values[[1]] +``` +Score the model with the testing data. How accurate are the tree’s predictions? +Repeat part (a), but set the splitting index to the Gini coefficient splitting index. How does the new tree compare to the previous one? + +индекс Джини показывает, как часто случайно выбранный пример обучающего множества будет распознан неправильно. + +Gini(Q) = 1 - sum(p^2) - максимизируем +0 - все к 1 классу +1 - все равновероятны +1-\ x^{2}\ -\ \left(1-x\right)^{2} +```{r} +pred_test = predict(tree, test_df, type="class") +conf_mat_test = table(Actual = test_df$MYDEPV, Predicted = pred_test) +conf_mat_test +print(diag(conf_mat_test) / rowSums(conf_mat_test)) + +tree_gini = rpart( + MYDEPV ~ Price + Income + Age, + data = train_df, + method = "class", + parms = list(split = "gini") +) + +printcp(tree_gini) + +rpart.plot( + tree_gini, + type = 1, + extra = 106, + fallen.leaves = TRUE, +) +``` +One way to prune a tree is according to the complexity parameter associated with the smallest cross-validation error. Prune the new tree in this way using the “prune” function. Which features were actually used in the pruned tree? Why were certain variables not used? +```{r} +best_cp <- tree_gini$cptable[which.min(tree_gini$cptable[, "xerror"]), "CP"] +best_cp + +pruned_tree = prune(tree_gini, cp = best_cp) + +printcp(pruned_tree) + +rpart.plot(pruned_tree) +``` +Create the confusion matrix for the new model, and compare the performance of the model before and after pruning. +```{r} +pruned_pred = predict(pruned_tree, test_df, type="class") +pruned_conf_mat = table(Actual = test_df$MYDEPV, Predicted = pruned_pred) +pruned_conf_mat +print(diag(pruned_conf_mat) / rowSums(pruned_conf_mat)) +``` \ No newline at end of file diff --git a/ds/25-1/r/r2.Rproj b/ds/25-1/r/r2.Rproj deleted file mode 100644 index 8e3c2eb..0000000 --- a/ds/25-1/r/r2.Rproj +++ /dev/null @@ -1,13 +0,0 @@ -Version: 1.0 - -RestoreWorkspace: Default -SaveWorkspace: Default -AlwaysSaveHistory: Default - -EnableCodeIndexing: Yes -UseSpacesForTab: Yes -NumSpacesForTab: 2 -Encoding: UTF-8 - -RnwWeave: Sweave -LaTeX: pdfLaTeX