Dataflowテンプレートを使用していても既存のDatafowを(データの欠損を生じさせずに)更新する

Dataflow テンプレートを使用する既存のパイプラインの更新は現在サポートされていません。

Cloud Dataflow テンプレートのドキュメントには上記のような記載がされていますが、Dataflowテンプレートの更新を行えば当然既存のDataflowも更新したいのが普通です。

ただGCP側にそのような機構はないようなのでterraformで無理やり更新できるようにしました。

下記のshell scriptテンプレート(wait_until_subscribe.sh.tmplとしてterraform(terragrunt)側で使用してするもの)を用意しておいて、

1
2
3
4
5
6
7
8
9
while true
do
  currentState=$(curl -H "Authorization: Bearer "$(gcloud auth print-access-token) https://dataflow.googleapis.com/v1b3/projects/${project_id}/locations/asia-northeast1/jobs/${job_id} | jq .currentState)
  if [ `echo $currentState | grep JOB_STATE_RUNNING` ]
  then
    break
  fi
  sleep 10
done

terraform(terragrunt)側で下記のようにprovisinerとlifecycleのcreate_before_destroyを使うことで、

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
resource "google_dataflow_job" "to_big_query_data_source" {
  count = local.dataflow_count
  provisioner "local-exec" {
    command = templatefile("${path.module}/wait_until_subscribe.sh.tmpl", { job_id = self.id, project_id = var.project_id })
  }

  name              = "to-${var.service_name}-big-query-data-source-${local.template_version}"
  template_gcs_path = "${local.dataflow_template_storage_path}/LogPubSubMessageToBigQueryDataSource"
  temp_gcs_location = "${var.temporary_data_storage_path}/tmp/"
  parameters = {
    inputTopic        = var.subscription_topic_name
    outputDatasetPath = local.outputDatasetPath
  }

  lifecycle {
    create_before_destroy = true
  }

  on_delete    = local.on_delete
  machine_type = var.machine_type
}

新しいバージョンのDataflowの作成を行い、実際に起動するまで待ってから旧バージョンのDataflowの削除を行うようにしました。

こうすることでデータの欠損を生じさせずにDataflowの更新を行うことができます。

データの重複については、自分の使用方法(詳しくはこちら)では前段でidを付与しているので、idでユニークにしてから使用するようにしています。

Built with Hugo
テーマ StackJimmy によって設計されています。