Lately, I build a system using Cloud Workflows which can combine Google Cloud Services such as Cloud Functions and Cloud Run. Sometimes, I was in a situation where I want to examine more efficiently using logs on Cloud Logging when debugging or daily monitoring.
In a workflow, You can easily save logs to Cloud Logging using sys.log. Also, you can see API logs on Cloud Logging which be called by the inside of Workflow. But these logs don't associate with each other, so it is hard to trace across parent workflow logs and child API logs from top to bottom orderly. It was tough when I met in high traffic cases. I associated manually using each log's timestamp as a clue.
In this article, I introduce to associate parent workflow logs and child API logs using a workflow execution id and structured logs.
An example app
I created a simple workflow which calls Cloud Functions and Cloud Run each one time. These APIs used Flask.
All source code which I wrote about in this article has been uploaded below for your reference.
https://github.com/koshilife/cloud-workflows-logging-example
Cloud Workflows
The following code is the workflow.
workflows/v1.yml
main:
params: [input]
steps:
- set_trace_id:
assign:
- execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- api_url_function: "https://asia-northeast1-xxx.cloudfunctions.net/logging-example-v1" # TODO set your environment
- api_url_cloudrun: "https://logging-example-v1-xxx-an.a.run.app" # TODO set your environment
- call_function:
call: http.post
args:
url: ${api_url_function}
body:
workflow_execution_id: ${execution_id}
auth:
type: OIDC
result: result_function
- call_cloudrun:
call: http.post
args:
url: ${api_url_cloudrun}
body:
workflow_execution_id: ${execution_id}
auth:
type: OIDC
result: result_cloudrun
- set_result:
assign:
- result_map:
message: ${execution_id}
function_execution_id: ${result_function.headers["Function-Execution-Id"]}
function_trace_id: ${text.split(result_function.headers["X-Cloud-Trace-Context"], ";o=")[0]}
cloudrun_trace_id: ${text.split(result_cloudrun.headers["X-Cloud-Trace-Context"], ";o=")[0]}
- log_result:
call: sys.log
args:
json: ${result_map}
- return_output:
return: ${result_map}
${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
gets a workflow execution ID and then the ID is set into each API request params.
In addition, I included each API Trace ID and Functions execution ID that hope to be a clue for examining.
Cloud Functions
The following code is the Cloud Functions.
functions/v1/main.py
import json
from flask import jsonify, request
GCP_PROJECT = os.environ.get('GCP_PROJECT')
def main(request):
log('Cloud Functions msg1')
log('Cloud Functions msg1', 'WARNING')
return jsonify(dict(result='ok'))
def log(message, severity='INFO'):
log_fields = {}
# refs: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
if request and request.headers.get("X-Cloud-Trace-Context") and GCP_PROJECT:
trace = request.headers["X-Cloud-Trace-Context"].split("/")
log_fields["logging.googleapis.com/trace"] = f"projects/{GCP_PROJECT}/traces/{trace[0]}"
request_json = request.get_json()
if request_json and request_json.get("workflow_execution_id"):
log_fields["logging.googleapis.com/labels"] = {'workflows.googleapis.com/execution_id': request_json["workflow_execution_id"]}
payload = dict(message=message, severity=severity, **log_fields)
print(json.dumps(payload))
The key point is using structured logs.
I set a field logging.googleapis.com/labels
in each log entry to the workflow execution ID from the parent workflow.
refs: Special fields in structured payloads
Also, a reason why to specify workflows.googleapis.com/execution_id
as a key of the labels field is that workflow execution logs use the same key.
Cloud Run
The following code is the Cloud Run.
cloudruns/v1/main.py
import os
import json
from flask import Flask, jsonify, request
GCP_PROJECT = os.environ.get('GCP_PROJECT')
app = Flask(__name__)
@app.route("/", methods=["POST"])
def main():
log('Cloud Run msg1')
log('Cloud Run msg1', 'WARNING')
return jsonify(dict(result='ok'))
def log(message, severity='INFO'):
log_fields = {}
# refs: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
if request and request.headers.get("X-Cloud-Trace-Context") and GCP_PROJECT:
trace = request.headers["X-Cloud-Trace-Context"].split("/")
log_fields["logging.googleapis.com/trace"] = f"projects/{GCP_PROJECT}/traces/{trace[0]}"
request_json = request.get_json()
if request_json and request_json.get("workflow_execution_id"):
log_fields["logging.googleapis.com/labels"] = {'workflows.googleapis.com/execution_id': request_json["workflow_execution_id"]}
payload = dict(message=message, severity=severity, **log_fields)
print(json.dumps(payload))
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
The point is the same as mentioned above at Cloud Functions.
I used structured logs and put the parent workflow execution ID.
Result
After executing the workflow, I could retrieve across the parent workflow logs and child APIs logs by the following query on Cloud Logging Console.
labels."workflows.googleapis.com/execution_id"="<Workflow Execution ID>" OR
labels.execution_id="<Workflow Execution ID>"
I tried an another example app which is more logs than above. (refs: The app name is v2. The source code is here)
I could see that all logs as below.
(Addition)
I wanted to simplify the query on Cloud Logging like one-liner labels."workflows.googleapis.com/execution_id"="<Workflow Execution ID>"
, so I tried to change sys.log params to use structured logs in the workflow. But It didn't work. I guess Cloud Workflows may not support structured logs yet.
Following code is sub-workflow which tries to use structured logs. (all source code is here)
log:
params: [message]
steps:
- set_log_data:
assign:
- execution_id_map: {}
- execution_id_map["workflows.googleapis.com/execution_id"]: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- data: {}
- data["message"]: ${message}
- data["logging.googleapis.com/labels"]: ${execution_id_map}
- call_sys_log:
call: sys.log
args:
json: ${data}
Top comments (0)