Deploying Data Pipelines at Saturn Cloud with Dask and Prefect

How you can automate your complex tasks using Saturn Cloud, Dask, and Prefect

Let’s talk about how we deploy data pipelines on Saturn Cloud internally at Saturn Cloud. This article will discuss how we do that and some lessons learned. Is also assumes that you’re already a fan of Prefect and Dask.

Use Dask! But only when you need it

Scaling up should be progressive. The more you scale, the more inherent complexity you deal with. I believe most jobs should be written with Pandas first, then Dask on a local cluster, and finally Dask on a multi-node cluster (if you really need it). The way we do this is by passing in parameters that determine which type of Dask cluster we’re using.

@resource_manager
class LocalDaskResource:
    def setup(self):
        self.cluster = LocalCluster(n_workers=1, threads_per_worker=15)
        Client(self.cluster)

    def cleanup(self, resource):
        self.cluster.close()


@resource_manager
class SaturnDaskResource:
    def __init__(self, teardown=True):
        self.teardown = teardown

    def setup(self):
        self.cluster = SaturnCluster(n_workers=1, threads_per_worker=15)
        Client(self.cluster)

    def cleanup(self, resource):
        if self.teardown:
            self.cluster.close()


def make_flow(mode, storage=None):
    if mode == "SaturnCluster":
        def resource():
            return SaturnDaskResource(teardown=False)
    else:
        def resource():
            return LocalDaskResource()

    with Flow("...", storage=storage) as flow:
        with resource():
            ...

    return flow

We wrote this flow was to backfill (and keep current) a job that loaded usage data from S3 and passed it into Snowflake. When developing the flow we used a multi-node Saturn Dask Cluster, but in our deployment, since we’re only processing the most recent 24 hours worth of data. it’s much easier to use a LocalCluster.

Don’t force yourself to use Jupyter

The data science world has standardized on Jupyter as the standard IDE of choice for data scientists. This isn’t so bad now that Jupyter Lab has a decent text editor you can use to work on Python scripts and libraries, in addition to working in notebooks.

I love emacs. When writing these flows I worked locally on my laptop, but I connected to Saturn Clusters to offload the expensive computations. Since then, I’ve switched to SSHing into my Jupyter instance so that I can run emacs (this is also how our VS code and PyCharm integrations work). I found that it’s helpful to make my development environment exactly match my production environment. Additionally, having a development machine that’s more powerful than my laptop has been really nice.

There are many data science platforms out there that focus on Notebooks. Notebooks have their place, but they will never completely replace writing code.

Prefect has multiple deployment patterns–you don’t need to limit yourself

We’ve been building out our Prefect Cloud integration for some time. Our integration provides a Storage object for your flows, registers them with Prefect Cloud, and also shows you all the Saturn Cloud logs for your flows. Your flows will be deployed in a Kubernetes pod with the Prefect Cloud Kubernetes Agent and can also use a Saturn Dask cluster. We do a lot of work to make sure that your Prefect Flow runs in a pod that matches up precisely with the Jupyter environment you used to create it, without you needing to configure any of your own infrastructure.

Sounds great right?

I’m planning on passing on our integration for some of our newer flows.

confusion

For a variety of reasons, we have a few flows that should be run every minute. And for that a Kubernetes Agent doesn’t make sense. Spinning pods up and down isn’t useful when you need to run every minute.

Instead, I’m leveraging Saturn deployments. In Saturn we have a capability of executing long running, always on tasks (these are often used to serve ML models, or data science dashboards. I’m running a Saturn deployment, that instead of running a webserver, is running a Prefect Local Agent. And I’m labeling the agent, and my flows to run on that.

For my hourly and daily flows, the k8s agent still makes sense. But for something that runs every minute, this makes more sense. Don’t restrict yourself to thinking there is only one way to do things. There are multiple types of agents, you might need them.

Working with Research and Production

In the past I’ve struggled with the logistics of working with Prefect cloud flows. Do I just write them in a notebook? If I write them in the notebook, is that what I use to make production deployments? If I move my flows to Python code, how do I explore my flows interactively?

I’ve settled on using click to solve this problem (really any command line interface will do).

@click.group()
def cli():
    pass


@cli.command()
def register():
    flow = make_flow(...)
    flow.storage.build()
    flow.register(...)


@cli.command()
@click.option("--mode", default=False)
def run(mode=None):
    flow = make_flow(mode)
    flow.run()
  • I have a cli that I can use to register the flow. I call this from my development machine, but I can easily trigger this from any CI system.
  • There is a separate cli I can use to run the flow. This allows me to pass in parameters, so I can run it with a LocalCluster (simulate production), or a Saturn Cluster (if I want to run it on the full dataset)
  • Since flow creation has been encapsulated in a function, if I want to explore the flow interactively, I can import that function into a notebook, or just import and run the individual tasks.

Next steps

With that, you have an overview on how we deploy data pipelines on Saturn Cloud and some things we’ve learned. For more on deployments, you can check out some helpful pages here:

Create and Use Deployments and Jobs

Create and Deploy Dashboards

To try this out right away and use Saturn Cloud for free, you can get started here.