With the help of an example, this blog post will walk you through how to use the Azure Data explorer Go SDK to ingest data from a Azure Blob storage container and query it programmatically using the SDK. After a quick overview of how to setup Azure Data Explorer cluster (and a database), we will explore the code to understand what's going on (and how) and finally test the application using a simple CLI interface
The sample data is a CSV file that can be downloaded from here
The code is available on GitHub https://github.com/abhirockzz/azure-dataexplorer-go
What is Azure Data Explorer ?
Azure Data Explorer (also known as Kusto) is a fast and scalable data exploration service for analyzing large volumes of diverse data from any data source, such as websites, applications, IoT devices, and more. This data can then be used for diagnostics, monitoring, reporting, machine learning, and additional analytics capabilities.
It supports several ingestion methods, including connectors to common services like Event Hub, programmatic ingestion using SDKs, such as .NET and Python, and direct access to the engine for exploration purposes. It also integrates with analytics and modeling services for additional analysis and visualization of data using tools such as Power BI
Go SDK for Azure Data Explorer
The Go client SDK allows you to query, control and ingest into Azure Data Explorer clusters using Go. Please note that this is for interacting with the Azure Data Explorer cluster (and related components such as tables etc.). To create Azure Data Explorer clusters, databases etc. you should the use the admin component (control plane) SDK which is a part of the larger Azure SDK for Go
API docs - https://godoc.org/github.com/Azure/azure-kusto-go
Before getting started, here is what you would need to try out the sample application
Pre-requisites
You will need a Microsoft Azure account. Go ahead and sign up for a free one!
Install the Azure CLI if you don't have it already (should be quick!)
Setup Azure Data Explorer cluster, create a Database and configure security
Start by creating a cluster using az kusto cluster create. Once that's done, create a database with az kusto database create, e.g.
az kusto cluster create -l "Central US" -n MyADXCluster -g MyADXResGrp --sku Standard_D11_v2 --capacity 2
az kusto database create --cluster-name MyADXCluster -g MyADXResGrp -n MyADXdb
az kusto database show --cluster-name MyADXCluster --name MyADXdb --resource-group MyADXResGrp
Create a Service Principal using az ad sp create-for-rbac
az ad sp create-for-rbac -n "test-datax-sp"
You will get a JSON response as such - please note down the appId
, password
and tenant
as you will be using them in subsequent steps
{
"appId": "fe7280c7-5705-4789-b17f-71a472340429",
"displayName": "test-datax-sp",
"name": "http://test-datax-sp",
"password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
"tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
}
You will need to assign roles to the Service Principal so that it can access the database you just created. To do so using the Azure Portal, open the Azure Data Explorer cluster, navigate to Data > Databases
and select the database. Choose Permissions
form the left menu and and click Add
to proceed.
For more information, please refer to Secure Azure Data Explorer clusters in Azure
Code walk through
At a high level, this is what the sample code does:
- Connect to an Azure Data Explorer cluster (of course!)
- Create a table (and list them just to be sure)
- Create data mapping
- Ingest/load existing data from a CSV file in Azure Blob storage
- Run a query on the data you just ingested
Let's look at each of these steps
Connect to an Azure Data Explorer cluster
We use Service Principal to authenticate to Azure Data Explorer and provide the Azure tenant ID, client ID and client secret (which were obtained after creating the principal using az ad sp create-for-rbac
)
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
kc, err := kusto.New(kustoEndpoint, auth)
if err != nil {
log.Fatal("failed to create kusto client", err)
}
You can check out the code here
Create table and data mappings
To create a table, we simply execute create table
func CreateTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("table %s created\n", kustoTable)
}
You can check out the code here
Notice how we use client.Mgmt to execute this operation since this is a management
query. Later, you will see how to execute query to read data from Azure Data Explorer.
To confirm, we run a query to check the tables in database i.e. show tables
func FindTable(kc *kusto.Client, kustoDB string) []TableInfo {
var tables []TableInfo
ri, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(testQuery))
if err != nil {
log.Fatalf("failed to execute query %s - %s", testQuery, err)
}
var t TableInfo
for {
row, err := ri.Next()
if err != nil {
if err == io.EOF {
break
} else {
log.Println("error", err)
}
}
row.ToStruct(&t)
tables = append(tables, t)
}
return tables
}
...
type TableInfo struct {
Name string `kusto:"TableName"`
DB string `kusto:"DatabaseName"`
}
You can check out the code here
After executing the query, ToStruct
is used to save the result to an instance of a user-defined TableInfo
struct
Once the table is created, we can configure data mappings that are used during ingestion to map incoming data to columns inside Kusto tables
func CreateMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping", err)
}
log.Printf("mapping %s created\n", kustoMappingRefName)
}
You can check out the code here
Ingest data from Azure Blob storage
To ingest data we use the Ingestion
client
const blobStorePathFormat = "https://%s.blob.core.windows.net/%s/%s%s"
func CSVFromBlob(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
You can check out the code here
We have the path to the file in Azure Blob storage and we refer to it in FromFile
function along with file type (CSV
in this case) as well as data mapping we just created (StormEvents_CSV_Mapping
)
Query data
We fetch some data from the StormEvents
table using the following query:
StormEvents | where EventType == 'Flood' and State == 'WASHINGTON' | sort by DamageProperty desc | project StartTime, EndTime, Source, DamageProperty
This time, we use client.Query
(not Mgmt
) to read data from the table.
func Get(kc *kusto.Client, kustoDB string) []StormDetails {
var events []StormDetail
ri, err := kc.Query(context.Background(), kustoDB, kusto.NewStmt(query))
if err != nil {
log.Fatalf("failed to execute query %s - %s", query, err)
}
for {
row, err := ri.Next()
if err != nil {
if err == io.EOF {
break
} else {
log.Println("error", err)
}
}
var event StormDetail
row.ToStruct(&event)
events = append(events, event)
}
return events
...
type StormDetail struct {
Start time.Time `kusto:"StartTime"`
End time.Time `kusto:"EndTime"`
From string `kusto:"Source"`
Damage int32 `kusto:"DamageProperty"`
}
Each row in the result in converted into a StormDetail
struct using ToStruct
You can check out the code here
The list of StormDetail
s is then displayed to stdout in the form a user-friendly tabular format
...
data := [][]string{}
for _, detail := range details {
data = append(data, []string{detail.Start.String(), detail.End.String(), detail.From, strconv.Itoa(int(detail.Damage))})
}
log.Println("StormEvents data....")
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Start Time", "End Time", "From", "Damage"})
for _, v := range data {
table.Append(v)
}
table.Render()
...
You can check out the code here
Finally, to drop the table, we use .drop table StormEvents
const dropTableQ = ".drop table StormEvents"
func dropTable(kc *kusto.Client) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(dropTableQ))
if err != nil {
log.Fatal("failed to drop table - ", err)
}
}
You can check out the code here
Run the example
Now that you have an idea of what's going on, let's try it out using a CLI
Set the required environment variables
export AZURE_SP_CLIENT_ID="service principal client id"
export AZURE_SP_CLIENT_SECRET="<service principal client secret>"
export AZURE_SP_TENANT_ID="<tenant ID>"
#e.g. https://mykusto.southeastasia.kusto.windows.net
export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
Get the code and build it
git clone https://github.com/abhirockzz/azure-dataexplorer-go
cd azure-dataexplorer-go
go build -o azdatax
//to confirm
chmod a+x azdatax && ./azdatax
//output
CLI to test sample program for Azure Data Explorer
Usage:
azdatax [command]
Available Commands:
create-mapping creates a mapping named StormEvents_CSV_Mapping
create-table creates a table named StormEvents
drop-table drops the StormEvents table
get queries data from StormEvents
help Help about any command
ingest ingests CSV file from blob store
list-tables lists tables
Flags:
-h, --help help for azdatax
Use "azdatax [command] --help" for more information about a command.
Let's start by creating a table:
./azdatax create-table --dbname <name of the database you created initially>
//output
Connected to Azure Data Explorer
table StormEvents created
To list tables:
./azdatax list-tables --dbname <name of the database you created initially>
//output
Connected to Azure Data Explorer
Table name: StormEvents, Database name: testkustodb
To create the data mapping
./azdatax create-mapping --dbname <name of the database you created initially>
//output
Connected to Azure Data Explorer
mapping StormEvents_CSV_Mapping created
To ingest data:
./azdatax ingest --dbname <name of the database you created initially>
//output
Connected to Azure Data Explorer
Ingested file from - https://kustosamplefiles.blob.core.windows.net/samplefiles/StormEvents.csv?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D
Wait for a while for the ingestion to complete before you try to query the data (next step)
To query data
./azdatax get --dbname <name of the database you created initially>
//output
Connected to Azure Data Explorer
StormEvents data....
+-------------------------------+-------------------------------+---------------------------+----------+
| START TIME | END TIME | FROM | DAMAGE |
+-------------------------------+-------------------------------+---------------------------+----------+
| 2007-12-02 23:00:00 +0000 UTC | 2007-12-05 23:00:00 +0000 UTC | Official NWS Observations | 50000000 |
| 2007-01-03 00:00:00 +0000 UTC | 2007-01-03 22:00:00 +0000 UTC | Newspaper | 20000 |
| 2007-12-03 03:00:00 +0000 UTC | 2007-12-05 19:00:00 +0000 UTC | Official NWS Observations | 12000 |
| 2007-01-03 00:00:00 +0000 UTC | 2007-01-03 22:00:00 +0000 UTC | Newspaper | 5000 |
| 2007-03-12 00:00:00 +0000 UTC | 2007-03-13 23:00:00 +0000 UTC | Public | 0 |
| 2007-03-12 00:00:00 +0000 UTC | 2007-03-14 23:00:00 +0000 UTC | Other Federal | 0 |
+-------------------------------+-------------------------------+---------------------------+----------+
Finally, to drop the StormEvents table:
./azdatax drop-table --dbname <name of the database you created initially>
//output
Connected to Azure Data Explorer
Table StormEvents dropped
Conclusion
Hopefully this was helpful in demonstrating how to use the Go SDK to interact with Azure Data Explorer. This obviously just one of the ways! For more, dig into the documentation and happy exploring!
Top comments (0)