Python整合Spark实现数据分析

【1】版本与依赖

pyspark在jdk1.8环境下是3.5.3版本,也就是:

pip install pyspark==3.5.3

pyspark需要依赖MySQL驱动,两种方式:

1.会自动下载

 .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26") \

2.手动下载后放到项目根目录libs下:

mysql_jar_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'libs', 'mysql-connector-java-8.0.26.jar')
if os.path.exists(mysql_jar_path):
    builder = builder.config("spark.jars", mysql_jar_path)
    app.logger.info(f"使用本地 MySQL 驱动: {
     mysql_jar_path}")

【2】扩展注入

applications/extension/init.py如下

# applications/extensions/__init__.py
from flask import Flask
import os
import sys
import traceback
from .init_sqlalchemy import db, ma, init_databases
from .init_upload import init_upload
from .init_migrate import init_migrate
# Spark相关 - 直接使用变量,不用property
spark = None
def init_spark(app):
    """初始化 SparkSession"""
    global spark  # 声明使用全局变量
    try:
        app.logger.info("开始初始化 SparkSession...")
        # 检查 Java 环境
        import subprocess
        try:
            java_version = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode()
            app.logger.info(f"Java版本: {
     java_version}")
        except Exception as e:
            app.logger.error(f"Java未找到: {
     str(e)}")
            # 不返回None,继续尝试,可能Java已配置但命令无法执行
        # 尝试导入 PySpark
        try:
            from pyspark.sql import SparkSession
            import pyspark
            app.logger.info(f"PySpark 版本: {
     pyspark.__version__}")
            app.logger.info(f"PySpark 路径: {
     pyspark.__file__}")
        except ImportError as e:
            app.logger.error(f"导入 PySpark 失败: {
     str(e)}")
            return None
        # 设置 Windows 环境变量
        os.environ['PYSPARK_PYTHON'] = sys.executable
        os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
        os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'
        app.logger.info(f"PYSPARK_PYTHON: {
     os.environ['PYSPARK_PYTHON']}")
        app.logger.info(f"SPARK_LOCAL_IP: {
     os.environ['SPARK_LOCAL_IP']}")
        # 完整的 Spark 配置 - 包含资源控制和 MySQL 驱动
        app.logger.info("正在创建 SparkSession...")
        # 内存和资源控制配置
        # 注意:反斜杠后面不能有注释,所以把所有注释放在配置之前或之后
        builder = SparkSession.builder \
            .appName("MovieAnalysis") \
            .master("local[1]") \
            .config("spark.driver.host", "127.0.0.1") \
            .config("spark.driver.bindAddress", "127.0.0.1") \
            .config("spark.ui.enabled", "false") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "10MB") \
            .config("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false") \
            .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1") \
            .config("spark.sql.shuffle.partitions", "2") \
            .config("spark.sql.files.maxPartitionBytes", "128MB") \
            .config("spark.driver.memory", "1g") \
            .config("spark.executor.memory", "1g") \
            .
© 版权声明

相关文章