NEWSニュース&ブログ

【Microsoft Fabric】データパイプラインを使ってみた

この記事は最終更新から1年以上経過しています。内容が古くなっている可能性があります。

投稿日:2024/01/15

本記事の概要

Microsoft Fabric(以下 Fabric)はデータ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンスまでのすべてをカバーする企業向けのオールインワン分析ソリューションです。
具体的にどのようなサービスなのかは、Microsoftの公式サイトや、こちらの記事をご参照ください。
前回の記事ではLakehouse内のデータをFabricのData Factoryの機能であるデータパイプラインを使用し、Synapse Data Warehouse(以下 DWH)に取り込み、変換処理を行ったうえでPower BIで可視化する検証を行いました。
今回はデータパイプラインをどのように作成したのかをご紹介します。

データパイプラインの構成画面

データパイプラインの詳細は下記の通りです。
①Copy DimCustomer/Copy DimGeography
 LakehouseのDimCustomer.csv /DimGeography.csv をDWHのテーブルとしてコピー
②Create number_of_customers_in_each_city
 上記2テーブルに対しての変換処理用SQLを実行し、number_of_customers_in_each_cityに格納

①~③の詳細について説明していきます。

①Copy DimCustomer/Copy DimGeography

[データのコピー]ではデータのソースと宛先を中心に設定を行っています。
設定内容はCopy DimCustomer/Copy DimGeography共にほとんど変わりません。
ソースの設定は以下の通りです。
設定
データストアの種類 データソースを意味します。今回はワークスペースを選択します。
ワークスペースのデータストアの種類 [レイクハウス]を選択します。他にもData Warehouseなども選択することができます。
レイクハウス 対象のレイクハウスを選択します。
ルートフォルダ Lakehouse内にはテーブル/ファイル形式がありますので、どちらかを選択します。
ファイルパスの種類 ファイルパスを選択します。
ファイルパス 実際のファイルパスを指定します。[参照]を押下することでGUIで指定することも可能です。
指定したフォルダとサブフォルダ内のすべてのファイルを再帰的に処理するか選択できます。
ファイル形式 ファイル形式の設定を指定できます。今回はCSVとしています。
他にも Binary, Excel, JSON, Parquet, XMLなどを指定することができます。


次は宛先の設定です。
設定
データストアの種類 ワークスペースを選択します。
ワークスペースのデータストアの種類 [Data Warehouse]を選択します。
Data Warehouse 対象のData Warehouseを選択します。
テーブルオプション [テーブルの自動作成]を選択します。事前に作成しておくといったことも可能です。
テーブル DWHに作成する[スキーマ].[テーブル名]を指定します。


データのコピーに失敗した際は[失敗]にステータスが遷移するように設定しています。
エラーメッセージやエラーコードを指定することができます。

②Create number_of_customers_in_each_city

今回データの変換では[スクリプト]を使用します。
設定で下記設定を行いました。
設定
データストアの種類 ワークスペースを選択します。
Data Warehouse 対象のData Warehouseを選択します。
スクリプト [Query]を指定します。


スクリプトの内容はSQLを貼り付けることができます。
## [dbo].[number_of_customers_in_each_city]の全データ削除。
delete [dbo].[number_of_customers_in_each_city];
## tmp01に変換データを格納し、[dbo].[number_of_customers_in_each_city]にinsertを行う。
with tmp01 as(
SELECT 
cus.CustomerKey,
cus.GeographyKey,
cus.FirstName,
cus.LastName,
cus.EmailAddress,
geo.City,
cus.AddressLine1,
cus.AddressLine2,
GETDATE() as update_time
FROM [dbo].[DimCustomer] as cus
INNER JOIN [dbo].[DimGeography] as geo
ON cus.GeographyKey = geo.GeographyKey)
insert into [dbo].[number_of_customers_in_each_city]
select City,count(*) as cn 
from tmp01
group by City
order by cn desc;
tmp01では、DimCustomerとDimGeographyを結合し、顧客の情報と都市の情報を結合した一時的なテーブルを作成しています。
insert文では、一時テーブルであるtmp01から都市ごとの顧客数を集計し、その結果を新しいテーブルnumber_of_customers_in_each_cityに挿入しています。

スクリプトも実行に失敗した際は[失敗]にステータスが遷移するように設定しています。

最後に

今回は前回の記事で検証を行ったデータパイプライン内の構成と設定内容についての詳細をご紹介いたしました。
データパイプラインでは他にも使用できるアクティビティがいくつも用意されているので、引き続き検証していきたいです。


中川 智文(2022年入社)
株式会社システムサポート フューチャーイノベーション事業本部 ソリューションサービス事業部所属
Oracle Database, SQL Serverを中心とした案件に従事