@@ -1430,6 +1430,36 @@ check_blackmap_by_relfilenode(RelFileNode relfilenode)
14301430 return true;
14311431}
14321432
1433+ /*
1434+ * This function takes relowner, relnamespace, reltablespace as arguments,
1435+ * prepares the searching key of the global blackmap for us.
1436+ */
1437+ static void
1438+ prepare_blackmap_search_key (BlackMapEntry * keyitem , QuotaType type ,
1439+ Oid relowner , Oid relnamespace , Oid reltablespace )
1440+ {
1441+ Assert (keyitem != NULL );
1442+ memset (keyitem , 0 , sizeof (BlackMapEntry ));
1443+ if (type == ROLE_QUOTA || type == ROLE_TABLESPACE_QUOTA )
1444+ keyitem -> targetoid = relowner ;
1445+ else if (type == NAMESPACE_QUOTA || type == NAMESPACE_TABLESPACE_QUOTA )
1446+ keyitem -> targetoid = relnamespace ;
1447+ else
1448+ ereport (ERROR ,
1449+ (errcode (ERRCODE_INTERNAL_ERROR ),
1450+ errmsg ("[diskquota] unknown quota type: %d" , type )));
1451+
1452+ if (type == ROLE_TABLESPACE_QUOTA || type == NAMESPACE_TABLESPACE_QUOTA )
1453+ keyitem -> tablespaceoid = reltablespace ;
1454+ else
1455+ {
1456+ /* refer to add_quota_to_blacklist */
1457+ keyitem -> tablespaceoid = InvalidOid ;
1458+ }
1459+ keyitem -> databaseoid = MyDatabaseId ;
1460+ keyitem -> targettype = type ;
1461+ }
1462+
14331463/*
14341464 * Given table oid, check whether quota limit
14351465 * of table's schema or table's owner are reached.
@@ -1454,32 +1484,7 @@ check_blackmap_by_reloid(Oid reloid)
14541484 LWLockAcquire (diskquota_locks .black_map_lock , LW_SHARED );
14551485 for (QuotaType type = 0 ; type < NUM_QUOTA_TYPES ; ++ type )
14561486 {
1457- if (type == ROLE_QUOTA || type == ROLE_TABLESPACE_QUOTA )
1458- {
1459- keyitem .targetoid = ownerOid ;
1460- }
1461- else if (type == NAMESPACE_QUOTA || type == NAMESPACE_TABLESPACE_QUOTA )
1462- {
1463- keyitem .targetoid = nsOid ;
1464- }
1465- else
1466- {
1467- ereport (ERROR ,
1468- (errcode (ERRCODE_INTERNAL_ERROR ),
1469- errmsg ("[diskquota] unknown quota type: %d" , type )));
1470- }
1471- if (type == ROLE_TABLESPACE_QUOTA || type == NAMESPACE_TABLESPACE_QUOTA )
1472- {
1473- keyitem .tablespaceoid = tablespaceoid ;
1474- }
1475- else
1476- {
1477- /* refer to add_quota_to_blacklist */
1478- keyitem .tablespaceoid = InvalidOid ;
1479- }
1480- keyitem .databaseoid = MyDatabaseId ;
1481- keyitem .targettype = type ;
1482- memset (& keyitem .relfilenode , 0 , sizeof (RelFileNode ));
1487+ prepare_blackmap_search_key (& keyitem , type , ownerOid , nsOid , tablespaceoid );
14831488 entry = hash_search (disk_quota_black_map ,
14841489 & keyitem ,
14851490 HASH_FIND , & found );
@@ -1686,6 +1691,13 @@ refresh_blackmap(PG_FUNCTION_ARGS)
16861691 hashctl .hcxt = CurrentMemoryContext ;
16871692 hashctl .hash = tag_hash ;
16881693
1694+ /*
1695+ * Since uncommitted relations' information and the global blackmap entries
1696+ * are cached in shared memory. The memory regions are guarded by lightweight
1697+ * locks. In order not to hold multiple locks at the same time, We add blackmap
1698+ * entries into the local_blackmap below and then flush the content of the
1699+ * local_blackmap to the global blackmap at the end of this UDF.
1700+ */
16891701 local_blackmap = hash_create ("local_blackmap" ,
16901702 1024 , & hashctl ,
16911703 HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION );
@@ -1754,22 +1766,8 @@ refresh_blackmap(PG_FUNCTION_ARGS)
17541766
17551767 for (QuotaType type = 0 ; type < NUM_QUOTA_TYPES ; ++ type )
17561768 {
1757- /*
1758- * Check that if the current relation should be blocked.
1759- * FIXME: The logic of preparing the blackmap searching
1760- * key is identical to check_blackmap_by_reloid(), we can
1761- * make it into a static helper function.
1762- */
1763- memset (& keyitem , 0 , sizeof (BlackMapEntry ));
1764- if (type == ROLE_QUOTA || type == ROLE_TABLESPACE_QUOTA )
1765- keyitem .targetoid = relowner ;
1766- else if (type == NAMESPACE_QUOTA || type == NAMESPACE_TABLESPACE_QUOTA )
1767- keyitem .targetoid = relnamespace ;
1768- if (type == ROLE_TABLESPACE_QUOTA || type == NAMESPACE_TABLESPACE_QUOTA )
1769- keyitem .tablespaceoid = reltablespace ;
1770- keyitem .databaseoid = MyDatabaseId ;
1771- keyitem .targettype = type ;
1772-
1769+ /* Check that if the current relation should be blocked. */
1770+ prepare_blackmap_search_key (& keyitem , type , relowner , relnamespace , reltablespace );
17731771 blackmapentry = hash_search (local_blackmap ,
17741772 & keyitem , HASH_FIND , & found );
17751773 if (found && blackmapentry )
@@ -1835,16 +1833,14 @@ refresh_blackmap(PG_FUNCTION_ARGS)
18351833 memset (& blocked_filenode_keyitem , 0 , sizeof (BlackMapEntry ));
18361834 memcpy (& blocked_filenode_keyitem .relfilenode , & relfilenode , sizeof (RelFileNode ));
18371835
1838- LWLockAcquire (diskquota_locks .black_map_lock , LW_EXCLUSIVE );
1839- blocked_filenode_entry = hash_search (disk_quota_black_map ,
1836+ blocked_filenode_entry = hash_search (local_blackmap ,
18401837 & blocked_filenode_keyitem ,
18411838 HASH_ENTER_NULL , & found );
18421839 if (!found && blocked_filenode_entry )
18431840 {
18441841 memcpy (& blocked_filenode_entry -> auxblockinfo , & keyitem , sizeof (BlackMapEntry ));
18451842 blocked_filenode_entry -> segexceeded = blackmapentry -> segexceeded ;
18461843 }
1847- LWLockRelease (diskquota_locks .black_map_lock );
18481844 }
18491845 }
18501846 /*
@@ -1855,8 +1851,85 @@ refresh_blackmap(PG_FUNCTION_ARGS)
18551851 }
18561852 }
18571853 }
1854+ else
1855+ {
1856+ /*
1857+ * We cannot fetch the relation from syscache. It may be an uncommitted relation.
1858+ * Let's try to fetch it from relation_cache.
1859+ */
1860+ DiskQuotaRelationCacheEntry * relation_cache_entry ;
1861+ bool found ;
1862+ LWLockAcquire (diskquota_locks .relation_cache_lock , LW_SHARED );
1863+ relation_cache_entry = hash_search (relation_cache , & active_oid ,
1864+ HASH_FIND , & found );
1865+ if (found && relation_cache_entry )
1866+ {
1867+ Oid relnamespace = relation_cache_entry -> namespaceoid ;
1868+ Oid reltablespace = relation_cache_entry -> rnode .node .spcNode ;
1869+ Oid relowner = relation_cache_entry -> owneroid ;
1870+ BlackMapEntry keyitem ;
1871+ for (QuotaType type = 0 ; type < NUM_QUOTA_TYPES ; ++ type )
1872+ {
1873+ /* Check that if the current relation should be blocked. */
1874+ prepare_blackmap_search_key (& keyitem , type , relowner , relnamespace , reltablespace );
1875+ blackmapentry = hash_search (local_blackmap , & keyitem , HASH_FIND , & found );
1876+
1877+ if (found && blackmapentry )
1878+ {
1879+ List * oid_list = NIL ;
1880+ ListCell * cell = NULL ;
1881+
1882+ /* Collect the relation oid together with its auxiliary relations' oid. */
1883+ oid_list = lappend_oid (oid_list , active_oid );
1884+ for (int auxoidcnt = 0 ; auxoidcnt < relation_cache_entry -> auxrel_num ; ++ auxoidcnt )
1885+ oid_list = lappend_oid (oid_list , relation_cache_entry -> auxrel_oid [auxoidcnt ]);
1886+
1887+ foreach (cell , oid_list )
1888+ {
1889+ bool found ;
1890+ GlobalBlackMapEntry * blocked_filenode_entry ;
1891+ BlackMapEntry blocked_filenode_keyitem ;
1892+ Oid curr_oid = lfirst_oid (cell );
1893+
1894+ relation_cache_entry = hash_search (relation_cache ,
1895+ & curr_oid , HASH_FIND , & found );
1896+ if (found && relation_cache_entry )
1897+ {
1898+ memset (& blocked_filenode_keyitem , 0 , sizeof (BlackMapEntry ));
1899+ memcpy (& blocked_filenode_keyitem .relfilenode ,
1900+ & relation_cache_entry -> rnode .node , sizeof (RelFileNode ));
1901+
1902+ blocked_filenode_entry = hash_search (local_blackmap ,
1903+ & blocked_filenode_keyitem ,
1904+ HASH_ENTER_NULL , & found );
1905+ if (!found && blocked_filenode_entry )
1906+ {
1907+ memcpy (& blocked_filenode_entry -> auxblockinfo , & keyitem , sizeof (BlackMapEntry ));
1908+ blocked_filenode_entry -> segexceeded = blackmapentry -> segexceeded ;
1909+ }
1910+ }
1911+ }
1912+ }
1913+ }
1914+ }
1915+ LWLockRelease (diskquota_locks .relation_cache_lock );
1916+ }
18581917 }
18591918
1919+ /* Flush the content of local_blackmap to the global blackmap. */
1920+ LWLockAcquire (diskquota_locks .black_map_lock , LW_EXCLUSIVE );
1921+ hash_seq_init (& hash_seq , local_blackmap );
1922+ while ((blackmapentry = hash_seq_search (& hash_seq )) != NULL )
1923+ {
1924+ bool found ;
1925+ GlobalBlackMapEntry * new_entry ;
1926+ new_entry = hash_search (disk_quota_black_map , & blackmapentry -> keyitem ,
1927+ HASH_ENTER_NULL , & found );
1928+ if (!found && new_entry )
1929+ memcpy (new_entry , blackmapentry , sizeof (GlobalBlackMapEntry ));
1930+ }
1931+ LWLockRelease (diskquota_locks .black_map_lock );
1932+
18601933 SPI_finish ();
18611934 PG_RETURN_VOID ();
18621935}
0 commit comments