スクエニ ITエンジニア ブログ

機械学習での Feature Store の利用

Graph database を扱ってみた、前回 の続きです。

もともとは Spanner が graph database に対応 したというのを見たのが発端だったのですが、そこから Vertex AI samples > neo4j/graph_paysim (Jupyter Notebook) に入り、graph database に関しては見終わったものの、この例の後半にある Vertex AI を使用する部分には触れられなかったので、今回はそこを見てみましょう。という経緯です。

Spanner の graph database 機能には一切関係なくなっていますがまあいいでしょう!

サンプルコードが何をしているかを見る

前回記事で触れられなかった、Vertex AI samples > neo4j/graph_paysim (Jupyter Notebook) の後半で何をしているかを読んでみます。

Train and deploy a model with Vertex AI

  • 前半で作成した train.csv を GCS に置く
  • それを dataset とし、AutoML を使って train job を実行する。
    • prediction type は classification で target column は is_fraudster
  • model を paysim-prediction-model として保存、deploy する

Loading Data into Vertex AI Feature Store

  • ここで Feature Store が登場。まだ存在しなければつくる。
  • GCS に置いた train.csv から feature values を import する

Sending a prediction using features from the feature store

  • Feature Store から entity_id(=nodeId)="5" の entity の feature value (embedding) を read する
  • read した feature value に num_transactions total_dollar_amnt を追加
  • model endpoint に predict を request する

… ざっと並べましたが、だいたいこんな感じの処理に見えます。

実装してみる

せっかくなので今回も Google Cloud を使用せずにやってみます。使いたくないというわけではないのですが、あまり慣れていない機能をわからないままに使うと気づかないうちに費用がかかったり思わぬ問題が出たりということもあるので、まずは local での練習からはじめてみるのもよいものです。

※ 以下ご注意ください

  • 以降でご紹介するコードは、Microsoft Copilot によって生成されたものを目的に合わせて調整したものです
  • わたしはデータ分析の専門家・データエンジニアでは ありません。初歩的なところで誤りがあるかもしれません
  • IT エンジニアとして業務をサポートするため、領域の理解を深めることを目的として行っているものとご理解ください

以下、前回 の環境からそのまま動作させることを想定しています。古くない Python 環境であればそのまま同様に動くと思います。

Feature store にデータを保存する

ここでは Vertex AI Feature Store の代わりに OSS (Apache-2.0 license) の feature store である Feast を使います。

feast.dev より引用

以下のような directory 構成を想定しています。

workspace/
└─ feature_repo/
    ├─ data/ (以下を実行時に自動的に作成される)
    └─ feature_store.yaml

feature_store.yaml は以下のような内容にしておきます。

project: paysim
registry: data/registry.db
provider: local
online_store:
    type: sqlite
    path: data/online_store.db
entity_key_serialization_version: 2

online store は、ここでは SQLite を指定していますが、他に Redis や DynamoDB、Postgres など 様々対応している ようです。対応する機能にも差があるようですので、データ量や速度、ユーザー数、メンテナンス性などもあわせて検討するとよさそうです。

次に、csv を load して Feast へ保存するまでのコードです。

import pandas as pd
from feast import FeatureStore, Entity, FeatureView, Field, ValueType, FileSource
from feast.types import Int64, Float32, Array
from feast.data_format import ParquetFormat
from datetime import datetime

# CSV ファイルを読み込む
df = pd.read_csv('train.csv')

# 行番号 (1~) を entity key として追加
df['node_id'] = df.index + 1
# FeatureView には timestamp が必要なので dummy を入れる
df['event_timestamp'] = datetime.utcnow()

# column embedding_[0-15] をひとつの embedding にまとめる
emb_columns = [f'embedding_{i}' for i in range(16)]
df['embedding'] = df[emb_columns].apply(lambda row: row.values.tolist(), axis=1)
df.drop(columns=emb_columns, inplace=True)

# CSV ファイルを Parquet ファイルに変換する
df.to_parquet('train.parquet')

# Entity key
e1 = Entity(name="node_id", value_type=ValueType.INT64, description="Node ID")

# データソースの設定
file_source = FileSource(
    path="train.parquet",
    file_format=ParquetFormat()
)

# Feature View の設定
feature_view = FeatureView(
    name="paysim_train",
    entities=[e1],
    ttl=None,
    source=file_source,
    online=True,
    schema=[
        Field(name="is_fraudster", dtype=Int64),
        Field(name="num_transactions", dtype=Int64),
        Field(name="total_transaction_amnt", dtype=Float32),
        # alpha feature https://docs.feast.dev/reference/alpha-vector-database
        Field(name="embedding", dtype=Array(Float32)),
    ]
)

# Feature Store の設定
fs = FeatureStore(repo_path="feature_repo/")

# Entity と Feature View を登録
fs.apply([e1, feature_view])

# データをロード
fs.materialize_incremental(end_date=datetime.utcnow())

基本的には train.csv にあるデータに node_id をつけてそのまま Feast へ保存しているだけです。vector data を dtype=Array(Float32) として保存できるように最近なったというのを見かけてしまいそれを試したく、カラム調整など行っている関係で多少長くなってしまっています。

ちなみに train.csv は以下のようなかたちをしています。

is_fraudster,num_transactions,total_transaction_amnt,embedding_0,embedding_1,embedding_2,embedding_3,embedding_4,embedding_5,embedding_6,embedding_7,embedding_8,embedding_9,embedding_10,embedding_11,embedding_12,embedding_13,embedding_14,embedding_15
1,4,118919.1199857998,0.0,-3.641206660631724e-07,0.0,0.0,-3.641206660631724e-07,-3.641206660631724e-07,3.641206660631724e-07,3.641206660631724e-07,3.641206660631724e-07,0.0,0.0,0.0,-0.0749993696808815,-0.07500189542770386,0.0749993696808815,0.0749993696808815
1,80,7484459.618641878,0.02352503128349781,-0.023524967953562737,2.529249476523887e-09,5.642404232730769e-08,-0.02352495677769184,-6.034454003156497e-08,-3.0876390333389736e-09,0.02352495864033699,-6.277139918964281e-11,-5.563631333416197e-09,-1.257205228810676e-09,-6.579878686352458e-08,-1.7745630741119385,-1.7745740413665771,1.7745630741119385,1.7745630741119385

Feature store からデータを取得して利用する

以下のようにすると Feast からデータを取得できます。

import pandas as pd
from feast import FeatureStore

fs = FeatureStore(repo_path="feature_repo/")

entity_df = pd.DataFrame({"node_id": list(range(1, 3301))})
features = fs.get_online_features(
    features=[
        "paysim_train:is_fraudster",
        "paysim_train:num_transactions",
        "paysim_train:total_transaction_amnt",
        "paysim_train:embedding",
    ],
    entity_rows=entity_df.to_dict(orient="records")
).to_df()

print(features)

機械学習タスクに入ります。以下のように処理を続けます。

# ひとつの field に配列として保持されている embedding を独立した field にばらす
# embedding -> embedding_1, embedding_2, ...
embedding_df = pd.DataFrame(features['embedding'].tolist(), index=features.index)
embedding_df.columns = [f'embedding_{i+1}' for i in range(embedding_df.shape[1])]
features = pd.concat([features, embedding_df], axis=1).drop(columns=['embedding'])

# (A) (後述のための印)

# 特徴量とラベルの分割
X = features.drop('is_fraudster', axis=1)
y = features['is_fraudster']

# 3. データの分割
# データをトレーニングセットとテストセットに分割します。
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 4. モデルの選択とトレーニング
# ここでは、ランダムフォレストを例にとります。
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 5. モデルの評価
# テストセットを使ってモデルの性能を評価します。
from sklearn.metrics import accuracy_score, classification_report
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy}')
print(classification_report(y_test, y_pred))

# 6. モデルの保存
# トレーニングしたモデルを保存します。
import joblib
joblib.dump(model, 'fraud_detection_model.pkl')

基本的にコメントで書いているままですが、Feast から取得したデータを使って model の train をし、分割しておいた test data を使用して model の評価をしています。

実行する際には scikit-learn を install してください。

pip install scikit-learn

Feature store からデータを取得して利用する (predict のみする)

さっきまでの処理で model の作成は終わったので、今度はそれを利用するだけのコードです。

↑ のコードの # (A) の部分まではおなじで、以降に以下のように処理を続けます。

# 保存されたモデルの読み込み
model = joblib.load('fraud_detection_model.pkl')

# feature store から取得したデータから答え (label) を分離する
X = features.drop('is_fraudster', axis=1)
y_true = features['is_fraudster']

# 予測の実行
predictions = model.predict(X)

# 精度スコアと分類レポートの生成
accuracy = accuracy_score(y_true, predictions)
report = classification_report(y_true, predictions)

print(f'Accuracy: {accuracy}')
print('Classification Report:')
print(report)

また、Feast から取得するデータは以下のように適当な一部を選択するように変更します。

# entity_df = pd.DataFrame({"node_id": list(range(1, 3301))}) # ここを変更
entity_df = pd.DataFrame({"node_id": [0, 5, 10, 15, 18, 20]})
features = fs.get_online_features(
    ...

もともと答え (label) を持っているデータからそれを落として predict にかけるという自作自演的な内容になっていますが、データ利用のデモが目的ですのでまあこんなものでしょう。

上記を実行した結果のレポート出力は以下のようになります。

Accuracy: 1.0
Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2
           1       1.00      1.00      1.00         4

    accuracy                           1.00         6
   macro avg       1.00      1.00      1.00         6
weighted avg       1.00      1.00      1.00         6

100% 正確!まあ、自作自演なので…

おまけ : 生データを覗いてみる

今回は SQLite で保存しているので、どのようなデータになっているか中を覗いてみます。

$ sudo apt install sqlite3
  ...
$ sqlite3 ./feature_repo/data/online_store.db 
  ...

sqlite> SELECT name FROM sqlite_master WHERE type='table';
paysim_paysim_train

sqlite> .schema paysim_paysim_train
CREATE TABLE paysim_paysim_train (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp,  PRIMARY KEY(entity_key, feature_name));
CREATE INDEX paysim_paysim_train_ek ON paysim_paysim_train (entity_key);

sqlite> SELECT hex(entity_key), feature_name FROM paysim_paysim_train LIMIT 10 ;
020000006E6F64655F696404000000080000000000000000000000|embedding
020000006E6F64655F696404000000080000000000000000000000|is_fraudster
020000006E6F64655F696404000000080000000000000000000000|num_transactions
020000006E6F64655F696404000000080000000000000000000000|total_transaction_amnt
020000006E6F64655F696404000000080000000001000000000000|embedding
020000006E6F64655F696404000000080000000001000000000000|is_fraudster
020000006E6F64655F696404000000080000000001000000000000|num_transactions
020000006E6F64655F696404000000080000000001000000000000|total_transaction_amnt
020000006E6F64655F696404000000080000000002000000000000|embedding
020000006E6F64655F696404000000080000000002000000000000|is_fraudster

feature store の project が paysim で feature view が paysim_train なので paysim_paysim_train のような名前になっているようです。table の構造は単純で、中身を覗くのもわかりやすそうです。(SQLite 以外では見ていないので形が違う可能性があります)

まとめ

慣れないながら、機械学習のワークフローの中で feature store をどのように利用するかを見てみました。feature store は要するに、機械学習で使用するのに適した機能を持つデータベースなのかなという印象を持ちました。IT エンジニアとしては、こういったものを適切にセットアップして業務の効率化に寄与できるといいですね。

この記事を書いた人

記事一覧
SQUARE ENIXでは一緒に働く仲間を募集しています!
興味をお持ちいただけたら、ぜひ採用情報ページもご覧下さい!