Snowflake & Saturn Cloud Launch 100x Faster Machine Learning Platform
Snowflake (NYSE:SNOW), the cloud data platform, is partnering, integrating products, and pursuing joint go-to-market with Saturn Cloud to help data science teams get 100x faster results.
Now, SQL users can go one step further to easily get advanced analytics and machine learning capabilities through Snowflake. Snowflake users will be able to build models with Dask, a Python-native parallel computing framework, and RAPIDS, a GPU data science framework that parallelizes across clusters with Dask.
The joint platform is behind groundbreaking speedups for data scientists, outperforming serial Python, and Apache Spark by 100x faster. Senseye, an AI company that uses the tooling, recently revealed a model improvement that drove runtime down from 60 days to 11 hours. The 130x speedup enables them to make breakthroughs on their computer vision accuracy, with iterations possible at a daily pace.
An Example with NYC Taxi Data
Let’s illustrate this with an example that is similar to general machine learning challenges that face every data scientist. In this example, we want to understand how to improve ride-sharing in New York City. As you would expect, there are millions of rows of data to analyze, which takes days to begin to process. We are going to use Snowflake and Saturn Cloud to get to performance fit for a business where time is money. We will use 3 years of NYC Taxi data stored in Snowflake. We do the feature engineering in Snowflake, and then load the data into a distributed GPU dataframe with dask_cudf, and then train a random forest model on GPUs using cuML. You can try this for yourself using the Snowflake quick-start project in Saturn Cloud.
Connecting to Dask and Snowflake
Let’s look at how it works. We’re using a 20 node Dask cluster using g4dn.xlarge instances on AWS. A Saturn Cloud cluster is provisioned with the following code:
Note: If your Dask clusters are hosted with a different platform, you will need to adapt this code chunk.
from dask.distributed import Client
from dask_saturn import SaturnCluster
n_workers = 20
cluster = SaturnCluster(n_workers=n_workers,
scheduler_size='g4dnxlarge',
worker_size='g4dnxlarge')
client = Client(cluster)
Next, we’ll connect to Snowflake. In our notebook, we’re pulling in the relevant connection information and environment variables. You will need to replace these values with your own Snowflake connection information.
SNOWFLAKE_ACCOUNT = os.environ['SNOWFLAKE_ACCOUNT']
SNOWFLAKE_USER = os.environ['SNOWFLAKE_USER']
SNOWFLAKE_PASSWORD = os.environ['SNOWFLAKE_PASSWORD']
SNOWFLAKE_WAREHOUSE = os.environ['SNOWFLAKE_WAREHOUSE']
TAXI_DATABASE = os.environ['TAXI_DATABASE']
TAXI_SCHEMA = os.environ['TAXI_SCHEMA']
conn_info = {
'account': SNOWFLAKE_ACCOUNT,
'user': SNOWFLAKE_USER,
'password': SNOWFLAKE_PASSWORD,
'warehouse': SNOWFLAKE_WAREHOUSE,
'database': TAXI_DATABASE,
'schema': TAXI_SCHEMA,
}
Loading data and feature engineering
We’re going to use Snowflake to compute some predictive features, and then have each of our 20 Dask workers load a piece of the resulting training set. The NYC taxi data has the pickup time of each ride, and the data is fairly evenly distributed across days, weeks, and months. Our approach here is to divide up the entire 3 year time period into 20 chunks based on the day of the ride, and load each chunk into a Dask worker. First, we write a function to calculate our date chunks.
def get_dates(start, end):
date_query = """
SELECT
DISTINCT(DATE(pickup_datetime)) as date
FROM taxi_yellow
WHERE
pickup_datetime BETWEEN %s and %s
ORDER BY date
"""
dates_df = conn.cursor().execute(date_query, (start, end))
columns = [x[0] for x in dates_df.description]
dates_df = pd.DataFrame(dates_df.fetchall(), columns=columns)
return dates_df['DATE'].tolist()
dates = sorted(get_dates('2017-01-01', '2019-12-31'))
chunks = (len(dates) // n_workers) + 1
date_groups = dates[::chunks]
date_starts = date_groups
date_ends = date_groups[1:] + [dates[-1] + datetime.timedelta(days=1)]
After calculating the date chunks, we set up a Snowflake query that performs feature engineering and put in bindings that we will use later for grabbing the appropriate date chunks.
First I want to point out that we’re just scratching the surface of Snowflake’s analytical capabilities. That’s probably worth an entirely separate blog post, but we’re using a bunch of mathematical and date based routines to highlight yearly, weekly, and daily seasonality (WEEKOFYEAR, DAYOFWEEKISO, and HOUR).
Now comes the fun part! Let’s load the data into a distributed GPU dataframe with Dask and RAPIDS.
@delayed
def load(conn_info, query, date_start, date_end, meta=None):
with snowflake.connector.connect(**conn_info) as conn:
taxi = conn.cursor().execute(query, (date_start, date_end)).fetch_pandas_all()
taxi.columns = [x.lower() for x in taxi.columns]
if meta:
taxi = taxi.astype(meta)
taxi = cudf.from_pandas(taxi)
return taxi
meta = {
'pickup_taxizone_id': dtype('float32'),
'dropoff_taxizone_id': dtype('float32'),
'passenger_count': dtype('float32'),
'high_tip': dtype('int32'),
'pickup_weekday': dtype('float32'),
'pickup_weekofyear': dtype('float32'),
'pickup_hour': dtype('float32'),
'pickup_week_hour': dtype('float32'),
'pickup_minute': dtype('float32')
}
taxi_delayed = [
load(conn_info, query, date_start, date_end, meta=meta)
for date_start, date_end in zip(date_starts, date_ends)
]
taxi = cudd.from_delayed(
taxi_delayed,
meta
)
There’s a lot to unpack here, but we’ll walk through each step. Our load
function is a Dask delayed function. This means that when you call it,
nothing happens, but an object is returned that keeps track of the
operations that need to happen and can be executed at the appropriate
time. In the load function, we’re calling the Snowflake query defined
above to pass through the date chunks that need to be pulled out of
Snowflake. The results are pulled into Python using
fetch_pandas_all()
. This is an optimized routine that loads data from Snowflake directly using Arrow, which can be efficiently loaded into pandas. Finally, we turn it
into a GPU dataframe.
The load function defines what needs to happen for one chunk of the
larger dataframe. We’re creating a bunch of these calls which return a
chunk of the training data set, and then passing them to
dask_cudf.from_delayed
, which creates a Dask
GPU DataFrame out of many small GPU dataframes.
You may also notice the meta parameter that we’re passing around. This isn’t strictly necessary, however many pandas routines infer datatypes based on the data. A chunk that has an integer field with no missing data may return that as an integer type, but another chunk that is missing some data points for that same field may cast it as a float. As a result, it’s useful to pass the metadata through, and coerce types using metadata to ensure consistency across every chunk.
Snowflake Caching and Configuration
Snowflake has two primary caching mechanisms that are great for ML workloads. The first is the local disk cache. The local disk cache stores the underlying data on the local disk of the machines that make up your Snowflake warehouse. The result cache stores the results of previous queries so they can be accessed quickly.
For ML workloads, both caches are useful. The local disk cache can speed up every query, especially as data sizes grow. The result cache is useful if you’re re-executing the same query, such as iterating on the ML model without changing your feature engineering, this can be very advantageous.
Snowflake tables can also be configured to use data clustering. In data clustering, one or more columns or expressions on the table can be designated as the clustering key. Data clustering co-locates similar rows and helps Snowflake understand what data to skip for a particular query.
In this article, we are disabling the result cache to show accurate benchmarks (the local disk cache cannot be disabled) and we are clustering on PICKUP_DATETIME since that’s how we’ve chosen to partition the result set.
Machine Learning
Now that we have our training dataset loaded, we can train our model.
from cuml.dask.ensemble import RandomForestClassifier
taxi_train = taxi[features + [y_col]]
taxi_train[features] = taxi_train[features].astype("float32").fillna(-1)
taxi_train[y_col] = taxi_train[y_col].astype("int32").fillna(-1)
taxi_train = taxi_train.persist()
rfc = RandomForestClassifier(n_estimators=100, max_depth=10, seed=42)
_ = rfc.fit(taxi_train[features], taxi_train[y_col])
Here, taxi_train is a dask_cudf dataframe and we’re importing
RandomForestClassifier
from the RAPIDS cuML
package. As a result, the training happens entirely on the GPU. Note the
call to .persist()
— this tells Dask to
perform the Snowflake queries and data loading across the cluster and
checkpoint the results in GPU memory. If we don’t do this, the Snowflake
queries would be executed each time the random forest model needed to
pull some data.
Summary and performance
We previously did the same exercise with Apache Spark. Loading data, feature engineering, and training a RandomForestClassifier on Spark took 38.5 Minutes. Loading data and feature engineering in Snowflake, and training the RandomForestClassifier in Saturn Cloud with Dask and Rapids takes 35 seconds. That’s a 60x speedup with minimal work, and has the added benefit of being able to use Python end to end.
In terms of computational cost, the Snowflake and Saturn Cloud solutions are quite cost-effective because the runtime is so short. Snowflake has a minimum charge of 1 minute. The x-large warehouse we’re running costs us $0.53, and the 20 g4dn.4xlarge machines used in machine learning cost 12 cents for that time period. The equivalent Spark cluster costs $7.19 for the 38 minutes it takes to complete the workload
Get Started Today
If you work in data science and machine learning, you can try Saturn Cloud & Snowflake for free and start building models in minutes. You can also join our upcoming events about 100x faster data science.