Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions runpod/endpoint/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
""" Helper functions for the Runpod Endpoint API. """

FINAL_STATES = ["COMPLETED", "FAILED", "TIMED_OUT"]
FINAL_STATES = ["COMPLETED", "FAILED", "TIMED_OUT", "CANCELLED"]

# Exception Messages
UNAUTHORIZED_MSG = "401 Unauthorized | Make sure Runpod API key is set and valid."
Expand All @@ -14,4 +14,4 @@

def is_completed(status: str) -> bool:
"""Returns true if status is one of the possible final states for a serverless request."""
return status in ["COMPLETED", "FAILED", "TIMED_OUT", "CANCELLED"]
return status in FINAL_STATES
13 changes: 11 additions & 2 deletions runpod/endpoint/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,17 @@ def __init__(self, api_key: Optional[str] = None):
raise RuntimeError(API_KEY_NOT_SET_MSG)

self.rp_session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[408, 429])
self.rp_session.mount("http://", HTTPAdapter(max_retries=retries))
retries = Retry(
total=5,
backoff_factor=1,
status_forcelist=[408, 429, 500, 502, 503, 504],
allowed_methods=frozenset(["GET", "POST"]),
)
adapter = HTTPAdapter(max_retries=retries)
# The production API is served over https; mount on both schemes so the
# retry/backoff policy actually applies to real traffic.
self.rp_session.mount("https://", adapter)
self.rp_session.mount("http://", adapter)

self.headers = {
"Content-Type": "application/json",
Expand Down
20 changes: 12 additions & 8 deletions runpod/serverless/modules/rp_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
log = RunPodLogger()


async def _transmit(client_session: ClientSession, url, job_data):
async def _transmit(client_session: ClientSession, url, job_data, request_id=None):
"""
Wrapper for transmitting results via POST.
"""
Expand All @@ -35,12 +35,18 @@ async def _transmit(client_session: ClientSession, url, job_data):
client_session=client_session, retry_options=retry_options
)

headers = {
"charset": "utf-8",
"Content-Type": "application/x-www-form-urlencoded",
}
# Pass the request id per-request rather than mutating the shared session's
# headers, which would race across concurrently handled jobs.
if request_id is not None:
headers["X-Request-ID"] = request_id

kwargs = {
"data": job_data,
"headers": {
"charset": "utf-8",
"Content-Type": "application/x-www-form-urlencoded",
},
"headers": headers,
"raise_for_status": True,
}

Expand All @@ -55,14 +61,12 @@ async def _handle_result(
A helper function to handle the result, either for sending or streaming.
"""
try:
session.headers["X-Request-ID"] = job["id"]

serialized_job_data = json.dumps(job_data, ensure_ascii=False)

is_stream = "true" if is_stream else "false"
url = url_template.replace("$ID", job["id"]) + f"&isStream={is_stream}"

await _transmit(session, url, serialized_job_data)
await _transmit(session, url, serialized_job_data, request_id=job["id"])
log.debug(f"{log_message}", job["id"])

except ClientError as err:
Expand Down
2 changes: 2 additions & 0 deletions tests/test_serverless/test_modules/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async def test_send_result(self):
headers={
"charset": "utf-8",
"Content-Type": "application/x-www-form-urlencoded",
"X-Request-ID": self.job["id"],
},
raise_for_status=True,
)
Expand Down Expand Up @@ -159,6 +160,7 @@ async def test_stream_result(self):
headers={
"charset": "utf-8",
"Content-Type": "application/x-www-form-urlencoded",
"X-Request-ID": self.job["id"],
},
raise_for_status=True,
)
Expand Down