明道云集成篇----《数据库集成》读数据

集成 数据库集成数据源MySQL&Oracle  收藏
9 / 1379

项目没有接口,只提供数据库,技术又没有时间排期,又不想使用 RPA 产生额外的费用,如何进行数据对接呢。那么今天这套方案来给你解忧。明道云如何设置链接自身数据库,读取数据写入到明道云下系统内呢?今天手把手教学模式,一一拆解步骤。

第一步:安装明道私有部署

步骤参考这里-->点击产看安装步骤

第二步:依赖库持久化并扩展依赖库

  • 持久化依赖库:

步骤参考这里-->(点击查看步骤)

  • 安装对应的扩展库

链接 MySQL 请安装 pymysql 扩展库: :

docker exec -t $(docker ps | grep community | awk '{print $1}') bash -c 'pip3 install --target=/usr/local/lib/python3.6/site-packages/ pymysql'

链接 sqlservice 请安装 pymssql 扩展库; :

docker exec -t $(docker ps | grep community | awk '{print $1}') bash -c 'pip3 install --target=/usr/local/lib/python3.6/site-packages/ pymssql'

Oracle Linux 连接配置 可以参考下这里-->(点击查看)

验证是否安装完成后可在宿主机目录 /data/mingdao/script/volume/command/package/python-3.6/site-packages/ 中进行查看。

第三步:应用设计

  • 基础信息级同步任务表

  1. 数据库配置表及设置 自定义动作获取工作表信息

  1. 数据库表及配置动作 获取字段,以及创建自动任务

  1. 字段表

  • 设计业务流及自动化流程

数据库连接是统一的,中间执行不同的脚本,所以抽出来做一个 pbp 业务流,提高复用率。

  • MySQL 连接脚本如下
import pymysql
import json
 
output = {}
output['result'] = ''
# 打开数据库连接
try:
    db = pymysql.connect(host=input['host'], user=input['username'], password=input['passwd'], port=int(input['port']))


    # 使用 cursor() 方法创建一个游标对象 cursor
    db.select_db(input['db'])
    cursor = db.cursor()
    # 使用 execute()  方法执行 SQL 查询 
    output['sql'] = input['sqltext']
    cursor.execute(input['sqltext'])
    cols=cursor.description
    # 使用 fetchone() 方法获取单条数据.
    datas = cursor.fetchall()
    allop=[]
    for data in datas:
        i = 0
        op= {}
        for col in cols:
           op[cols[cols.index(col)][i]] =str(data[cols.index(col)])
        i=i+1
        allop.append(op)


    output['data'] = json.dumps(allop, ensure_ascii=False)
    #print(output)
    # 关闭数据库连接
    db.close()
    output['code'] = 0
    output['msg'] = '连接成功'
except Exception as e:
    output['code'] = 1
    output['msg'] = '连接失败:{}'.format(e)

获取库下面所有的表

select table_name tableName ,ifnull(TABLE_COMMENT,'') as tabledesc,create_time createTime from information_schema.tables where table_schema ='数据库库名' order by create_time desc

获取表的列信息

select column_name as '列名',data_type as '字段类型',column_comment as '字段注释' ,column_key as '是否主键',is_nullable as '是否允许为空',column_default as '默认值' from information_schema.columns  where table_schema='数据库库名' and table_name='表名'
  • SQLService 封装链接脚本如下:
import pymssql
import json
import time
class MyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, bytes):
            return str(obj, encoding='utf-8')   
        return json.JSONEncoder.default(self, obj)
 
output = {}
# 打开数据库连接
# 建议改写下里面添加事务 本章只是简单的案例
try:
    db = pymssql.connect(host=input['host'], user=input['username'], password=input['passwd'], port=int(input['port']))

    # 使用 cursor() 方法创建一个游标对象 cursor
    cursor = db.cursor()
    sql ="use "+input["db"]+"; "+ input['sqltext']
    output['sql'] = sql
    #print(sql)
    cursor.execute(sql)
    cols=cursor.description
    # 使用 fetchone() 方法获取单条数据.
    datas = cursor.fetchall()
    allop=[]
    for data in datas:
        i = 0
        op= {}
        for col in cols:
           op[cols[cols.index(col)][i]] =str(data[cols.index(col)])
        i=i+1
        allop.append(op)

    output['data'] = json.dumps(allop, ensure_ascii=False)
    #print(output)
    # 关闭数据库连接
    db.close()
    output['code'] = 0
    output['msg'] = '连接成功'
except Exception as e:
    output['code'] = 1
    output['msg'] = '连接失败:{}'.format(e)

if output['code'] == 1:
    pass

获取库下面所有的表
SELECT name tableName,'' tabledesc,crdate createTime FROM 替换为库名..SYSOBJECTS WHERE XTYPE='U' ORDER BY crdate

获取表的列信息

use 替换为库名 ; SELECT A.NAME  '列名', CASE WHEN COLUMNPROPERTY( A.ID,A.NAME, 'ISIDENTITY ')=1 THEN 'YES' ELSE '' END '标识', CASE WHEN EXISTS(SELECT 1 FROM SYSOBJECTS WHERE XTYPE= 'PK ' AND PARENT_OBJ=A.ID AND NAME IN ( SELECT NAME FROM SYSINDEXES WHERE INDID IN( SELECT INDID FROM SYSINDEXKEYS WHERE ID = A.ID AND COLID=A.COLID))) THEN 'PRI' ELSE '' END '是否主键', 
B.NAME '字段类型', A.LENGTH '占用位元组数', COLUMNPROPERTY(A.ID,A.NAME, 'PRECISION ') '长度', ISNULL(COLUMNPROPERTY(A.ID,A.NAME, 'SCALE '),0) '小数位', CASE WHEN A.ISNULLABLE=1 THEN 'YES' ELSE '' END '是否允许为空', 
ISNULL(E.TEXT, ' ') '默认值',CONVERT(VARCHAR(50),ISNULL(G.[VALUE], '')) '字段注释' 
FROM SYSCOLUMNS A LEFT JOIN SYSTYPES B ON A.XUSERTYPE=B.XUSERTYPE 
INNER JOIN SYSOBJECTS D ON A.ID=D.ID   AND D.XTYPE= 'U ' AND   D.NAME <> 'DTPROPERTIES ' 
LEFT JOIN SYSCOMMENTS E ON A.CDEFAULT=E.ID 
LEFT JOIN sys.extended_properties G ON A.ID=G.major_id AND A.COLID=G.minor_id   
LEFT JOIN sys.extended_properties F ON D.ID=F.major_id AND F.minor_id=0 
where D.NAME='替换为表名'
ORDER BY A.ID,A.COLORDER

其他业务流程图 获取字段 ,创建自动任务。

对接其他系统或者项目对接的时候日志是不可缺少的,能够快速定位错误,记得加上哦。
MySQL 同步获取演示:

数据同步操作演示.gif

SQLService 同步获演示:
SQLService 同步示例.gif

最后业务部门测试数据也已经写入工作表了整体对接 OK 已完成,小伙伴们快快尝试吧