SparkのRDDとcontextを共有するために Livy Spark REST Job Server APIを使用する方法

Published on 12 February 2016 in Hue 3.10 / Programming / Spark / Tutorial - 4 minutes read - Last modified on 04 February 2020

(元のブログ記事はこちらです)

Livyは任意の場所からApache Sparkを使用するためのオープンソースのRESTインターフェースです。LivyはローカルまたはYARNで実行される、Spark ContextのPython, Scala, Rのコード、あるいはプログラムのスニペットの実行をサポートしています。

エピソード1では、対話的なシェルAPIの使用方法を以前に説明しました 。

このフォローアップでは、より具体的な例のために実際のAPIを置いてみます:RDDまたはコンテキストの共有をシミュレートしてみましょう!

 

RESTサーバを起動する

これは以前の記事のセクションに記載されています 。

 

Spark Contextを共有する…

LivyはユーザーにリモートのSparkセッションを提供しています。Sparkのセッションは通常それぞれ(またはノートブックずつ)に一つあります:

# Client 1
curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"1 + 1"}'
# Client 2
curl localhost:8998/sessions/1/statements -X POST -H 'Content-Type: application/json' -d '{"code":"..."}'
# Client 3
curl localhost:8998/sessions/2/statements -X POST -H 'Content-Type: application/json' -d '{"code":"..."}'

livy_shared_contexts2

…なので、RDDを共有する

ユーザーが同じセッションを指していた場合、それらは同じSpark Contextで相互作用します。このコンテキストはそれ自身がいくつかのRDDを管理するでしょう。ユーザーは単純に同じセッションID、例えば0を使用してこれらのコマンドを発行する必要があります:

# Client 1
curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"1 <a href="https://cdn.gethue.com/uploads/2015/10/livy_multi_contexts.png"><img class="aligncenter size-large wp-image-3340" src="https://cdn.gethue.com/uploads/2015/10/livy_multi_contexts-1024x566.png" alt="livy_multi_contexts" width="1024" height="566" data-wp-pid="3340" /></a>+ 1"}'
# Client 2
curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"..."}'
# Client 3
curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"..."}' 

livy_multi_rdds2

 

…どこからでもアクセス

これで、シンプルに保ちながら、より洗練させることができます。メモリ内で共有されたキー/バリューストアをシミュレートしたい、と想像してみてください。あるユーザーはリモートのLivy PySparkセッションで名前を付けたRDDを開始することができ、誰もがそこにアクセスできます。

livy_shared_rdds_anywhere2

よりきれいに見せるため、それを数行のPythonでラップしてShareableRddで呼び出すことができます 。その後、ユーザーは直接セッションに接続して値をセット、あるいは取得できます。

class ShareableRdd():

def __init__(self):
self.data = sc.parallelize([])

def get(self, key):
return self.data.filter(lambda row: row[0] == key).take(1)

def set(self, key, value):
new_key = sc.parallelize([[key, value]])
self.data = self.data.union(new_key)

set()はshared RDDに値を追加しget()で取得します。

srdd = ShareableRdd()

srdd.set('ak', 'Alaska')
srdd.set('ca', 'California')
srdd.get('ak')

REST APIを直接使用している場合、単にこれらのコマンドでアクセスすることができます。

curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"srdd.get(\"ak\")"}'
{"id":3,"state":"running","output":null}
curl localhost:8998/sessions/0/statements/3
{"id":3,"state":"available","output":{"status":"ok","execution_count":3,"data":{"text/plain":"[['ak', 'Alaska']]"}}}

%jsonマジックキーワードを追加することにより、戻ってくるデータを直接JSON形式できれいに取得することさえもできます:

curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"data = srdd.get(\"ak\")\n%json data"}'
{"id":4,"state":"running","output":null}
curl localhost:8998/sessions/0/statements/4
{"id":4,"state":"available","output":{"status":"ok","execution_count":2,"data":{"application/json":[["ak","Alaska"]]}}}

注意

%json srdd.get("ak")の対応は道半ばです!

 

任意の言語からでも

LivyはシンプルなREST APIを提供しているので、任意の言語でのshared RDDの機能を提供するために、その周辺に少しのラッパーを素早く実装することができます。では通常のPythonでやってみましょう:

pip install requests
python
そして、Pythonシェルで単なるラッパーを宣言します。
import requests
import json

class SharedRdd():
"""
Perform REST calls to a remote PySpark shell containing a Shared named RDD.
"""
def __init__(self, session_url, name):
self.session_url = session_url
self.name = name

def get(self, key):
return self._curl('%(rdd)s.get("%(key)s")' % {'rdd': self.name, 'key': key})

def set(self, key, value):
return self._curl('%(rdd)s.set("%(key)s", "%(value)s")' % {'rdd': self.name, 'key': key, 'value': value})

def _curl(self, code):
statements_url = self.session_url + '/statements'
data = {'code': code}
r = requests.post(statements_url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
resp = r.json()
statement_id = str(resp['id'])
while resp['state'] == 'running':
r = requests.get(statements_url + '/' + statement_id)
resp = r.json()
return r.json()['data']

インスタンス化してShareableRddを含んだライブセッションを指すようにします :

states = SharedRdd('http://localhost:8998/sessions/0', 'states')

そして、RDDと透過的にやりとりします。

states.get('ak')
states.set('hi', 'Hawaii')

これです!この共有したRDDの例はREST APIの機能の優れたデモで、どの製品も共有されたRDDをサポートする主張しています。共有されたRDDは、ロード、保存、RDDのリストAPIを介しての探索もサポートできます。

すべてのコードの例はgithubで利用可能で、Livyの最新版で動作します。

Livy Spark REST APIについてさらに学習したい方はユーザーリストやアムステルダムのSpark Summitで気軽に直接質問を送ってください。!


comments powered by Disqus

More recent stories

10 June 2021
Hue4.10(新しいSQLエディタコンポーネント、REST API、小さなファイルのインポート、Slackアプリなど)がリリースされました!
Read More
29 May 2021
Sqlスクラッチパッドコンポーネントとパブリック REST API を使用して、5 分で独自の SQL エディター (BYOE) を構築する
Read More
26 May 2021
改善されたHueのImporter -- ファイルの選択、方言の選択、テーブルの作成
Read More