DB World國際中文電子雜誌
作 者:楊先民
審 稿:張智凱
前言
上期提到了 Azure Data Factory的一些轉換方式,本期要來實作在映射資料流中使用轉換
現在您已將資料移動到 Azure Data Lake Store Gen2,您已準備好構建映射資料流將通過 Spark 叢集群大規模轉換您的資料,然後將其加載到資料倉儲中。
主要任務如下:
1. 準備環境
2. 添加資料源
3.使用映射資料流轉換
4. 寫入資料接收器
任務 1:準備環境
1.打開Data Flow Debug:位於authoring頂部的Data Flow Debug滑塊模塊上。
注意:
資料流叢集需要 5-7 分鐘來預熱。
2. 添加資料流活動。在活動視窗中,打開移動和轉換並拖動資料流活動到pipeline畫布上。在彈出的畫面中,點擊新增新資料流並選擇映射資料流,然後點擊確定。點擊 pipeline 選項卡並拖動從你的複製活動到資料流活動的綠色框以新增成功條件。你會在畫布中查看以下內容:

任務 2:添加資料來源
1. 添加一個 ADLS 來源。雙擊畫布中的 Mapping Data Flow 對象。點擊添加資料流畫布中的來源按鈕。在來源數據集下拉列表中,選擇你的 ADLSG2 資料集以便在你的複製活動中使用

● 如果你的數據集指向包含其他文件的文件夾,你可能需要新增另一個數據集或利用參數化確保僅讀取 moviesDB.csv 文件。
● 如果你尚未在 ADLS 中導入架構,但已提取數據,請轉至數據集的“架構”選項卡,然後點擊“導入架構”,以便你的資料流知道架構投影。
調試叢集預熱後,通過“數據預覽”選項卡驗證數據是否正確加載。
點擊刷新按鈕後,映射資料流將顯示數據外觀的快照就像在每次轉換時一樣。
任務 3:使用映射資料流轉換
1. 添加選擇轉換以重命名和刪除列。在數據預覽中,你可以注意到“Rotton Tomatoes”列拼寫錯誤。正確命名並刪除未使用的評級列,你可以通過點擊你旁邊的 + 圖像來添加選擇轉換ADLS 源節點並在 Schema 修飾符下選擇 Select。

在名稱字段中,將“Rotton”更改為“Rotten”。要刪除評級列,請將鼠標懸停在其上並點擊在垃圾桶圖像上。

2.添加過濾器轉換以過濾掉不需要的年份。例如你只對電影感興趣是在1951 年之後製作的。你可以添加過濾器轉換以通過點擊選擇轉換旁邊的 + 圖像並在行修飾符下選擇過濾器。
點擊表達式框以打開表達式構建器並輸入你的過濾條件。使用映射資料流表達式語言的語法,toInteger(year) > 1950 將轉換如果該值高於 1950,則將年份值字符串為整數並過濾行。

你可以使用表達式構建器的嵌入式數據預覽視窗來驗證你的條件是否有效適當。

3. 添加派生轉換來計算主要類型。你可能已經注意到,流派column 是一個由“|”字符分隔的字符串。如果你只關心每列中的第一個流派,你可以通過 Derived Column 轉換派生一個名為 PrimaryGenre 的新列點擊過濾器轉換旁邊的 + 圖像,然後在架構修改器下選擇派生。
與過濾器轉換類似,派生列使用 Mapping Data Flow 表達式builder 來指定新列的值。

在這種情況下,你嘗試從流派列中提取第一個流派,其格式為‘流派1|流派2|...|流派N’。使用定位函數獲取“|”的第一個從 1 開始的索引在裡面流派字符串。使用iif函數,如果這個指標大於1,就可以計算出主要流派通過 left 函數,該函數將字符串中的所有字符返回到索引的左側。否則,該PrimaryGenre 值等於流派字段。你可以通過表達式生成器驗證輸出數據預覽視窗。
4. 通過窗口變換對電影進行排名。假設你對一部電影的排名感興趣年份為其特定類型。你可以添加一個窗口轉換來定義基於窗口的彙總通過點擊派生列轉換旁邊的 + 圖像並點擊窗口在架構修飾符下。要做到這一點,請指定你正在窗口化的內容,你是什麼排序依據,範圍是什麼,以及如何計算新窗口列。在這個例子中,我們將PrimaryGenre 和 year 的窗口,範圍無限,按爛番茄降序排序,合併計算一個名為 RatingsRank 的新列,該列等於每部電影在其範圍內的排名特定流派-年份。




5. 使用彙總轉換來彙總評級。既然你已經收集並導出了所有你需要的數據,我們可以添加一個彙總轉換來計算基於通過點擊窗口轉換旁邊的 + 圖像並點擊彙總來所需的組在架構修飾符下。正如你在窗口轉換中所做的那樣,讓我們按 PrimaryGenre 對電影進行分組和年份

在“彙總”選項卡中,你可以按列對指定的分組進行彙總計算。為了每個流派和年份,讓我們獲得爛番茄的平均評分,最高和最低評分電影(利用窗口功能)和每組中的電影數量。彙總顯著減少轉換流中的行數,並且只傳播轉換中指定的分組依據和彙總列。

6. 通過 Alter Row Transformation 指定 Upsert 條件。如果你正在寫一個表格接收器,你可以使用 Alter Row 轉換為行指定插入、刪除、更新和更新插入策略點擊彙總轉換旁邊的 + 圖像,然後點擊行下的更改行修飾符。由於你始終在插入和更新,因此你可以指定所有行將始終上插入。

任務 4:寫入數據接收器
1. 寫入 Azure Synapse Analytics Sink。
現在你已經完成了所有的轉換邏輯,你已準備好寫入接收器。
1. 通過點擊 Upsert 轉換旁邊的 + 圖像並點擊下面的 Sink 來添加 Sink目的地。
2. 在 Sink 選項卡中,通過 + New 按鈕新增一個新的資料倉儲數據集。
3. 從磁貼列表中選擇 Azure Synapse Analytics。
4. 選擇一個新的鏈接服務並配置你的 Azure Synapse Analytics 連接以連接到在之前所新增的 DWDB 資料庫。完成後點擊新增。

5. 在數據集配置中,選擇 Create new table 並輸入 Dbo 的 schema 和評分的表名。完成後確定。
6. 由於指定了 upsert 條件,你需要轉到“設置”選項卡並選擇“允許”upsert' 基於關鍵列 PrimaryGenre 和 year。

至此,你已經完成了構建你的 8 個轉換映射資料流。是時候運行pipeline並查看結果了!

任務 5:執行 pipeline
1. 轉到畫布中的 pipeline1 選項卡。因為資料流中的 Azure Synapse Analytics 使用 PolyBase,你必須指定 blob 或 ADLS 暫存文件夾。在執行資料流活動的設置選項卡中,打開 PolyBase 並選擇你的 ADLS 鏈接服務並指定臨時文件夾路徑。

2. 在發布pipeline之前,運行另一個調試運行以確認它按預期工作。看著在“輸出”選項卡上,你可以在兩個活動運行時監控它們的狀態。
3. 兩個活動成功後,你可以點擊資料流活動旁邊的眼鏡圖像更深入地了解資料流運行。
4. 如果你使用本實驗中描述的相同邏輯,你的資料流將向你的 SQL DW 寫入 737 行。你可以進入 SQL Server Management Studio 以驗證pipeline是否正常工作並查看寫了什麼。

小結
本期主要介紹如何利用在Azure Data Factory中使用免程式的大規模資料轉換下集,步驟其實相當麻煩,但是至少可以大概了解其操作方式,也大概心裡有個底吧。