Dataflow テンプレートを使用する既存のパイプラインの更新は現在サポートされていません。
Cloud Dataflow テンプレートのドキュメントには上記のような記載がされていますが、Dataflowテンプレートの更新を行えば当然既存のDataflowも更新したいのが普通です。
ただGCP側にそのような機構はないようなのでterraformで無理やり更新できるようにしました。
下記のshell scriptテンプレート(wait_until_subscribe.sh.tmplとしてterraform(terragrunt)側で使用してするもの)を用意しておいて、
1
2
3
4
5
6
7
8
9
| while true
do
currentState=$(curl -H "Authorization: Bearer "$(gcloud auth print-access-token) https://dataflow.googleapis.com/v1b3/projects/${project_id}/locations/asia-northeast1/jobs/${job_id} | jq .currentState)
if [ `echo $currentState | grep JOB_STATE_RUNNING` ]
then
break
fi
sleep 10
done
|
terraform(terragrunt)側で下記のようにprovisinerとlifecycleのcreate_before_destroyを使うことで、
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| resource "google_dataflow_job" "to_big_query_data_source" {
count = local.dataflow_count
provisioner "local-exec" {
command = templatefile("${path.module}/wait_until_subscribe.sh.tmpl", { job_id = self.id, project_id = var.project_id })
}
name = "to-${var.service_name}-big-query-data-source-${local.template_version}"
template_gcs_path = "${local.dataflow_template_storage_path}/LogPubSubMessageToBigQueryDataSource"
temp_gcs_location = "${var.temporary_data_storage_path}/tmp/"
parameters = {
inputTopic = var.subscription_topic_name
outputDatasetPath = local.outputDatasetPath
}
lifecycle {
create_before_destroy = true
}
on_delete = local.on_delete
machine_type = var.machine_type
}
|
新しいバージョンのDataflowの作成を行い、実際に起動するまで待ってから旧バージョンのDataflowの削除を行うようにしました。
こうすることでデータの欠損を生じさせずにDataflowの更新を行うことができます。
データの重複については、自分の使用方法(詳しくはこちら)では前段でidを付与しているので、idでユニークにしてから使用するようにしています。