@@ -33,25 +33,26 @@ def retrieve(file, analyzer):
3333 from storages .backends .s3boto3 import S3Boto3Storage
3434
3535 class S3Boto3StorageWrapper (S3Boto3Storage ):
36- def retrieve (self , file , analyzer ):
37- # FIXME we can optimize this a lot.
38- # Right now we are doing an http request FOR analyzer. We can have a
39- # proxy that will store the content and then save it locally
36+ # Shared cache directory where files are downloaded once and
37+ # reused by every analyzer that needs them.
38+ _CACHE_DIR = os .path .join (MEDIA_ROOT , "_s3_cache" )
4039
41- # The idea is to download the file in MEDIA_ROOT/analyzer/namefile
42- # if it does not exist
43- path_dir = os .path .join (MEDIA_ROOT , analyzer )
40+ def retrieve (self , file , analyzer ):
4441 name = file .name
45- _path = os .path .join (path_dir , name )
42+ _path = os .path .join (self . _CACHE_DIR , name )
4643 if not os .path .exists (_path ):
47- os .makedirs (path_dir , exist_ok = True )
44+ os .makedirs (os . path . dirname ( _path ) , exist_ok = True )
4845 if not self .exists (name ):
4946 raise AssertionError
47+ # Write to a temp file first, then rename for atomicity.
48+ # This prevents a concurrent worker from reading a half-written file.
49+ tmp_path = _path + ".tmp"
5050 with self .open (name ) as s3_file_object :
5151 content = s3_file_object .read ()
52- s3_file_object .seek (0 )
53- with open (_path , "wb" ) as local_file_object :
52+ with open (tmp_path , "wb" ) as local_file_object :
5453 local_file_object .write (content )
54+ # atomic on the same filesystem
55+ os .replace (tmp_path , _path )
5556 return _path
5657
5758 DEFAULT_FILE_STORAGE = "intel_owl.settings.S3Boto3StorageWrapper"
0 commit comments