跳至主要内容

TFX_組件筆記本互動範例_鐵人賽示範

Open In Colab

29.TFX 組件筆記本互動實作

  • 此為鐵人幫範例,內容源自TFX官方範例
  • 本示範將使用 TensorFlow Extended (TFX) 各組件完成機械學習端對端任務,之後您也可以透過 Apache Airflow 及 Apache Beam 編排。
  • ML中繼資料 (ML Metadata) 是保存 TFX 各組件執行歷程的重要資料庫,數據可保存在 MySQL 或 SQLite 資料庫,在 Colab 示範時是暫存在 SQLite。

安裝與設置 TFX 環境

import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
!pip install -U tfx 
"請記得安裝完需重新啟動執行階段(Restart Runtime),再進行後續內容"

安裝完需重新啟動執行階段(Restart Runtime),再進行後續內容

import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

print(f'TensorFlow version: {tf.__version__}') # >= 2.5.1
print(f'TFX version: {tfx.__version__}') # >= 1.2.0
  • 設定 Pipeline 工作路徑
# TFX 模組安裝原始路徑_tfx_root
_tfx_root = tfx.__path__[0]

# 芝加哥計程車資料集路徑_taxi_root
_taxi_root = os.path.join(_tfx_root, 'examples/chicago_taxi_pipeline')

# 模型發布serving路徑_serving_model_dir
_serving_model_dir = os.path.join(tempfile.mkdtemp(), 'serving_model/taxi_simple')

# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)
  • 下載資料集,以芝加哥Taxi Trips dataset 進行示範,特徵如下,將使用這個數據集構建一個預測小費tips的模型。
pickup_community_areafaretrip_start_month
trip_start_hourtrip_start_daytrip_start_timestamp
pickup_latitudepickup_longitudedropoff_latitude
dropoff_longitudetrip_milespickup_census_tract
dropoff_census_tractpayment_typecompany
trip_secondsdropoff_community_areatips
# 建立/tmp/tfx-dataXXXXXXZ/檔案路徑_data_filepath
_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
# 查看資料集檔案確認內容
!head {_data_filepath}

創建 InteractiveContext

  • tfx.orchestration.experimental.interactive.interactive_context.InteractiveContext 允許在 notebook 環境中以互動方式查看 TFX 組件。
  • InteractiveContext 預設使用臨時的中繼資料。
    • 已有自己的 pipeline 可設定 pipe_root 參數。
    • 已有中繼資料庫可設定 metadata_connection_config 參數。
context = InteractiveContext()

互動式 TFX components 示範

  • 本範例將逐一示範各組建的工作,也透過 InteractiveContext() 逐一演示互動情形。

ExampleGen

  1. 將數據拆分為訓練集和評估集(默認情況下,2/3 訓練 + 1/3 評估)
  2. 將數據轉換為 tf.Example 格式(參閱說明)。
  3. 將數據複製到 _tfx_root 目錄中供其他組件訪問。
  • 本範例將 _data_root 的 CSV 資料集輸入至 ExampleGen

  • 注意:在這個 notebook 示範使用InteractiveContext.run()。在生產環境中,會預指定所有組件Pipeline以傳遞給協調器(請參閱構建 TFX 管道指南)。

example_gen = tfx.components.CsvExampleGen(input_base=_data_root)
context.run(example_gen, enable_cache=True)
  • ExampleGen 組件將輸出training examplesevaluation examples
artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)
  • 輸出前3筆資料觀察
# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(
example_gen.outputs['examples'].get()[0].uri,
'Split-train'
)

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [
os.path.join(train_uri, name)
for name in os.listdir(train_uri)
]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(
tfrecord_filenames,
compression_type="GZIP"
)

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
  • ExampleGen 已攝取資料,接續資料分析。

StatisticsGen

  • StatisticsGen 組件輸入 ExampleGen 數據後,將據以計算出資料集的統計數據。
  • StatisticsGenTFDV 模組功能之一。
  • context.run(statistics_gen) 觀察互動介面,.execution_id 版次累加至2,.component.inputs 組件輸入為 Examples , 輸出為 ExampleStatistics
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples'])
context.run(statistics_gen, enable_cache=True)
  • context.show(statistics_gen.outputs['statistics']) 如同 TFDV 工具以 Facets 視覺化統計資訊。
  • 可以觀察判讀可能異常的紅色值、資料分佈情形等。
context.show(statistics_gen.outputs['statistics'])

SchemaGen

  • SchemaGen組件會依據您的資料統計自動產生 Schema ,包含數據預期邊界、資料類型與屬性它還使用TensorFlow 數據驗證庫。
  • SchemaGen 同樣是 TFDV 模組功能之一。
  • 即便 Schema 自動生成已經很實用,但您仍應該會依據需求進行審查和修改。
  • SchemaGen 輸入為 StatisticsGen,默認情況下查看已拆分的訓練資料集。
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False)
context.run(schema_gen, enable_cache=True)
  • SchemaGen 執行後可透過 context.show(schema_gen.outputs['schema']) 查看 Schema 表格。
  • 表格呈現各特徵名稱、屬性、是否必須、所有值、Domain 及 邊界範圍等, 參見 SchemaGen 文件.。
context.show(schema_gen.outputs['schema'])

ExampleValidator

  • ExampleValidator 組件根據 Schema 的預期檢測數據中的異常。
  • ExampleValidator 同樣是 TFDV 模組功能之一。
  • ExampleValidator 的輸入是來自具有數據統計資訊的 StatisticsGen 以及具有數據定義 Schema 的 SchemaGen
  • ExampleValidator 的輸出 anomalies 是有無異常的判讀結果。
example_validator = tfx.components.ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
context.run(example_validator, enable_cache=True)
  • 執行 ExampleValidator 後可以產生異常情形的圖表,綠字 No anomalies found. 表示無異常。
  • 由於此為最初的數據集資訊,而且統計與 Schema 皆是由該數據產生,理應無異常。未來不同版次的資訊流可能會檢測出異常情形。
  • 資料驗證可用 Schema 保護未來數據,異常偵測可用於調試模型性能、了解數據如何隨時間演變以及識別數據錯誤。
context.show(example_validator.outputs['anomalies'])

Transform

  • Transform 組件為訓練和服務執行特徵工程。

  • Transform 使用TensorFlow Transform 模組。

  • Transform 輸入數據來自 ExampleGen 、 Schema 來自 SchemaGen ,以及自行定義如何進行特徵工程的模組。

  • 以下為自行定義的 Transform 程式碼範例,(有關 TensorFlow Transform API 的介紹,請參閱教程)。

  • Notebook 魔術指令 %%writefile ,可以將 cell 內的程式碼指定保存為檔案,該檔案可以用 Transform 組件將程式碼檔案做為模組輸入執行。

_taxi_constants_module_file = 'taxi_constants.py'
%%writefile {_taxi_constants_module_file}

NUMERICAL_FEATURES = ['trip_miles', 'fare', 'trip_seconds']

BUCKET_FEATURES = [
'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
'dropoff_longitude'
]
# tf.transform用於編碼每個特徵的桶數=10
FEATURE_BUCKET_COUNT = 10

CATEGORICAL_NUMERICAL_FEATURES = [
'trip_start_hour', 'trip_start_day', 'trip_start_month',
'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
'dropoff_community_area'
]

CATEGORICAL_STRING_FEATURES = [
'payment_type',
'company',
]

# tf.transform用於編碼VOCAB_FEATURES的詞彙術語數量=1000
VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized
# VOCAB_FEATURES are hashed.
OOV_SIZE = 10

# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def t_name(key):
"""
Rename the feature keys so that they don't clash with the raw keys when
running the Evaluator component.
Args:
key: The original feature key
Returns:
key with '_xf' appended
"""
return key + '_xf'
  • 接著編寫 preprocessing_fn 將原始數據轉換特徵。
_taxi_transform_module_file = 'taxi_transform.py'
%%writefile {_taxi_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft

# Imported files such as taxi_constants are normally cached, so changes are
# not honored after the first import. Normally this is good for efficiency, but
# during development when we may be iterating code it can be a problem. To
# avoid this problem during development, reload the file.
import taxi_constants
import sys
if 'google.colab' in sys.modules: # Testing to see if we're doing development
import importlib
importlib.reload(taxi_constants)

_NUMERICAL_FEATURES = taxi_constants.NUMERICAL_FEATURES
_BUCKET_FEATURES = taxi_constants.BUCKET_FEATURES
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_CATEGORICAL_NUMERICAL_FEATURES = taxi_constants.CATEGORICAL_NUMERICAL_FEATURES
_CATEGORICAL_STRING_FEATURES = taxi_constants.CATEGORICAL_STRING_FEATURES
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY


def _make_one_hot(x, key):
"""Make a one-hot tensor to encode categorical features.
Args:
X: A dense tensor
key: A string key for the feature in the input
Returns:
A dense one-hot tensor as a float list
"""
integerized = tft.compute_and_apply_vocabulary(x,
top_k=_VOCAB_SIZE,
num_oov_buckets=_OOV_SIZE,
vocab_filename=key, name=key)
depth = (
tft.experimental.get_vocabulary_size_by_name(key) + _OOV_SIZE)
one_hot_encoded = tf.one_hot(
integerized,
depth=tf.cast(depth, tf.int32),
on_value=1.0,
off_value=0.0)
return tf.reshape(one_hot_encoded, [-1, depth])


def _fill_in_missing(x):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
if not isinstance(x, tf.sparse.SparseTensor):
return x

default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)


def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
for key in _NUMERICAL_FEATURES:
# If sparse make it dense, setting nan's to 0 or '', and apply zscore.
outputs[taxi_constants.t_name(key)] = tft.scale_to_z_score(
_fill_in_missing(inputs[key]), name=key)

for key in _BUCKET_FEATURES:
outputs[taxi_constants.t_name(key)] = tf.cast(tft.bucketize(
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT, name=key),
dtype=tf.float32)

for key in _CATEGORICAL_STRING_FEATURES:
outputs[taxi_constants.t_name(key)] = _make_one_hot(_fill_in_missing(inputs[key]), key)

for key in _CATEGORICAL_NUMERICAL_FEATURES:
outputs[taxi_constants.t_name(key)] = _make_one_hot(tf.strings.strip(
tf.strings.as_string(_fill_in_missing(inputs[key]))), key)

# Was this passenger a big tipper?
taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
tips = _fill_in_missing(inputs[_LABEL_KEY])
outputs[_LABEL_KEY] = tf.where(
tf.math.is_nan(taxi_fare),
tf.cast(tf.zeros_like(taxi_fare), tf.int64),
# Test if the tip was > 20% of the fare.
tf.cast(
tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

return outputs
  • 將特徵工程程式傳遞給 Transform 組件轉換資料。
  • Transform組件將產生以下兩種類型的輸出:
    • transform_graph 是可以執行預處理操作的圖(此圖將包含在服務和評估模型中)。
    • transformed_examples 表示預處理的訓練和評估數據。
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_taxi_transform_module_file))
context.run(transform, enable_cache=True)
transform.outputs
  • 輸出的 transform_graph 同時指向包含3個子目錄的目錄。
    • transformed_metadata子目錄包含預處理數據的架構。
    • transform_fn子目錄包含實際的預處理圖。
    • metadata子目錄包含原始數據的架構。
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)
# Get the URI of the output artifact representing the transformed examples, which is a directory
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)

Trainer

  • Trainer組件負責訓練 TensorFlow 模型。

  • Trainer 預設使用 Estimator API ,如要使用 Keras API,您需要通過在 Trainer 的構造函數中設置來指定 custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor) ,參閱Generic Trainer

  • Trainer 的輸入來源:

    • 來自 SchemaGen 的 Schema。
    • 來自 Transform 的 graph。
    • 訓練參數。
    • 做為模組輸入的自定義程式碼。
  • 以下為用戶自定義模型代碼示範(參見 TensorFlow Keras API 介紹)。

  • 創立 taxi_trainer.py 之後將程式碼做為模組傳遞給 Trainer 組件並運行它來訓練模型。

_taxi_trainer_module_file = 'taxi_trainer.py'
%%writefile {_taxi_trainer_module_file}

from typing import Dict, List, Text

import os
import glob
from absl import logging

import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_transform import TFTransformOutput

# Imported files such as taxi_constants are normally cached, so changes are
# not honored after the first import. Normally this is good for efficiency, but
# during development when we may be iterating code it can be a problem. To
# avoid this problem during development, reload the file.
import taxi_constants
import sys
if 'google.colab' in sys.modules: # Testing to see if we're doing development
import importlib
importlib.reload(taxi_constants)

_LABEL_KEY = taxi_constants.LABEL_KEY

_BATCH_SIZE = 40


def _input_fn(file_pattern: List[Text],
data_accessor: tfx.components.DataAccessor,
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for tuning/training.

Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
tf_transform_output: A TFTransformOutput.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch

Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
tf_transform_output.transformed_metadata.schema)

def _get_tf_examples_serving_signature(model, tf_transform_output):
"""Returns a serving signature that accepts `tensorflow.Example`."""

# We need to track the layers in the model in order to save it.
# TODO(b/162357359): Revise once the bug is resolved.
model.tft_layer_inference = tf_transform_output.transform_features_layer()

@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
])
def serve_tf_examples_fn(serialized_tf_example):
"""Returns the output to be used in the serving signature."""
raw_feature_spec = tf_transform_output.raw_feature_spec()
# Remove label feature since these will not be present at serving time.
raw_feature_spec.pop(_LABEL_KEY)
raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
transformed_features = model.tft_layer_inference(raw_features)
logging.info('serve_transformed_features = %s', transformed_features)

outputs = model(transformed_features)
# TODO(b/154085620): Convert the predicted labels from the model using a
# reverse-lookup (opposite of transform.py).
return {'outputs': outputs}

return serve_tf_examples_fn


def _get_transform_features_signature(model, tf_transform_output):
"""Returns a serving signature that applies tf.Transform to features."""

# We need to track the layers in the model in order to save it.
# TODO(b/162357359): Revise once the bug is resolved.
model.tft_layer_eval = tf_transform_output.transform_features_layer()

@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
])
def transform_features_fn(serialized_tf_example):
"""Returns the transformed_features to be fed as input to evaluator."""
raw_feature_spec = tf_transform_output.raw_feature_spec()
raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
transformed_features = model.tft_layer_eval(raw_features)
logging.info('eval_transformed_features = %s', transformed_features)
return transformed_features

return transform_features_fn


def export_serving_model(tf_transform_output, model, output_dir):
"""Exports a keras model for serving.
Args:
tf_transform_output: Wrapper around output of tf.Transform.
model: A keras model to export for serving.
output_dir: A directory where the model will be exported to.
"""
# The layer has to be saved to the model for keras tracking purpases.
model.tft_layer = tf_transform_output.transform_features_layer()

signatures = {
'serving_default':
_get_tf_examples_serving_signature(model, tf_transform_output),
'transform_features':
_get_transform_features_signature(model, tf_transform_output),
}

model.save(output_dir, save_format='tf', signatures=signatures)


def _build_keras_model(tf_transform_output: TFTransformOutput
) -> tf.keras.Model:
"""Creates a DNN Keras model for classifying taxi data.

Args:
tf_transform_output: [TFTransformOutput], the outputs from Transform

Returns:
A keras Model.
"""
feature_spec = tf_transform_output.transformed_feature_spec().copy()
feature_spec.pop(_LABEL_KEY)

inputs = {}
for key, spec in feature_spec.items():
if isinstance(spec, tf.io.VarLenFeature):
inputs[key] = tf.keras.layers.Input(
shape=[None], name=key, dtype=spec.dtype, sparse=True)
elif isinstance(spec, tf.io.FixedLenFeature):
# TODO(b/208879020): Move into schema such that spec.shape is [1] and not
# [] for scalars.
inputs[key] = tf.keras.layers.Input(
shape=spec.shape or [1], name=key, dtype=spec.dtype)
else:
raise ValueError('Spec type is not supported: ', key, spec)

output = tf.keras.layers.Concatenate()(tf.nest.flatten(inputs))
output = tf.keras.layers.Dense(100, activation='relu')(output)
output = tf.keras.layers.Dense(70, activation='relu')(output)
output = tf.keras.layers.Dense(50, activation='relu')(output)
output = tf.keras.layers.Dense(20, activation='relu')(output)
output = tf.keras.layers.Dense(1)(output)
return tf.keras.Model(inputs=inputs, outputs=output)


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.

Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
tf_transform_output, _BATCH_SIZE)
eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
tf_transform_output, _BATCH_SIZE)

model = _build_keras_model(tf_transform_output)

model.compile(
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
metrics=[tf.keras.metrics.BinaryAccuracy()])

tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq='batch')

model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback])

# Export the model.
export_serving_model(tf_transform_output, model, fn_args.serving_model_dir)
trainer = tfx.components.Trainer(
module_file=os.path.abspath(_taxi_trainer_module_file),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=tfx.proto.TrainArgs(num_steps=10000),
eval_args=tfx.proto.EvalArgs(num_steps=5000))
context.run(trainer, enable_cache=True)

使用 TensorBoard 分析訓練

  • 檢視 'Format-Serving' 目錄。
model_artifact_dir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'Format-Serving')
pp.pprint(os.listdir(model_dir))
  • 可以透過 TensorBoard 分析模型訓練曲線。
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {model_run_artifact_dir}

Evaluator

  • Evaluator 組件可評估模型性能。
  • Evaluator 組件為 TensorFlow Model Analysis (TFMA) 模組功能。
  • Evaluator 可以設定門檻值以比較並選擇較佳的模型。這在生產管道設置中很有用,您可以每天自動訓練和驗證模型。
  • Evaluator 的輸入:
    • 輸入資料集來自 ExampleGen
    • 訓練模型來自 Trainer 和切片配置。切片配置允許您根據特徵值對指標進行切片(例如,您的模型在早上 8 點和晚上 8 點開始的出租車行程中表現如何?)。
  • 在此筆記本範例只訓練一個模型,所以Evaluator自動將模型標記為“Good”。
# Imported files such as taxi_constants are normally cached, so changes are
# not honored after the first import. Normally this is good for efficiency, but
# during development when we may be iterating code it can be a problem. To
# avoid this problem during development, reload the file.
import taxi_constants
import sys
if 'google.colab' in sys.modules: # Testing to see if we're doing development
import importlib
importlib.reload(taxi_constants)

eval_config = tfma.EvalConfig(
model_specs=[
# This assumes a serving model with signature 'serving_default'. If
# using estimator based EvalSavedModel, add signature_name: 'eval' and
# remove the label_key.
tfma.ModelSpec(
signature_name='serving_default',
label_key=taxi_constants.LABEL_KEY,
preprocessing_function_names=['transform_features'],
)
],
metrics_specs=[
tfma.MetricsSpec(
# The metrics added here are in addition to those saved with the
# model (assuming either a keras model or EvalSavedModel is used).
# Any metrics added into the saved model (for example using
# model.compile(..., metrics=[...]), etc) will be computed
# automatically.
# To add validation thresholds for metrics saved with the model,
# add them keyed by metric name to the thresholds map.
metrics=[
tfma.MetricConfig(class_name='ExampleCount'),
tfma.MetricConfig(class_name='BinaryAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.5}),
# Change threshold will be ignored if there is no
# baseline model resolved from MLMD (first run).
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10})))
]
)
],
slicing_specs=[
# An empty slice spec means the overall slice, i.e. the whole dataset.
tfma.SlicingSpec(),
# Data can be sliced along a feature column. In this case, data is
# sliced along feature column trip_start_hour.
tfma.SlicingSpec(
feature_keys=['trip_start_hour'])
])
# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.

# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = tfx.dsl.Resolver(
strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
model_blessing=tfx.dsl.Channel(
type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
'latest_blessed_model_resolver')
context.run(model_resolver, enable_cache=True)

evaluator = tfx.components.Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
eval_config=eval_config)
context.run(evaluator, enable_cache=True)
evaluator.outputs
context.show(evaluator.outputs['evaluation'])
  • 要切片顯示模型情形,需使用 TFMA 模組。
  • 在此示範將trip_start_hour切片視覺化,TFMA 支援許多其他可視化,例如公平指標和繪製模型性能的時間序列。要了解更多信息,請參閱教學
import tensorflow_model_analysis as tfma

# Get the TFMA output result path and load the result.
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
tfma_result = tfma.load_eval_result(PATH_TO_RESULT)

# Show data sliced along feature column trip_start_hour.
tfma.view.render_slicing_metrics(
tfma_result, slicing_column='trip_start_hour')
  • 通過門檻值的模型會得到祝福 blessing ,第一次預設會自動取得,之後持續訓練過程會將取得祝福的模型再上線。
blessing_uri = evaluator.outputs['blessing'].get()[0].uri
!ls -l {blessing_uri}
  • 現在也可以讀取經過驗證成功的紀錄。
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(PATH_TO_RESULT))

Pusher

  • Pusher 組件通常位於 TFX 管道末端。
  • Pusher 組件檢查模型是否已通過驗證,如果是,則將模型導出至 _serving_model_dir
  • Pusher 將以 SavedModel 格式導出您的模型。
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=_serving_model_dir)))
context.run(pusher, enable_cache=True)
pusher.outputs
push_uri = pusher.outputs['pushed_model'].get()[0].uri
model = tf.saved_model.load(push_uri)

for item in model.signatures.items():
pp.pprint(item)

終於完成 TFX 所有組件的示範!