由于我使用
虚拟机的时候,虚拟机和本机电脑直接的复制粘贴总是出异常,需要将虚拟机重新关闭打开然后才能正常很烦人,然后找了好几个共享工具都不够简单,然后就有了这个工具
这个是局域网临时使用的共享工具
所有人都打开这个软件直接拖入需要共享的文件进去别人自动收到这条信息,就可以直接下载,
需要什么功能自己添加就行,源码在下面
虚拟机必须桥接,nat3.0之前的版本不行,要不然只能你看见下虚拟机的文件,但是虚拟机访问不了你
功能很简单就是分享然后下载,临时使用下
看到很多人用这个,我还以为就我自己随便凑合下,原来这东西还是有很多人需要的啊,争取维护好他,我感觉这东西不用这么复杂
一个分享下载就ok了,大道至简果然,但是你使用过程会发现不是想的那么简单各种问题需要处理,AI很强大真的,我就写了个最简单的
基础版本然后需要各种的功能就直接让AI操作了省时省力,linux,苹果我没试过,但是AI他写了脚本有需要的朋友可以尝试打包试试,AI
评估说他们是可正常通信的也就是分享下载,我没试过没有设备,v7.0以后有了(Linux/macOS/Windows/Universal )平台的脚本,大家可以打包试试
源码都自带了大家有啥可以直接问AI然他帮你干活,多简单
v7.3文件夹上传下载和打开文件夹下载文件夹中的文件功能,
修复一些bug,
rgbhttps://xbl.lanzoue.com/itzsN3kr5r1ergb密码:3nh2
v7.3文件夹上传下载和打开文件夹下载文件夹中的文件功能,
PS:来个朋友帮忙测试下,这个版本无法和之前的版本互通文件夹功能,因为之前的工具中没有这个变量,如果要用文件夹这个功能就必须使用当前版本或者是之后的更高的版本
有问题就反馈,我看到,到时候在修复
rgbhttps://xbl.lanzoue.com/iJyIc3knufgdrgb密码:8470
v7.1 修复2个bug
修复一:文件名显示异常问题
- Windows 10 使用 v5.0 分享文件后,在 Windows 11 的 v7.0 上会下载带 file_id_ 前缀的文件
- 当 Windows 10 删除原文件并重新分享同名文件时,Windows 11 上仍显示旧的带前缀文件名必须在 Windows 10 上修改文件名后重新分享,Windows 11 才能显示正确文件名
修复二:删除同步不及时问题
问题现象:
- Windows 10 删除已分享文件后,本地列表立即消失
- 但 Windows 11 上该文件仍显示在列表中,且可继续下载即使关闭并重新打开 Windows 11 的软件,文件依然存在
解决方案:
- 优化删除通知处理逻辑:区分自动接收和手动保存的文件
- 自动接收的文件会随源端删除而从列表中隐藏
- 增强广播机制,增加重试次数和发送间隔优化心跳检测,更及时地发现离线节点
修复效果:
- 删除操作在 10-15 秒内同步到所有在线节点
- 被删除的文件在对方设备上自动消失手动保存到其他位置的文件不受影响
rgbhttps://xbl.lanzoue.com/iuJLW3juf2yjrgb密码:c6kh
v7.0 重点优化响应速度和操作流畅度,
- 批量上传后台化,减少卡顿。
- 同步刷新去掉无效重绘。
- 拖拽下载改后台,不锁界面。
- 退出清理后台执行,关闭更快。
- 接收文件名恢复原名,兼容旧命名。新增 Linux/macOS/Windows/Universal 单文件脚本
PS:老规矩打包里面有源码和成品,我只有win只能打包win其他平台不懂这是ai写的我就开始写了个简陋的第一版本
纯脚本下载地址(Linux/macOS/Windows/Universal ):
https://xbl.lanzoue.com/icm0n3jpuigh
win成品和源码下载地址:
下载:https://xbl.lanzoue.com/iV8A93jpu9ah 密码:hatq
v5.0 统一界面风格,修复无法删除分享时候文件是只读模式的bug
rgbhttps://xbl.lanzoue.com/igaaN3jm5hfcrgb密码:2y9b
v4.0上传可以选择
rgbhttps://xbl.lanzoue.com/iCVUh3jliflirgb密码:8yn2
v3.0增加虚拟机nat模式也可以互相分享下载
rgbhttps://xbl.lanzoue.com/b0hdr381argb密码:43ob
添加了检查功能,下载体验,有问题给我反馈,
V2.0 20260301
rgbhttps://xbl.lanzoue.com/ihnLr3jiruzirgb密码:1oq0
v1.0
rgbhttps://xbl.lanzoue.com/ifRQ13jfb2wdrgb密码:cxub
下面是1.0源码后续所有源码都在下载成品里面
[Python]
纯文本查看 复制代码import hashlibimport jsonimport osimport shutilimport socketimport sysimport threadingimport timeimport uuidfrom datetime import datetimefrom http import HTTPStatusfrom http.server import BaseHTTPRequestHandler, ThreadingHTTPServerfrom pathlib import Pathfrom urllib.parse import quote, unquote, urlparsefrom urllib.request import Request, urlopenfrom PyQt5.QtCore import QObject, QSize, Qt, QUrl, pyqtSignalfrom PyQt5.QtGui import QColor, QDragfrom PyQt5.QtWidgets import ( QApplication, QCheckBox, QDialog, QFileDialog, QFrame, QHBoxLayout, QLabel, QListWidget, QListWidgetItem, QMainWindow, QMessageBox, QPlainTextEdit, QPushButton, QVBoxLayout, QWidget,)UDP_DISCOVERY_PORT = 45678UDP_BUFFER_SIZE = 65535HELLO_INTERVAL_SEC = 3PEER_TIMEOUT_SEC = 12HTTP_TIMEOUT_SEC = 10def now_ts() -> float: return time.time()def fmt_time(ts: float) -> str: return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")def fmt_size(size: int) -> str: units = ["B", "KB", "MB", "GB", "TB"] n = float(size) for unit in units: if n < 1024 or unit == units[-1]: if unit == "B": return f"{int(n)} {unit}" return f"{n:.2f} {unit}" n /= 1024.0 return f"{size} B"def safe_name(name: str) -> str: cleaned = "".join(c if c.isalnum() or c in "._- ()[]{}" else "_" for c in name) cleaned = cleaned.strip() return cleaned or "file"def detect_local_ip() -> str: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sock.connect(("8.8.8.8", 80)) return sock.getsockname()[0] except OSError: return "127.0.0.1" finally: sock.close()def find_free_port() -> int: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("0.0.0.0", 0)) port = sock.getsockname()[1] sock.close() return portdef sha1_file(path: Path) -> str: h = hashlib.sha1() with path.open("rb") as f: while True: chunk = f.read(1024 * 1024) if not chunk: break h.update(chunk) return h.hexdigest()class ShareCore(QObject): files_changed = pyqtSignal() stats_changed = pyqtSignal(dict) log_event = pyqtSignal(str) def __init__(self): super().__init__() self.lock = threading.RLock() self.stop_event = threading.Event() self.node_id = uuid.uuid4().hex[:12] self.node_name = socket.gethostname() self.local_ip = detect_local_ip() self.http_port = find_free_port() self.base_dir = Path.home() / ".lan_soft_share" self.shared_dir = self.base_dir / "shared" self.mirror_dir = self.base_dir / "mirror" self.auto_sync_default_dir = self.base_dir / "auto_sync_downloads" self.settings_file = self.base_dir / "settings.json" self.shared_dir.mkdir(parents=True, exist_ok=True) self.mirror_dir.mkdir(parents=True, exist_ok=True) self.auto_sync_default_dir.mkdir(parents=True, exist_ok=True) self.auto_sync_enabled = False self.auto_sync_dir = self.auto_sync_default_dir self.cleanup_shared_on_exit = True self._load_settings() self.auto_sync_dir.mkdir(parents=True, exist_ok=True) self.files = {} self.peers = {} self.local_file_paths = {} self.downloading = set() self.udp_socket = None self.http_server = None self.http_thread = None self.threads = [] def start(self): self._start_http() self._start_udp() self._spawn(self._udp_recv_loop, "udp-recv") self._spawn(self._hello_loop, "hello") self._spawn(self._peer_gc_loop, "peer-gc") self._emit_stats() self.log( "已启动,拖拽文件到窗口即可共享。" f" 自动接收={'开启' if self.auto_sync_enabled else '关闭'}。" ) if self.auto_sync_enabled: self._trigger_auto_sync_backfill() def stop(self): with self.lock: local_file_ids = [ fid for fid, meta in self.files.items() if meta.get("is_local") ] cleanup_on_exit = bool(self.cleanup_shared_on_exit) self._broadcast_files_removed(local_file_ids) self.stop_event.set() if self.udp_socket: try: self.udp_socket.close() except OSError: pass if self.http_server: try: self.http_server.shutdown() except OSError: pass try: self.http_server.server_close() except OSError: pass for t in self.threads: t.join(timeout=1.2) if self.http_thread: self.http_thread.join(timeout=1.2) if cleanup_on_exit: self._cleanup_shared_dir() def log(self, text: str): self.log_event.emit(f"[{datetime.now().strftime('%H:%M:%S')}] {text}") def _load_settings(self): if not self.settings_file.exists(): return try: data = json.loads(self.settings_file.read_text(encoding="utf-8")) self.auto_sync_enabled = bool(data.get("auto_sync_enabled", False)) auto_dir = str(data.get("auto_sync_dir", "")).strip() if auto_dir: self.auto_sync_dir = Path(auto_dir).expanduser() self.cleanup_shared_on_exit = bool(data.get("cleanup_shared_on_exit", True)) except Exception: # Fall back to defaults if settings are malformed. self.auto_sync_enabled = False self.auto_sync_dir = self.auto_sync_default_dir self.cleanup_shared_on_exit = True def _save_settings(self): try: payload = { "auto_sync_enabled": self.auto_sync_enabled, "auto_sync_dir": str(self.auto_sync_dir), "cleanup_shared_on_exit": self.cleanup_shared_on_exit, } self.settings_file.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") except Exception as e: self.log(f"保存设置失败: {e}") def set_auto_sync_enabled(self, enabled: bool): enabled = bool(enabled) with self.lock: self.auto_sync_enabled = enabled self._save_settings() self._emit_stats() self.log(f"自动接收已{'开启' if enabled else '关闭'}") if enabled: self._trigger_auto_sync_backfill() def set_auto_sync_dir(self, folder: str) -> bool: target = Path(folder).expanduser() try: target.mkdir(parents=True, exist_ok=True) except Exception as e: self.log(f"无法使用该目录: {e}") return False with self.lock: self.auto_sync_dir = target self._save_settings() self._emit_stats() self.log(f"自动接收目录: {target}") if self.auto_sync_enabled: self._trigger_auto_sync_backfill() return True def set_cleanup_shared_on_exit(self, enabled: bool): enabled = bool(enabled) with self.lock: self.cleanup_shared_on_exit = enabled self._save_settings() self._emit_stats() self.log(f"退出自动清理共享副本已{'开启' if enabled else '关闭'}") def _trigger_auto_sync_backfill(self): with self.lock: if not self.auto_sync_enabled: return candidates = [] for fid, meta in self.files.items(): if meta.get("is_local"): continue local_path = meta.get("local_path", "") if local_path and os.path.exists(local_path): continue if meta.get("status") in ("remote", "error"): candidates.append(fid) for fid in candidates: self._spawn( lambda file_id=fid: self._download_remote( file_id, target_dir=self.auto_sync_dir, reason="auto-receive" ), f"auto-{fid[:8]}", ) def get_stats(self) -> dict: with self.lock: return { "node_name": self.node_name, "node_id": self.node_id, "local_ip": self.local_ip, "http_port": self.http_port, "peer_count": len(self.peers), "file_count": len(self.files), "auto_sync_enabled": self.auto_sync_enabled, "auto_sync_dir": str(self.auto_sync_dir), "cleanup_shared_on_exit": self.cleanup_shared_on_exit, } def get_files_snapshot(self): with self.lock: items = list(self.files.values()) return sorted(items, key=lambda x: x.get("added_at", 0), reverse=True) def share_paths(self, paths): changed = False for p in paths: path = Path(p) if not path.exists() or not path.is_file(): continue if self._share_single(path): changed = True if changed: self.files_changed.emit() self._emit_stats() def _share_single(self, path: Path) -> bool: size = path.stat().st_size sha1 = sha1_file(path) file_id = f"{sha1}_{size}" with self.lock: if file_id in self.files and self.files[file_id].get("is_local"): self.log(f"已共享过: {path.name}") return False target_name = f"{file_id}_{safe_name(path.name)}" target_path = self.shared_dir / target_name if not target_path.exists(): shutil.copy2(str(path), str(target_path)) meta = { "file_id": file_id, "name": path.name, "size": size, "sha1": sha1, "added_at": now_ts(), "owner_id": self.node_id, "owner_name": self.node_name, "owner_host": self.local_ip, "owner_port": self.http_port, "status": "ready", "is_local": True, "local_path": str(target_path), "sources": [{"host": self.local_ip, "port": self.http_port}], } with self.lock: self.files[file_id] = meta self.local_file_paths[file_id] = str(target_path) self._broadcast( { "type": "NEW_FILE", "node_id": self.node_id, "node_name": self.node_name, "http_port": self.http_port, "meta": { "file_id": file_id, "name": path.name, "size": size, "sha1": sha1, "added_at": meta["added_at"], "owner_id": self.node_id, "owner_name": self.node_name, }, } ) self.log(f"已共享: {path.name} ({fmt_size(size)})") return True def _start_http(self): core = self class Handler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): return def _send_json(self, data, status=HTTPStatus.OK): payload = json.dumps(data, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_GET(self): parsed = urlparse(self.path) if parsed.path == "/index": self._send_json(core._local_index()) return if parsed.path.startswith("/file/"): file_id = unquote(parsed.path[len("/file/") :]) with core.lock: fpath = core.local_file_paths.get(file_id) if not fpath or not os.path.exists(fpath): self._send_json({"error": "not_found"}, status=HTTPStatus.NOT_FOUND) return size = os.path.getsize(fpath) self.send_response(HTTPStatus.OK) self.send_header("Content-Type", "application/octet-stream") self.send_header("Content-Length", str(size)) self.end_headers() with open(fpath, "rb") as f: while True: chunk = f.read(1024 * 1024) if not chunk: break self.wfile.write(chunk) return self._send_json({"error": "unknown_endpoint"}, status=HTTPStatus.NOT_FOUND) self.http_server = ThreadingHTTPServer(("0.0.0.0", self.http_port), Handler) self.http_thread = threading.Thread(target=self.http_server.serve_forever, daemon=True) self.http_thread.start() def _local_index(self): with self.lock: out = [] for f in self.files.values(): if not f.get("is_local"): continue out.append( { "file_id": f["file_id"], "name": f["name"], "size": f["size"], "sha1": f["sha1"], "added_at": f["added_at"], "owner_id": self.node_id, "owner_name": self.node_name, } ) return out def _start_udp(self): self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.udp_socket.bind(("", UDP_DISCOVERY_PORT)) self.udp_socket.settimeout(1.0) def _spawn(self, fn, name): t = threading.Thread(target=fn, name=name, daemon=True) t.start() self.threads.append(t) def _broadcast(self, payload: dict): if not self.udp_socket: return data = json.dumps(payload, ensure_ascii=False).encode("utf-8") try: self.udp_socket.sendto(data, ("255.255.255.255", UDP_DISCOVERY_PORT)) except OSError: pass def _broadcast_files_removed(self, file_ids): if not file_ids: return payload = { "type": "FILES_REMOVED", "node_id": self.node_id, "node_name": self.node_name, "http_port": self.http_port, "owner_id": self.node_id, "file_ids": list(file_ids), "ts": now_ts(), } # Best effort: send a few times before stopping network threads. for _ in range(2): self._broadcast(payload) time.sleep(0.05) def _cleanup_shared_dir(self): removed = 0 try: for p in self.shared_dir.iterdir(): if p.is_file(): try: p.unlink() removed += 1 except OSError: pass except OSError: return if removed > 0: self.log(f"退出清理完成:删除共享副本 {removed} 个。") def _hello_loop(self): while not self.stop_event.is_set(): self._broadcast( { "type": "HELLO", "node_id": self.node_id, "node_name": self.node_name, "http_port": self.http_port, "ts": now_ts(), } ) self.stop_event.wait(HELLO_INTERVAL_SEC) def _peer_gc_loop(self): while not self.stop_event.is_set(): stats_changed = False files_changed = False cutoff = now_ts() - PEER_TIMEOUT_SEC dead_endpoints = [] with self.lock: dead = [pid for pid, p in self.peers.items() if p["last_seen"] < cutoff] for pid in dead: p = self.peers.get(pid) or {} dead_endpoints.append((p.get("host"), int(p.get("http_port", 0)))) self.peers.pop(pid, None) stats_changed = True for host, port in dead_endpoints: if not host or port