如何使用 Azure Databrick來進行資料的擷取、轉換和載入資料

by adonisy 9. 十一月 2020 09:38

如何使用 Azure Databrick來進行資料的擷取、轉換和載入資料

以下文章,轉錄自:

https://docs.microsoft.com/zh-tw/azure/databricks/scenarios/databricks-extract-load-sql-data-warehouse

主要是為了更了解 Azure Databrick更實際的應用,也讓各位可以參考一下:

 

前言:

在雲端的環境中,充滿了各式各樣的資料來源,也需要把不同來源的資料內容,轉成另一種格式的資料,在微軟正統的作法,是使用

Azure Data Factory,也就是資料工廠來完成,不過事實上當然也可以使用 Azure Databrick來做,因為只要是能夠寫程式讀取資料的,當是

適用於這個範圍,所以此範例中,我們將使用 Azure Databricks 執行 ETL (擷取、轉換及載入資料) 作業。 將資料從 Azure Data Lake Storage Gen2 

擷取至 Azure Databricks,並在 Azure Databricks 中執行資料轉換,然後將轉換後的資料載入 Azure Synapse Analytics(也就是資料倉儲的資料庫)。

 

本教學課程中的步驟會使用適用於 Azure Databricks Azure Synapse 連接器,將資料轉送至 Azure Databricks 接著,

當資料在 Azure Databricks 叢集和 Azure Synapse 之間進行轉換時,此連接器會使用 Azure Blob 儲存體作為暫時儲存體。

 

下圖顯示此應用程式流程:

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

 注意:

本範例不適用 Azure 免費試用版的訂用帳戶 如果您有免費帳戶,請移至您的設定檔,並將訂用帳戶變更為隨用隨付 如需詳細資訊,

請參閱 Azure 免費帳戶 

然後,為您所在區域的 vCPU 移除消費限制要求增加配額 當您建立 Azure Databricks 工作區時,您可以選取 [試用版 (進階 - 14 天的免費 DBU)]

 定價層,讓工作區可免費存取進階 Azure Databricks DBU 14 天。

 

首先,我們先要建立一個 Azure Databricks的服務,這個請在 Azure 的入口網站選取「建立資源」

 

 

[Azure Databricks 服務] 下方提供下列值,以建立 Databricks 服務


建立帳戶需要幾分鐘的時間。 若要監視作業狀態,請檢視頂端的進度列。

選取 [釘選到儀表板] ,然後選取 [建立]

接下來

 

Azure Databricks 中建立 Spark 叢集

 

  1.  Azure 入口網站中,移至您所建立的 Databricks 服務,然後選取 [啟動工作區
  2. 系統會將您重新導向至 Azure Databricks 入口網站。 在入口網站中選取 [叢集
  3. [新增叢集] 頁面上,提供值以建立叢集。
  4. 填寫下列欄位的值,然後接受其他欄位的預設值:
  5. 輸入叢集的名稱。
  6. 確定您已選取 [在停止活動__分鐘後終止] 核取方塊。 若未使用叢集,請提供據以終止叢集的持續時間 (以分鐘為單位)
  7. 選取 [建立叢集]  叢集執行後,您就可以將 Notebook 連結至叢集,並執行 Spark 作業。

 

 

 

Azure Data Lake Storage Gen2 帳戶中建立檔案系統

在本節中,您會在 Azure Databricks 工作區中建立 Notebook,然後執行程式碼片段以設定儲存體帳戶

  Azure 入口網站中,移至您所建立的 Azure Databricks 服務,然後選取 [啟動工作區]

 在左側選取 [工作區]   [工作區] 下拉式清單選取 [建立] > [Notebook]

 

 

[建立 Notebook] 對話方塊中,輸入 Notebook 的名稱。 選取 [Scala] 作為語言,然後選取您先前建立的 Spark 叢集

 

 

 

選取 [建立]

 下列程式碼區塊會設定在 Spark 工作階段中存取的任何 ADLS Gen 2 帳戶的預設服務主體認證。 第二個程式碼區塊會帳戶名稱附加至設定,以指定特定 ADLS Gen 2 帳戶的認證。 請將其中一個程式碼區塊複製並貼到 Azure Databricks Notebook 的第一個資料格。

 工作階段組態

 

 

val appID = "<appID>"
val secret = "<secret>"
val tenantID = "<tenant-id>"

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
帳戶組態

 

 

val storageAccountName = "<storage-account-name>"
val appID = "<app-id>"
val secret = "<secret>"
val fileSystemName = "<file-system-name>"
val tenantID = "<tenant-id>"

spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
  1. 在此程式碼區塊中,請將此程式碼區塊中的 <app-id><secret><tenant-id>  <storage-account-name> 預留位置值取代為您在執行本教學課程的必要條件時所收集到的值。 請將 <file-system-name> 預留位置值取代為您要為檔案系統指定的任何名稱。
    • <app-id>  <secret> 來自於您在建立服務主體時向 Active Directory 註冊的應用程式。
    • <tenant-id> 來自於您的訂用帳戶。
    • <storage-account-name> 是您 Azure Data Lake Storage Gen2 儲存體帳戶的名稱。
  2.  SHIFT + ENTER 鍵以執行此區塊中的程式碼

Azure Data Lake Storage Gen2 帳戶中內嵌範例資料

開始本節之前,您必須先完成下列必要條件:

 

Notebook 資料格中輸入下列程式碼:

 

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

 

在資料格中按 SHIFT + ENTER 執行程式碼。

 

現在,在此資料格下方的新資料格中輸入下列程式碼,並將括號中的值取代為您稍早使用的相同值:

 

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

 

在資料格中按 SHIFT + ENTER 執行程式碼。


 

Azure Data Lake Storage Gen2 帳戶擷取資料

您現在可以將 json 檔案範例以資料框架的形式載入 Azure Databricks 在新的資料格中貼上下列程式碼。 請將括號內顯示的預留位置取代為您自己的值。

 

val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
  1.  SHIFT + ENTER 鍵以執行此區塊中的程式碼。
  2. 執行下列程式碼,以查看資料框架的內容

 

df.show()

您現在已將資料從 Azure Data Lake Storage Gen2 擷取至 Azure Databricks

 

轉換 Azure Databricks 中的資料

原始範例資料 small_radio_json.json 檔案擷取了廣播電台的聽眾,並且有不同資料行。 在本節中,您會將資料轉換為僅從資料集擷取特定資料行。

首先,您僅從已建立的資料框架中擷取 firstNamelastNamegenderlocation  level 資料行。

 

val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
specificColumnsDf.show()

 

您會取得如下列程式碼片段所示的輸出:

您可以進一步轉換此資料,將 level 資料行重新命名為 subscription_type

 

 

val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
renamedColumnsDF.show()

 

您會取得如下列程式碼片段所示的輸出。


將資料載入至 Azure Synapse

在本節中,您會將已轉換的資料上傳至 Azure Synapse 您可以使用適用於 Azure Databricks Azure Synapse 連接器,直接將資料框架以資料表形式上傳至 Synapse Spark 集區。

如前所述,在 Azure Databricks Azure Synapse 之間上傳資料時,Azure Synapse 連接器會使用 Azure Blob 儲存體作為暫時儲存體。 因此,一開始您需提供要連線到儲存體帳戶的組態。 您必須已建立屬於本文必要條件的帳戶。

提供可從 Azure Databricks 存取 Azure 儲存體帳戶的組態。

 

val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
val blobContainer = "<blob-container-name>"
val blobAccessKey =  "<access-key>"

 

指定在 Azure Databricks Azure Synapse 之間移動資料時所要使用的暫存資料夾。

 

val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"

 

執行下列程式碼片段,儲存組態中的 Azure Blob 儲存體存取金鑰。 此動作可確保您無須在 Notebook 中以純文字保留存取金鑰。

 

 

val acntInfo = "fs.azure.account.key."+ blobStorage
sc.hadoopConfiguration.set(acntInfo, blobAccessKey)

 

提供用來連線至 Azure Synapse 執行個體的值。 您必須已建立 Azure Synapse Analytics 服務做為必要條件。 請使用 dwServer 的完整伺服器名稱。 例如: <servername>.database.windows.net 

 

//Azure Synapse related settings
val dwDatabase = "<database-name>"
val dwServer = "<database-server-name>"
val dwUser = "<user-name>"
val dwPass = "<password>"
val dwJdbcPort =  "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

 

執行下列程式碼片段,將已轉換的資料框架 renamedColumnsDF 以資料表形式載入 Azure Synapse 中。 此程式碼片段會在 SQL 資料庫中建立名為 SampleTable 的資料表。

 

spark.conf.set(
    "spark.sql.parquet.writeLegacyFormat",
    "true")

renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()

 

連線至 SQL 資料庫,並確認您看到名為 SampleTable 的資料庫。 

執行 SELECT 查詢,以確認資料表的內容。 資料表中應包含與 renamedColumnsDF 資料框架相同的資料。


由以上的範例,我們可以了解 Azure Databrick,除了可以做機械學習的運算平台之外,還可以當成 ETL的工具

不過我個人覺得使用 Azure Databrick的成本太高,如果是我的話,還是乖乖的使用 Azure Data Factory 來做 ETL吧!

Tags:

NoSQL | SQL Server資料庫 | 楊先民Adonis Young

不允許評論

NET Magazine國際中文電子雜誌

NET Magazine國際中文電子版雜誌,由恆逸資訊創立於2000,自發刊日起迄今已發行超過500篇.NET相關技術文章,擁有超過40000名註冊讀者群。NET Magazine國際中文電子版雜誌希望藉於電子雜誌與NET Developer達到共同學習與技術新知分享,歡迎每一位對.NET 技術有興趣的朋友們多多支持本雜誌,讓作者群們可以有持續性的動力繼續爬文。<請加入免費訂閱>

月分類Month List