ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Python爬取企查查网页中潜在的信息字典

2021-10-01 23:01:54  阅读:324  来源: 互联网

标签:取企 cookies name Python encrypted detail 查查 cookie import


转载自: 

国庆节,企查查我来啦~_user_from_future的博客-CSDN博客强迫症的我凑个国庆节注册了账号,直接这么发好似不太好。受到某位女生的工作需求,加上重色轻友的心,所以先拿企查查开刀吧。首先企查查这个网站不登陆也能查公司,不过坑人的就是只能查那么几次,然后就必须要登录了。我想想为了那几次不值得,就搞个登录的爬虫程序吧。众所周知,登录最重要的参数是Cookie,这个一般在浏览器的XHR(XMLHttpRequest对象/Ajax对象等)里复制任意一个元素的Cookie就可以了,建议使用CV大法复制,右击Copy value可能会复制到中文,在此先献上不知道从哪搜到的读https://blog.csdn.net/user_from_future/article/details/120576842我在此将两个文件组合成了一个文件,并对获取cookie部分附写了一些不知道算不算正确的注释,下面是我整理的代码:

# _*_ coding:utf-8 _*_
# FileName: get_qcc_company.py
# IDE: PyCharm
# 菜菜代码,永无BUG!

# https://www.qcc.com/

import sqlite3
import urllib3
import os
import json

import sys
import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

import browsercookie

from urllib import parse
from bs4 import BeautifulSoup

import json
import time
import requests
from random import uniform


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)  # 取消HTTPS安全警告


def dpapi_decrypt(encrypted):
    import ctypes
    import ctypes.wintypes

    class DATA_BLOB(ctypes.Structure):  # ctypes结构体通用格式化输出
        _fields_ = [('cbData', ctypes.wintypes.DWORD),  # 定义double word(4字节)大小的值
                    ('pbData', ctypes.POINTER(ctypes.c_char))]  # 定义内存指针

    p = ctypes.create_string_buffer(encrypted, len(encrypted))  # 生成C类型字符串组
    blobin = DATA_BLOB(ctypes.sizeof(p), p)  # 一个指向DATA_BLOB结构体的指针
    blobout = DATA_BLOB()  # 一个指向解密后的数据的DATA_BLOB
    # 中间五个参数:描述该加密数据的信息、一个指向含有密钥DATA_BLOB的指针、保留参数、不需要弹出风险提升提示设置为None、安全相关的标志
    retval = ctypes.windll.crypt32.CryptUnprotectData(
        ctypes.byref(blobin), None, None, None, None, 0, ctypes.byref(blobout))
    if not retval:
        raise ctypes.WinError()
    result = ctypes.string_at(blobout.pbData, blobout.cbData)  # 获取解密结果
    ctypes.windll.kernel32.LocalFree(blobout.pbData)  # 释放pbData指向的内存
    return result


def aes_decrypt(encrypted_txt):
    with open(os.path.join(os.environ['LOCALAPPDATA'],
                           r"Google\Chrome\User Data\Local State"), encoding='utf-8', mode="r") as f:  # 读取本地状态
        jsn = json.loads(str(f.readline()))  # 读取为json类型
    encoded_key = jsn["os_crypt"]["encrypted_key"]  # 获取加密键值
    encrypted_key = base64.b64decode(encoded_key.encode())  # 解密关键键值
    encrypted_key = encrypted_key[5:]  # 获取关键键值的关键部位
    key = dpapi_decrypt(encrypted_key)  # 解密关键键值
    nonce = encrypted_txt[3:15]  # 获取关键键值的关键部位
    cipher = Cipher(algorithms.AES(key), None, backend=default_backend())  # 创建一个空的AES加密对象
    cipher.mode = modes.GCM(nonce)  # 采用GCM加密模式,初始化向量采用关键值的关键部位
    decryptor = cipher.decryptor()  # 解密AES
    return decryptor.update(encrypted_txt[15:])  # 更新解密对象


def chrome_decrypt(encrypted_txt):
    if sys.platform == 'win32':  # 判断系统为Windows
        try:
            # 依据字符串开头判断解密方案
            if encrypted_txt[:4] == b'x01x00x00x00':
                decrypted_txt = dpapi_decrypt(encrypted_txt)  # 采用dpapi解密
                return decrypted_txt.decode()
            elif encrypted_txt[:3] == b'v10':
                decrypted_txt = aes_decrypt(encrypted_txt)  # 采用aes解密
                return decrypted_txt[:-16].decode()
        except WindowsError:
            return None
    else:
        raise WindowsError


def get_cookies_from_chrome(domain):
    sql = f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{domain}%"'  # 获取cookie的sql语句
    filename = os.path.join(os.environ['USERPROFILE'], r'AppData\Local\Google\Chrome\User Data\default\Cookies')  # 本地cookies文件路径拼接
    con = sqlite3.connect(filename)  # 使用sqlite3连接cookies数据库
    con.row_factory = sqlite3.Row  # 需要允许其他人写权限
    cur = con.cursor()  # 获取游标
    cur.execute(sql)  # 执行sql语句
    cookie = ''  # 初始化cookie
    for row in cur:
        if row['value'] is not None:
            name = row['name']  # cookie的键
            value = chrome_decrypt(row['value'])  # cookie的值
            if value is not None:
                cookie += name + '=' + value + ';'  # 拼接cookie
    return cookie


str_time = lambda _: _ == 253392422400 and "9999-09-09" or _ and time.strftime("%Y-%m-%d", time.localtime(_)) or "无固定期限"  # 格式化日期


# 格式化网页访问参数
def parse_parameters(string: str):
    parameters = {}
    string = string.strip().replace(' ', '')
    if ':' not in string and '&' in string:
        for _ in string.split('&'):
            try:
                parameters[_.split('=')[0]] = _.split('=')[1]
            except IndexError:
                parameters[_.split('=')[0]] = ''
    else:
        for _ in string.split('\n'):
            _ = _.strip()
            try:
                parameters[_.split(':')[0]] = _.split(':')[1]
            except IndexError:
                parameters[_.split(':')[0]] = ''
    return parameters


# 格式化cookies值
def parse_cookies(cookie_value: str):
    cookies_dict = {}
    for c in cookie_value.replace(' ', '').split(';'):
        try:
            cookies_dict[c.split('=')[0]] = c.split('=')[1]
        except IndexError:
            cookies_dict[c.split('=')[0]] = ''
    return cookies_dict


# json格式化
def dump_json(text: (str, list, tuple, dict)):
    return json.dumps(text, ensure_ascii=False, indent=4)


# 随机休眠,防止过快的爬取
def random_sleep(a=1, b=2):
    sleep_time = uniform(a, b)
    time.sleep(sleep_time)


doMain = 'qcc.com'  # 企查查域名
search_url = "https://www." + doMain + "/web/search" + "?"  # 企查查搜索根网址
headers = {
    "referer": "https://www.qcc.com/",
    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
}

input_cookie = ''  # 可选手动输入cookie值
cookies = input_cookie or get_cookies_from_chrome(doMain)  # 无手动输入则从cookie文件中读取
while not (parse_cookies(cookies).get("QCCSESSID", None) and parse_cookies(cookies).get("qcc_did", None)):  # 关键参数检测
    input('请在浏览器登录企查查!')


# 获取公司数据
def get_company(company_name):
    parameters = f"""
        key: {company_name}
    """
    parameters = parse_parameters(parameters)
    r = requests.get(search_url + parse.urlencode(parameters), headers=headers, cookies={"cookie": cookies})
    if r.ok:
        soup = BeautifulSoup(r.text, "html.parser")
        table = soup.find("table", attrs={"class": "ntable ntable-list"})
        if table is None:
            return f"未搜寻到公司 “{company_name}” !"
        for tr in table.find_all("tr"):
            info = tr.find_all("td")[2].find("div")
            if info.find("a").find("span") is None:
                continue
            name_ = info.find("a").find("span").text.replace('(', '(').replace(')', ')')
            url = info.find("a")["href"]
            no_kh_things = name_.replace(name_[name_.find('('): name_.rfind(')') + 1], '')
            no_kh = name_.replace('(', '').replace(')', '')
            if company_name != no_kh_things and company_name != no_kh:
                continue
            r = requests.get(url, headers=headers, cookies={"cookie": cookies})
            if r.ok:
                r.encoding = 'utf-8'
                soup = BeautifulSoup(r.text, "html.parser")
                script = soup.find_all('script')
                for s in script:
                    if 'window.__INITIAL_STATE__' in s.text:
                        script = s.text
                        break
                else:
                    return '请清除谷歌浏览器缓存,并重新登录企查查重新执行程序!如果多次出现此提示,请手动复制任意XHR的cookie值赋予到cookie变量!'
                detail = json.loads(script[script.find('{'): script.rfind('};') + 1])["company"]["companyDetail"]
                return {
                    "企业名称": detail["Name"],
                    "信息更新时间": str_time(detail["UpdatedDate"]),
                    "法定代表人": detail["Oper"]["Name"],
                    "登记状态": detail["Status"],
                    "统一社会信用代码": detail["CreditCode"],
                    "工商注册号": detail["No"],
                    "组织机构代码": detail["OrgNo"],
                    "纳税人识别号": detail["TaxNo"],
                    "纳税人资质": detail.get("TaxpayerType", ''),
                    "注册资本": detail["RegistCapi"],
                    "实缴资本": detail["RecCap"],
                    "登记机关": detail["BelongOrg"],
                    "成立日期": str_time(detail["TermStart"]),
                    "核准日期": str_time(detail["CheckDate"]),
                    "营业期限": str_time(detail["TermStart"]) + "至" + str_time(detail["TeamEnd"]),
                    "注册地址": detail["Address"],
                    "宗旨和业务范围": detail["Scope"],
                    "企业类型": detail["EconKind"],
                    "所属行业": detail["Industry"]["SubIndustry"],
                    "所属地区": detail["Area"]["Province"],
                    "人员规模": detail["profile"]["Info"],
                    "参保人数": [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"] and [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"][0] or '',
                    "英文名": detail["EnglishName"],
                    "曾用名": detail["OriginalName"] and [_["Name"] for _ in detail["OriginalName"]] or []
                }
            return f"获取公司 “{name_}” 详情信息失败!"
        return f"未搜寻到公司 “{company_name}” !"
    return "搜索失败!"


if __name__ == '__main__':
    print(dump_json(get_company('浙江阿瓦隆科技有限公司')))

标签:取企,cookies,name,Python,encrypted,detail,查查,cookie,import
来源: https://blog.csdn.net/qq_39603829/article/details/120581606

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有