The proliferation of data is real and Zocdoc isn’t immune. In an effort to better understand our users, both patients and doctors, product teams around the technology organization have been creating robust event streams and persisting them to the data lake. The contents of each record will vary by activity and use case, but can range from searches initiated and patients booking appointments to sponsored results decision engine operations and insurance cards being uploaded.
Creating these real time streams is an important part of living up to Zocdoc’s goal of being a data driven organization, using metrics and data mining to improve the product. With multiple producers generating granular data every second, however, we needed a way to clean up and standardize some things where we can. We need an ETL process that leaves the data exposed to the analysts to be a little less data swamp and a little more data lake.
ETL rules to live by
There are a few guidelines that we try follow when building our ETL pipelines. They should be …
Failures are inevitable, but when it does happen how easy will it be to restart your pipeline? Or perhaps your transformation code has a bug in it. Is that bug permanent or will you be able to rerun the process? This is related to the concept of idempotence, crucial in ETL. In short, it means that you should be able to safely rerun the pipeline as many times as needed.
If your data grows 100x will your process grow with it? Nothing is worse than doing work twice; aim for a horizontally scalable system and do it once.
There’s never a bad time to mention that reusable code is a good thing. In the ETL context, consider what your actual goal is; you want to apply some business logic to your data. Note that in this three-part process, your focus is just on T, the transform. Getting data from a source (E) and saving it elsewhere (L) are two steps that are chock full of boilerplate to eliminate. Despite it being a data lake, you’ll still want to stay DRY. Sorry..
For the data science team here at Zocdoc, ETL generally falls into two broad buckets. First, there is the stream or mini-batch processing that is most closely associated with near real time analytics. The other category is large, infrequent batch processing that is often more exploratory and complex and usually done with tools like Spark. Here we’ll focus on the former. Metrics pipelines that are well structured and quick to deploy are particularly important for us because they free up a lot more time for the data mining tasks that our team enjoys so much.
The system that we built to turn our potential data swamp into a beautiful data lake is called aqueduct. We leverage queues to track new data notifications, the serverless power of lambda to quickly deploy transformation code, and S3 for scalable storage. The following diagram shows how aqueduct works.
When new data is persisted by any of the event producers a notification is pushed using SNS. The aqueduct stack then takes over with a SQS message queue that is subscribed to the data object creation notifications. This means that whenever a data object is added to a source, we’re keeping track of it in a large distributed list. Then, on a fixed scheduled, a lambda function is triggered that gathers some or all of the messages currently in the queue for processing. We’ve made the frequency of the trigger as well as the number of messages retrieved configurable since it is something that should vary with the volume of data being produced. Once a set of messages has been collected, the lambda function will batch them and distribute the work to a set of worker lambdas that do the actual data transformations. When the work is finished the transformed data will be persisted to its final destination and the original message deleted from the queue. The batch size is also configurable.
Many sample deployments using SNS and lambda have taken advantage of the direct invocation support in which each notification directly triggers the function. The simplicity in the architecture is desirable, but there are tradeoffs. First, at the time of writing, if the S3 bucket event directly triggers a lambda, then that is the only event support you can have. You are limited to only one such configuration on a bucket. If another team wants to build an event driven system based upon a S3 event from that bucket, they’re out of luck. Having SNS topics associated with S3 events, however, allows for multiple subscribers.
The other benefit of putting SQS between the data creation and the processing code is driven by risk aversion. Along with direct invocation by S3 of a lambda on object creation, it is also possible to have S3 send out a SNS notification which then triggers the lambda function. With both of these configurations you hand over control of your resources to the data producers which can backfire … badly.
Consider a situation that is rare, but, as we were recently reminded, is most certainly possible: S3 is down. The many data streams delivering messages via systems such as Kinesis Streams and Kinesis Firehose will hold those messages until S3 is back up. At that time all of the data will essentially be dropped in at once and for each object there will be a lambda call. Depending on how much data you usually process, you’ll quickly become familiar with the lambda concurrent account limits.
While S3 being down twice might seem a bit far fetched, the idea that we’ll write a bug every now and again certainly isn’t. When a lambda fails, the retry rules will vary based upon the type of trigger. In the case of S3 directly triggering the lambda, it will essentially retry indefinitely causing a failing lambda propagation. This is a very fast way to find out what the lambda account limits are. Even if the person on pager duty is super fast to respond, there’s no easy way to stop the failing lambda of this type. The best bet, or at least the somewhat desperate one that may have been done once at Zocdoc, is to completely delete the lambda.
In the case of SNS as the caller it is a bit cleaner; lambdas should retry twice when they fail. That doesn’t mean that there still isn’t a catch. Once the retries have been exhausted the event will be discarded unless you’ve added a Dead Letter Queue or some equivalent to your stack. You’ll then need to deploy a means to reprocess these messages when your bug fix goes. There’s no free lunch.
So, having SQS subscribe to a topic and writing an extra lambda to retrieve and batch messages is some additional work. The benefit, on failure it’s easy. There’s no need to set retry rules if you don’t want to. The message can happily stay in queue until you fix it. There are even easily configured Cloudwatch SQS notifications for queue depth as well as lambda failures. The event isn’t lost and there’s no real rush to fix anything since the events will be there for up to seven days. When S3 went down and was subsequently repaired, there was nothing to do in aqueduct.
Some SQS tips
The standard SQS deployment supports neither FIFO nor exactly once processing out of the box. The former isn’t much of an issue for our use case in aqueduct, though it is now available should our needs change. The latter, however, is very important. There’s a high probability that aqueduct will occasionally process the same message more than once. If, however, your code adheres to the ETL guidelines the process should be idempotent making this a moot point. Any message processed a second time will leave the destination the exact same.
Knowing that the process is idempotent still leaves a bit to be desired. It may seem wasteful, even with cheap dynamically allocated resources like lambda, to redo the same work multiple times. The chances that this redundancy occurs can be reduced with a bit of planning and some time spent in the AWS docs. When you deploy your queue, pay attention to the Visibility Timeout parameter. After a message is taken from the queue that parameter dictates how long before it becomes available again. It won’t be deleted, but it also won’t be returned by the queue to another caller during that window. So, if your worker lambda can process the batch and delete the messages before the visibility timeout is up, then those items shouldn’t be processed twice.
Aqueduct practices what we preach
Earlier we mentioned three high level guidelines for ETL processes. How does Aqueduct stack up?
Is it reproducible?
If using SQS, there’s really no other choice. It’s not exactly once processing so your code will need to be idempotent. If you want to redo some work, just put a message back in the queue and the process will work exactly the same.
Is it scalable?
It’s a distributed queue and lambdas, so of course! If you have 100x more data, then simply increase the frequency of message retrieval and the number of messages you take at any interval. If that’s still not enough, increase batch sizes and visibility timeouts. You can get a lot of mileage out of those parameters.
Is it reusable?
Thanks to infrastructure as code frameworks like cloudformation and the generic APIs of S3, SQS, and lambda, absolutely. We have a generic template for the whole stack that is parameterized merely by the location of the data source and the location of the data destination. There are only two lambda functions and they are never edited. The aqueduct worker lambda passes ingested data through a generic transformer interface that lives in a separate library we call Purify. As promised, we don’t rewrite E or L, just extend T.
While the volume, granularity, and potential messiness of data may grow over time, the solutions to deal with it need not be any more sophisticated. In fact, ETL pipelines are only getting easier to design and deploy given the suite of products now offered. Using off the shelf AWS products fits with our broader move to the cloud and pays dividends in the form of scalability, ease of deployment, and low maintenance. These are, of course, some of the main ingredients of happy a development environment.
About the author
Rob is a hockey and corgi enthusiast … also, he enjoys data science related things.