HadoopのノートブックとSpark、SQLを使用して、ベイエリアの自転車の共有を分析する

Published on 01 October 2015 in Hive / Hue 3.10 / Impala / Metastore / Spark / Tutorial / Video - 6 minutes read - Last modified on 04 February 2020

以前の記事で 、私たちはベイエリアの自転車の共有データから移動データをシームレスにインデックス化、視覚化し、ダッシュボードに気象データを追加して、分析を補足するためにSparkを使用する方法について説明しました。

このチュートリアルでは、ベイエリアの自転車の共有(BABS: Bay Area Bike Share)システムの、ピークの使用量をより深く研究するためにノートブックアプリを使用します。

http://www.bayareabikeshare.com/datachallengeから最新のデータセットをダウンロードします 。本記事では2014年2月から2013年8月までのデータを使用しています。

メタストアアプリでCSVデータをインポートする

BABSデータセットには、駅、移動、利用できるかどうか、および気象データを含む4つのCSVが含まれています。Hueのメタストアのインポートウィザードを使用して、これらのデータセットを簡単にインポートし、CSVのヘッダからスキーマを推測してテーブルを作成することができます。

File Upload to Metastore

Metastore Sample

インポートウィザードは、移動データの「期間(duration)」フィールドを、TINYINTからINTに変更するような、任意のフィールド名や種類を上書きする機会を提供します。

Metastore Schema Fields

Hadoopノートブックでの対話的な分析

電光石火のImpalaクエリ

私たちはクラスタにデータをインポートしているので、データの大量処理を実行するための新しいノートブックを作成することができます。始めるには、Impalaを使用していくつかの簡単な探索クエリを実行します。

 

移動データに基づいて、最も人気のある出発駅の上位10件を見つけてみましょう:

SELECT startterminal, startstation, COUNT(1) AS count FROM bikeshare.trips GROUP BY startterminal, startstation ORDER BY count DESC LIMIT 10

impala_query

結果が返ってくれば、私たちはこのデータを簡単に可視化できます。棒グラフは簡単な COUNT … GROUP BY クエリでうまく動作します。

Impala Bar Graph

サンフランシスコのカルトレイン(Townsend at 4th: タウンセンド 4th)が最も一般的な出発駅だったようです。それでは、SFカルトレインのタウンゼント駅から出発する移動で、最も人気のあった終着駅はどこなのかを特定してみましょう。結果を地図上に可視化できるように緯度と経度の座標を取得します。

SELECT
 s.station_id,
 s.name,
 s.lat,
 s.long,
 COUNT(1) AS count
FROM `bikeshare`.`trips` t
JOIN `bikeshare`.`stations` s ON s.station_id = t.endterminal
WHERE t.startterminal = 70
GROUP BY s.station_id, s.name, s.lat, s.long
ORDER BY count DESC LIMIT 10

Bike Share Map

地図の可視化では、SFカルトレインの駅から出発する最もポピュラーな移動の目的地のほとんど駅にかなり近く、金融地区とSOMAの周りにクラスタ化されていることを示しています。

Hiveでの長時間実行するクエリ

長時間実行するSQLクエリ、またはHiveの組み込み関数を使用する必要があるクエリの場合、この分析を実行するために、ノートブックにHiveのスニペットを追加することができます。

Fカルトレイン駅の移動データをさらに掘り下げ、移動の合計数と、1時間単位でグループ化した移動の平均の時間(分)をみつけたいとしましょう。

移動データは出発時間をSTRINGとして保存しているので、インラインSQLクエリ内で時間を抽出するため、いくつかの文字列操作を適用する必要があります。外部のクエリで移動の回数と平均の期間を集約します。

SELECT
    hour,
    COUNT(1) AS trips,
    ROUND(AVG(duration) / 60) AS avg_duration
FROM (
    SELECT
        CAST(SPLIT(SPLIT(t.startdate, ' ')[1], ':')[0] AS INT) AS hour,
        t.duration AS duration
    FROM `bikeshare`.`trips` t
    WHERE
        t.startterminal = 70
        AND
        t.duration IS NOT NULL
    ) r
GROUP BY hour
ORDER BY hour ASC;

このデータはデータのいくつかの数値の面を生成するので、X軸に時間、Y軸に移動の数、散布の大きさで平均の期間を、散布グラフを使用して結果を可視化することができます。それでは、SFカルトレイン駅で、時間ごとに利用できる内訳を分析するために、別のHiveのスニペットを追加してみましょう:

Bike Share Scatter Plot

Let’s add another Hive snippet to analyze an hour-by-hour breakdown of availability at the SF Caltrain Station:

SELECT
  hour,
  ROUND(AVG(bikes_available)) AS avg_bikes,
  ROUND(AVG(docks_available)) AS avg_docks
FROM (
  SELECT
    r.time AS time,
    CAST(SUBSTR(r.time, 12, 2) AS INT) AS hour,
    CAST(r.bikes_available AS INT) AS bikes_available,
    CAST(r.docks_available AS INT) AS docks_available
  FROM `bikeshare`.`rebalancing` r
  JOIN `bikeshare`.`stations` s ON r.station_id = s.station_id
  WHERE
    r.station_id = 70
    AND
    SUBSTR(r.time, 15, 2) = '00'
  ) t
GROUP BY hour
ORDER BY hour ASC;

私たちは、自転車が利用できるのが午前6時から低下し、午後6時頃に回復している傾向があることを示す結果を、線グラフとして可視化します。

Bike Share Availability Line Graph

PySparkによる強力なデータ解析

ある時点で、あなたのデータ分析がSQLでのリレーショナルな分析の限界を超えるかもしれず、また、それ以上の表現力、本格的なAPIが必要になるかもしれません。

HueのSparkのノートブックは、ユーザーがカスタムのScala、Python (pyspark)、およびSpark APIを利用するRのコードで、調査のためのSQL分析を混在させることができます。

例えば、pysparkスニペットをオープンし、Hiveのwarehouseから直接旅行データをロードし、SFカルトレイン駅からスタートする旅行の平均数を特定するために、filter、map、およびreduceByKey操作のシーケンスを適用することができます:

trips = sc.textFile('/user/hive/warehouse/bikeshare.db/trips/201402_trip_data.csv')

trips = trips.map(lambda line: line.split(","))

station_70 = trips.filter(lambda x: x[4] == '70')

# Emit tuple of ((date, hour), 1)
trips_by_day_hour = station_70.map(lambda x: ((x[2].split()[0], x[2].split()[1].split(':')[0]), 1))

trips_by_day_hour = trips_by_day_hour.reduceByKey(lambda a, b: a+b)

# Emit tuple of (hour, count)
trips_by_hour = trips_by_day_hour.map(lambda x: (int(x[0][1]), x[1]))

avg_trips_by_hour = trips_by_hour.combineByKey( (lambda x: (x, 1)),
 (lambda x, y: (x[0] + y, x[1] + 1)),
 (lambda x, y: (x[0] + y[0], x[1] + y[1]))
 )
avg_trips_by_hour = avg_trips_by_hour.mapValues(lambda v : v[0] / v[1]) 

avg_trips_sorted = sorted(avg_trips_by_hour.collect())
%table avg_trips_sorted

Notebook pyspark bar graph

 

ご覧のように、Hueのノートブックアプリは、強力なツールを組み合わせ、簡単にインタラクティブなデータ分析と可視化ができるようになります。Sparkのノートブックの動作についてもっと知りたい方はLivy、Spark REST Job serverについてご覧いただき、ニューヨークのHadoop WorldとアムステルダムのSpark Summitでお会いしましょう!

ノートブックアプリへの数々の刺激的な改善をお楽しみに!そしていつものようにいつものように、コメントとフィードバックは hue-userメーリングリストや@gethueまでお気軽に!

 


役立つヒント

引用符のあるCSVデータのインポート

(201402_status_data.csvという名前の)BABSの利用可能性(リバランス)データは引用符を使用しています。このような場合は、BeeswaxエディタでHive内にテーブルを作成し、HiveのOpenCSV Row SERDEを使用するのが簡単です:

CREATE TABLE rebalancing(station_id int, bikes_available int, docks_available int, time string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = """,
"escapeChar" = "\"
)
STORED AS TEXTFILE;

その後、テーブルにCSVファイルをインポートするためにメタストアに戻ることができます。手作業でヘッダー行を削除する必要があることに注意してください。

Impalaのメタストアのキャッシュをリセットする

新しいデータベースやテーブルを作成したり、Impalaのスニペットでそれらのクエリを計画している場合は、INVALIDATE METADATA、最初にメタストアのキャッシュをリセットするコマンドの実行をお勧めします。そうしないと、データベースやテーブルが認識されないというエラーが発生することがあります。


comments powered by Disqus

More recent stories

10 June 2021
Hue4.10(新しいSQLエディタコンポーネント、REST API、小さなファイルのインポート、Slackアプリなど)がリリースされました!
Read More
29 May 2021
Sqlスクラッチパッドコンポーネントとパブリック REST API を使用して、5 分で独自の SQL エディター (BYOE) を構築する
Read More
26 May 2021
改善されたHueのImporter -- ファイルの選択、方言の選択、テーブルの作成
Read More