Create Rolling Averages with Dask
Having rolling average calculations with data that is distributed over a Dask cluster
“I need to calculate a rolling average of a numerical column, in time series data. In pandas, I can do this with rolling(x).mean() with sorted values, but what do I do in Dask, with distributed data?”
- Sort by index within AND across partitions
- Know when to compute (convert to pandas DF) or persist (process computations on cluster)
- Run calculations, with attention to our need to cross partitions correctly.
This example will walk you through these specific points, and demonstrate how it’s done. We’ll use New York City taxi trip data, and get the 30-day rolling average of base fare prices, for our example.
Single Node
import dask
timeseries = dask.datasets.timeseries()
timeseries.rolling('1D').mean().compute()
Cluster
from dask_saturn import SaturnCluster
from dask.distributed import Client
cluster = SaturnCluster(
scheduler_size='medium',
worker_size='xlarge',
n_workers=3,
nthreads=4,
)
client = Client(cluster)
client
import s3fs
s3 = s3fs.S3FileSystem(anon=True)
files_2019 = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv'
s3.glob(files_2019)
import dask.dataframe as dd
taxi = dd.read_csv(
files_2019,
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
storage_options={'anon': True},
assume_missing=True,
)
taxi = taxi.set_index("tpep_pickup_datetime")
from dask.distributed import wait
taxi = taxi["2019-01-01": "2020-01-01"]
taxi = taxi.persist()
_ = wait(taxi)
rolling_fares = taxi.fare_amount.rolling('30D').mean()
rolling_fares_df = rolling_fares.to_frame(name="fare_amount_rolled")
type(rolling_fares_df)
taxi_new = taxi.join(rolling_fares_df, how='outer')
Need help, or have more questions? Contact us at:
- support@saturncloud.io
- On Intercom, using the icon at the bottom right corner of the screen
We'll be happy to help you and answer your questions!