-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathSlidingWindowAggregation.py
More file actions
125 lines (103 loc) · 3.6 KB
/
SlidingWindowAggregation.py
File metadata and controls
125 lines (103 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from typing import Callable, Generic, List, TypeVar
E = TypeVar("E")
class SlidingWindowAggregation(Generic[E]):
"""SlidingWindowAggregation
Api:
1. append value to tail,O(1).
2. pop value from head,O(1).
3. query aggregated value in window,O(1).
"""
__slots__ = ["_stack0", "_stack1", "_stack2", "_stack3", "_e0", "_e1", "_size", "_op", "_e"]
def __init__(self, e: Callable[[], E], op: Callable[[E, E], E]):
"""
Args:
e: unit element
op: merge function
"""
self._stack0 = []
self._stack1 = []
self._stack2 = []
self._stack3 = []
self._e = e
self._e0 = e()
self._e1 = e()
self._size = 0
self._op = op
def append(self, value: E) -> None:
if not self._stack0:
self._push0(value)
self._transfer()
else:
self._push1(value)
self._size += 1
def popleft(self) -> None:
if not self._size:
return
if not self._stack0:
self._transfer()
self._stack0.pop()
self._stack2.pop()
self._e0 = self._stack2[-1] if self._stack2 else self._e()
self._size -= 1
def query(self) -> E:
return self._op(self._e0, self._e1)
def _push0(self, value):
self._stack0.append(value)
self._e0 = self._op(value, self._e0)
self._stack2.append(self._e0)
def _push1(self, value):
self._stack1.append(value)
self._e1 = self._op(self._e1, value)
self._stack3.append(self._e1)
def _transfer(self):
while self._stack1:
self._push0(self._stack1.pop())
while self._stack3:
self._stack3.pop()
self._e1 = self._e()
def __len__(self):
return self._size
if __name__ == "__main__":
from math import gcd
# 滑动窗口gcd
windowGcd = SlidingWindowAggregation(lambda: 0, gcd)
assert windowGcd.query() == 0
windowGcd.append(4)
assert windowGcd.query() == 4
windowGcd.append(6)
assert windowGcd.query() == 2
windowGcd.popleft()
assert windowGcd.query() == 6
assert windowGcd.query() == 6
windowGcd.popleft()
assert windowGcd.query() == 0
windowGcd.popleft()
assert not windowGcd
# 大小为k的滑动窗口最大值
class Solution2:
def maxSlidingWindow(self, nums: List[int], k: int) -> List[int]:
n = len(nums)
res = []
maxWindow = SlidingWindowAggregation(lambda: -int(1e18), max)
for right in range(n):
maxWindow.append(nums[right])
if right >= k:
maxWindow.popleft()
if right >= k - 1:
res.append(maxWindow.query())
return res
# 3097. 或值至少为 K 的最短子数组 II
# https://leetcode.cn/problems/shortest-subarray-with-or-at-least-k-ii/description/
class Solution:
def minimumSubarrayLength(self, nums: List[int], k: int) -> int:
n = len(nums)
res = n + 1
maxWindow = SlidingWindowAggregation(lambda: 0, lambda x, y: x | y)
left, right = 0, 0
for right, v in enumerate(nums):
maxWindow.append(v)
while left <= right and maxWindow.query() >= k:
res = min(res, right - left + 1)
maxWindow.popleft()
left += 1
return res if res != n + 1 else -1