Spark Session

Giới thiệu

SparkSession là gì?

Bước đầu tiên trong bất kỳ Ứng dụng Spark nào là tạo một SparkSession. SparkSession hoạt động như quá trình điều khiển để kiểm soát ứng dụng Spark. Khi bạn khởi đầu Spark trong chế độ tương tác, một SparkSession được tạo một cách ngầm định để quản lý Ứng dụng Spark. Tuy nhiên, khi khởi động nó thông qua một ứng dụng độc lập, bạn cần tự tạo đối tượng SparkSession trong mã ứng dụng của bạn.

SparkSession so với SparkContext

Trong Spark 1.X, có hai ngữ cảnh hiệu quả: SparkContext và SQLContext. SparkContext tập trung vào kiểm soát tinh tế hơn của các trừng tượng trung tâm của Spark, trong khi SQLContext xử lý các công cụ cấp cao hơn như Spark SQL. Tuy nhiên, trong Spark 2.X, cộng đồng đã hợp nhất hai API này thành SparkSession tập trung để quản lý tốt hơn và ngăn chặn xung đột ngữ cảnh. Mặc dù SparkContext và SQLContext vẫn có sẵn trong SparkSession, nhưng hiếm khi được sử dụng trực tiếp.

Tạo SparkSession

Một SparkSession là điểm vào tất cả các chức năng trong Spark và là bắt buộc nếu bạn muốn xây dựng một DataFrame trong PySpark.

pythonSao chép mãspark = SparkSession \
        .builder \
        .appName("SparkExample") \
        .enableHiveSupport() \  # Để làm việc với Hive metastore.
        .config("spark.memory.offHeap.enabled","true") \ # Bộ nhớ cache dữ liệu trong bộ nhớ off-heap để tránh lưu trữ nó trực tiếp trên đĩa.
        .config("spark.memory.offHeap.size","10g") \
        .getOrCreate() # Tạo Spark session với các cấu hình đã chỉ định.

Một ví dụ khác với nhiều cấu hình hơn:

pythonSao chép mãdef create_spark_session():
    '''
    Hàm để tạo một Spark session.
    Output:
        SparkSession
    '''
    try:
        spark = SparkSession \
             .builder \
             .appName("SparkExample") \
             .enableHiveSupport() \ # Để làm việc với Hive metastore.
             .config("spark.driver.memory", "9g") \
             .config("spark.executor.cores", "2") \
             .config("spark.executor.memory", "9g") \
             .config("spark.yarn.queue", "root.tnm.ada_analytics_tnm") \ # Cần cập nhật với tên Queue YARN.

        custom_spark_config = {
            "spark.driver.memory": "9g",
            "spark.executor.cores": "2",
            "spark.executor.memory": "9g",
            "spark.yarn.queue": "root.tnm.ada_analytics_tnm", # Cần cập nhật với tên Queue YARN.
         }

        for key, value in custom_spark_config.items():
            spark = spark.config(key, value)

        spark = spark.getOrCreate() # Tạo Spark session với các cấu hình đã chỉ định.

        return spark

    except RuntimeError as e:
        print('Lỗi khi tạo Spark context')
        print(e)

Hàm này tạo một Spark session với các cấu hình đã chỉ định, hỗ trợ kiểm soát tinh tế hơn đối với ứng dụng Spark.