Python整合Spark实现数据分析
【1】版本与依赖
pyspark在jdk1.8环境下是3.5.3版本,也就是:
pip install pyspark==3.5.3
pyspark需要依赖MySQL驱动,两种方式:
1.会自动下载
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26") \
2.手动下载后放到项目根目录libs下:
mysql_jar_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'libs', 'mysql-connector-java-8.0.26.jar')
if os.path.exists(mysql_jar_path):
builder = builder.config("spark.jars", mysql_jar_path)
app.logger.info(f"使用本地 MySQL 驱动: {
mysql_jar_path}")
【2】扩展注入
applications/extension/init.py如下
# applications/extensions/__init__.py
from flask import Flask
import os
import sys
import traceback
from .init_sqlalchemy import db, ma, init_databases
from .init_upload import init_upload
from .init_migrate import init_migrate
# Spark相关 - 直接使用变量,不用property
spark = None
def init_spark(app):
"""初始化 SparkSession"""
global spark # 声明使用全局变量
try:
app.logger.info("开始初始化 SparkSession...")
# 检查 Java 环境
import subprocess
try:
java_version = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode()
app.logger.info(f"Java版本: {
java_version}")
except Exception as e:
app.logger.error(f"Java未找到: {
str(e)}")
# 不返回None,继续尝试,可能Java已配置但命令无法执行
# 尝试导入 PySpark
try:
from pyspark.sql import SparkSession
import pyspark
app.logger.info(f"PySpark 版本: {
pyspark.__version__}")
app.logger.info(f"PySpark 路径: {
pyspark.__file__}")
except ImportError as e:
app.logger.error(f"导入 PySpark 失败: {
str(e)}")
return None
# 设置 Windows 环境变量
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'
app.logger.info(f"PYSPARK_PYTHON: {
os.environ['PYSPARK_PYTHON']}")
app.logger.info(f"SPARK_LOCAL_IP: {
os.environ['SPARK_LOCAL_IP']}")
# 完整的 Spark 配置 - 包含资源控制和 MySQL 驱动
app.logger.info("正在创建 SparkSession...")
# 内存和资源控制配置
# 注意:反斜杠后面不能有注释,所以把所有注释放在配置之前或之后
builder = SparkSession.builder \
.appName("MovieAnalysis") \
.master("local[1]") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config("spark.ui.enabled", "false") \
.config("spark.sql.execution.arrow.pyspark.enabled", "false") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "10MB") \
.config("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false") \
.config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1") \
.config("spark.sql.shuffle.partitions", "2") \
.config("spark.sql.files.maxPartitionBytes", "128MB") \
.config("spark.driver.memory", "1g") \
.config("spark.executor.memory", "1g") \
.
© 版权声明
文章版权归作者所有,未经允许请勿转载。