@@ -692,7 +692,7 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
692692 updater = self ._run_refresh ()
693693 updater .get_targetinfo ("non_existent_target" )
694694
695- # Clear statistics for calls and metadata requests
695+ # Clear statistics for open() calls and metadata requests
696696 wrapped_open .reset_mock ()
697697 self .sim .fetch_tracker .metadata .clear ()
698698
@@ -715,6 +715,87 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
715715 expected_calls = [("root" , 2 ), ("timestamp" , None )]
716716 self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
717717
718+
719+ @patch .object (builtins , "open" , wraps = builtins .open )
720+ def test_intermediate_root_cache (self , wrapped_open : MagicMock ) -> None :
721+ """Test that refresh uses the intermediate roots from cache"""
722+ # Add root versions 2, 3
723+ self .sim .root .version += 1
724+ self .sim .publish_root ()
725+ self .sim .root .version += 1
726+ self .sim .publish_root ()
727+
728+ # Make a successful update of valid metadata which stores it in cache
729+ self ._run_refresh ()
730+
731+ # assert that cache lookups happened but data was downloaded from remote
732+ wrapped_open .assert_has_calls (
733+ [
734+ call (os .path .join (self .metadata_dir , "root_history/2.root.json" ), "rb" ),
735+ call (os .path .join (self .metadata_dir , "root_history/3.root.json" ), "rb" ),
736+ call (os .path .join (self .metadata_dir , "root_history/4.root.json" ), "rb" ),
737+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
738+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
739+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
740+ ]
741+ )
742+ expected_calls = [("root" , 2 ), ("root" , 3 ), ("root" , 4 ), ("timestamp" , None ), ("snapshot" , 1 ), ("targets" , 1 )]
743+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
744+
745+ # Clear statistics for open() calls and metadata requests
746+ wrapped_open .reset_mock ()
747+ self .sim .fetch_tracker .metadata .clear ()
748+
749+ # Run update again, assert that metadata from cache was used (including intermediate roots)
750+ self ._run_refresh ()
751+ wrapped_open .assert_has_calls (
752+ [
753+ call (os .path .join (self .metadata_dir , "root_history/2.root.json" ), "rb" ),
754+ call (os .path .join (self .metadata_dir , "root_history/3.root.json" ), "rb" ),
755+ call (os .path .join (self .metadata_dir , "root_history/4.root.json" ), "rb" ),
756+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
757+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
758+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
759+ ]
760+ )
761+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
762+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
763+
764+ def test_intermediate_root_cache_poisoning (self ) -> None :
765+ """Test that refresh works as expected when intermediate roots in cache are poisoned"""
766+ # Add root versions 2, 3
767+ self .sim .root .version += 1
768+ self .sim .publish_root ()
769+ self .sim .root .version += 1
770+ self .sim .publish_root ()
771+
772+ # Make a successful update of valid metadata which stores it in cache
773+ self ._run_refresh ()
774+
775+ # Modify cached intermediate root v2 so that it's no longer signed correctly
776+ root_path = os .path .join (self .metadata_dir , "root_history" , "2.root.json" )
777+ md = Metadata .from_file (root_path )
778+ md .signatures .clear ()
779+ md .to_file (root_path )
780+
781+ # Clear statistics for metadata requests
782+ self .sim .fetch_tracker .metadata .clear ()
783+
784+ # Update again, assert that intermediate root v2 was downloaded again
785+ self ._run_refresh ()
786+
787+ expected_calls = [("root" , 2 ), ("root" , 4 ), ("timestamp" , None )]
788+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
789+
790+ # Clear statistics for metadata requests
791+ self .sim .fetch_tracker .metadata .clear ()
792+
793+ # Update again, this time assert that intermediate root v2 was used from cache
794+ self ._run_refresh ()
795+
796+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
797+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
798+
718799 def test_expired_metadata (self ) -> None :
719800 """Verifies that expired local timestamp/snapshot can be used for
720801 updating from remote.
0 commit comments