Following this guide you will learn things like:
- How to load file from Hadoop Distributed Filesystem directly info memory
- Moving files from local to HDFS
- Setup a Spark local installation using conda
- Loading data from HDFS to a Spark or pandas DataFrame
- Leverage libraries like: pyarrow, impyla, python-hdfs, ibis, etc.
First, let's import some libraries we will be using everywhere in this tutorial, specially pandas:
from pathlib import Path
import pandas as pd
import numpy as np
pyspark: Apache Spark
First of all, install findspark, and also pyspark in case you are working in a local computer. If you are following this tutorial in a Hadoop cluster, can skip pyspark install. For simplicity I will use conda virtual environment manager (pro tip: create a virtual environment before starting and do not break your system Python install!).
conda install -c conda-forge findspark -y
conda install -c conda-forge pyspark -y
Spark setup with findspark
Local and cluster mode, uncomment the line depending on your particular situation:
import findspark
# Local Spark
# findspark.init('/home/cloudera/miniconda3/envs/jupyter/lib/python3.7/site-packages/pyspark/')
# Cloudera cluster Spark
findspark.init(spark_home='/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/')
Getting PySpark shell
To get a PySpark shell:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example_app').master('local[*]').getOrCreate()
Let’s get existing databases. I assume you are familiar with Spark DataFrame API and its methods:
spark.sql("show databases").show()
+------------+
|databaseName|
+------------+
| __ibis_tmp|
| analytics|
| db1|
| default|
| fhadoop|
| juan|
+------------+
From pandas to Spark
First integration is about how to move data from pandas library, which is Python standard library to perform in-memory data manipulation, to Spark. First, let’s load a pandas DataFrame. This one is about Air Quality in Madrid (just to satisfy your curiosity, but not important with regards to moving data from one place to another one). You can download it here. Make sure you install pytables to read hdf5
data.
air_quality_df = pd.read_hdf('data/air_quality/air-quality-madrid/madrid.h5', key='28079008')
air_quality_df.head()
date | BEN | CH4 | CO | ... |
---|---|---|---|---|
2001-07-01 01:00:00 | 30.65 | NaN | 6.91 | ... |
2001-07-01 02:00:00 | 29.59 | NaN | 2.59 | ... |
2001-07-01 03:00:00 | 4.69 | NaN | 0.76 | ... |
2001-07-01 04:00:00 | 4.46 | NaN | 0.74 | ... |
2001-07-01 05:00:00 | 2.18 | NaN | 0.57 | ... |
Let’s make some changes to this DataFrame, like resetting datetime index to not lose information when loading into Spark. Datetime will also be transformed to string as Spark has some issues working with dates (related to system locale, timezones, and so on).
air_quality_df.reset_index(inplace=True)
air_quality_df['date'] = air_quality_df['date'].dt.strftime('%Y-%m-%d %H:%M:%S')
We can simply load from pandas to Spark with createDataFrame
:
air_quality_sdf = spark.createDataFrame(air_quality_df)
air_quality_sdf.dtypes
Once DataFrame is loaded into Spark (as air_quality_sdf
here), can be manipulated easily using PySpark methods:
air_quality_sdf.select('date', 'NOx').show(5)
+-------------------+------------------+
| date| NOx|
+-------------------+------------------+
|2001-07-01 01:00:00| 1017.0|
|2001-07-01 02:00:00|409.20001220703125|
|2001-07-01 03:00:00|143.39999389648438|
|2001-07-01 04:00:00| 149.3000030517578|
|2001-07-01 05:00:00|124.80000305175781|
+-------------------+------------------+
only showing top 5 rows
From Spark to Hive
To persist a Spark DataFrame into HDFS, where it can be queried using default Hadoop SQL engine (Hive), one straightforward strategy (not the only one) is to create a temporal view from that DataFrame:
air_quality_sdf.createOrReplaceTempView("air_quality_sdf")
Once the temporal view is created, it can be used from Spark SQL engine to create a real table using create table as select
. Before creating this table, I will create a new database called analytics
to store it:
sql_drop_table = """
drop table if exists analytics.pandas_spark_hive
"""
sql_drop_database = """
drop database if exists analytics cascade
"""
sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
sql_create_table = """
create table if not exists analytics.pandas_spark_hive
using parquet
as select to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""
print("dropping database...")
result_drop_db = spark.sql(sql_drop_database)
print("creating database...")
result_create_db = spark.sql(sql_create_database)
print("dropping table...")
result_droptable = spark.sql(sql_drop_table)
print("creating table...")
result_create_table = spark.sql(sql_create_table)
dropping database...
creating database...
dropping table...
creating table...
Can check results using Spark SQL engine, for example to select ozone pollutant concentration over time:
spark.sql("select * from analytics.pandas_spark_hive") \
.select("date_parsed", "O_3").show(5)
+-------------------+------------------+
| date_parsed| O_3|
+-------------------+------------------+
|2001-07-01 01:00:00| 9.010000228881836|
|2001-07-01 02:00:00| 23.81999969482422|
|2001-07-01 03:00:00|31.059999465942383|
|2001-07-01 04:00:00|23.780000686645508|
|2001-07-01 05:00:00|29.530000686645508|
+-------------------+------------------+
only showing top 5 rows
pyarrow: Apache Arrow
Apache Arrow, is a in-memory columnar data format created to support high performance operations in Big Data environments (it can be seen as the parquet format in-memory equivalent). It is developed in C++, but its Python API is amazing as you will be able to see now, but first of all, please install it:
conda install pyarrow -y
In order to establish a native communication with HDFS I will use the interface included in pyarrow. Only requirement is setting an environment variable pointing to the location of libhdfs
. Remember we are in a Cloudera environment. In case you are using Horton will have to find proper location (believe me, it exists).
Establish connection
import pyarrow as pa
import os
os.environ['ARROW_LIBHDFS_DIR'] = \
'/opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/lib64/'
hdfs_interface = pa.hdfs.connect(host='localhost', port=8020, user='cloudera')
List files in HDFS
Let’s list files persisted by Spark before. Remember that those files has been previously loaded in a pandas DataFrame from a local file and then loaded into a Spark DataFrame. Spark by default works with files partitioned into a lot of snappy
compressed files. In HDFS path you can identify database name (analytics
) and table name (pandas_spark_hive
):
hdfs_interface.ls('/user/cloudera/analytics/pandas_spark_hive/')
['/user/cloudera/analytics/pandas_spark_hive/_SUCCESS',
'/user/cloudera/analytics/pandas_spark_hive/part-00000-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
'/user/cloudera/analytics/pandas_spark_hive/part-00001-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
'/user/cloudera/analytics/pandas_spark_hive/part-00002-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
'/user/cloudera/analytics/pandas_spark_hive/part-00003-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
'/user/cloudera/analytics/pandas_spark_hive/part-00004-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
'/user/cloudera/analytics/pandas_spark_hive/part-00005-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
'/user/cloudera/analytics/pandas_spark_hive/part-00006-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet',
'/user/cloudera/analytics/pandas_spark_hive/part-00007-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet']
Reading parquet files directly from HDFS
To read parquet files (or a folder full of files representing a table) directly from HDFS, I will use PyArrow HDFS interface created before:
table = hdfs_interface \
.read_parquet('/user/cloudera/analytics/pandas_spark_hive/')
From HDFS to pandas (.parquet example)
Once parquet
files are read by PyArrow HDFS interface, a Table object is created. We can easily go back to pandas with method to_pandas
:
table_df = table.to_pandas()
table_df.head()
id | date_parsed | date | BEN | ... |
---|---|---|---|---|
0 | 2001-06-30 23:00:00 | 2001-07-01 01:00:00 | 30.65 | ... |
1 | 2001-07-01 00:00:00 | 2001-07-01 02:00:00 | 29.59 | ... |
2 | 2001-07-01 01:00:00 | 2001-07-01 03:00:00 | 4.69 | ... |
3 | 2001-07-01 02:00:00 | 2001-07-01 04:00:00 | 4.46 | ... |
4 | 2001-07-01 03:00:00 | 2001-07-01 05:00:00 | 2.18 | ... |
And that is basically where we started, closing the cycle Python -> Hadoop -> Python.
Uploading local files to HDFS
All kind of HDFS operations are supported using PyArrow HDFS interface, for example, uploading a bunch of local files to HDFS:
cwd = Path('./data/')
destination_path = '/user/cloudera/analytics/data/'
for f in cwd.rglob('*.*'):
print(f'uploading {f.name}')
with open(str(f), 'rb') as f_upl:
hdfs_interface.upload(destination_path + f.name, f_upl)
uploading sandp500.zip
uploading stations.csv
uploading madrid.h5
uploading diamonds_train.csv
uploading diamonds_test.csv
Let’s check if files have been uploaded properly, listing files in destination path:
hdfs_interface.ls(destination_path)
['/user/cloudera/analytics/data/diamonds_test.csv',
'/user/cloudera/analytics/data/diamonds_train.csv',
'/user/cloudera/analytics/data/madrid.h5',
'/user/cloudera/analytics/data/sandp500.zip',
'/user/cloudera/analytics/data/stations.csv']
From HDFS to pandas (.csv example)
For example, a .csv
file can be directly loaded from HDFS into a pandas DataFrame using open
method and read_csv
standard pandas function which is able to get a buffer as input:
diamonds_train = pd.read_csv(hdfs_interface.open('/user/cloudera/analytics/data/diamonds_train.csv'))
diamonds_train.head()
carat | cut | color | clarity | depth | table | price | x | y | z | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 1.21 | Premium | J | VS2 | 62.4 | 58.0 | 4268 | 6.83 | 6.79 | 4.25 |
1 | 0.32 | Very Good | H | VS2 | 63.0 | 57.0 | 505 | 4.35 | 4.38 | 2.75 |
2 | 0.71 | Fair | G | VS1 | 65.5 | 55.0 | 2686 | 5.62 | 5.53 | 3.65 |
3 | 0.41 | Good | D | SI1 | 63.8 | 56.0 | 738 | 4.68 | 4.72 | 3.00 |
4 | 1.02 | Ideal | G | SI1 | 60.5 | 59.0 | 4882 | 6.55 | 6.51 | 3.95 |
In case you are interested in all methods and possibilities this library has, please visit: https://arrow.apache.org/docs/python/filesystems.html#hdfs-api
python-hdfs: HDFS
Sometimes it is not possible to access libhdfs
native HDFS library (for example, performing analytics from a computer that is not part of the cluster). In that case, we can rely on WebHDFS (HDFS service REST API), it is slower and not suitable for heavy Big Data loads, but an interesting option in case of light workloads. Let’s install a WebHDFS Python API:
conda install -c conda-forge python-hdfs -y
Establish WebHDFS connection
To establish connection:
from hdfs import InsecureClient
web_hdfs_interface = InsecureClient('http://localhost:50070', user='cloudera')
List files in HDFS
Listing files is similar to using PyArrow interface, just use list
method and a HDFS path:
web_hdfs_interface.list('/user/cloudera/analytics/data')
['diamonds_test.csv',
'diamonds_train.csv',
'madrid.h5',
'sandp500.zip',
'stations.csv']
Uploading local files to HDFS using WebHDFS
More of the same thing:
cwd = Path('./data/')
destination_path = '/user/cloudera/analytics/data_web_hdfs/'
for f in cwd.rglob('*.*'):
print(f'uploading {f.name}')
web_hdfs_interface.upload(destination_path + f.name,
str(f),
overwrite=True)
uploading sandp500.zip
uploading stations.csv
uploading madrid.h5
uploading diamonds_train.csv
uploading diamonds_test.csv
Let’s check the upload is correct:
web_hdfs_interface.list(destination_path)
['diamonds_test.csv',
'diamonds_train.csv',
'madrid.h5',
'sandp500.zip',
'stations.csv']
Bigger files can also be handled by HDFS (with some limitations). Those files are from Kaggle Microsoft Malware Competition, and weighs a couple of GB each:
web_hdfs_interface.upload(destination_path + 'train.parquet', '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/train.pq', overwrite=True);
web_hdfs_interface.upload(destination_path + 'test.parquet', '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/test.pq', overwrite=True);
From HDFS to pandas using WebHDFS (.parquet example)
In this case, it is useful using PyArrow parquet
module and passing a buffer to create a Table object. After, a pandas DataFrame can be easily created from Table object using to_pandas
method:
from pyarrow import parquet as pq
from io import BytesIO
with web_hdfs_interface.read(destination_path + 'train.parquet') as reader:
microsoft_train = pq.read_table(BytesIO(reader.read())).to_pandas()
microsoft_train.head()
MachineIdentifier | ProductName | EngineVersion | ... | |
---|---|---|---|---|
0 | 0000028988387b115f69f31a3bf04f09 | win8defender | 1.1.15100.1 | ... |
1 | 000007535c3f730efa9ea0b7ef1bd645 | win8defender | 1.1.14600.4 | ... |
2 | 000007905a28d863f6d0d597892cd692 | win8defender | 1.1.15100.1 | ... |
3 | 00000b11598a75ea8ba1beea8459149f | win8defender | 1.1.15100.1 | ... |
4 | 000014a5f00daa18e76b81417eeb99fc | win8defender | 1.1.15100.1 | ... |
impyla: Hive + Impala SQL
Hive and Impala are two SQL engines for Hadoop. One is MapReduce based (Hive) and Impala is a more modern and faster in-memory implementation created and opensourced by Cloudera. Both engines can be fully leveraged from Python using one of its multiples APIs. In this case I am going to show you impyla
, which supports both engines. Let’s install it using conda, and do not forget to install thrift_sasl
0.2.1 version (yes, must be this specific version otherwise it will not work):
conda install impyla thrift_sasl=0.2.1 -y
Establishing connection
from impala.dbapi import connect
from impala.util import as_pandas
From Hive to pandas
API follow classic ODBC stantard which will probably be familiar to you. impyla
includes an utility function called as_pandas
that easily parse results (list of tuples) into a pandas DataFrame. Use it with caution, it has issues with certain types of data and is not very efficient with Big Data workloads. Fetching results both ways:
hive_conn = connect(host='localhost', port=10000, database='analytics', auth_mechanism='PLAIN')
with hive_conn.cursor() as c:
c.execute('SELECT * FROM analytics.pandas_spark_hive LIMIT 100')
results = c.fetchall()
with hive_conn.cursor() as c:
c.execute('SELECT * FROM analytics.pandas_spark_hive LIMIT 100')
results_df = as_pandas(c)
Raw results are pretty similar to those you can expect using, for example, Python standard sqlite3
library:
results[:2]
[(datetime.datetime(2001, 7, 1, 1, 0),
'2001-07-01 01:00:00',
30.649999618530273,
nan,
6.909999847412109,
42.63999938964844,
nan,
nan,
381.29998779296875,
1017.0,
9.010000228881836,
158.89999389648438,
nan,
47.5099983215332,
nan,
76.05000305175781),
(datetime.datetime(2001, 7, 1, 2, 0),
'2001-07-01 02:00:00',
29.59000015258789,
nan,
2.5899999141693115,
50.36000061035156,
nan,
nan,
209.5,
409.20001220703125,
23.81999969482422,
104.80000305175781,
nan,
20.950000762939453,
nan,
84.9000015258789)]
And its pandas DataFrame version:
results_df.head()
pandas_spark_hive.id | pandas_spark_hive.date_parsed | pandas_spark_hive.date | ... |
---|---|---|---|
0 | 2001-07-01 01:00:00 | 2001-07-01 01:00:00 | ... |
1 | 2001-07-01 02:00:00 | 2001-07-01 02:00:00 | ... |
2 | 2001-07-01 03:00:00 | 2001-07-01 03:00:00 | ... |
3 | 2001-07-01 04:00:00 | 2001-07-01 04:00:00 | ... |
4 | 2001-07-01 05:00:00 | 2001-07-01 05:00:00 | ... |
From Impala to pandas
Working with Impala follows the same pattern as Hive, just make sure you connect to correct port, default is 21050 in this case:
impala_conn = connect(host='localhost', port=21050)
with impala_conn.cursor() as c:
c.execute('show databases')
result_df = as_pandas(c)
result_df
name | comment | |
---|---|---|
0 | __ibis_tmp | |
1 | _impala_builtins | System database for Impala builtin functions |
2 | analytics | |
3 | db1 | |
4 | default | Default Hive database |
5 | fhadoop | |
6 | juan |
Ibis Framework: HDFS + Impala
Another alternative is Ibis Framework, a high level API to a relatively vast collection of datasources, including HDFS and Impala. It is build around the idea of using Python objects and methods to perform actions over those sources. Let’s install it the same way as the rest of libraries:
conda install ibis-framework -y
Let’s create both a HDFS and Impala interfaces (impala needs an hdfs interface object in Ibis):
import ibis
hdfs_ibis = ibis.hdfs_connect(host='localhost', port=50070)
impala_ibis = ibis.impala.connect(host='localhost',
port=21050,
hdfs_client=hdfs_ibis,
user='cloudera')
Once interfaces are created, actions can be performed calling methods, no need to write more SQL. If you are familiar to ORMs (Object Relational Mappers), this is not exactly the same, but the underlying idea is pretty similar.
impala_ibis.invalidate_metadata()
impala_ibis.list_databases()
['__ibis_tmp',
'_impala_builtins',
'analytics',
'db1',
'default',
'fhadoop',
'juan']
From Impala to pandas
Ibis natively works over pandas, so there is no need to perform a conversion. Reading a table returns a pandas DataFrame object:
table = impala_ibis.table('pandas_spark_hive',
database='analytics')
table_df = table.execute()
table_df
is a pandas DataFrame
object.
From pandas to Impala
Going from pandas to Impala can be made using Ibis selecting the database using Impala interface, setting up permissions (depending on your cluster setup) and using the method create
, passing a pandas DataFrame object as an argument:
analytics_db = impala_ibis.database('analytics')
hdfs_ibis.chmod('/user/cloudera/analytics', '777')
analytics_db.create_table(table_name='diamonds',
obj=pd.read_csv('data/diamonds/diamonds_train.csv'),
force=True)
Reading the newly created table back result in:
analytics_db.table('diamonds').execute().head(5)
id | carat | cut | color | clarity | depth | table | price | x | y | z |
---|---|---|---|---|---|---|---|---|---|---|
0 | 1.21 | Premium | J | VS2 | 62.4 | 58.0 | 4268 | 6.83 | 6.79 | 4.25 |
1 | 0.32 | Very Good | H | VS2 | 63.0 | 57.0 | 505 | 4.35 | 4.38 | 2.75 |
2 | 0.71 | Fair | G | VS1 | 65.5 | 55.0 | 2686 | 5.62 | 5.53 | 3.65 |
3 | 0.41 | Good | D | SI1 | 63.8 | 56.0 | 738 | 4.68 | 4.72 | 3.00 |
4 | 1.02 | Ideal | G | SI1 | 60.5 | 59.0 | 4882 | 6.55 | 6.51 | 3.95 |
Final words
Hope you liked this tutorial. Using those methods you can vanish the wall between local computing using Python and Hadoop distributed computing framework. In case you have any questions about the concepts explained here, please write a comment below or send me an email.