Python操作sqllite数据库

python分享 (327) 2023-04-17 11:50:47
# -*- coding: utf-8 -*-

import sqlite3
import requests, json, re, os, sys, datetime, time, traceback , random
from bs4 import BeautifulSoup

DB_FILE_PATH = 'G:\\synonym.db'
SHOW_SQL = True

def get_conn(path):
    conn = sqlite3.connect(path)
    if path:
        return conn
    else:
        conn = None
        print('内存上面:[:memory:]')
        return sqlite3.connect(':memory:')

def get_cursor(conn):
    if conn is not None:
        return conn.cursor()
    else:
        return get_conn('').cursor()

def close_all(conn, cu):
    try:
        if cu is not None:
            cu.close()
    finally:
        if cu is not None:
            cu.close()

def drop_table(conn, table):

    if table is not None and table != '':
        sql = 'DROP TABLE IF EXISTS ' + table
        if SHOW_SQL:
            print('执行sql:[{}]'.format(sql))
        cu = get_cursor(conn)
        cu.execute(sql)
        conn.commit()
        print('删除数据库表[{}]成功!'.format(table))
        close_all(conn, cu)
    else:
        print('the [{}] is empty or equal None!'.format(sql))

def create_table(conn, sql):
    '''创建数据库表'''
    if sql is not None and sql != '':
        cu = get_cursor(conn)
        if SHOW_SQL:
            print('执行sql:[{}]'.format(sql))
        cu.execute(sql)
        conn.commit()
        print('创建数据库表成功!')
        close_all(conn, cu)
    else:
        print('the [{}] is empty or equal None!'.format(sql))


def save(conn, sql, data):
    '''插入数据'''
    if sql is not None and sql != '':
        if data is not None:
            cu = get_cursor(conn)
            for d in data:
                #if SHOW_SQL:
                #    print('执行sql:[{}],参数:[{}]'.format(sql, d))
                cu.execute(sql, d)
                conn.commit()
            close_all(conn, cu)
    else:
        print('the [{}] is empty or equal None!'.format(sql))

def saveOne(conn, sql):
    '''插入数据'''
    if sql is not None and sql != '':
        cu = get_cursor(conn)
        if SHOW_SQL:
            print('执行sql:[{}]'.format(sql))
        cu.execute(sql)
        conn.commit()
        close_all(conn, cu)
    else:
        print('the [{}] is empty or equal None!'.format(sql))

def saveInfo(sql,data):
    conn = get_conn(DB_FILE_PATH)
    save(conn,sql,data)

def saveUrl(url,text,orgin):
    conn = get_conn(DB_FILE_PATH)
    sql = 'insert into history_url (href,content,orgin) values (?,?,?)'
    data = [(url,text,orgin)]
    save(conn,sql,data)

def getByUrl(url):
    sql = 'select * from history_url where href = ?'
    return getById(sql, (url,))


def create_url_table():
    create_table_sql = '''CREATE TABLE `history_url` (
                          `href` varchar(20) DEFAULT NULL,
                          `content` text DEFAULT NULL,
                          'orgin' text DEFAULT NUll,
                          `logtime` TIMESTAMP default (datetime('now', 'localtime')),
                           PRIMARY KEY (`href`)
                        )'''
    conn = get_conn(DB_FILE_PATH)
    create_table(conn,create_table_sql)

def getSynoRecord(text):
    sql = 'select * from syno where one = ? or two = ?'
    return getById(sql,(text,text))

def getById(sql,data):
    conn = get_conn(DB_FILE_PATH)
    cu = get_cursor(conn)
    cu.execute(sql, data)
    #print(sql)
    r = cu.fetchone()
    #print(r)
    close_all(conn,cu)
    return r

def downloadImg(imgurl,path):
        if not os.path.exists(path):
            r = requests.get(imgurl)
            r.raise_for_status()
            #使用with语句可以不用自己手动关闭已经打开的文件流
            with open(path,"wb") as f: #开始写文件,wb代表写二进制文件
                f.write(r.content)
            print('下载'+path+'完成')
        else:
            print(path + "文件已存在")

def downloadText(text, path,encoding):
        try:
           with open(path, 'w',encoding=encoding) as file:
              file.write(text)
              file.flush()
        except Exception:
           traceback.print_exc()
           print(path+'下载失败')
        else:
           print(path + '下载成功')

def getHtml(base_url,headers,encoding):
    response = requests.get(base_url,headers=headers, timeout=30)
    response.encoding = encoding
    html = response.text
    soup = BeautifulSoup(html,'lxml')
    return (html,soup)

if __name__ == '__main__':
    #getById('select * from syno where one =? or two= ?',('哀求','哀求'))
    #create_url_table()
    #drop_table(get_conn(DB_FILE_PATH),'history_url')
    saveUrl('test','title','sina')

 

THE END

发表回复