Python Miniblink 简单使用

已知问题:

  • 代理ip 时而失效
  • 每个页面加载完成后关闭当前页面,若多进程开启多个窗口,会等所有窗口都加载完才会关闭所有页面

其他参考:

https://github.com/ynyyn/Miniblink-Python-SimpleDemo
https://github.com/lochen88/MBPython3

mini.py

import ctypes, ctypes.wintypes
from ctypes import *
import threading
import multiprocessing
import time

class wkeProxy(Structure):

    _fields_ = [('type', c_int),('hostname', c_char *100),('port', c_ushort ),('username', c_char *50),('password',c_char *50)]
def run(url):
    webview = mb = None

    mb = ctypes.cdll.LoadLibrary('node.dll')
    mb.wkeInitialize()

    webview = mb.wkeCreateWebWindow(
        0, # WKE_WINDOW_TYPE_POPUP
        None, 0, 0,
        1024, 768 # width, height
    )
    mb.wkeMoveToCenter(webview)

    mb.wkeSetWindowTitleW(webview, "Miniblink Python")

    # mb.wkeOnWindowClosing(webview, handleWindowClosing, 0)

    # 窗口销毁时回调
    mb.wkeOnWindowDestroy(webview, handleWindowDestroy, 0) 


    mb.wkeShowWindow(webview, True)

    mb.wkeLoadURLW(webview, url)
    
    # 页面加载完成后回调
    mb.wkeOnLoadingFinish(webview,onLoadingFinish,0) 

    # 设置代理ip
    proxy = set_webview_proxy('1.1.1.1',8088,proxy_type=4,user=None,password=None)
    mb.wkeSetViewProxy(webview,byref(proxy))

    msg = ctypes.wintypes.MSG()
    lpMsg = ctypes.byref(msg)

    while ctypes.windll.user32.GetMessageW(lpMsg, 0, 0, 0) > 0:
        ctypes.windll.user32.TranslateMessage(lpMsg)
        ctypes.windll.user32.DispatchMessageW(lpMsg)

@CFUNCTYPE(None,c_int,c_void_p,c_void_p,c_int,c_void_p)
def onLoadingFinish(webview,param,url,result,failedReason):

    webview = None
    ctypes.windll.user32.PostQuitMessage(0)

@CFUNCTYPE(None, c_int, c_void_p)
def handleWindowDestroy(webView, param):
    webview = None
    ctypes.windll.user32.PostQuitMessage(0)


#设置指定webview的代理
def set_webview_proxy(ip,port,proxy_type=1,user=None,password=None):
    # proxy_type={
    # 0:WKE_PROXY_NONE,
    # 1:WKE_PROXY_HTTP,
    # 2:WKE_PROXY_SOCKS4,
    # 3:WKE_PROXY_SOCKS4A,
    # 4:WKE_PROXY_SOCKS5,
    # 5:WKE_PROXY_SOCKS5HOSTNAME}
    # ip='49.87.18.53'
    # port=4221
    if user==None:
        user=b''
    else:
        user=user.encode('utf8')
    if password==None:
        password=b''
    else:
        password=password.encode('utf8')
    ip=ip.encode('utf8')
    port=int(port)
    #  _fields_ = [('type', c_int),('hostname', c_char *100),('port', c_ushort ),('username', c_char *50),('password',c_char *50)]
    proxy= wkeProxy(type=c_int(proxy_type), hostname=ip, port=c_ushort(port),username=user,password=password)
    return proxy




if __name__ == '__main__':  
    url_list = []
    with open('urls.txt','r') as f:
        for line in f:
            url_list.append(line.strip('\n')) 

    # while True:
    pool = multiprocessing.Pool(processes=1)
    for url in url_list:
        pool.apply_async(run,(url,))
        # print(url)
        # run(url)
    pool.close()
    pool.join()

    
    print('ok')

发表评论

邮箱地址不会被公开。 必填项已用*标注