DEV Community

Cover image for Pub/sub adventures: From Pull to Push with Cloud Run
Danyel Cabello
Danyel Cabello

Posted on

Pub/sub adventures: From Pull to Push with Cloud Run

One important part of every ML Video app is process the video in background to create features.

There are several ways to do this, when we create the first architecture of Herman App, at 2017, we decided to implement a full cloud native solution so we use pub/sub queue and create a scalable container architecture that spawn subscribers that will pull the jobs from queue and publish the features after processing them.

This approach had some disadvantages:

  1. Pub/sub doesn't have simple way to avoid that two subscribers won't get the same message.
  2. It also requires some custom metrics in order to automatic spawn new subscribers
  3. The pub/sub library is very reliable but you need a persistent connection mechanism to the queue, sometimes you are getting disconnected or stuck if you don't handle the errors correctly.

But the things have been changed in the past two years: there are more interesting alternatives like create a complex Data Pipelines on Apache Airflow/Apache Bean/Google Dataflow or create Event driven containers on KNative/Google Cloud run.

Introducing KNative Google Cloud Run

Google Cloud Run was released on 2019, is an implementation of KNative on Google Kubernetes Engine.

This implementation abstracts a lot of the internal settings of the KNative Eventing, provides a single API endpoint that every time is called, the HTTP request is forwarded to your container, and you can configure how much traffic your container can handle, and if you want to completely turn off all your containers to save costs.

It also allows you to run your containers on your own Kubernetes cluster or on private Google Cloud Run Kubernetes cluster, the second one is more attractive since you will only pay for the CPU/memory that your container use but it has it owns disadvantages.

Is can be integrated seamlessly with pub/sub retry mechanism, if your process fail for some hiccup in the network and return a 500/400 etc, pub/sub will publish this message again to the subscriber and try again, based on the pub/sub retry policy that you established before.

Since our data pipeline currently is very simple, we decided to use Google Cloud Run, the migration was surprisingly very straight forward, you just need to add a boilerplate flask app that will receive the http request then call your actual code with the message content.

Furthermore, is very simple to integrate this with Pub/sub, you just need to add the new endpoint in Pub/sub as new Push subscription, another important feature is that all the security is handled by IAM Service accounts so you don't need to worry about implement it.

gcloud pubsub subscriptions create pushSubscription --topic videoTopic \
   --push-endpoint=CLOUD_RUN_URL/ \
   --push-auth-service-account=cloud-run-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com
Enter fullscreen mode Exit fullscreen mode

Old

def handle_messages(message):
    data = json.loads(message.data.decode('utf-8'))
    handle_json_message(data)
    message.ack()

def main():
    subscriber = pubsub_v1.SubscriberClient()
    flow_control = pubsub_v1.types.FlowControl(max_messages=MAX_MESSAGES)
    custom_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
    custom_scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(custom_executor)

    subscription_path = subscriber.subscription_path(PROJECT,
                                                     PUBSUB_SUBSCRIPTION)
    logger.info(f"Starting Video Consumer {subscription_path}... ")
    try:
        future= subscriber.subscribe(subscription_path,
                                     callback=handle_messages,
                                     flow_control=flow_control,
                                     scheduler=custom_scheduler)
        future.result()
    except:
        future.cancel()
        logger.error("Reloading subscriber due to faillure...")
        sys.exit(1)
Enter fullscreen mode Exit fullscreen mode

Adding flask APP

def handle_run_message(message):
    data = json.loads(message)
    try:
        handle_json_message(data)
    except Exception as e:        
        logger.error("Error processing video")
        logger.exception(e)
        return False

@app.route("/", methods=["POST"])
def index():
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        logger.error(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(envelope, dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        logger.error(f"error: {msg}")
        return f"Bad Request: {msg}", 400
    pubsub_message = envelope["message"]
    if isinstance(pubsub_message, dict) and "data" in pubsub_message:                 
        data = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()       
        logging.info(data)
        result = handle_run_message(data)
        if result is False:
            return ("", 500)

    return ("", 204)
Enter fullscreen mode Exit fullscreen mode

As you can see, with just a few lines you can migrate your Pub/Sub subscriber to cloud.

Disadvantages

Cloud Run is not a magic thing that will solve all your background processing problems, there are several limitation using this approach.

  1. There is timeout limit of 3600 seconds, so is not suitable for very long process.
  2. If you decided to your the private Google Cloud run Kubernetes instance instead of your own Kubernetes instance is very tricky to create a VPC network to connect your container with other ones.
  3. Pub/sub also has a Acknowledgement deadline of 600 seconds, therefore you need to create your own retry mechanism if your process overpass this limit.
  4. If your data pipeline is more complex that just call a simple container, maybe is better star thinking in implement a full Google Dataflow solution that can pass and process messages/data between different containers.

Conclusion

Google Cloud Run allow you to run containers on demand for simple background process, if I learned something here is: There are different alternatives to create Data Pipelines and you need to explore what is more suitable for your use case. I can leave here a simple recomendation.

  • Use Cloud Functions: If your process doesn't have any custom libraries or binaries
  • Use Cloud Run: If your process is containerized and the process is 0-3600 seconds
  • Use Data Flow: If your process is more complex and you need to run other process that depends on previous outputs

References

https://knative.dev/docs/getting-started/getting-started-eventing/
https://cloud.google.com/run/docs/tutorials/pubsub
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline

Top comments (0)