Skip to content

Commit f19496e

Browse files
committed
fix: always call close in the streaming generators
1 parent 3ab1c08 commit f19496e

1 file changed

Lines changed: 88 additions & 86 deletions

File tree

src/groq/_streaming.py

Lines changed: 88 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -55,49 +55,50 @@ def __stream__(self) -> Iterator[_T]:
5555
process_data = self._client._process_response_data
5656
iterator = self._iter_events()
5757

58-
for sse in iterator:
59-
if sse.data.startswith("[DONE]"):
60-
break
61-
62-
if sse.event is None:
63-
data = sse.json()
64-
if is_mapping(data) and data.get("error"):
65-
message = None
66-
error = data.get("error")
67-
if is_mapping(error):
68-
message = error.get("message")
69-
if not message or not isinstance(message, str):
70-
message = "An error occurred during streaming"
71-
72-
raise APIError(
73-
message=message,
74-
request=self.response.request,
75-
body=data["error"],
76-
)
77-
78-
yield process_data(data=data, cast_to=cast_to, response=response)
79-
80-
else:
81-
data = sse.json()
82-
83-
if sse.event == "error" and is_mapping(data) and data.get("error"):
84-
message = None
85-
error = data.get("error")
86-
if is_mapping(error):
87-
message = error.get("message")
88-
if not message or not isinstance(message, str):
89-
message = "An error occurred during streaming"
90-
91-
raise APIError(
92-
message=message,
93-
request=self.response.request,
94-
body=data["error"],
95-
)
96-
97-
yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
98-
99-
# As we might not fully consume the response stream, we need to close it explicitly
100-
response.close()
58+
try:
59+
for sse in iterator:
60+
if sse.data.startswith("[DONE]"):
61+
break
62+
63+
if sse.event is None:
64+
data = sse.json()
65+
if is_mapping(data) and data.get("error"):
66+
message = None
67+
error = data.get("error")
68+
if is_mapping(error):
69+
message = error.get("message")
70+
if not message or not isinstance(message, str):
71+
message = "An error occurred during streaming"
72+
73+
raise APIError(
74+
message=message,
75+
request=self.response.request,
76+
body=data["error"],
77+
)
78+
79+
yield process_data(data=data, cast_to=cast_to, response=response)
80+
81+
else:
82+
data = sse.json()
83+
84+
if sse.event == "error" and is_mapping(data) and data.get("error"):
85+
message = None
86+
error = data.get("error")
87+
if is_mapping(error):
88+
message = error.get("message")
89+
if not message or not isinstance(message, str):
90+
message = "An error occurred during streaming"
91+
92+
raise APIError(
93+
message=message,
94+
request=self.response.request,
95+
body=data["error"],
96+
)
97+
98+
yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
99+
finally:
100+
# Always close the underlying HTTP response even if the consumer breaks early.
101+
response.close()
101102

102103
def __enter__(self) -> Self:
103104
return self
@@ -156,49 +157,50 @@ async def __stream__(self) -> AsyncIterator[_T]:
156157
process_data = self._client._process_response_data
157158
iterator = self._iter_events()
158159

159-
async for sse in iterator:
160-
if sse.data.startswith("[DONE]"):
161-
break
162-
163-
if sse.event is None:
164-
data = sse.json()
165-
if is_mapping(data) and data.get("error"):
166-
message = None
167-
error = data.get("error")
168-
if is_mapping(error):
169-
message = error.get("message")
170-
if not message or not isinstance(message, str):
171-
message = "An error occurred during streaming"
172-
173-
raise APIError(
174-
message=message,
175-
request=self.response.request,
176-
body=data["error"],
177-
)
178-
179-
yield process_data(data=data, cast_to=cast_to, response=response)
180-
181-
else:
182-
data = sse.json()
183-
184-
if sse.event == "error" and is_mapping(data) and data.get("error"):
185-
message = None
186-
error = data.get("error")
187-
if is_mapping(error):
188-
message = error.get("message")
189-
if not message or not isinstance(message, str):
190-
message = "An error occurred during streaming"
191-
192-
raise APIError(
193-
message=message,
194-
request=self.response.request,
195-
body=data["error"],
196-
)
197-
198-
yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
199-
200-
# As we might not fully consume the response stream, we need to close it explicitly
201-
await response.aclose()
160+
try:
161+
async for sse in iterator:
162+
if sse.data.startswith("[DONE]"):
163+
break
164+
165+
if sse.event is None:
166+
data = sse.json()
167+
if is_mapping(data) and data.get("error"):
168+
message = None
169+
error = data.get("error")
170+
if is_mapping(error):
171+
message = error.get("message")
172+
if not message or not isinstance(message, str):
173+
message = "An error occurred during streaming"
174+
175+
raise APIError(
176+
message=message,
177+
request=self.response.request,
178+
body=data["error"],
179+
)
180+
181+
yield process_data(data=data, cast_to=cast_to, response=response)
182+
183+
else:
184+
data = sse.json()
185+
186+
if sse.event == "error" and is_mapping(data) and data.get("error"):
187+
message = None
188+
error = data.get("error")
189+
if is_mapping(error):
190+
message = error.get("message")
191+
if not message or not isinstance(message, str):
192+
message = "An error occurred during streaming"
193+
194+
raise APIError(
195+
message=message,
196+
request=self.response.request,
197+
body=data["error"],
198+
)
199+
200+
yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
201+
finally:
202+
# Always close the underlying HTTP response even if the consumer breaks early.
203+
await response.aclose()
202204

203205
async def __aenter__(self) -> Self:
204206
return self

0 commit comments

Comments
 (0)