欢迎各位兄弟 发布技术文章

这里的技术是共享的

You are here

简化版示例脚本 —— 把下载部分升级为 多线程分块下载 + 大块写入 + 进度条优化,这样下载速度会显著提升,,,,,你想把 tqdm 的进度条 换成 tkinter 窗口里的进度条,并且在 GUI 界面中显示百分比、文件大小、预计剩余时间、实时速度。 AG chatgpt 有大用 有大大用

在局域网的情况下  chunk_size 大小是多少比较合适

在 局域网(LAN)环境 下,由于网络延迟极低(通常 <1ms)、带宽高(通常 1Gbps 或更高),chunk_size 的设置可以显著优化下载速度。以下是详细建议:


1. 局域网环境下推荐 chunk_size

场景推荐 chunk_size原因
小文件(<10MB)64KB~256KB减少请求次数,避免因分块过多导致协议开销。
大文件(>100MB)1MB~10MB充分利用高带宽,减少 TCP/IP 协议栈的上下文切换开销。
极高速网络(10Gbps)10MB~100MB匹配网络吞吐能力(需确保服务器和客户端内存足够)。




简化版示例脚本 —— 把下载部分升级为 多线程分块下载 + 大块写入 + 进度条优化,这样下载速度会显著提升

 明白了,我给你一个 简化版示例脚本 —— 只保留 多线程分块下载 + 进度条,方便你直接嵌入到原来的大脚本里替换下载函数。

核心逻辑:

  • 获取文件大小

  • Range 头把文件分成多个区块(默认 4 块)

  • 多线程下载,每个线程负责一个区块

  • 边下载边写入临时文件,最后合并

  • tqdm 实时显示下载进度


简化版示例脚本

import os
import requests
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

   
def download_range(url, start, end, temp_file, pbar):
    headers = {"Range": f"bytes={start}-{end}"}
    with requests.get(url, headers=headers, stream=True) as r:
        with open(temp_file, "wb") as f:
            for chunk in r.iter_content(chunk_size=1024*1024):  # 1MB    
                if chunk:
                    f.write(chunk)
                    pbar.update(len(chunk))

   
def download_file_multithread(url, filename, num_threads=4):
    # 获取文件大小    
    resp = requests.head(url)
    file_size = int(resp.headers.get("Content-Length", 0))
    if file_size == 0:
        raise Exception("无法获取文件大小")

   
    # 分块区间    
    part_size = file_size // num_threads
    ranges = []
    for i in range(num_threads):
        start = i * part_size
        end = (i+1) * part_size - 1 if i < num_threads - 1 else file_size - 1    
        ranges.append((start, end))

   
    temp_files = [f"{filename}.part{i}" for i in range(num_threads)]

   
    # 多线程下载    
    with tqdm(total=file_size, unit="B", unit_scale=True, desc=filename) as pbar:
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            for i, (start, end) in enumerate(ranges):
                executor.submit(download_range, url, start, end, temp_files[i], pbar)

   
    # 合并文件    
    with open(filename, "wb") as outfile:
        for temp_file in temp_files:
            with open(temp_file, "rb") as infile:
                outfile.write(infile.read())
            os.remove(temp_file)

   
    print(f"下载完成: {filename}")

   
# 示例调用    
if __name__ == "__main__":
    test_url = "https://speed.hetzner.de/100MB.bin"  # 100MB 测试文件    
    download_file_multithread(test_url, "test.bin", num_threads=4)



这样你就可以直接把原来大脚本里的下载函数替换掉:

  • 原来的 download_with_progress → 用我这个 download_file_multithread

  • UI 的进度条你可以用类似 tqdm 的回调方式来替换。


想把 tqdm 的进度条 换成 tkinter 窗口里的进度条,并且在 GUI 界面中显示百分比、文件大小、预计剩余时间、实时速度。

明白了,你想把 tqdm 的进度条 换成 tkinter 窗口里的进度条,并且在 GUI 界面中显示百分比、文件大小、预计剩余时间、实时速度。

tqdm 是命令行工具,不直接兼容 tkinter,所以我们需要用 tkinter.ttk.Progressbar + Label 来手动更新进度。


示例:tkinter 窗口进度条 + 百分比 + 速度 + 预计时间

import os
import time
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
import tkinter as tk
from tkinter import ttk


   
def download_range(url, start, end, temp_file, progress_data):
    headers = {"Range": f"bytes={start}-{end}"}
    with requests.get(url, headers=headers, stream=True) as r:
        with open(temp_file, "wb") as f:
            for chunk in r.iter_content(chunk_size=1024 * 256):  # 256KB    
                if chunk:
                    f.write(chunk)
                    size = len(chunk)
                    with progress_data["lock"]:
                        progress_data["downloaded"] += size


   
def update_ui(root, progress_bar, label, progress_data, file_size, start_time):
    if not progress_data["done"]:
        with progress_data["lock"]:
            downloaded = progress_data["downloaded"]
        percent = downloaded / file_size * 100    
        elapsed = time.time() - start_time
        speed = downloaded / 1024 / 1024 / elapsed if elapsed > 0 else 0    
        remaining = (file_size - downloaded) / (speed * 1024 * 1024) if speed > 0 else 0    

   
        # 更新进度条和文字    
        progress_bar["value"] = percent
        label.config(text=f"{percent:.2f}%  {downloaded / 1024 / 1024:.2f}MB / {file_size / 1024 / 1024:.2f}MB | "    
                          f"速度: {speed:.2f} MB/s | 剩余: {remaining:.1f}s")

   
        root.after(500, update_ui, root, progress_bar, label, progress_data, file_size, start_time)


   
def download_file_multithread(url, filename, num_threads=4):
    resp = requests.head(url)
    file_size = int(resp.headers.get("Content-Length", 0))
    if file_size == 0:
        raise Exception("无法获取文件大小")

   
    # 分块区间    
    part_size = file_size // num_threads
    ranges = []
    for i in range(num_threads):
        start = i * part_size
        end = (i + 1) * part_size - 1 if i < num_threads - 1 else file_size - 1    
        ranges.append((start, end))

   
    temp_files = [f"{filename}.part{i}" for i in range(num_threads)]

   
    import threading
    progress_data = {"downloaded": 0, "lock": threading.Lock(), "done": False}

   
    # 启动 Tkinter UI    
    root = tk.Tk()
    root.title("文件下载器")

   
    progress_bar = ttk.Progressbar(root, orient="horizontal", length=400, mode="determinate", maximum=100)
    progress_bar.pack(pady=10)

   
    label = tk.Label(root, text="准备下载...")
    label.pack()

   
    start_time = time.time()
    root.after(500, update_ui, root, progress_bar, label, progress_data, file_size, start_time)

   
    # 下载线程    
    def run_download():
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            futures = []
            for i, (start, end) in enumerate(ranges):
                futures.append(executor.submit(download_range, url, start, end, temp_files[i], progress_data))
            for f in futures:
                f.result()
        progress_data["done"] = True    

   
        # 合并文件    
        with open(filename, "wb") as outfile:
            for temp_file in temp_files:
                with open(temp_file, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(temp_file)

   
        label.config(text=f"下载完成: {filename}")

   
    threading.Thread(target=run_download, daemon=True).start()

   
    root.mainloop()


   
# 示例调用    
if __name__ == "__main__":
    test_url = "https://speed.hetzner.de/100MB.bin"  # 100MB 测试文件    
    download_file_multithread(test_url, "test.bin", num_threads=4)



效果

  • 弹出一个 tkinter 窗口。

  • 上方有进度条。

  • 下方有文字:

               
    35.42%  35.42MB / 100.00MB | 速度: 8.23 MB/s | 剩余: 7.3s            

           


           

  • 下载完成后提示:下载完成: test.bin



普通分类: