-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_manager.py
More file actions
340 lines (262 loc) · 14.2 KB
/
file_manager.py
File metadata and controls
340 lines (262 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import database
import logging
import os
import shutil
import zipfile
import tarfile
import threading
import hashlib
from pathlib import Path
from enum import Enum
from backupchan_server import models, nameformat, utility
class FileManagerError(Exception):
pass
class BackupUploadMode(Enum):
DIRECTORY = 0
ARCHIVE = 1
SINGLE_FILE = 2
MULTI_FILE = 3
def is_archive(filename: str) -> bool:
return os.path.isfile(filename) and (tarfile.is_tarfile(filename) or zipfile.is_zipfile(filename))
def safe_tar_extract(tar: tarfile.TarFile, path: str):
# Validate the file to make sure we don't get any path traversal garbage
for member in tar.getmembers():
member_path = os.path.join(path, member.name)
if not os.path.abspath(member_path).startswith(os.path.abspath(path)):
raise FileManagerError("Path traversal detected in tar file")
tar.extractall(path)
def extract_archive(fs_location: str, filename: str):
suffixes = Path(filename).suffixes
if zipfile.is_zipfile(filename):
with zipfile.ZipFile(filename, "r") as zip_file:
zip_file.extractall(fs_location)
return
elif tarfile.is_tarfile(filename):
with tarfile.open(filename, "r:*") as tar_file:
safe_tar_extract(tar_file, fs_location)
return
raise FileManagerError("Unsupported archive format")
def get_fs_location(location: str, name_template: str, backup_id: str, backup_creation_str: str, manual: bool) -> str:
return utility.join_path(location, nameformat.parse(name_template, backup_id, backup_creation_str, manual))
def get_backup_fs_location(backup: models.Backup, target: models.BackupTarget, recycle_bin_path: str) -> str:
if backup.is_recycled:
return get_fs_location(recycle_bin_path, target.name_template, backup.id, backup.created_at.isoformat(), backup.manual)
return get_fs_location(target.location, target.name_template, backup.id, backup.created_at.isoformat(), backup.manual)
def find_single_backup_file(base_path: str) -> str | None:
base = Path(base_path)
parent = base.parent
stem = base.name
for file in parent.iterdir():
if file.is_file() and file.stem == stem:
return file
raise FileManagerError(f"Could not find backup file in base path {base_path}")
def get_directory_size(path: Path)-> int:
total = 0
for dirpath, _, filenames in os.walk(path):
for filename in filenames:
filepath = Path(dirpath) / filename
if filepath.is_file():
total += filepath.stat().st_size
return total
def file_hash(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as file:
while chunk := file.read(8192):
h.update(chunk)
return h.hexdigest()
def directory_hash(path: str) -> str:
h = hashlib.sha256()
for root, _, files in sorted(os.walk(path)):
for filename in sorted(files):
filepath = utility.join_path(root, filename)
relpath = os.path.relpath(filepath, path)
h.update(relpath.encode())
h.update(file_hash(filepath).encode())
return h.hexdigest()
class FileManager:
def __init__(self, db: database.Database, recycle_bin_path: str):
self.db = db
self.recycle_bin_path = recycle_bin_path
self.lock = threading.RLock()
self.logger = logging.getLogger(__name__)
def add_backup(self, backup_id: str, filenames: list[str]):
with self.lock:
self.logger.info("Start add backup operation. Backup id: {%s} filenames: %s", backup_id, filenames)
#
# Checks
#
backup, target = self.get_backup_and_target(backup_id)
fs_location = get_backup_fs_location(backup, target, self.recycle_bin_path)
# If it's single-file, append the extension as well.
if target.target_type == models.BackupType.SINGLE:
fs_location += Path(filenames[0]).suffix
if os.path.exists(fs_location):
raise FileManagerError(f"Path {fs_location} already exists")
self.logger.info("Will be put in %s", fs_location)
if len(filenames) == 0:
raise FileManagerError("No files specified")
elif len(filenames) == 1:
file = filenames[0]
if os.path.isdir(file):
upload_mode = BackupUploadMode.DIRECTORY
elif is_archive(file) and target.target_type == models.BackupType.MULTI:
upload_mode = BackupUploadMode.ARCHIVE
else:
upload_mode = BackupUploadMode.SINGLE_FILE
else:
if any(os.path.isdir(f) for f in filenames):
raise FileManagerError("Directories not allowed in multi-file upload")
upload_mode = BackupUploadMode.MULTI_FILE
if (upload_mode == BackupUploadMode.DIRECTORY or upload_mode == BackupUploadMode.MULTI_FILE) and target.target_type == models.BackupType.SINGLE:
raise FileManagerError("Cannot upload directory or multiple files to a single-file target")
#
# Actual operation
#
self.logger.info("Checks passed. Now uploading")
# Regardless of type, create the directory if it doesn't exist
if target.target_type == models.BackupType.MULTI:
os.makedirs(fs_location, exist_ok=True)
else:
os.makedirs(target.location, exist_ok=True)
if upload_mode == BackupUploadMode.SINGLE_FILE:
shutil.move(filenames[0], fs_location)
elif upload_mode == BackupUploadMode.MULTI_FILE:
for f in filenames:
shutil.move(f, fs_location)
elif upload_mode == BackupUploadMode.ARCHIVE:
extract_archive(fs_location, filenames[0])
elif upload_mode == BackupUploadMode.DIRECTORY:
shutil.move(filenames[0], fs_location)
self.logger.info("Finish upload")
def delete_backup(self, backup_id: str):
with self.lock:
backup, target = self.get_backup_and_target(backup_id)
self.logger.info("Deleting backup {%s}", backup_id)
fs_location = get_backup_fs_location(backup, target, self.recycle_bin_path)
if target.target_type == models.BackupType.SINGLE:
base_location = self.recycle_bin_path if backup.is_recycled else target.location
for path in Path(base_location).glob(nameformat.parse(target.name_template, backup.id, backup.created_at.isoformat(), backup.manual)):
path.unlink()
else:
shutil.rmtree(fs_location)
def delete_target_backups(self, target_id: str):
with self.lock:
self.get_target(target_id)
self.logger.info("Deleting all backups for target {%s}", target_id)
backups = self.db.list_backups_target(target_id)
for backup in backups:
self.delete_backup(backup.id)
def update_backup_locations(self, target: models.BackupTarget, new_name_template: str, new_location: str, old_name_template: str, old_location: str):
with self.lock:
self.logger.info("Starting move backups in target {%s}. Name template: '%s' -> '%s', location: '%s' -> '%s'", target.id, old_name_template, new_name_template, old_location, new_location)
self.db.validate_target(target.name, new_name_template, new_location, target.id, None)
os.makedirs(new_location, exist_ok=True)
for backup in self.db.list_backups_target(target.id):
old_fs_location = get_fs_location(old_location, old_name_template, backup.id, backup.created_at.isoformat(), backup.manual)
new_fs_location = get_fs_location(new_location, new_name_template, backup.id, backup.created_at.isoformat(), backup.manual)
if backup.is_recycled:
# In this case, it only matters if the name template changed.
old_fs_location = get_fs_location(self.recycle_bin_path, old_name_template, backup.id, backup.created_at.isoformat(), backup.manual)
new_fs_location = get_fs_location(self.recycle_bin_path, new_name_template, backup.id, backup.created_at.isoformat(), backup.manual)
if old_fs_location == new_fs_location:
self.logger.info("Skip moving backup {%s} (same source and destination)", backup.id)
if target.target_type == models.BackupType.SINGLE:
old_fs_location = find_single_backup_file(old_fs_location)
new_fs_location += "".join(Path(old_fs_location).suffixes)
self.logger.info("Move %s -> %s", old_fs_location, new_fs_location)
shutil.move(old_fs_location, new_fs_location)
if os.path.isdir(old_location) and not any(os.scandir(old_location)):
self.logger.info("Old location directory empty, removing")
os.rmdir(old_location)
self.logger.info("Finished moving")
def recycle_backup(self, backup_id: int):
with self.lock:
backup, target = self.get_backup_and_target(backup_id)
self.logger.info("Recycle backup {%s}", backup_id)
self.recycle_bin_mkdir()
# Doing this manually since the backup might be marked as recycled or not. This module shouldn't care.
backup_location = get_fs_location(target.location, target.name_template, backup_id, backup.created_at.isoformat(), backup.manual)
recycle_location = get_fs_location(self.recycle_bin_path, target.name_template, backup_id, backup.created_at.isoformat(), backup.manual)
if target.target_type == models.BackupType.SINGLE:
backup_location = find_single_backup_file(backup_location)
recycle_location += "".join(Path(backup_location).suffixes)
self.logger.info("Move %s -> %s", backup_location, recycle_location)
shutil.move(backup_location, recycle_location)
self.logger.info("Finished recycling")
def unrecycle_backup(self, backup_id: str):
with self.lock:
backup, target = self.get_backup_and_target(backup_id)
self.logger.info("Unrecycle backup {%s}", backup_id)
backup_location = get_fs_location(self.recycle_bin_path, target.name_template, backup_id, backup.created_at.isoformat(), backup.manual)
original_location = get_fs_location(target.location, target.name_template, backup_id, backup.created_at.isoformat(), backup.manual)
if target.target_type == models.BackupType.SINGLE:
backup_location = find_single_backup_file(backup_location)
original_location += "".join(Path(backup_location).suffixes)
self.logger.info("Move %s -> %s", backup_location, original_location)
shutil.move(backup_location, original_location)
self.logger.info("Finished unrecycling")
def get_backup_hash(self, backup_id: str):
with self.lock:
backup, target = self.get_backup_and_target(backup_id)
backup_location = get_backup_fs_location(backup, target, self.recycle_bin_path)
if target.target_type == models.BackupType.SINGLE:
backup_location = find_single_backup_file(backup_location)
return file_hash(backup_location)
return directory_hash(backup_location)
def create_backup_archive(self, backup_id: str, output_file: str):
"""
Only works with multi-file targets
"""
backup, target = self.get_backup_and_target(backup_id)
if target.target_type != models.BackupType.MULTI:
raise FileManagerError("Cannot create archive from single-file backup")
self.logger.info("Create archive of backup {%s} as '%s'", backup.id, output_file)
fs_location = get_backup_fs_location(backup, target, self.recycle_bin_path)
with tarfile.open(output_file, "w:xz") as tar_file:
basename = os.path.basename(fs_location)
tar_file.add(fs_location, arcname=basename)
self.logger.info("Finished creating archive")
def recycle_bin_mkdir(self):
with self.lock:
os.makedirs(self.recycle_bin_path, exist_ok=True)
#
# Statistics
#
# They return the value in bytes.
# These recalculate the size every time they're called, see filesize column
# in backups table for general filesize measuring.
#
def get_backup_size(self, backup_id: str) -> int:
backup, target = self.get_backup_and_target(backup_id)
fs_location = get_backup_fs_location(backup, target, self.recycle_bin_path)
if target.target_type == models.BackupType.SINGLE:
fs_location = find_single_backup_file(fs_location)
return Path(fs_location).stat().st_size
if not os.path.exists(fs_location):
raise FileManagerError(f"Backup {backup_id} does not exist on-disk")
return get_directory_size(Path(fs_location))
def get_target_size(self, target_id: str) -> int:
target = self.get_target(target_id)
backups = self.db.list_backups_target(target_id)
return self.get_backup_list_size(backups)
def get_backup_list_size(self, backups: list[models.Backup]) -> int:
total = 0
for backup in backups:
total += self.get_backup_size(backup.id)
return total
#
# These allow accessing things from the database while making sure nothing's broken
#
def get_backup_and_target(self, backup_id: str) -> tuple[models.Backup, models.BackupTarget]:
backup = self.db.get_backup(backup_id)
if backup is None:
raise FileManagerError(f"Backup {backup_id} does not exist")
target = self.db.get_target(backup.target_id)
if target is None:
raise FileManagerError(f"Backup {backup_id} points to nonexistent target")
return backup, target
def get_target(self, target_id: str) -> models.BackupTarget:
target = self.db.get_target(target_id)
if target is None:
raise FileManagerError(f"Target {target_id} does not exist")
return target