Kế Hoạch Thực Thi Spark

Giới Thiệu

Sử dụng hàm explain() trên bất kỳ đối tượng DataFrame nào để xem dòng dữ liệu của DataFrame (hoặc cách Spark sẽ thực thi truy vấn này). Kế hoạch thực thi được đọc từ trên xuống dưới, phần trên là kết quả cuối cùng và phần dưới là nguồn (hoặc các bước xử lý) của dữ liệu.

Trong ví dụ dưới đây, hãy xem từ khóa đầu tiên của mỗi dòng: FileScan, Exchange, Sort. Việc sắp xếp dữ liệu thực sự là một biến đổi rộng vì các hàng sẽ cần được so sánh với nhau.

pythonSao chép mãflight_df = spark\
.read\
.option("inferSchema", "true")\ # suy đoán schema, có nghĩa là chúng ta muốn Spark đoán schema tốt nhất cho DataFrame của chúng ta
.option("header", "true")\
.csv("/data/flight-data/csv/2015-summary.csv")
flight_df.sort("count").explain()

Kế Hoạch Thực Thi (Physical Plan)

plaintextSao chép mã== Kế Hoạch Thực Thi ==
* Sắp Xếp [count#195 ASC NULLS FIRST], true, 0
+- Trao Đổi rangepartitioning(count#195 ASC NULLS FIRST, 200)
   +- *FileScan csv [DEST_COUNTRY_NAME#193,ORIGIN_COUNTRY_NAME#194,count#195]

Dòng thứ nhất là kế hoạch thực thi của Spark, nó hiển thị các bước mà Spark sẽ thực hiện để thực hiện truy vấn này. Trong ví dụ này, chúng ta thấy một Sort và một Exchange. Sorting thực sự là một biến đổi rộng, vì các hàng sẽ cần được so sánh với nhau để sắp xếp. Exchange cũng là một bước rộng, vì nó liên quan đến việc tráo đổi dữ liệu giữa các phân vùng. Dưới đó là một FileScan, cho biết dữ liệu sẽ được đọc từ một tệp CSV.

DataFrame trong Spark

Tạo DataFrame

createDataFrame

Khi schema là một danh sách các tên cột, loại của mỗi cột sẽ được suy đoán từ dữ liệu.

Khi schema là None, nó sẽ cố gắng suy đoán schema (tên cột và loại) từ dữ liệu, dữ liệu này phải là một RDD của Row, namedtuple hoặc dict.

pythonSao chép mã# Tạo Spark Session
spark = SparkSession.builder\
        .getOrCreate()

# Tạo Spark Context
sc = spark.sparkContext

# Định nghĩa schema
schema = StructType([
   StructField("name", StringType(), True), # True - Có thể là null
   StructField("age", IntegerType(), True)
])

# Tạo DataFrame Spark từ RDD của Row, namedtuple hoặc dict
rdd = sc.parallelize([('Alice', 1)]) # Spark context -> để tạo một phiên làm việc song song
spark_df = spark.createDataFrame(rdd, schema)
spark_df.collect()
# [Row(name='Alice', age=1)]

# Tạo DataFrame Spark từ DataFrame Pandas
spark_df = spark.createDataFrame(pandas_df, schema)

Đây là cách tạo DataFrame trong Spark từ một loạt các nguồn dữ liệu khác nhau. Bạn có thể sử dụng RDD của Row, namedtuple hoặc dict để tạo DataFrame, hoặc tạo từ DataFrame Pandas. Điều này cho phép bạn làm việc với các loại dữ liệu khác nhau trong môi trường Spark một cách linh hoạt.