juooo1117

Feature Engineering using TFX Pipeline and TensorFlow Transform 본문

Data Engineering/MLOps (Tensorflow Extended)

Feature Engineering using TFX Pipeline and TensorFlow Transform

Hyo__ni 2024. 1. 1. 18:06

Transform input data and train a model with a TFX pipeline.

이미 전처리 된 데이터 집합을 사용 이전 튜토리얼과는 달리, 지금은 처리되지 않은 raw-dataset을 사용한다.

 

1. Create a pipeline

  -  We will add Transform component.

  -  A Transform component requires input data from an ExampleGen component and a schema from a SchemaGen component, and produces a "transform graph(변환 그래프)". 

  -  The output will be used in a Trainer component. 

  -  Transform can optionally produce "transformed data" in addition, which is the materialized(구체화된) data after transformation.

  -  However, we will transform data during training in this tutorial without materialization of the intermediate transformed data. (훈련 중에 데이터를 변환)

 

따라서, 입력 데이터가 어떻게 변환될지(pre-processing 방법) 지정하기 위해 preprocessing_fn을 따로 정의해야 한다.

 

2. Write preprocessing and training code 

preprocessing_fn:

culmen_length_mm, body_mass_g와 같은 numeric feature의 경우 tft.scale_to_z_score 로 값을 정규화합니다.

label feature의 경우 tf.lookup.StaticHashTable을 사용해서 숫자 인덱스 값으로 변환한다.

def preprocessing_fn(inputs):
  outputs = {}
  for key in _FEATURE_KEYS:
    outputs[key] = tft.scale_to_z_score(inputs[key])

  table_keys = ['Adelie', 'Chinstrap', 'Gentoo']
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=table_keys,
      values=tf.cast(tf.range(len(table_keys)), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)
  outputs[_LABEL_KEY] = table.lookup(inputs[_LABEL_KEY])

  return outputs

run_fn: 

Transform 구성 요소의 변환 그래프(transform graph)를 사용하여 입력 데이터를 변환한다.

 

3. Write a pipeline definition

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)
  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

  # Import the schema.
  schema_importer = tfx.dsl.Importer(source_uri=schema_path,
                                     artifact_type=tfx.types.standard_artifacts.Schema).with_id('schema_importer')

  # Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(statistics=statistics_gen.outputs['statistics'],
                                                      schema=schema_importer.outputs['result'])

  # NEW: Transforms input data using preprocessing_fn in the 'module_file'.
  transform = tfx.components.Transform(examples=example_gen.outputs['examples'],
                                       schema=schema_importer.outputs['result'],
                                       materialize=False,
                                       module_file=module_file)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(module_file=module_file,
                                   examples=example_gen.outputs['examples'],
                                   transform_graph=transform.outputs['transform_graph'], # NEW: Pass transform_graph to the trainer.
                                   train_args=tfx.proto.TrainArgs(num_steps=100),
                                   eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(model=trainer.outputs['model'],
                                 push_destination=tfx.proto.PushDestination(
                                 filesystem=tfx.proto.PushDestination.Filesystem(
                                 base_directory=serving_model_dir)))

  components = [example_gen, statistics_gen, schema_importer, example_validator, transform, trainer, pusher]
  return tfx.dsl.Pipeline(pipeline_name=pipeline_name,
                          pipeline_root=pipeline_root,
                          metadata_connection_config=tfx.orchestration.metadata
                                                                  .sqlite_metadata_connection_config(metadata_path),
                          components=components)

pusher component는 학습된 모델을 SERVING_MODEL_DIR which is the serving_model/penguin-transform directory 로 push한다.

!find {SERVING_MODEL_DIR}

4. Run Inference

We can load the exported model(내보낸 모델) and try some inferences with a few examples.

# Find a model with the latest timestamp.
model_dirs = (item for item in os.scandir(SERVING_MODEL_DIR) if item.is_dir())
model_path = max(model_dirs, key=lambda i: int(i.name)).path

loaded_model = tf.keras.models.load_model(model_path)
inference_fn = loaded_model.signatures['serving_default']

# Prepare an example and run inference.
features = {
  'culmen_length_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[49.9])),
  'culmen_depth_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[16.1])),
  'flipper_length_mm': tf.train.Feature(int64_list=tf.train.Int64List(value=[213])),
  'body_mass_g': tf.train.Feature(int64_list=tf.train.Int64List(value=[5400])),
}
example_proto = tf.train.Example(features=tf.train.Features(feature=features))
examples = example_proto.SerializeToString()

result = inference_fn(examples=tf.constant([examples]))
print(result['output_0'].numpy())

결과 : [[-7.1045403 -6.1917973  0.683368 ]]

 → 세 번째 원소는 '젠투(Gentoo)' 펭귄에 해당하며, 세 원소 중 가장 클 것으로 예측했다.

 

[Practice Code]

https://github.com/juooo1117/practice_AI_Learning/blob/main/MLOps/FeatureEngineering_TFXPipeline.ipynb

'Data Engineering > MLOps (Tensorflow Extended)' 카테고리의 다른 글

Model analysis using TFX Pipeline  (1) 2024.01.01
Data validation using TFX Pipeline  (1) 2024.01.01
TFX(Tensorflow Extended)  (0) 2024.01.01