Use RAPIDS on a GPU cluster
We perform the same machine learning exercise as the previous notebook, except on a cluster of multiple GPUs with Dask. This exercise uses the following RAPIDS packages:
Connect to Dask Cluster
This project has a Dask cluster defined for it, which you can start or connect to in the below cell. For more information about Dask clusters in Saturn Cloud, check out the docs.
from dask.distributed import Client, wait
from dask_saturn import SaturnCluster
n_workers = 3
cluster = SaturnCluster(n_workers=n_workers)
client = Client(cluster)
client.wait_for_workers(n_workers)
Load data
The code below loads the data into a dask-cudf
data frame. This is similar to a pandas
or cudf
dataframe, but it is distributed across GPUs in the cluster.
import dask_cudf
taxi = dask_cudf.read_csv(
"s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv",
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
storage_options={"anon": True},
assume_missing=True,
)
Many dataframe operations that you would execute on a pandas
dataframe also work for a dask-cudf
dataframe:
len(taxi)
taxi.head()
When we say that a dask-cudf
dataframe is a distributed data frame, that means that it comprises multiple smaller cudf
data frames. Run the following to see how many of these pieces (called “partitions”) there are.
taxi
Train model
Now that the data have been prepped, it’s time to build a model!
For this task, we’ll use the RandomForestClassifier
from cuml.dask
(notice the .dask
!). If you’ve never used a random forest or need a refresher, consult “Forests of randomized trees” in the scikit-learn
documentation. We cast to 32-bit types for compatibility with older versions of cuml
.Cast to 32-bit types for compatibility with older versions of cuml
X = taxi[["PULocationID", "DOLocationID", "passenger_count"]].astype("float32").fillna(-1)
y = (taxi["tip_amount"] > 1).astype("int32")
Dask performs computations in a lazy manner, so we persist the dataframe to perform data loading and feature processing and load into GPU memory.
X, y = client.persist([X, y])
_ = wait([X, y])
from cuml.dask.ensemble import RandomForestClassifier
rfc = RandomForestClassifier(n_estimators=100, ignore_empty_partitions=True)
_ = rfc.fit(X, y)
Calculate metrics
We’ll use another month of taxi data for the test set and calculate the AUC score
taxi_test = dask_cudf.read_csv(
"s3://nyc-tlc/trip data/yellow_tripdata_2019-02.csv",
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
storage_options={"anon": True},
assume_missing=True,
)
X_test = taxi_test[["PULocationID", "DOLocationID", "passenger_count"]].astype("float32").fillna(-1)
y_test = (taxi_test["tip_amount"] > 1).astype("int32")
from cuml.metrics import roc_auc_score
preds = rfc.predict_proba(X_test)[1]
roc_auc_score(y_test.compute(), preds.compute())
Need help, or have more questions? Contact us at:
- support@saturncloud.io
- On Intercom, using the icon at the bottom right corner of the screen
We'll be happy to help you and answer your questions!