DeepSeek辅助编写的Oracle dmp转SQL脚本和CSV文件工具

AI14小时前发布 beixibaobao
2 0 0

一开始让他直接写,结果能输出表结构,但无法导出数据,所以在网上找了一个Java的dmp解析器交给他翻译。
输出如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Oracle DMP文件解析工具 - 基于Java版本的二进制解析
支持导出表结构(SQL)和数据(CSV)
"""
import struct
import sys
import os
import csv
import re
from datetime import datetime
from collections import defaultdict
from typing import List, Tuple, Optional
class DMPItemType:
    NULL = 0
    STRING = 1
    NUMBER = 2
    DATE = 3
    TIMESTAMP = 4
    BLOB = 5
class DMPItem:
    def __init__(self):
        self.no_of_bytes = 0
        self.bytes = []
        self.item_type = None
        self.string_value = None
        self.number_value = None
        self.date_value = None
        self.timestamp_value = None
class DMPRow:
    def __init__(self):
        self.items = []
class DMPTable:
    def __init__(self):
        self.table_name = None
        self.create_table_sql = None
        self.field_names = []
        self.data_rows = []
class OracleDMPParser:
    def __init__(self, dmp_file_path):
        self.dmp_file_path = dmp_file_path
        self.after_insert_statement = False
        self.current_table = None
        self.current_table_obj = None
        self.tables = []
        self.finished = False
        self.debug_to_stdout = False
        self.debug_hex_dump_to_stdout = False
        self.export_version = {'major': 0, 'minor': 0, 'patch': 0}
        self.export_user = None
        self.export_tablespace = None
        self.insert_statement_line = None
    def parse_file(self, table_names=None):
        """解析DMP文件"""
        with open(self.dmp_file_path, 'rb') as f:
            content = f.read()
        # 按行分割(保留二进制)
        lines = self.split_binary_lines(content)
        for i, line in enumerate(lines):
            self.parse_line(line, i, lines)
        # 如果指定了表名,进行过滤
        if table_names:
            table_names_lower = [t.lower() for t in table_names]
            self.tables = [t for t in self.tables if t.table_name.lower() in table_names_lower]
        return self.tables
    def split_binary_lines(self, data):
        """分割二进制行为行(0x0a作为分隔符)"""
        lines = []
        start = 0
        for i, byte in enumerate(data):
            if byte == 0x0a:  # 换行符
                lines.append(data[start:i+1])
                start = i + 1
        if start < len(data):
            lines.append(data[start:])
        return lines
    def clean_table_name(self, name):
        """清理表名,移除引号"""
        if name:
            return name.strip('"')
        return name
    def parse_line(self, bytes_data, line_num, all_lines):
        """解析一行"""
        if self.after_insert_statement and self.insert_statement_line is not None:
            # 应该是数据行
            if self.current_table_obj:
                rows = self.parse_data_row(bytes_data, line_num, all_lines)
                self.current_table_obj.data_rows.extend(rows)
                # 清除标记,避免重复解析
                if line_num > self.insert_statement_line + 1:
                    self.after_insert_statement = False
        # 尝试解码为字符串(用于识别SQL语句)
        try:
            test_string = bytes_data.decode('utf-8', errors='ignore')
            test_string = test_string.rstrip('x00nr')
        except:
            test_string = ""
        if test_string.startswith('TABLE '):
            table_name = test_string[6:].strip()
            self.current_table = self.clean_table_name(table_name)
        elif test_string.startswith('CREATE TABLE '):
            if self.current_table:
                self.current_table_obj = DMPTable()
                self.current_table_obj.table_name = self.current_table
                self.current_table_obj.create_table_sql = test_string
                self.tables.append(self.current_table_obj)
        elif test_string.startswith('INSERT INTO '):
            if self.current_table:
                self.after_insert_statement = True
                self.insert_statement_line = line_num
                # 解析字段名
                pattern = r'INSERT INTO "([^"]+)" (([^)]+)) VALUES'
                match = re.search(pattern, test_string)
                if match:
                    table_name = self.clean_table_name(match.group(1))
                    fields_str = match.group(2)
                    field_names = [f.strip().strip('"') for f in fields_str.split(',')]
                    if self.current_table_obj and self.current_table_obj.table_name == table_name:
                        self.current_table_obj.field_names = field_names
        elif 'EXPORT:V' in test_string:
            # 解析版本
            pattern = r'EXPORT:V(dd).(dd).(dd)'
            match = re.search(pattern, test_string)
            if match:
                self.export_version['major'] = int(match.group(1))
                self.export_version['minor'] = int(match.group(2))
                self.export_version['patch'] = int(match.group(3))
        elif test_string.startswith('U') and self.export_user is None and len(test_string) > 1:
            self.export_user = test_string[1:].strip()
        elif test_string.startswith('R') and self.export_tablespace is None and len(test_string) > 1:
            self.export_tablespace = test_string[1:].strip()
    def parse_data_row(self, bytes_data, line_num, all_lines) -> List[DMPRow]:
        """解析数据行"""
        rows = []
        if len(bytes_data) == 0:
            return rows
        if self.debug_hex_dump_to_stdout:
            print(self.byte_array_to_string(bytes_data))
        # 第一个字节是字段数量
        field_count = bytes_data[0] & 0xff
        if field_count == 0 or field_count > 100:  # 合理性检查
            return rows
        # 获取列类型
        column_types = []
        for i in range(field_count):
            offset = i * 4 + 2
            if offset < len(bytes_data):
                fc = bytes_data[offset] & 0xff
                if fc == 1:
                    column_types.append(DMPItemType.STRING)
                elif fc == 2:
                    column_types.append(DMPItemType.NUMBER)
                elif fc == 12:
                    column_types.append(DMPItemType.DATE)
                elif fc == 180:
                    column_types.append(DMPItemType.TIMESTAMP)
                elif fc == 113:
                    column_types.append(DMPItemType.BLOB)
                else:
                    column_types.append(None)
            else:
                column_types.append(None)
        # 找到数据开始位置
        skip_bytes = field_count * 4
        for i in range(len(bytes_data)-1, 0, -1):
            if i > 4 and bytes_data[i] == 0 and bytes_data[i-1] == 0 and bytes_data[i-2] == 0 and bytes_data[i-3] == 0:
                skip_bytes = i - 4
                break
        if self.debug_to_stdout:
            print(f"Skipping {skip_bytes} bytes")
        # 解析数据
        current_row = DMPRow()
        cur = None
        take_byte_count = 0
        include_next_batch = False
        null_count = 0
        has_started = False
        row_count = 0
        if True or self.export_version['major'] < 8:
            has_started = True
            rows.append(current_row)
            skip_bytes += 1
        i = skip_bytes
        while i < len(bytes_data):
            byte_val = bytes_data[i] & 0xff
            if byte_val == 0:
                null_count += 1
            else:
                null_count = 0
            if null_count == 4:
                if i > 0 and len(bytes_data) > i+4 and bytes_data[i] == 0 and bytes_data[i+1] == 0 and (bytes_data[i+2] & 0xff) == 0xff and (bytes_data[i+3] & 0xff) == 0xff and (bytes_data[i+4] & 0xff) == 0x0a:
                    break
                else:
                    has_started = True
            elif has_started:
                if len(bytes_data) > i+1 and byte_val == 0xfe and (bytes_data[i+1] & 0xff) == 0xff:
                    # NULL值
                    cur = DMPItem()
                    cur.item_type = DMPItemType.NULL
                    cur.no_of_bytes = 2
                    cur.bytes.extend([bytes_data[i], bytes_data[i+1]])
                    current_row.items.append(cur)
                    i += 1
                elif i > 0 and len(bytes_data) > i+4 and bytes_data[i] == 0 and bytes_data[i+1] == 0 and (bytes_data[i+2] & 0xff) == 0xff and (bytes_data[i+3] & 0xff) == 0xff:
                    # 结束标记
                    has_started = False
                elif i > 0 and bytes_data[i] == 0 and bytes_data[i-1] == 0:
                    # 新行标记
                    if current_row.items:
                        current_row = DMPRow()
                        rows.append(current_row)
                elif take_byte_count > 0:
                    cur.bytes.append(bytes_data[i])
                    take_byte_count -= 1
                    if take_byte_count == 0:
                        if self.export_version['major'] < 8 and len(current_row.items) == field_count:
                            current_row = DMPRow()
                            rows.append(current_row)
                elif len(bytes_data) > i+1 and byte_val > 0 and (bytes_data[i+1] & 0xff) == 0:
                    # 新列数据开始
                    take_byte_count = byte_val
                    if include_next_batch:
                        cur.no_of_bytes += take_byte_count
                        include_next_batch = False
                    else:
                        cur = DMPItem()
                        cur.no_of_bytes = take_byte_count
                        current_row.items.append(cur)
                    i += 1
                elif take_byte_count == 0 and len(bytes_data) > i+1 and byte_val > 0 and (bytes_data[i+1] & 0xff) == 0x80:
                    # 新列数据开始(特殊情况)
                    cur = DMPItem()
                    take_byte_count = byte_val
                    cur.no_of_bytes = take_byte_count
                    include_next_batch = True
                    current_row.items.append(cur)
                    i += 1
            i += 1
        # 过滤空行
        rows = [r for r in rows if r.items]
        # 转换数据值
        for row in rows:
            for col, item in enumerate(row.items):
                if col >= len(column_types) or column_types[col] is None:
                    continue
                # 截取正确的字节数
                if item.no_of_bytes > 0:
                    item.bytes = item.bytes[:min(len(item.bytes), item.no_of_bytes)]
                if item.item_type == DMPItemType.NULL:
                    pass
                elif column_types[col] == DMPItemType.NUMBER:
                    self.parse_number(item)
                elif column_types[col] == DMPItemType.STRING:
                    self.parse_string(item)
                elif column_types[col] == DMPItemType.DATE:
                    self.parse_date(item)
                elif column_types[col] == DMPItemType.TIMESTAMP:
                    self.parse_timestamp(item)
        return rows
    def parse_number(self, item: DMPItem):
        """解析数字"""
        item.item_type = DMPItemType.NUMBER
        if len(item.bytes) == 0:
            return
        bytes_list = list(item.bytes)
        # 检查是否为0
        if len(bytes_list) > 0 and (bytes_list[0] & 0xff) == 0x80:
            item.number_value = 0.0
            item.string_value = "0"
            return
        # 检查是否为负数
        is_negative = False
        if len(bytes_list) > 1 and ((bytes_list[-1] & 0xff) == 0x66 or (bytes_list[0] & 0xff) < 0xa0):
            is_negative = True
            # 移除第一个和最后一个字节(如果是0x66)
            if (bytes_list[-1] & 0xff) == 0x66:
                bytes_list = bytes_list[1:-1]
            else:
                bytes_list = bytes_list[1:]
            int_part = -1 * ((bytes_list[0] & 0xff) - 0x3f)
            if int_part > 0:
                while len(bytes_list) < int_part:
                    bytes_list.append(0x65)
                int_part_bytes = bytes_list[:int_part]
                v = "-" + ''.join([f"{-1*(b-101):02d}" for b in int_part_bytes])
                if len(bytes_list) > int_part:
                    dec_part_bytes = bytes_list[int_part:]
                    v += "." + ''.join([f"{-1*(b-101):02d}" for b in dec_part_bytes])
            else:
                v = "-0." + "00" * (-1 * int_part) + ''.join([f"{-1*(b-101):02d}" for b in bytes_list])
        else:
            # 正数
            int_part = (bytes_list[0] & 0xff) - 0xc0
            bytes_list = bytes_list[1:]
            if int_part > 0:
                while len(bytes_list) < int_part:
                    bytes_list.append(1)
                int_part_bytes = bytes_list[:int_part]
                v = ''.join([f"{b-1:02d}" for b in int_part_bytes])
                if len(bytes_list) > int_part:
                    dec_part_bytes = bytes_list[int_part:]
                    v += "." + ''.join([f"{b-1:02d}" for b in dec_part_bytes])
            else:
                v = "0." + "00" * (-1 * int_part) + ''.join([f"{b-1:02d}" for b in bytes_list])
        item.string_value = v
        try:
            if v:
                # 移除前导零
                if '.' in v:
                    parts = v.split('.')
                    if parts[0].startswith('0') and len(parts[0]) > 1:
                        parts[0] = parts[0].lstrip('0')
                        v = '.'.join(parts)
                item.number_value = float(v)
        except:
            pass
    def parse_string(self, item: DMPItem):
        """解析字符串"""
        item.item_type = DMPItemType.STRING
        if item.bytes:
            try:
                # 尝试解码,跳过无效字节
                item.string_value = bytes(item.bytes).decode('utf-8', errors='ignore')
                # 移除空字符
                item.string_value = item.string_value.rstrip('x00')
            except:
                item.string_value = ''.join([f"\x{b:02x}" for b in item.bytes])
    def parse_date(self, item: DMPItem):
        """解析日期"""
        item.item_type = DMPItemType.DATE
        if len(item.bytes) >= 7:
            year = (item.bytes[0] - 100) * 100 + (item.bytes[1] - 100)
            month = item.bytes[2]
            day = item.bytes[3]
            hour = item.bytes[4] - 1
            minute = item.bytes[5] - 1
            second = item.bytes[6] - 1
            try:
                item.date_value = datetime(year, month, day, hour, minute, second)
                item.string_value = item.date_value.strftime('%Y-%m-%d %H:%M:%S')
            except:
                item.string_value = f"{year:04d}-{month:02d}-{day:02d}{hour:02d}:{minute:02d}:{second:02d}"
    def parse_timestamp(self, item: DMPItem):
        """解析时间戳"""
        item.item_type = DMPItemType.TIMESTAMP
        if len(item.bytes) >= 7:
            year = (item.bytes[0] - 100) * 100 + (item.bytes[1] - 100)
            month = item.bytes[2]
            day = item.bytes[3]
            hour = item.bytes[4] - 1
            minute = item.bytes[5] - 1
            second = item.bytes[6] - 1
            nanos = 0
            if len(item.bytes) >= 11:
                nanos_bytes = item.bytes[7:11]
                nanos = struct.unpack('>I', bytes(nanos_bytes))[0]
            try:
                dt = datetime(year, month, day, hour, minute, second)
                item.timestamp_value = dt.replace(microsecond=nanos//1000)
                item.string_value = item.timestamp_value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            except:
                item.string_value = f"{year:04d}-{month:02d}-{day:02d}{hour:02d}:{minute:02d}:{second:02d}.{nanos:09d}"
    @staticmethod
    def byte_array_to_string(bytes_data):
        return ', '.join([f"{b:02x}" for b in bytes_data])
    def export_schema(self, output_dir):
        """导出表结构"""
        os.makedirs(output_dir, exist_ok=True)
        schema_file = os.path.join(output_dir, 'schema.sql')
        with open(schema_file, 'w', encoding='utf-8') as f:
            f.write("-- Oracle DMP 导出的表结构n")
            f.write(f"-- 导出时间: {datetime.now()}n")
            f.write(f"-- Oracle版本: V{self.export_version['major']:02d}.{self.export_version['minor']:02d}.{self.export_version['patch']:02d}n")
            if self.export_user:
                f.write(f"-- 导出用户: {self.export_user}n")
            if self.export_tablespace:
                f.write(f"-- 表空间: {self.export_tablespace}n")
            f.write("n")
            for table in self.tables:
                if table.create_table_sql:
                    # 只写入CREATE TABLE语句,不包含INSERT
                    lines = table.create_table_sql.split('n')
                    create_lines = []
                    for line in lines:
                        if line.strip() and not line.strip().startswith('INSERT'):
                            create_lines.append(line)
                        elif line.strip().startswith('CREATE'):
                            create_lines.append(line)
                    clean_sql = 'n'.join(create_lines)
                    f.write(f"-- 表: {table.table_name}n")
                    f.write(clean_sql)
                    f.write(";nn")
        print(f"表结构已导出到: {schema_file}")
        print(f"共导出 {len([t for t in self.tables if t.create_table_sql])} 个表")
    def export_data(self, output_dir):
        """导出数据到CSV"""
        os.makedirs(output_dir, exist_ok=True)
        data_dir = os.path.join(output_dir, 'data')
        os.makedirs(data_dir, exist_ok=True)
        total_rows = 0
        for table in self.tables:
            if not table.data_rows:
                print(f"表 {table.table_name}: 无数据")
                continue
            # 清理表名用于文件名
            safe_table_name = re.sub(r'[<>:"/\|?*]', '_', table.table_name)
            csv_file = os.path.join(data_dir, f"{safe_table_name}.csv")
            with open(csv_file, 'w', newline='', encoding='utf-8-sig') as f:
                writer = csv.writer(f)
                # 写入列名
                if table.field_names:
                    headers = table.field_names
                elif table.data_rows and table.data_rows[0].items:
                    headers = [f"COL{i+1}" for i in range(len(table.data_rows[0].items))]
                else:
                    headers = []
                if headers:
                    writer.writerow(headers)
                # 写入数据
                for row in table.data_rows:
                    row_values = []
                    for item in row.items:
                        if item.item_type == DMPItemType.NULL:
                            row_values.append('')
                        elif item.string_value is not None:
                            row_values.append(item.string_value)
                        elif item.number_value is not None:
                            # 处理数字格式
                            if item.number_value == int(item.number_value):
                                row_values.append(str(int(item.number_value)))
                            else:
                                row_values.append(str(item.number_value))
                        elif item.date_value is not None:
                            row_values.append(item.date_value.strftime('%Y-%m-%d %H:%M:%S'))
                        elif item.timestamp_value is not None:
                            row_values.append(item.timestamp_value.strftime('%Y-%m-%d %H:%M:%S.%f'))
                        else:
                            row_values.append('')
                    # 确保列数一致
                    while len(row_values) < len(headers):
                        row_values.append('')
                    while len(row_values) > len(headers):
                        row_values.pop()
                    writer.writerow(row_values)
            print(f"表 {table.table_name}: {len(table.data_rows)} 行数据 -> {csv_file}")
            total_rows += len(table.data_rows)
        print(f"n共导出 {len([t for t in self.tables if t.data_rows])} 个表,{total_rows} 行数据")
    def export_all(self, output_dir):
        """导出所有内容"""
        self.export_schema(output_dir)
        self.export_data(output_dir)
def main():
    print("=" * 60)
    print("Oracle DMP文件解析工具 (二进制解析版)")
    print("=" * 60)
    # 获取文件路径
    if len(sys.argv) > 1:
        dmp_file = sys.argv[1]
    else:
        dmp_file = input("请输入DMP文件路径: ").strip()
    if not os.path.exists(dmp_file):
        print(f"文件不存在: {dmp_file}")
        return
    # 获取输出目录
    if len(sys.argv) > 2:
        output_dir = sys.argv[2]
    else:
        output_dir = input("请输入输出目录(默认为当前目录下的output文件夹): ").strip()
        if not output_dir:
            output_dir = "output"
    # 创建解析器
    parser = OracleDMPParser(dmp_file)
    try:
        print(f"n正在解析文件: {dmp_file}")
        # 解析文件
        tables = parser.parse_file()
        print(f"找到 {len(tables)} 个表")
        for table in tables:
            print(f"  - {table.table_name}: {len(table.data_rows)} 行数据")
        if not tables:
            print("未找到任何表")
            return
        # 选择操作
        print("n请选择操作:")
        print("1. 导出表结构(SQL)")
        print("2. 导出数据(CSV)")
        print("3. 导出全部(表结构+数据)")
        choice = input("请输入选择 (1/2/3,默认3): ").strip() or "3"
        # 执行操作
        if choice == "1":
            parser.export_schema(output_dir)
        elif choice == "2":
            parser.export_data(output_dir)
        else:
            parser.export_all(output_dir)
        print("n解析完成!")
    except Exception as e:
        print(f"解析过程中出错: {e}")
        import traceback
        traceback.print_exc()
if __name__ == "__main__":
    main()

用java代码压缩包附带的exptest.dmp解析的运行结果

C:d>python dmpbin2sql2.py
============================================================
Oracle DMP文件解析工具 (二进制解析版)
============================================================
请输入DMP文件路径: exptest.dmp
请输入输出目录(默认为当前目录下的output文件夹):
正在解析文件: exptest.dmp
找到 4 个表
  - IGNORE: 0 行数据
  - IGNORE2: 0 行数据
  - TABLE1: 7 行数据
  - TABLE2: 4 行数据
请选择操作:
1. 导出表结构(SQL)
2. 导出数据(CSV)
3. 导出全部(表结构+数据)
请输入选择 (1/2/3,默认3):
表结构已导出到: outputschema.sql
共导出 4 个表
表 IGNORE: 无数据
表 IGNORE2: 无数据
表 TABLE1: 7 行数据 -> outputdataTABLE1.csv
表 TABLE2: 4 行数据 -> outputdataTABLE2.csv
共导出 2 个表,11 行数据
解析完成!

output/schema.sql

-- Oracle DMP 导出的表结构
-- 导出时间: 2026-04-04 20:29:45.716883
-- Oracle版本: V12.01.00
-- 表空间: USERS
-- 表: IGNORE
CREATE TABLE "IGNORE" ("COLUMN1" VARCHAR2(20))  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 TABLESPACE "USERS" LOGGING NOCOMPRESS;
-- 表: IGNORE2
CREATE TABLE "IGNORE2" ("COLUMN1" VARCHAR2(20), "COLUMN2" VARCHAR2(21))  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 TABLESPACE "USERS" LOGGING NOCOMPRESS;
-- 表: TABLE1
CREATE TABLE "TABLE1" ("KEYCOL" NUMBER(*,0) NOT NULL ENABLE, "NUMCOL1" NUMBER(22, 0), "FLOATCOL1" FLOAT(126), "STRCOL1" VARCHAR2(200), "DATECOL1" DATE, "BLOBCOL1" BLOB, "TIMESTAMP1" TIMESTAMP (6))  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT) TABLESPACE "USERS" LOGGING NOCOMPRESS LOB ("BLOBCOL1") STORE AS SECUREFILE  (TABLESPACE "USERS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION AUTO NOCACHE LOGGING  NOCOMPRESS KEEP_DUPLICATES STORAGE(INITIAL 106496 NEXT 1048576 MINEXTENTS 1 BUFFER_POOL DEFAULT));
-- 表: TABLE2
CREATE TABLE "TABLE2" ("STRKEY" VARCHAR2(20) NOT NULL ENABLE, "FLOAT1" FLOAT(126), "DEC1" NUMBER(22, 5))  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT) TABLESPACE "USERS" LOGGING NOCOMPRESS;

data/TABLE1.csv

KEYCOL,NUMCOL1,FLOATCOL1,STRCOL1,DATECOL1,BLOBCOL1,TIMESTAMP1
01,0123,03.567890,STRING 1 TEST,,,
02,0456,01.23,STRING 2 TEST sadf asdf asdf asdf asdf,,,
03,-0.0000000000000000000000000000000000000000000000000000000000000000000000000001,-0.000000000000000000000000000000000000000000000000000000000000000000000000000123,,,,
04,-0.00000000000000000000000000000000000000000000000000000000000000000000000002,0.000000000330,,,,
05,123123123456,-0.00000000000000000000000000000000000000000000000000000000000000000000000330,,,,
06,-0.0000000000000000000000000000000000000000000000000000123123123456,,,,,
07,-0.000000000000000000000000000000000000000000000000000000000000000000000456,-0.00000000000000000000000000000000000020,,,,

data/TABLE2.csv

STRKEY,FLOAT1,DEC1
K1,,14
K2,,02.999920
K3,,-0.0000000000000000000000000000002320
K4,,
© 版权声明

相关文章