これは TECHSCORE Advent Calendar 2019 の7日目の記事です。
Amazon Simple Storage Service という名の通り、S3 は提供されているサービス内容は非常にシンプルなのですが利用時の用途が多岐にわたります。
利用用途が多岐にわたるという事は、注意して管理しないとカオスに陥る可能性があり、「一時的に置いているつもりだった」「そのうちに対応するつもりだった」という野良データがいつの間にか業務に組み込まれてしまい簡単に手が出せなくなる事態に発展する場合もあります。
私が普段利用している AWS アカウントの中で最も運用歴の長いものにも、何やらよろしくないデータが存在することが分かりました。
AWS を適切に利用出来ているかコストの面から調査をしている担当者から「S3 のストレージ利用量、勢いよく増加している理由は何?」と聞かれて即答できず、調査してみると該当の S3 バケットを発見しました。
未圧縮のCSVデータがS3コストを押し上げ
このバケットには心当たりがありました。まさに「そのうちに対応するつもりだった」実績データが未圧縮の状態で置かれています。
業務で急ぎ必要になり抽出するようになった CSV データで、圧縮処理やライフサイクル設定をせずに運用を開始。その後、このデータを基に後続処理が誕生してしまい誰も手を付けられなくなってしまった野良データの1つでした。
ひっそりと運用されている割にははっきりと右肩上がりでストレージ利用量が増えていきます。
「そのうち」は今でしょ!ということで、良い機会とばかりコスト削減対応を実施するための調査を行いました。
目指す成果
- 運用にファイル圧縮を追加し、S3 ストレージ料金をコスト削減する
- 追加処理の工数をできるだけ小さくする
- 後続処理への影響をできるだけ小さくする
S3 のコストは適切に利用していれば安価なものなので(執筆時点の2019年12月では、S3標準ストレージの場合でも 最初の 50 TB/月は0.025USD/GB ※東京リージョンの場合)、修正に工数をかけても得られる削減効果は結局小さくなってしまいます。できるだけ手間暇かけずに実現するという視点を見失わないことが重要と認識しました。
現在運用されている後続処理では Apache Spark でデータをアクセス&集計しています。今後もこの利用方法は変わらない前提とします。
圧縮処理が簡単という点だけに注目すると gzip がよさそうだと思われました。AWS Glue では gzip は未解凍のまま処理可能なので後続処理への影響も小さくできそうです。
他の形式としては、Apache Parquet への変換も候補としました。2018年10月より Amazon Kinesis Data Firehose でサポートが開始されており、運用に組み込むための工数が小さく済みそうです。
Firehose を利用する場合のインプットデータは JSON である必要があり、CSV など別のデータ形式の場合は事前に変換が必要ですが、変換用 AWS Lambda を組み込んだ blueprint が公開されています。うまくはまれば No コーディングで対応可能となっています。
同じく列指向フォーマットである Apache ORC もサポートされていますが、今回は Parquet 変換を考えます。
サイズの計測:圧縮サイズ
現在の運用では100バイト程度から100メガバイト足らずの CSV ファイルが数多く存在する状態になっています。大きなサイズのファイルで試しても運用に即していないので、サンプルデータでは運用データに近いサイズになるようにしました。
同じ CSV ファイルを gzip 圧縮したもの、Parquet に変換したもの(Parquet はデフォルトで Snappy 圧縮されます)を用意し、サイズを計測しました。
サンプルデータ
データ長は長くありません。データ件数を増やすことでファイルサイズを大きくします。
項目 | データサンプル |
---|---|
id | 1 |
clientKey | aaa |
itemCode | 00001 |
itemCount | 1 |
itemPrice | 1000 |
createdAt | 2019-12-01T11:00:18.398000 |
計測結果
CSVデータ件数 | CSV | Parquet | gzip | ||
---|---|---|---|---|---|
ヘッダ+1件 | 116 | 4592 | 3958.62% | 136 | 117.24% |
ヘッダ+100件 | 6059 | 6178 | 101.96% | 852 | 14.06% |
ヘッダ+1千件 | 61827 | 24206 | 39.15% | 9170 | 14.83% |
ヘッダ+1万件 | 627721 | 78872 | 12.56% | 89030 | 14.18% |
ヘッダ+10万件 | 6276697 | 230500 | 3.67% | 889481 | 14.17% |
ヘッダ+100万件 | 62766465 | 1741330 | 2.77% | 8893356 | 14.17% |
ファイスサイズ(単位:バイト)と、CSV ファイルサイズに対して何パーセント程度のサイズになったのかを記載しています。
gzip が早々に頭打ちになったのに比べて、Parquet は継続的にサイズを小さくし続けていることが分かります。更にデータ件数が増えれば、gzip の10分の1、もとの CSV ファイルの100分の1程度にもなりそうです。ただし、ある程度のファイルサイズが無ければ恩恵を受けられないことも分かりました。
ベンチマーク:Spark 読み込みなど
処理前と処理後の時間の差分を所要時間として計測しました。(以下のコード例では時間計測部分のコードは省略しています。)
検証した環境の python と spark のバージョンは以下の通りです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
$ python --version Python 2.7.5 $ pyspark --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_171 Branch Compiled by user on 2019-08-27T21:21:38Z Revision Url Type --help for more information. |
データ読み込み
以下のような python コードで、dataframe にデータを読み込みました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.context import SparkContext from pyspark.sql import SQLContext sc = SparkContext() sqlContext = SQLContext(sc) # CSVの場合 df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("./data/csv/data.csv") # Parquetの場合( ディレクトリ配下に1つに Parquet ファイル ) df = sqlContext.read.parquet("./data/parquet/") |
データ件数 | CSV | Parquet |
---|---|---|
ヘッダ+10万件 | 4.958 秒 | 1.570 秒 |
ヘッダ+100万件 | 7.095 秒 | 2.857 秒 |
dataframe への読み込みは Parquet の圧勝でした。
現実的な運用では1件や2件のファイルを読み込むことは無いと思い小さなファイル件数では試していませんが、CSV と Parquet でさほど変わらない結果から件数が大きくなるにつれて差異が大きくなっていくのではないかと予想しています。
現在の運用では処理速度は重要な要件ではありませんが、あまりにも遅延するのは困ります。
簡単な集計処理やJOINの計測も実施しました。
結論から言いますと、準備したデータ件数程度だと差異はほとんど見られません。
念のため dataframe で処理する場合と SQL で処理する場合でも比較してみましたが、こちらも差異はほとんどありませんでした。
GroupBy + Count
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# dtaframe の場合 df.groupBy("clientKey").count().sort("clientKey").show() # SQL の場合 df.registerTempTable("sample_data") sqlContext.sql("SELECT clientKey, COUNT(*) FROM sample_data GROUP BY clientKey ORDER BY clientKey").show() # 実行結果 +----------+--------+ | clientKey|count(1)| +----------+--------+ | aaa| 100000| | bbb| 100000| | ccc| 100000| | ddd| 100000| | eee| 100000| | fff| 100000| | ggg| 100000| | hhh| 100000| | iii| 100000| | jjj| 100000| +----------+--------+ |
データ件数 | CSV | Parquet | ||
---|---|---|---|---|
dataframe | SQL | dataframe | SQL | |
ヘッダ+10万件 | 0.463 秒 | 0.587 秒 | 0.452 秒 | 0.562 秒 |
ヘッダ+100万件 | 0.645 秒 | 0.707 秒 | 0.423 秒 | 0.555 秒 |
乗算 + GroupBy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# dtaframe の場合 df.select("clientKey", df.itemCount * df.itemPrice).groupBy("clientKey").sum().sort("clientKey").show() # SQL の場合 sqlContext.sql("SELECT clientKey, SUM(itemCount * itemPrice) FROM sample_data GROUP BY clientKey ORDER BY clientKey").show() # 実行結果 +----------+----------------------------+ | clientKey|sum((itemCount * itemPrice))| +----------+----------------------------+ | aaa| 71923762200| | bbb| 71923762200| | ccc| 71923762200| | ddd| 71923762200| | eee| 71923762200| | fff| 71923762200| | ggg| 71923762200| | hhh| 71923762200| | iii| 71923762200| | jjj| 71923762200| +----------+----------------------------+ |
データ件数 | CSV | Parquet | ||
---|---|---|---|---|
dataframe | SQL | dataframe | SQL | |
ヘッダ+10万件 | 0.656 秒 | 0.611 秒 | 0.504 秒 | 0.477 秒 |
ヘッダ+100万件 | 0.883 秒 | 0.893 秒 | 0.620 秒 | 0.617 秒 |
JOIN + 乗算 + GroupBy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
#dataframe の場合 df_name = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("./data/csv/name.csv") df.join(df_name, df.clientKey == df_name.clientKey, "inner").select("clientName", df.itemCount * df.itemPrice).groupBy("clientName").sum().sort("clientName").show() # SQL の場合 df_name.registerTempTable("sample_data_name") sqlContext.sql(""" SELECT b.clientName, SUM(a.itemCount * a.itemPrice) FROM sample_data as a JOIN sample_data_name as b ON a.clientKey = b.clientKey GROUP BY b.clientName ORDER BY b.clientName """).show() # 実行結果 +---------------+----------------------------+ | clientName|sum((itemCount * itemPrice))| +---------------+----------------------------+ |クライアントaaa| 71923762200| |クライアントbbb| 71923762200| |クライアントccc| 71923762200| |クライアントddd| 71923762200| |クライアントeee| 71923762200| |クライアントfff| 71923762200| |クライアントggg| 71923762200| |クライアントhhh| 71923762200| |クライアントiii| 71923762200| |クライアントjjj| 71923762200| +---------------+----------------------------+ |
データ件数 | CSV | Parquet | ||
---|---|---|---|---|
dataframe | SQL | dataframe | SQL | |
ヘッダ+10万件 | 0.741 秒 | 0.759 秒 | 0.832 秒 | 0.834 秒 |
ヘッダ+100万件 | 1.109 秒 | 1.107 秒 | 1.123 秒 | 1.056 秒 |
まとめ
私たちのアカウントで S3 コストを押し上げる原因になっている未圧縮の CSV ファイルに対するコスト削減策としては、以下の対策が有効であることが分かりました。
- Parquet 変換を行う
- 細かなファイルを分散させて管理するのではなくて、ある程度大きなサイズになるようにする
他にも S3 のコスト削減策としてはストレージクラスの見直しも有効です。
Amazon S3 ストレージクラス
Amazon S3 の料金
既存のストレージ変更には移動の際に料金がかかるので注意が必要ですが、新しくバケットを追加する際には是非ともストレージクラスを使い分けたいと思います。
S3 のコスト削減をするためにとるべき修正方針は、運用によってケースバイケースまさに千差万別です。
今回のブログでお役に立てそうな情報はあまり無いかも知れませんが、同じ悩みを抱えておられる方の何かのヒントになれば幸いです。