{ "cells": [ { "cell_type": "markdown", "id": "a5863a6f-d28f-4f3c-bceb-69a8d6482cc8", "metadata": {}, "source": [ " \"Header\" " ] }, { "cell_type": "markdown", "id": "e60bbdfc-6e2c-4275-ba3e-d72e70ab021b", "metadata": { "tags": [] }, "source": [ "# Enhancing Data Science Outcomes With Efficient Workflow #" ] }, { "cell_type": "markdown", "id": "b9e9d7e6-8b43-45f0-9de5-9889a67a229b", "metadata": {}, "source": [ "## 04 - NVTabular ##\n", "In this lab, you will learn the motivation behind doing data science on a GPU cluster. This lab covers the ETL, data exploration, and feature engineering steps of the data processing pipeline. Extract, transform, load, or [ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load), is the process where data is transformed into a proper structure for the purposes of querying and analysis. Feature engineering, on the other hand, involves the extraction and transformation of raw data. \n", "\n", "

\n", "\n", "**Table of Contents**\n", "
\n", "In this notebook, we will use NVTabular to perform feature engineering. This notebook covers the below sections: \n", "1. [NVTabular](#s4-1)\n", " * [Multi-GPU Scaling in NVTabular with Dask](#s4-1.1)\n", "2. [Operators](#s4-2)\n", "3. [Feature Engineering and Preprocessing with NVTabular](#s4-3)\n", " * [Defining the Workflow](#s4-3.1)\n", " * [Exercise #1 - Using NVTabular Operators](#s4-e1)\n", " * [Defining the Dataset](#s4-3.2)\n", " * [Fit, Transform, and Persist](#s4-3.3)\n", " * [Exercise #2 - Load Saved Workflow](#s4-e2)" ] }, { "cell_type": "markdown", "id": "2eaa7334-ffc5-4ab2-b76c-3bfb9d21b9cd", "metadata": {}, "source": [ "\n", "## NVTabular ##\n", "[NVTabular](https://nvidia-merlin.github.io/NVTabular/main/index.html) is a feature engineering and preprocessing library for tabular data that is designed to easily manipulate terabyte scale datasets. It provides high-level abstraction to simplify code and accelerates computation on the GPU using the RAPIDS [cuDF](https://docs.rapids.ai/api/cudf/stable/) library. While NVTabular is built upon the RAPIDS cuDF library, it improves cuDF since data is not limited to GPU memory capacity. The API documentation can be found [here](https://nvidia-merlin.github.io/NVTabular/main/api.html#). \n", "\n", "Core features of NVTabular include: \n", "* Easily process data by leveraging built-in or custom operators specifically designed for machine learning algorithms\n", "* Computations are carried out on the GPU with best practices baked into the library, allowing us to realize significant acceleration\n", "* Provide higher-level API to greatly simplify code complexity while still providing the same level of performance\n", "* Work on arbitrarily large datasets when used with [Dask](https://www.dask.org/)\n", "* Minimize the number of passes through the data with [Lazy execution](https://en.wikipedia.org/wiki/Lazy_evaluation)\n", "\n", "In doing so, NVTabular helps data scientists and machine learning engineers to: \n", "* Process datasets that exceed GPU and CPU memory without having to worry about scale\n", "* Focus on what to do with the data and not how to do it by using abstraction at the operation level\n", "* Prepare datasets quickly and easily for experimentation so that more models can be trained\n", "\n", "Data science can be an iterative process that requires extensive repeated experimentation. The ability to perform feature engineering and preprocessing quickly translates into faster iteration cycles, which can help us to arrive at an optimal solution. " ] }, { "cell_type": "markdown", "id": "dbf181ea-5e26-48f5-a4eb-e2b95112f744", "metadata": {}, "source": [ "\n", "### Multi-GPU Scaling in NVTabular with Dask ###\n", "NVTabular supports multi-GPU scaling with [Dask-CUDA](https://github.com/rapidsai/dask-cuda) and `dask.distributed`[[doc]](https://distributed.dask.org/en/latest/). For multi-GPU, NVTabular uses [Dask-cuDF](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) for internal data processing. The parallel performance can depend strongly on the size of the partitions, the shuffling procedure used for data output, and the arguments used for transformation operations. " ] }, { "cell_type": "markdown", "id": "2ed12dc2-5f40-4fc2-b7c0-2a8535c81d7e", "metadata": {}, "source": [ "\n", "## Operators ##\n", "NVTabular has already implemented several data transformations, called `ops`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/generated/nvtabular.ops.Operator.html). An `op` can be applied to a `ColumnGroup` from an overloaded `>>` operator, which in turn returns a new `ColumnGroup`. A `ColumnGroup` is a list of column names as text. \n", "\n", "```\n", "features = [ column_name_1, column_name_2, ...] >> op1 >> op2 >> ...\n", "```\n", "\n", "Since the Dataset API can both ingest and output a Dask collection, it is straightforward to transform data either before or after an NVTabular workflow is executed. This means that some complex preprocessing operations, that are not yet supported in NVTabular, can still be accomplished with the Dask-CuDF API:\n", "\n", "Common operators include: \n", "* [Categorify](https://nvidia-merlin.github.io/NVTabular/main/api/ops/categorify.html) - transform categorical features into unique integer values\n", " * Can apply a frequency threshold to group low frequent categories together\n", "* [TargetEncoding](https://nvidia-merlin.github.io/NVTabular/main/api/ops/targetencoding.html) - transform categorical features into group-specific mean of each row\n", " * Using `kfold=1` and `p_smooth=0` is the same as disabling these additional logic\n", "* [Groupby](https://nvidia-merlin.github.io/NVTabular/main/api/ops/groupby.html) - transform feature into the result of one or more groupby aggregations\n", " * **NOTE**: Does not move data between partitions, which means data should be shuffled by groupby_cols\n", "* [JoinGroupby](https://nvidia-merlin.github.io/NVTabular/main/api/ops/joingroupby.html) - add new feature based on desired group-specific statistics of requested continuous features\n", " * Supported statistics include [`count`, `sum`, `mean`, `std`, `var`]. \n", "* [LogOp](https://nvidia-merlin.github.io/NVTabular/main/api/ops/log.html) - log transform with the continuous features\n", "* [FillMissing](https://nvidia-merlin.github.io/NVTabular/main/api/ops/fillmissing.html) - replaces missing values with constant pre-defined value\n", "* [Bucketize](https://nvidia-merlin.github.io/NVTabular/main/api/ops/bucketize.html) - transform continuous features into categorical features with bins based on provided bin boundaries\n", "* [LambdaOp](https://nvidia-merlin.github.io/NVTabular/main/api/ops/lambdaop.html) - enables custom row-wise dataframe manipulations with NVTabular\n", "* [Rename](https://nvidia-merlin.github.io/NVTabular/main/api/ops/rename.html) - rename columns\n", "* [Normalize](https://nvidia-merlin.github.io/NVTabular/main/api/ops/normalize.html) - perform normalization using the mean standard deviation method" ] }, { "cell_type": "code", "execution_count": 1, "id": "ef7617b1-c799-4c33-8c34-b339c7df649d", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/envs/rapids/lib/python3.9/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'\n", " warn(f\"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}\")\n", "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n", "Perhaps you already have a cluster running?\n", "Hosting the HTTP server on port 45589 instead\n", " warnings.warn(\n", "2026-02-11 13:20:08,688 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n", "2026-02-11 13:20:08,688 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n", "2026-02-11 13:20:08,712 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n", "2026-02-11 13:20:08,712 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n", "2026-02-11 13:20:08,718 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n", "2026-02-11 13:20:08,718 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n", "2026-02-11 13:20:08,723 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n", "2026-02-11 13:20:08,723 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n" ] } ], "source": [ "# import dependencies\n", "import nvtabular as nvt\n", "from nvtabular.ops import * \n", "\n", "from dask.distributed import Client, wait\n", "from dask_cuda import LocalCUDACluster\n", "import dask_cudf\n", "import cudf\n", "import gc\n", "\n", "# instantiate a Client\n", "cluster=LocalCUDACluster()\n", "client=Client(cluster)" ] }, { "cell_type": "code", "execution_count": null, "id": "b4646966-9d65-4d44-ad9a-abc0c68763bc", "metadata": { "scrolled": true, "tags": [] }, "outputs": [], "source": [ "# get the machine's external IP address\n", "from requests import get\n", "\n", "ip=get('https://api.ipify.org').content.decode('utf8')\n", "\n", "print(f'Dask dashboard (status) is accessible on http://{ip}:8787/status')\n", "print(f'Dask dashboard (gpu) is accessible on http://{ip}:8787/gpu')" ] }, { "cell_type": "code", "execution_count": 2, "id": "6941e910-3611-4edc-ba89-a480e5386fb6", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
event_timeevent_typeproduct_idcategory_idcategory_codebrandpriceuser_iduser_sessionsession_product...cat_1cat_2cat_3datets_hourts_minutets_weekdayts_dayts_monthts_year
02020-03-01 04:54:09purchase103014002232732104888681081apparel.scarfbburago19.280001537144080053d5ad3-01c7-4dfb-9079-e121c33b0938053d5ad3-01c7-4dfb-9079-e121c33b0938_10301400...scarfNANA2020-03-014546132020
12020-03-01 04:55:26purchase157002852232732094134485388UNKNOWNUNKNOWN154.1900025146865493c842e53-1e47-4941-83e0-2a27a8fdeaf13c842e53-1e47-4941-83e0-2a27a8fdeaf1_15700285...NANANA2020-03-014556132020
22020-03-01 04:54:46purchase214063312232732082063278200electronics.clockscasio30.369999522564661cfa89b7f-5b34-4d65-a135-bb924d98af9ccfa89b7f-5b34-4d65-a135-bb924d98af9c_21406331...clocksNANA2020-03-014546132020
32020-03-01 07:45:47purchase10046652232732093077520756construction.tools.lightsamsung816.690002596178054f84b2b78-50a0-4e34-ad8d-da60a6178091f84b2b78-50a0-4e34-ad8d-da60a6178091_1004665...toolslightNA2020-03-017456132020
42020-03-02 05:26:04purchase214009962232732082063278200electronics.clockscasio81.159996537131991b19b380b-2db7-4c6e-bb91-998556315d0ab19b380b-2db7-4c6e-bb91-998556315d0a_21400996...clocksNANA2020-03-025260232020
\n", "

5 rows × 22 columns

\n", "
" ], "text/plain": [ " event_time event_type product_id category_id \\\n", "0 2020-03-01 04:54:09 purchase 10301400 2232732104888681081 \n", "1 2020-03-01 04:55:26 purchase 15700285 2232732094134485388 \n", "2 2020-03-01 04:54:46 purchase 21406331 2232732082063278200 \n", "3 2020-03-01 07:45:47 purchase 1004665 2232732093077520756 \n", "4 2020-03-02 05:26:04 purchase 21400996 2232732082063278200 \n", "\n", " category_code brand price user_id \\\n", "0 apparel.scarf bburago 19.280001 537144080 \n", "1 UNKNOWN UNKNOWN 154.190002 514686549 \n", "2 electronics.clocks casio 30.369999 522564661 \n", "3 construction.tools.light samsung 816.690002 596178054 \n", "4 electronics.clocks casio 81.159996 537131991 \n", "\n", " user_session \\\n", "0 053d5ad3-01c7-4dfb-9079-e121c33b0938 \n", "1 3c842e53-1e47-4941-83e0-2a27a8fdeaf1 \n", "2 cfa89b7f-5b34-4d65-a135-bb924d98af9c \n", "3 f84b2b78-50a0-4e34-ad8d-da60a6178091 \n", "4 b19b380b-2db7-4c6e-bb91-998556315d0a \n", "\n", " session_product ... cat_1 cat_2 cat_3 \\\n", "0 053d5ad3-01c7-4dfb-9079-e121c33b0938_10301400 ... scarf NA NA \n", "1 3c842e53-1e47-4941-83e0-2a27a8fdeaf1_15700285 ... NA NA NA \n", "2 cfa89b7f-5b34-4d65-a135-bb924d98af9c_21406331 ... clocks NA NA \n", "3 f84b2b78-50a0-4e34-ad8d-da60a6178091_1004665 ... tools light NA \n", "4 b19b380b-2db7-4c6e-bb91-998556315d0a_21400996 ... clocks NA NA \n", "\n", " date ts_hour ts_minute ts_weekday ts_day ts_month ts_year \n", "0 2020-03-01 4 54 6 1 3 2020 \n", "1 2020-03-01 4 55 6 1 3 2020 \n", "2 2020-03-01 4 54 6 1 3 2020 \n", "3 2020-03-01 7 45 6 1 3 2020 \n", "4 2020-03-02 5 26 0 2 3 2020 \n", "\n", "[5 rows x 22 columns]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# read data as Dask DataFrame\n", "ddf=dask_cudf.read_parquet('clean_parquet')\n", "\n", "# preview DataFrame\n", "ddf.head()" ] }, { "cell_type": "markdown", "id": "c0df8197-a6fe-46f3-8e77-231e7853b16e", "metadata": { "tags": [] }, "source": [ "\n", "## Feature Engineering and Preprocessing with NVTabular ##\n", "The typical steps for developing with NVTabular include: \n", "1. Design and Define Operations in the Pipeline\n", "2. Create Workflow\n", "3. Create Dataset\n", "4. Apply Workflow to Dataset\n", "\n", "

" ] }, { "cell_type": "markdown", "id": "bedb235f-8d08-4a28-956c-a6298379a278", "metadata": {}, "source": [ "\n", "### Defining the Workflow ###\n", "We start by creating the `nvtabular.workflow.workflow.Workflow`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/workflow/workflow.html), which defines the operations and preprocessing steps that we would like to perform on the data. \n", "\n", "We will perform the following feature engineering and preprocessing steps: \n", "* Categorify the categorical features\n", "* Log transform and normalize continuous features\n", "* Calculate group-specific `sum`, `count`, and `mean` of the `target` for categorical features\n", "* Log transform `price`\n", "* Calculate `product_id` specific relative `price` to average `price`\n", "* Target encode all categorical features\n", "\n", "One of the key advantages of using NVTabular is the high-level abstraction we can use, which simplifies code significantly. " ] }, { "cell_type": "code", "execution_count": 3, "id": "78b305b3-76f8-44ce-912d-f3adbdef7a69", "metadata": { "tags": [] }, "outputs": [], "source": [ "# assign features and label\n", "cat_cols=['brand', 'cat_0', 'cat_1', 'cat_2', 'cat_3']\n", "cont_cols=['price', 'ts_hour', 'ts_minute', 'ts_weekday']\n", "label='target'" ] }, { "cell_type": "code", "execution_count": 4, "id": "a527a25e-e6c2-4e27-a2af-ffdeb7992892", "metadata": { "tags": [] }, "outputs": [], "source": [ "# categorify categorical features\n", "cat_features=cat_cols >> Categorify()" ] }, { "cell_type": "markdown", "id": "fd1a2ac8-8b14-4664-9cd6-b34a2c98eea2", "metadata": {}, "source": [ "\n", "### Exercise #1 - Using NVTabular Operators ###\n", "We can use the `>>` operator to specify how columns will be transformed. We need to transform the `price` feature by performing the log transformation and normalization. \n", "\n", "**Instructions**:
\n", "* Review the documentation for the `LogOp()`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/ops/log.html)\n", "This operator calculates the log of continuous columns. Note that to handle the common case of zerofilling null values, this calculates log(1+x) instead of just log(x).\n", "and `Normalize()`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/ops/normalize.html) operators. Standardizing the features around 0 with a standard deviation of 1\n", "* Modify the ``s only and execute the cell below to create a workflow. " ] }, { "cell_type": "code", "execution_count": 6, "id": "a734412d-c1f2-47bb-8016-fdc4e1c4c4b0", "metadata": { "tags": [] }, "outputs": [], "source": [ "# \n", "price = (\n", " ['price']\n", " >> FillMissing(0)\n", " >> LogOp()\n", " >> Normalize()\n", " >> LambdaOp(lambda col: col.astype(\"float32\"), dtype='float32')\n", ") " ] }, { "cell_type": "markdown", "id": "5b76f36b-d7a8-46c8-91c2-c68733dc1d99", "metadata": {}, "source": [ "There are several ways to create a feature for relative `price` to average. We will do so with the below steps: \n", "1. Calculate average `price` per group. \n", "2. Define a function to calculate the percentage difference\n", "3. Apply the user defined function to `price` and average `price`" ] }, { "cell_type": "code", "execution_count": 7, "id": "b0641d59-ad3e-4ffe-a7e3-144b4b98141b", "metadata": { "tags": [] }, "outputs": [], "source": [ "# relative price to the average price for the product_id\n", "# create product_id specific average price feature\n", "avg_price_product = ['product_id'] >> JoinGroupby(cont_cols =['price'], stats=[\"mean\"])\n", "\n", "# create user defined function to calculate percent difference\n", "def relative_price_to_avg(col, gdf):\n", " # introduce tiny number in case of 0\n", " epsilon = 1e-5\n", " col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int)\n", " return col\n", "\n", "# create product_id specific relative price to average\n", "relative_price_to_avg_product = (\n", " avg_price_product \n", " >> LambdaOp(relative_price_to_avg, dependency=['price'], dtype='float64') \n", " >> Rename(name='relative_price_product')\n", ")" ] }, { "cell_type": "code", "execution_count": 8, "id": "89136070-effd-4bd6-a7a8-8ae03cf40e31", "metadata": { "tags": [] }, "outputs": [], "source": [ "avg_price_category = ['category_code'] >> JoinGroupby(cont_cols =['price'], stats=[\"mean\"])\n", "\n", "# create product_id specific relative price to average\n", "relative_price_to_avg_category = (\n", " avg_price_category \n", " >> LambdaOp(relative_price_to_avg, dependency=['price'], dtype='float64') \n", " >> Rename(name='relative_price_category')\n", ")" ] }, { "cell_type": "code", "execution_count": 9, "id": "c9c77298-7884-4d05-9d3c-b2c7c277fa7a", "metadata": { "tags": [] }, "outputs": [], "source": [ "# calculate group-specific statistics for categorical features\n", "ce_features=cat_cols >> JoinGroupby(stats=['sum', 'count'], cont_cols=label)\n", "\n", "# target encode\n", "te_features=cat_cols >> TargetEncoding(label)" ] }, { "cell_type": "markdown", "id": "ba7db9e8-9597-4193-a54b-78752ac9e651", "metadata": {}, "source": [ "We also add the target, i.e. `label`, to the set of returned columns. We can visualize our data processing pipeline with `graphviz` by calling `.graph`. The data processing pipeline is a DAG (direct acyclic graph). " ] }, { "cell_type": "code", "execution_count": 10, "id": "09666729-0cba-438e-aeb6-97bb97c274a1", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "data": { "image/svg+xml": [ "\n", "\n", "\n", "\n", "\n", "\n", "%3\n", "\n", "\n", "\n", "0\n", "\n", "Rename\n", "\n", "\n", "\n", "2\n", "\n", "+\n", "\n", "\n", "\n", "0->2\n", "\n", "\n", "\n", "\n", "\n", "18\n", "\n", "relative_price_to_avg\n", "\n", "\n", "\n", "18->0\n", "\n", "\n", "\n", "\n", "\n", "1\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "19\n", "\n", "JoinGroupby\n", "\n", "\n", "\n", "1->19\n", "\n", "\n", "\n", "\n", "\n", "1_selector\n", "\n", "['category_code']\n", "\n", "\n", "\n", "1_selector->1\n", "\n", "\n", "\n", "\n", "\n", "28\n", "\n", "output cols\n", "\n", "\n", "\n", "2->28\n", "\n", "\n", "\n", "\n", "\n", "13\n", "\n", "Categorify\n", "\n", "\n", "\n", "13->2\n", "\n", "\n", "\n", "\n", "\n", "4\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "4->2\n", "\n", "\n", "\n", "\n", "\n", "20\n", "\n", "JoinGroupby\n", "\n", "\n", "\n", "20->2\n", "\n", "\n", "\n", "\n", "\n", "6\n", "\n", "TargetEncoding\n", "\n", "\n", "\n", "6->2\n", "\n", "\n", "\n", "\n", "\n", "7\n", "\n", "LambdaOp(lambda col: col.astype("float32"), dtype='float32')\n", "\n", "\n", "\n", "7->2\n", "\n", "\n", "\n", "\n", "\n", "3\n", "\n", "Rename\n", "\n", "\n", "\n", "3->2\n", "\n", "\n", "\n", "\n", "\n", "17\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "17->2\n", "\n", "\n", "\n", "\n", "\n", "10\n", "\n", "relative_price_to_avg\n", "\n", "\n", "\n", "10->3\n", "\n", "\n", "\n", "\n", "\n", "4_selector\n", "\n", "['price', 'ts_hour', 'ts_minute', 'ts_weekday']\n", "\n", "\n", "\n", "4_selector->4\n", "\n", "\n", "\n", "\n", "\n", "5\n", "\n", "LogOp\n", "\n", "\n", "\n", "14\n", "\n", "Normalize\n", "\n", "\n", "\n", "5->14\n", "\n", "\n", "\n", "\n", "\n", "24\n", "\n", "FillMissing\n", "\n", "\n", "\n", "24->5\n", "\n", "\n", "\n", "\n", "\n", "15\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "15->6\n", "\n", "\n", "\n", "\n", "\n", "8\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "8->6\n", "\n", "\n", "\n", "\n", "\n", "14->7\n", "\n", "\n", "\n", "\n", "\n", "8_selector\n", "\n", "['target']\n", "\n", "\n", "\n", "8_selector->8\n", "\n", "\n", "\n", "\n", "\n", "9\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "9->13\n", "\n", "\n", "\n", "\n", "\n", "9_selector\n", "\n", "['brand', 'cat_0', 'cat_1', 'cat_2', 'cat_3']\n", "\n", "\n", "\n", "9_selector->9\n", "\n", "\n", "\n", "\n", "\n", "19->10\n", "\n", "\n", "\n", "\n", "\n", "12\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "12->10\n", "\n", "\n", "\n", "\n", "\n", "11\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "11->20\n", "\n", "\n", "\n", "\n", "\n", "11_selector\n", "\n", "['target']\n", "\n", "\n", "\n", "11_selector->11\n", "\n", "\n", "\n", "\n", "\n", "12_selector\n", "\n", "['price']\n", "\n", "\n", "\n", "12_selector->12\n", "\n", "\n", "\n", "\n", "\n", "15_selector\n", "\n", "['brand', 'cat_0', 'cat_1', 'cat_2', 'cat_3']\n", "\n", "\n", "\n", "15_selector->15\n", "\n", "\n", "\n", "\n", "\n", "16\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "21\n", "\n", "JoinGroupby\n", "\n", "\n", "\n", "16->21\n", "\n", "\n", "\n", "\n", "\n", "16_selector\n", "\n", "['product_id']\n", "\n", "\n", "\n", "16_selector->16\n", "\n", "\n", "\n", "\n", "\n", "17_selector\n", "\n", "['target']\n", "\n", "\n", "\n", "17_selector->17\n", "\n", "\n", "\n", "\n", "\n", "21->18\n", "\n", "\n", "\n", "\n", "\n", "27\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "27->18\n", "\n", "\n", "\n", "\n", "\n", "25\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "25->19\n", "\n", "\n", "\n", "\n", "\n", "23\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "23->20\n", "\n", "\n", "\n", "\n", "\n", "22\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "22->21\n", "\n", "\n", "\n", "\n", "\n", "22_selector\n", "\n", "['price']\n", "\n", "\n", "\n", "22_selector->22\n", "\n", "\n", "\n", "\n", "\n", "23_selector\n", "\n", "['brand', 'cat_0', 'cat_1', 'cat_2', 'cat_3']\n", "\n", "\n", "\n", "23_selector->23\n", "\n", "\n", "\n", "\n", "\n", "26\n", "\n", "SelectionOp\n", "\n", "\n", "\n", "26->24\n", "\n", "\n", "\n", "\n", "\n", "25_selector\n", "\n", "['price']\n", "\n", "\n", "\n", "25_selector->25\n", "\n", "\n", "\n", "\n", "\n", "26_selector\n", "\n", "['price']\n", "\n", "\n", "\n", "26_selector->26\n", "\n", "\n", "\n", "\n", "\n", "27_selector\n", "\n", "['price']\n", "\n", "\n", "\n", "27_selector->27\n", "\n", "\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "features=cat_features+cont_cols+ce_features+te_features+price+relative_price_to_avg_product+relative_price_to_avg_category+[label]\n", "features.graph" ] }, { "cell_type": "markdown", "id": "696f1cdd-3a14-462c-9fea-e9f03a9a47d6", "metadata": {}, "source": [ "We are now ready to construct a `Workflow` that will run the operations we defined above. To enable distributed parallelism, the NVTabular `Workflow` must be initialized with a `dask.distributed.Client` object. Since NVTabular already uses Dask-CuDF for internal data processing, there are no other requirements for multi-GPU scaling. " ] }, { "cell_type": "code", "execution_count": 11, "id": "7ae89f60-b420-4871-ad5f-b7f292c56add", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/envs/rapids/lib/python3.9/site-packages/merlin/core/utils.py:361: FutureWarning: The `client` argument is deprecated from DaskExecutor and will be removed in a future version of NVTabular. By default, a global client in the same python context will be detected automatically, and `merlin.utils.set_dask_client` (as well as `Distributed` and `Serial`) can be used for explicit control.\n", " warnings.warn(\n" ] } ], "source": [ "# define our NVTabular Workflow with client to enable multi-GPU execution\n", "# for multi-GPU execution, the only requirement is that we specify a client when \n", "# initializing the NVTabular Workflow.\n", "workflow=nvt.Workflow(features, client=client)" ] }, { "cell_type": "markdown", "id": "88e03d7f-03b1-4968-855c-e8656e9ff062", "metadata": {}, "source": [ "\n", "### Defining the Dataset ###\n", "All external data need to be converted to the universal `nvtabular.io.dataset.Dataset`[[doc]](https://nvidia-merlin.github.io/NVTabular/v0.7.1/api/dataset.html) type. The main purpose of this class is to abstract away the raw format of the data, and to allow other NVTabular classes to reliably materialize a `dask.dataframe.DataFrame` collection and/or collection-based iterator on demand. \n", "\n", "The collection-based iterator is important when working with large datasets that do not fit into GPU memory since operations in the `Workflow` often require statistics calculated across the entire dataset. For example, `Normalize` requires measurements of the dataset mean and standard deviation, and `Categorify` requires an accounting of all the unique categories a particular feature can manifest. The `Dataset` object partitions the dataset into chunks that will fit into GPU memory to compute statistics in an online fashion. \n", "\n", "A `Dataset` can be initialized from a variety of different raw-data formats: \n", "1. With a parquet-dataset directory\n", "2. With a list of files\n", "3. In addition to handling data stored on disk, a `Dataset` can also be initialized from an existing cuDF DataFrame, or from a `dask.dataframe.DataFrame`\n", "\n", "The data we pass to the `Dataset` constructor is usually the result of a query from some source, for example a data warehouse or data lake. The output is usually in Parquet, ORC, or CSV format. In our case, we have the data in parquet format saved on the disk from previous steps. When initializing a `Dataset` from a directory path, the engine should be used to specify either `parquet` or `csv` format. If initializing a `Dataset` from a list of files, the engine can be inferred. \n", "\n", "Memory is an important consideration. The workflow will process data in chunks, therefore increasing the number of partitions will limit the memory footprint. Since we will initialize the `Dataset` with a DataFrame type (`cudf.DataFrame` or `dask.dataframe.DataFrame`), most of the parameters will be ignored and the partitions will be preserved. Otherwise, the data would be converted to a `dask.dataframe.DataFrame` with a maximum partition size of roughly 12.5% of the total memory on a single device by default. We can use the `npartitions` parameter for specifying into how many chunks we would like the data to be split. The partition size can be changed to a different fraction of total memory on a single device with the `part_mem_fraction` argument. Alternatively, a specific byte size can be specified with the `part_size` argument. \n", "\n", "

\n", "\n", "The NVTabular dataset should be created from Parquet files in order to get the best possible performance, preferably with a row group size of around 128MB. While NVTabular also supports reading from CSV files, reading CSV can be over twice as slow as reading from Parquet. It's recommended to convert a CSV dataset into Parquet format for use with NVTabular. " ] }, { "cell_type": "code", "execution_count": 12, "id": "25f69f56-7883-4630-a916-866c21984f77", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The Dataset is split into 4 partitions\n" ] } ], "source": [ "# create dataset\n", "dataset=nvt.Dataset(ddf)\n", "\n", "print(f'The Dataset is split into {dataset.npartitions} partitions')" ] }, { "cell_type": "markdown", "id": "a9d465b6-f695-43d5-b33c-adc4a19fc35b", "metadata": {}, "source": [ "\n", "### Fit, Transform, and Persist ###\n", "NVTabular follows a familiar API for pipeline operations. We can `.fit()` the workflow to a training set to calculate the statistics for this workflow. Afterwards, we can use it to `.transform()` the training set and validation dataset. We will persist the transformed data to disk in parquet format for fast reading and train time. Importantly, we can use the `.save()`[[doc]](https://nvidia-merlin.github.io/NVTabular/main/api/workflow/workflow.html#nvtabular.workflow.workflow.Workflow.save) method so that our `Workflow` can be used during model inference. \n", "\n", "

\n", "\n", "Since the `Dataset` API can both ingest and output a Dask collection, it is straightforward to transform data either before or after an NVTabular workflow is executed. This means that some complex pre-processing operations, that are not yet supported in NVTabular, can still be accomplished with the `dask_cudf.DataFrame` API after the `Dataset` is converted with `.to_ddf`. " ] }, { "cell_type": "code", "execution_count": 13, "id": "2bff2a60-5892-4ee8-9277-3d5be0caff91", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/envs/rapids/lib/python3.9/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'\n", " warn(f\"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}\")\n", "/opt/conda/envs/rapids/lib/python3.9/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'\n", " warn(f\"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}\")\n", "/opt/conda/envs/rapids/lib/python3.9/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'\n", " warn(f\"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}\")\n", "/opt/conda/envs/rapids/lib/python3.9/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'\n", " warn(f\"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}\")\n" ] } ], "source": [ "# fit and transform dataset\n", "workflow.fit(dataset)\n", "output_dataset=workflow.transform(dataset)" ] }, { "cell_type": "code", "execution_count": 14, "id": "3eb48ab0-16a7-4b83-91d6-719af97acdca", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 32\n", "drwxr-xr-x 14 root root 4096 Feb 11 13:29 categories\n", "-rw-r--r-- 1 root root 187 Feb 11 13:29 metadata.json\n", "-rw-r--r-- 1 root root 21110 Feb 11 13:29 workflow.pkl\n" ] } ], "source": [ "# save the workflow\n", "workflow.save('nvt_workflow')\n", "\n", "!ls -l nvt_workflow" ] }, { "cell_type": "code", "execution_count": 15, "id": "c7ea31ca-4b58-409b-b8cd-f357dc2a1abc", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "rm: cannot remove 'processed_parquet/*': No such file or directory\n" ] } ], "source": [ "# remove existing parquet directory\n", "!rm -R processed_parquet/*\n", "\n", "# save output to parquet directory\n", "output_path='processed_parquet'\n", "output_dataset.to_parquet(output_path=output_path)" ] }, { "cell_type": "markdown", "id": "1e8ae451-3ba7-4a27-b8fa-d5d79d4768ab", "metadata": {}, "source": [ "If needed, we can convert the `Dataset` object to `dask.dataframe.DataFrame` to inspect the results. " ] }, { "cell_type": "code", "execution_count": 16, "id": "658f563b-76e0-4927-80e7-87a52d5f8d63", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
brandcat_0cat_1cat_2cat_3ts_hourts_minutets_weekdaybrand_target_sumbrand_count...cat_3_countTE_brand_targetTE_cat_0_targetTE_cat_1_targetTE_cat_2_targetTE_cat_3_targetpricerelative_price_productrelative_price_categorytarget
09526194345461271...24604050.2203510.3496300.2629900.3401220.410012-1.5839840.000000-0.6788531
155443455681333234973...24604050.3462850.2972940.2972940.3393050.4101210.031592-0.0341460.2619511
23471043454632598325...24604050.3873530.4020700.4297620.3407030.410679-1.2376760.000000-0.8835151
3333337456226354457906...24604050.4943550.4831190.4825310.4886410.4106791.350904-0.0173990.7727171
43471043526032598325...24604050.3969420.4013960.4279100.3393050.410121-0.473307-0.006232-0.6887081
\n", "

5 rows × 27 columns

\n", "
" ], "text/plain": [ " brand cat_0 cat_1 cat_2 cat_3 ts_hour ts_minute ts_weekday \\\n", "0 952 6 19 4 3 4 54 6 \n", "1 5 5 4 4 3 4 55 6 \n", "2 34 7 10 4 3 4 54 6 \n", "3 3 3 3 3 3 7 45 6 \n", "4 34 7 10 4 3 5 26 0 \n", "\n", " brand_target_sum brand_count ... cat_3_count TE_brand_target \\\n", "0 12 71 ... 2460405 0.220351 \n", "1 81333 234973 ... 2460405 0.346285 \n", "2 3259 8325 ... 2460405 0.387353 \n", "3 226354 457906 ... 2460405 0.494355 \n", "4 3259 8325 ... 2460405 0.396942 \n", "\n", " TE_cat_0_target TE_cat_1_target TE_cat_2_target TE_cat_3_target \\\n", "0 0.349630 0.262990 0.340122 0.410012 \n", "1 0.297294 0.297294 0.339305 0.410121 \n", "2 0.402070 0.429762 0.340703 0.410679 \n", "3 0.483119 0.482531 0.488641 0.410679 \n", "4 0.401396 0.427910 0.339305 0.410121 \n", "\n", " price relative_price_product relative_price_category target \n", "0 -1.583984 0.000000 -0.678853 1 \n", "1 0.031592 -0.034146 0.261951 1 \n", "2 -1.237676 0.000000 -0.883515 1 \n", "3 1.350904 -0.017399 0.772717 1 \n", "4 -0.473307 -0.006232 -0.688708 1 \n", "\n", "[5 rows x 27 columns]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# convert to DataFrame and preview\n", "output_dataset.to_ddf().head()" ] }, { "cell_type": "markdown", "id": "d8f1253e-19d0-4a36-b065-42324f5f3e2a", "metadata": {}, "source": [ "\n", "### Exercise #2 - Load Saved Workflow ###\n", "We can load a saved workflow, which will contain the graph, schema, and statistics. This is useful if the workflow should be applied to future datasets. \n", "\n", "**Instructions**:
\n", "* Review the [documentation](https://nvidia-merlin.github.io/NVTabular/main/api/workflow/workflow.html#nvtabular.workflow.workflow.Workflow.load) for the `.load()` _class_ method. \n", "* Modify the `` only and execute the cell below to create a workflow. \n", "* Execute the cell below to apply the graph of operators to transform the data. " ] }, { "cell_type": "code", "execution_count": 17, "id": "2748e6e8-a866-4c96-a534-decf2fe269c7", "metadata": { "tags": [] }, "outputs": [], "source": [ "# load workflow\n", "loaded_workflow=nvt.Workflow.load('nvt_workflow')" ] }, { "cell_type": "markdown", "id": "de1f6efe-e6cf-4576-88e6-6ae95dbd49b3", "metadata": {}, "source": [ "Click ... to show **solution**." ] }, { "cell_type": "code", "execution_count": 18, "id": "8ee80ee7-7162-4352-90d4-51bd2c6f1531", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
brandcat_0cat_1cat_2cat_3ts_hourts_minutets_weekdaybrand_target_sumbrand_count...cat_3_countTE_brand_targetTE_cat_0_targetTE_cat_1_targetTE_cat_2_targetTE_cat_3_targetpricerelative_price_productrelative_price_categorytarget
09526194345461271...24604050.2203510.3496300.2629900.3401220.410012-1.5839840.000000-0.6788531
155443455681333234973...24604050.3462850.2972940.2972940.3393050.4101210.031592-0.0341460.2619511
23471043454632598325...24604050.3873530.4020700.4297620.3407030.410679-1.2376760.000000-0.8835151
3333337456226354457906...24604050.4943550.4831190.4825310.4886410.4106791.350904-0.0173990.7727171
43471043526032598325...24604050.3969420.4013960.4279100.3393050.410121-0.473307-0.006232-0.6887081
\n", "

5 rows × 27 columns

\n", "
" ], "text/plain": [ " brand cat_0 cat_1 cat_2 cat_3 ts_hour ts_minute ts_weekday \\\n", "0 952 6 19 4 3 4 54 6 \n", "1 5 5 4 4 3 4 55 6 \n", "2 34 7 10 4 3 4 54 6 \n", "3 3 3 3 3 3 7 45 6 \n", "4 34 7 10 4 3 5 26 0 \n", "\n", " brand_target_sum brand_count ... cat_3_count TE_brand_target \\\n", "0 12 71 ... 2460405 0.220351 \n", "1 81333 234973 ... 2460405 0.346285 \n", "2 3259 8325 ... 2460405 0.387353 \n", "3 226354 457906 ... 2460405 0.494355 \n", "4 3259 8325 ... 2460405 0.396942 \n", "\n", " TE_cat_0_target TE_cat_1_target TE_cat_2_target TE_cat_3_target \\\n", "0 0.349630 0.262990 0.340122 0.410012 \n", "1 0.297294 0.297294 0.339305 0.410121 \n", "2 0.402070 0.429762 0.340703 0.410679 \n", "3 0.483119 0.482531 0.488641 0.410679 \n", "4 0.401396 0.427910 0.339305 0.410121 \n", "\n", " price relative_price_product relative_price_category target \n", "0 -1.583984 0.000000 -0.678853 1 \n", "1 0.031592 -0.034146 0.261951 1 \n", "2 -1.237676 0.000000 -0.883515 1 \n", "3 1.350904 -0.017399 0.772717 1 \n", "4 -0.473307 -0.006232 -0.688708 1 \n", "\n", "[5 rows x 27 columns]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# create dataset from parquet directory\n", "dataset=nvt.Dataset('clean_parquet', engine='parquet')\n", "\n", "# transform dataset\n", "loaded_workflow.transform(dataset).to_ddf().head()" ] }, { "cell_type": "code", "execution_count": 19, "id": "48f8ca09-920d-48e9-aa78-7fab88364953", "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "data": { "text/plain": [ "{'status': 'ok', 'restart': False}" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# clean GPU memory\n", "import IPython\n", "app = IPython.Application.instance()\n", "app.kernel.do_shutdown(restart=False)" ] }, { "cell_type": "markdown", "id": "5d921fa5-0f64-4e9c-bdc5-2c13432f1cc5", "metadata": {}, "source": [ " \"Header\" " ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.16" } }, "nbformat": 4, "nbformat_minor": 5 }