DeepSeek辅助编写的Oracle dmp转SQL脚本和CSV文件工具
一开始让他直接写,结果能输出表结构,但无法导出数据,所以在网上找了一个Java的dmp解析器交给他翻译。
输出如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Oracle DMP文件解析工具 - 基于Java版本的二进制解析
支持导出表结构(SQL)和数据(CSV)
"""
import struct
import sys
import os
import csv
import re
from datetime import datetime
from collections import defaultdict
from typing import List, Tuple, Optional
class DMPItemType:
NULL = 0
STRING = 1
NUMBER = 2
DATE = 3
TIMESTAMP = 4
BLOB = 5
class DMPItem:
def __init__(self):
self.no_of_bytes = 0
self.bytes = []
self.item_type = None
self.string_value = None
self.number_value = None
self.date_value = None
self.timestamp_value = None
class DMPRow:
def __init__(self):
self.items = []
class DMPTable:
def __init__(self):
self.table_name = None
self.create_table_sql = None
self.field_names = []
self.data_rows = []
class OracleDMPParser:
def __init__(self, dmp_file_path):
self.dmp_file_path = dmp_file_path
self.after_insert_statement = False
self.current_table = None
self.current_table_obj = None
self.tables = []
self.finished = False
self.debug_to_stdout = False
self.debug_hex_dump_to_stdout = False
self.export_version = {'major': 0, 'minor': 0, 'patch': 0}
self.export_user = None
self.export_tablespace = None
self.insert_statement_line = None
def parse_file(self, table_names=None):
"""解析DMP文件"""
with open(self.dmp_file_path, 'rb') as f:
content = f.read()
# 按行分割(保留二进制)
lines = self.split_binary_lines(content)
for i, line in enumerate(lines):
self.parse_line(line, i, lines)
# 如果指定了表名,进行过滤
if table_names:
table_names_lower = [t.lower() for t in table_names]
self.tables = [t for t in self.tables if t.table_name.lower() in table_names_lower]
return self.tables
def split_binary_lines(self, data):
"""分割二进制行为行(0x0a作为分隔符)"""
lines = []
start = 0
for i, byte in enumerate(data):
if byte == 0x0a: # 换行符
lines.append(data[start:i+1])
start = i + 1
if start < len(data):
lines.append(data[start:])
return lines
def clean_table_name(self, name):
"""清理表名,移除引号"""
if name:
return name.strip('"')
return name
def parse_line(self, bytes_data, line_num, all_lines):
"""解析一行"""
if self.after_insert_statement and self.insert_statement_line is not None:
# 应该是数据行
if self.current_table_obj:
rows = self.parse_data_row(bytes_data, line_num, all_lines)
self.current_table_obj.data_rows.extend(rows)
# 清除标记,避免重复解析
if line_num > self.insert_statement_line + 1:
self.after_insert_statement = False
# 尝试解码为字符串(用于识别SQL语句)
try:
test_string = bytes_data.decode('utf-8', errors='ignore')
test_string = test_string.rstrip('x00nr')
except:
test_string = ""
if test_string.startswith('TABLE '):
table_name = test_string[6:].strip()
self.current_table = self.clean_table_name(table_name)
elif test_string.startswith('CREATE TABLE '):
if self.current_table:
self.current_table_obj = DMPTable()
self.current_table_obj.table_name = self.current_table
self.current_table_obj.create_table_sql = test_string
self.tables.append(self.current_table_obj)
elif test_string.startswith('INSERT INTO '):
if self.current_table:
self.after_insert_statement = True
self.insert_statement_line = line_num
# 解析字段名
pattern = r'INSERT INTO "([^"]+)" (([^)]+)) VALUES'
match = re.search(pattern, test_string)
if match:
table_name = self.clean_table_name(match.group(1))
fields_str = match.group(2)
field_names = [f.strip().strip('"') for f in fields_str.split(',')]
if self.current_table_obj and self.current_table_obj.table_name == table_name:
self.current_table_obj.field_names = field_names
elif 'EXPORT:V' in test_string:
# 解析版本
pattern = r'EXPORT:V(dd).(dd).(dd)'
match = re.search(pattern, test_string)
if match:
self.export_version['major'] = int(match.group(1))
self.export_version['minor'] = int(match.group(2))
self.export_version['patch'] = int(match.group(3))
elif test_string.startswith('U') and self.export_user is None and len(test_string) > 1:
self.export_user = test_string[1:].strip()
elif test_string.startswith('R') and self.export_tablespace is None and len(test_string) > 1:
self.export_tablespace = test_string[1:].strip()
def parse_data_row(self, bytes_data, line_num, all_lines) -> List[DMPRow]:
"""解析数据行"""
rows = []
if len(bytes_data) == 0:
return rows
if self.debug_hex_dump_to_stdout:
print(self.byte_array_to_string(bytes_data))
# 第一个字节是字段数量
field_count = bytes_data[0] & 0xff
if field_count == 0 or field_count > 100: # 合理性检查
return rows
# 获取列类型
column_types = []
for i in range(field_count):
offset = i * 4 + 2
if offset < len(bytes_data):
fc = bytes_data[offset] & 0xff
if fc == 1:
column_types.append(DMPItemType.STRING)
elif fc == 2:
column_types.append(DMPItemType.NUMBER)
elif fc == 12:
column_types.append(DMPItemType.DATE)
elif fc == 180:
column_types.append(DMPItemType.TIMESTAMP)
elif fc == 113:
column_types.append(DMPItemType.BLOB)
else:
column_types.append(None)
else:
column_types.append(None)
# 找到数据开始位置
skip_bytes = field_count * 4
for i in range(len(bytes_data)-1, 0, -1):
if i > 4 and bytes_data[i] == 0 and bytes_data[i-1] == 0 and bytes_data[i-2] == 0 and bytes_data[i-3] == 0:
skip_bytes = i - 4
break
if self.debug_to_stdout:
print(f"Skipping {skip_bytes} bytes")
# 解析数据
current_row = DMPRow()
cur = None
take_byte_count = 0
include_next_batch = False
null_count = 0
has_started = False
row_count = 0
if True or self.export_version['major'] < 8:
has_started = True
rows.append(current_row)
skip_bytes += 1
i = skip_bytes
while i < len(bytes_data):
byte_val = bytes_data[i] & 0xff
if byte_val == 0:
null_count += 1
else:
null_count = 0
if null_count == 4:
if i > 0 and len(bytes_data) > i+4 and bytes_data[i] == 0 and bytes_data[i+1] == 0 and (bytes_data[i+2] & 0xff) == 0xff and (bytes_data[i+3] & 0xff) == 0xff and (bytes_data[i+4] & 0xff) == 0x0a:
break
else:
has_started = True
elif has_started:
if len(bytes_data) > i+1 and byte_val == 0xfe and (bytes_data[i+1] & 0xff) == 0xff:
# NULL值
cur = DMPItem()
cur.item_type = DMPItemType.NULL
cur.no_of_bytes = 2
cur.bytes.extend([bytes_data[i], bytes_data[i+1]])
current_row.items.append(cur)
i += 1
elif i > 0 and len(bytes_data) > i+4 and bytes_data[i] == 0 and bytes_data[i+1] == 0 and (bytes_data[i+2] & 0xff) == 0xff and (bytes_data[i+3] & 0xff) == 0xff:
# 结束标记
has_started = False
elif i > 0 and bytes_data[i] == 0 and bytes_data[i-1] == 0:
# 新行标记
if current_row.items:
current_row = DMPRow()
rows.append(current_row)
elif take_byte_count > 0:
cur.bytes.append(bytes_data[i])
take_byte_count -= 1
if take_byte_count == 0:
if self.export_version['major'] < 8 and len(current_row.items) == field_count:
current_row = DMPRow()
rows.append(current_row)
elif len(bytes_data) > i+1 and byte_val > 0 and (bytes_data[i+1] & 0xff) == 0:
# 新列数据开始
take_byte_count = byte_val
if include_next_batch:
cur.no_of_bytes += take_byte_count
include_next_batch = False
else:
cur = DMPItem()
cur.no_of_bytes = take_byte_count
current_row.items.append(cur)
i += 1
elif take_byte_count == 0 and len(bytes_data) > i+1 and byte_val > 0 and (bytes_data[i+1] & 0xff) == 0x80:
# 新列数据开始(特殊情况)
cur = DMPItem()
take_byte_count = byte_val
cur.no_of_bytes = take_byte_count
include_next_batch = True
current_row.items.append(cur)
i += 1
i += 1
# 过滤空行
rows = [r for r in rows if r.items]
# 转换数据值
for row in rows:
for col, item in enumerate(row.items):
if col >= len(column_types) or column_types[col] is None:
continue
# 截取正确的字节数
if item.no_of_bytes > 0:
item.bytes = item.bytes[:min(len(item.bytes), item.no_of_bytes)]
if item.item_type == DMPItemType.NULL:
pass
elif column_types[col] == DMPItemType.NUMBER:
self.parse_number(item)
elif column_types[col] == DMPItemType.STRING:
self.parse_string(item)
elif column_types[col] == DMPItemType.DATE:
self.parse_date(item)
elif column_types[col] == DMPItemType.TIMESTAMP:
self.parse_timestamp(item)
return rows
def parse_number(self, item: DMPItem):
"""解析数字"""
item.item_type = DMPItemType.NUMBER
if len(item.bytes) == 0:
return
bytes_list = list(item.bytes)
# 检查是否为0
if len(bytes_list) > 0 and (bytes_list[0] & 0xff) == 0x80:
item.number_value = 0.0
item.string_value = "0"
return
# 检查是否为负数
is_negative = False
if len(bytes_list) > 1 and ((bytes_list[-1] & 0xff) == 0x66 or (bytes_list[0] & 0xff) < 0xa0):
is_negative = True
# 移除第一个和最后一个字节(如果是0x66)
if (bytes_list[-1] & 0xff) == 0x66:
bytes_list = bytes_list[1:-1]
else:
bytes_list = bytes_list[1:]
int_part = -1 * ((bytes_list[0] & 0xff) - 0x3f)
if int_part > 0:
while len(bytes_list) < int_part:
bytes_list.append(0x65)
int_part_bytes = bytes_list[:int_part]
v = "-" + ''.join([f"{-1*(b-101):02d}" for b in int_part_bytes])
if len(bytes_list) > int_part:
dec_part_bytes = bytes_list[int_part:]
v += "." + ''.join([f"{-1*(b-101):02d}" for b in dec_part_bytes])
else:
v = "-0." + "00" * (-1 * int_part) + ''.join([f"{-1*(b-101):02d}" for b in bytes_list])
else:
# 正数
int_part = (bytes_list[0] & 0xff) - 0xc0
bytes_list = bytes_list[1:]
if int_part > 0:
while len(bytes_list) < int_part:
bytes_list.append(1)
int_part_bytes = bytes_list[:int_part]
v = ''.join([f"{b-1:02d}" for b in int_part_bytes])
if len(bytes_list) > int_part:
dec_part_bytes = bytes_list[int_part:]
v += "." + ''.join([f"{b-1:02d}" for b in dec_part_bytes])
else:
v = "0." + "00" * (-1 * int_part) + ''.join([f"{b-1:02d}" for b in bytes_list])
item.string_value = v
try:
if v:
# 移除前导零
if '.' in v:
parts = v.split('.')
if parts[0].startswith('0') and len(parts[0]) > 1:
parts[0] = parts[0].lstrip('0')
v = '.'.join(parts)
item.number_value = float(v)
except:
pass
def parse_string(self, item: DMPItem):
"""解析字符串"""
item.item_type = DMPItemType.STRING
if item.bytes:
try:
# 尝试解码,跳过无效字节
item.string_value = bytes(item.bytes).decode('utf-8', errors='ignore')
# 移除空字符
item.string_value = item.string_value.rstrip('x00')
except:
item.string_value = ''.join([f"\x{b:02x}" for b in item.bytes])
def parse_date(self, item: DMPItem):
"""解析日期"""
item.item_type = DMPItemType.DATE
if len(item.bytes) >= 7:
year = (item.bytes[0] - 100) * 100 + (item.bytes[1] - 100)
month = item.bytes[2]
day = item.bytes[3]
hour = item.bytes[4] - 1
minute = item.bytes[5] - 1
second = item.bytes[6] - 1
try:
item.date_value = datetime(year, month, day, hour, minute, second)
item.string_value = item.date_value.strftime('%Y-%m-%d %H:%M:%S')
except:
item.string_value = f"{year:04d}-{month:02d}-{day:02d}{hour:02d}:{minute:02d}:{second:02d}"
def parse_timestamp(self, item: DMPItem):
"""解析时间戳"""
item.item_type = DMPItemType.TIMESTAMP
if len(item.bytes) >= 7:
year = (item.bytes[0] - 100) * 100 + (item.bytes[1] - 100)
month = item.bytes[2]
day = item.bytes[3]
hour = item.bytes[4] - 1
minute = item.bytes[5] - 1
second = item.bytes[6] - 1
nanos = 0
if len(item.bytes) >= 11:
nanos_bytes = item.bytes[7:11]
nanos = struct.unpack('>I', bytes(nanos_bytes))[0]
try:
dt = datetime(year, month, day, hour, minute, second)
item.timestamp_value = dt.replace(microsecond=nanos//1000)
item.string_value = item.timestamp_value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
except:
item.string_value = f"{year:04d}-{month:02d}-{day:02d}{hour:02d}:{minute:02d}:{second:02d}.{nanos:09d}"
@staticmethod
def byte_array_to_string(bytes_data):
return ', '.join([f"{b:02x}" for b in bytes_data])
def export_schema(self, output_dir):
"""导出表结构"""
os.makedirs(output_dir, exist_ok=True)
schema_file = os.path.join(output_dir, 'schema.sql')
with open(schema_file, 'w', encoding='utf-8') as f:
f.write("-- Oracle DMP 导出的表结构n")
f.write(f"-- 导出时间: {datetime.now()}n")
f.write(f"-- Oracle版本: V{self.export_version['major']:02d}.{self.export_version['minor']:02d}.{self.export_version['patch']:02d}n")
if self.export_user:
f.write(f"-- 导出用户: {self.export_user}n")
if self.export_tablespace:
f.write(f"-- 表空间: {self.export_tablespace}n")
f.write("n")
for table in self.tables:
if table.create_table_sql:
# 只写入CREATE TABLE语句,不包含INSERT
lines = table.create_table_sql.split('n')
create_lines = []
for line in lines:
if line.strip() and not line.strip().startswith('INSERT'):
create_lines.append(line)
elif line.strip().startswith('CREATE'):
create_lines.append(line)
clean_sql = 'n'.join(create_lines)
f.write(f"-- 表: {table.table_name}n")
f.write(clean_sql)
f.write(";nn")
print(f"表结构已导出到: {schema_file}")
print(f"共导出 {len([t for t in self.tables if t.create_table_sql])} 个表")
def export_data(self, output_dir):
"""导出数据到CSV"""
os.makedirs(output_dir, exist_ok=True)
data_dir = os.path.join(output_dir, 'data')
os.makedirs(data_dir, exist_ok=True)
total_rows = 0
for table in self.tables:
if not table.data_rows:
print(f"表 {table.table_name}: 无数据")
continue
# 清理表名用于文件名
safe_table_name = re.sub(r'[<>:"/\|?*]', '_', table.table_name)
csv_file = os.path.join(data_dir, f"{safe_table_name}.csv")
with open(csv_file, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
# 写入列名
if table.field_names:
headers = table.field_names
elif table.data_rows and table.data_rows[0].items:
headers = [f"COL{i+1}" for i in range(len(table.data_rows[0].items))]
else:
headers = []
if headers:
writer.writerow(headers)
# 写入数据
for row in table.data_rows:
row_values = []
for item in row.items:
if item.item_type == DMPItemType.NULL:
row_values.append('')
elif item.string_value is not None:
row_values.append(item.string_value)
elif item.number_value is not None:
# 处理数字格式
if item.number_value == int(item.number_value):
row_values.append(str(int(item.number_value)))
else:
row_values.append(str(item.number_value))
elif item.date_value is not None:
row_values.append(item.date_value.strftime('%Y-%m-%d %H:%M:%S'))
elif item.timestamp_value is not None:
row_values.append(item.timestamp_value.strftime('%Y-%m-%d %H:%M:%S.%f'))
else:
row_values.append('')
# 确保列数一致
while len(row_values) < len(headers):
row_values.append('')
while len(row_values) > len(headers):
row_values.pop()
writer.writerow(row_values)
print(f"表 {table.table_name}: {len(table.data_rows)} 行数据 -> {csv_file}")
total_rows += len(table.data_rows)
print(f"n共导出 {len([t for t in self.tables if t.data_rows])} 个表,{total_rows} 行数据")
def export_all(self, output_dir):
"""导出所有内容"""
self.export_schema(output_dir)
self.export_data(output_dir)
def main():
print("=" * 60)
print("Oracle DMP文件解析工具 (二进制解析版)")
print("=" * 60)
# 获取文件路径
if len(sys.argv) > 1:
dmp_file = sys.argv[1]
else:
dmp_file = input("请输入DMP文件路径: ").strip()
if not os.path.exists(dmp_file):
print(f"文件不存在: {dmp_file}")
return
# 获取输出目录
if len(sys.argv) > 2:
output_dir = sys.argv[2]
else:
output_dir = input("请输入输出目录(默认为当前目录下的output文件夹): ").strip()
if not output_dir:
output_dir = "output"
# 创建解析器
parser = OracleDMPParser(dmp_file)
try:
print(f"n正在解析文件: {dmp_file}")
# 解析文件
tables = parser.parse_file()
print(f"找到 {len(tables)} 个表")
for table in tables:
print(f" - {table.table_name}: {len(table.data_rows)} 行数据")
if not tables:
print("未找到任何表")
return
# 选择操作
print("n请选择操作:")
print("1. 导出表结构(SQL)")
print("2. 导出数据(CSV)")
print("3. 导出全部(表结构+数据)")
choice = input("请输入选择 (1/2/3,默认3): ").strip() or "3"
# 执行操作
if choice == "1":
parser.export_schema(output_dir)
elif choice == "2":
parser.export_data(output_dir)
else:
parser.export_all(output_dir)
print("n解析完成!")
except Exception as e:
print(f"解析过程中出错: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
用java代码压缩包附带的exptest.dmp解析的运行结果
C:d>python dmpbin2sql2.py
============================================================
Oracle DMP文件解析工具 (二进制解析版)
============================================================
请输入DMP文件路径: exptest.dmp
请输入输出目录(默认为当前目录下的output文件夹):
正在解析文件: exptest.dmp
找到 4 个表
- IGNORE: 0 行数据
- IGNORE2: 0 行数据
- TABLE1: 7 行数据
- TABLE2: 4 行数据
请选择操作:
1. 导出表结构(SQL)
2. 导出数据(CSV)
3. 导出全部(表结构+数据)
请输入选择 (1/2/3,默认3):
表结构已导出到: outputschema.sql
共导出 4 个表
表 IGNORE: 无数据
表 IGNORE2: 无数据
表 TABLE1: 7 行数据 -> outputdataTABLE1.csv
表 TABLE2: 4 行数据 -> outputdataTABLE2.csv
共导出 2 个表,11 行数据
解析完成!
output/schema.sql
-- Oracle DMP 导出的表结构
-- 导出时间: 2026-04-04 20:29:45.716883
-- Oracle版本: V12.01.00
-- 表空间: USERS
-- 表: IGNORE
CREATE TABLE "IGNORE" ("COLUMN1" VARCHAR2(20)) PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 TABLESPACE "USERS" LOGGING NOCOMPRESS;
-- 表: IGNORE2
CREATE TABLE "IGNORE2" ("COLUMN1" VARCHAR2(20), "COLUMN2" VARCHAR2(21)) PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 TABLESPACE "USERS" LOGGING NOCOMPRESS;
-- 表: TABLE1
CREATE TABLE "TABLE1" ("KEYCOL" NUMBER(*,0) NOT NULL ENABLE, "NUMCOL1" NUMBER(22, 0), "FLOATCOL1" FLOAT(126), "STRCOL1" VARCHAR2(200), "DATECOL1" DATE, "BLOBCOL1" BLOB, "TIMESTAMP1" TIMESTAMP (6)) PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT) TABLESPACE "USERS" LOGGING NOCOMPRESS LOB ("BLOBCOL1") STORE AS SECUREFILE (TABLESPACE "USERS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION AUTO NOCACHE LOGGING NOCOMPRESS KEEP_DUPLICATES STORAGE(INITIAL 106496 NEXT 1048576 MINEXTENTS 1 BUFFER_POOL DEFAULT));
-- 表: TABLE2
CREATE TABLE "TABLE2" ("STRKEY" VARCHAR2(20) NOT NULL ENABLE, "FLOAT1" FLOAT(126), "DEC1" NUMBER(22, 5)) PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT) TABLESPACE "USERS" LOGGING NOCOMPRESS;
data/TABLE1.csv
KEYCOL,NUMCOL1,FLOATCOL1,STRCOL1,DATECOL1,BLOBCOL1,TIMESTAMP1
01,0123,03.567890,STRING 1 TEST,,,
02,0456,01.23,STRING 2 TEST sadf asdf asdf asdf asdf,,,
03,-0.0000000000000000000000000000000000000000000000000000000000000000000000000001,-0.000000000000000000000000000000000000000000000000000000000000000000000000000123,,,,
04,-0.00000000000000000000000000000000000000000000000000000000000000000000000002,0.000000000330,,,,
05,123123123456,-0.00000000000000000000000000000000000000000000000000000000000000000000000330,,,,
06,-0.0000000000000000000000000000000000000000000000000000123123123456,,,,,
07,-0.000000000000000000000000000000000000000000000000000000000000000000000456,-0.00000000000000000000000000000000000020,,,,
data/TABLE2.csv
STRKEY,FLOAT1,DEC1
K1,,14
K2,,02.999920
K3,,-0.0000000000000000000000000000002320
K4,,
© 版权声明
文章版权归作者所有,未经允许请勿转载。