每日小结

#Python 文件锁与单文件数据库

最近遇到一个需求是需要克隆大量的 Git 仓库到本地,然后丢到 Python 里做数据分析处理。

具体来说是有一个url列表,需要逐个克隆下来进行一些分析,分析过程大概就是从中抽取一些内容,分析完这个仓库基本就没用了;

仓库大小不一,有的很大,有的很小,有的很多分支,有的很少分支,有的很多commit,有的很少commit;

url可能有重复,可能有无效Git url,同一个仓库可能有多个不同url;

我的(额外)需求:

  • 尽可能快速的克隆,可以同时克隆多个;
  • 我有多个 Project 都有类似需求。为了节省时间和磁盘空间(梯子贵贵),需要尽可能保证每个仓库最多只克隆一次,后面其他 Project 如果再遇到相同url,直接用已有的;
  • 由于磁盘空间有限,当磁盘空间不足时,把最少使用的仓库删除,类似LRU;
  • url可能有无效的,需要记录下来,以便后续不再反复重试浪费时间;
  • 分析仓库的 Python 代码很可能会使用到大量并行手段,所以需要并发控制。

以上。

#分析

  1. url摘要部分:作为内部使用的仓库ID,也用于简单合并重复url。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    def _digest(self, git_url: str):
    git_url = git_url.strip().lower()
    git_url = git_url.rstrip("/")
    if git_url.endswith(".git"):
    git_url = git_url[:-4]
    if git_url.startswith("https://"):
    group, repo = git_url.split("/")[-2:]
    else:
    group, repo = git_url.split(":")[-1].split("/")
    return f"{group}+{repo}"
  2. 仓库克隆的位置:为了方便管理,所有仓库克隆到同一个目录(/data/repos/)下,每个仓库一个子目录。

  3. 系统需要维护的信息有:

  • 已克隆的仓库列表;
  • 访问频次信息,即每个仓库的访问次数,此处没有使用 LRU;
  • 无效的url;

因为可能有多个 Project 同时运行,这些信息需要跨线程/进程、在整个OS的级别共享,即需要支持所谓的 Process safe

一般来说,这其实是一个数据库领域的典型需求,我们也可以通过增加一个单独的数据库服务解决,但是太麻烦,也不符合 Python 作为胶水脚本语言的定位;

相对于使用完整的数据库,一个更简单的办法就是利用文件系统,文件系统一直是全局唯一,且伴随着OS默默运行的一个服务; 文件系统提供了很多方便的接口,例如使用文件锁来实现跨进程的并发控制;

1
2
3
4
5
6
7
8
9
10
11
12
import fcntl

class FileLock:
def __init__(self, file, mode="w"):
self.file = open(file, "rb" if self.mode == "r" else "wb")
self.mode = mode

def __enter__(self):
fcntl.flock(self.file, fcntl.LOCK_SH if self.mode == "r" else fcntl.LOCK_EX)

def __exit__(self, exc_type, exc_val, exc_tb):
fcntl.flock(self.file, fcntl.LOCK_UN)

通过文件锁,我们可以快速封装一个简单的单文件数据库;以下代码使用了一个 JSON 文件来存储任意数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class FileDB:
def __init__(self, db_file):
self.db_file = db_file
self.lock_file = db_file + ".lock"

class Transaction:
def __init__(self, db_file, lock_file, mode):
self.db_file = db_file
self.lock_file = FileLock(lock_file, mode)

def __enter__(self):
self.lock_file.__enter__()
self._load()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.lock_file.mode == "w":
self._dump()
self.lock_file.__exit__(exc_type, exc_val, exc_tb)

def _load(self):
self.data = {}
if os.path.exists(self.db_file):
with open(self.db_file, "r") as f:
self.data = json.load(f)

def _dump(self):
with tempfile.NamedTemporaryFile("w", delete=False) as f:
json.dump(self.data)
os.rename(f.name, self.db_file)

def lock_read(self):
return self.Transaction(self.db_file, self.lock_file, "r")

def lock(self):
return self.Transaction(self.db_file, self.lock_file, "w")

为了方便 Python 用户使用,以上代码把读写操作封装成了一个 Transaction,用户只需要在 with 语句中使用即可;

1
2
3
4
5
6
7
db = FileDB("db.json")

with db.lock() as tx:
tx.data["key"] = "value"

with db.lock_read() as tx:
print(tx.data["key"])

最后,使用这个简单的 FileDB 来实现我们的需求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
from git import Repo

class GitRepoManager:
def __init__(self, db_file):
self.file_db = FileDB(db_file)

def clone(self, git_url) -> Optional[Repo]:
key = self._digest(git_url)
repo_path = os.path.join(self.repo_base, key)
repo_lock = os.path.join(self.repo_base, key + ".lock")

with self.file_db.lock() as trx:
# The repo is already cloned and in the cache
if key in trx["lrucache"]:
trx["lrucache"][key] += 1
return Repo(repo_path)

# The url is bad
if key in trx["badcache"]:
return None

if len(trx["lrucache"]) >= self.max_size:
# remove the least used item
least_used = min(trx["lrucache"].items(), key=lambda x: x[1])
least_key = least_used[0]
least_path = os.path.join(self.repo_base, least_key)
least_lock = os.path.join(self.repo_base, least_key + ".lock")
if os.path.exists(least_path):
shutil.rmtree(least_path)
if os.path.exists(least_lock):
os.unlink(least_lock)
del trx["lrucache"][least_key]

# acquire a file lock of the repo
# if the lock is acquired, we are the only process/thread that is cloning the repo
# otherwise, we wait for the other process to finish cloning
with FileLock(repo_lock):
# The repo has been cloned by another process
# We can just open it
if os.path.exists(repo_path):
inst = Repo(repo_path)
return inst

with self.file_db.lock() as trx:
# The url is bad
if key in trx["badcache"]:
return None

# Clone the repo
try:
inst = Repo.clone_from(git_url, repo_path)
except Exception as e:
# Clone failed, bad git url
logger.error(e)
with self.file_db.lock() as trx:
trx["badcache"].add(key)
return None

# Clone succeeded
with self.file_db.lock() as trx:
if key not in trx["lrucache"]:
trx["lrucache"][key] = 1
else:
trx["lrucache"][key] += 1
return Repo(repo_path)

def _digest(self, git_url: str):
git_url = git_url.strip().lower()
git_url = git_url.rstrip("/")
if git_url.endswith(".git"):
git_url = git_url[:-4]
if git_url.startswith("https://"):
group, repo = git_url.split("/")[-2:]
else:
group, repo = git_url.split(":")[-1].split("/")
return f"{group}+{repo}"

大体上就是这样,这个 FileDB 可以用来实现很多简单的数据库需求,当然,如果你的需求更复杂,还是使用一个完整的数据库服务更好。