Event Polling
Overview
This scenario demonstrates the best practices on event polling and how to use the GET /events
event list endpoint to retrieve events.
Recommendation
Webhook delivery is the more performant and recommended integration approach to receive event data.
Instead of constantly checking or polling for updates (which can lead to unnecessary API calls and usage of your system resources), webhooks allow your system to listen for and receive notifications only when an event actually occurs.
Event polling should only be used in scenarios where your integration systems do not support webhooks.
Note
Event polling is based on retrieving events on a recurring schedule. In practice event polling is executed via a scheduled job or task runner that runs code based on the following on a recurring schedule.
Interactive Code Notebook
Below is an interactive and runnable Google Colab code notebook that allows you to run code and see the results. The python
notebook demonstrates event polling and how to get events for an event type (e.g. shipment.created
, order.created
, shipment.updated
, etc.) within a date/time range while also covering pagination of events.
- Shipwell Google Colab Notebook for Event Polling
Get Events - Basic Example
Event polling is based on retrieving events on a schedule. Before getting to the advanced event polling, understanding the base GET /events
API will help to build the foundation for event polling. Retrieving one or multiple events in the API uses the same flexible GET /events
event list endpoint.
Obtaining one or more events will follow the same paginated data access pattern and only vary in the
number of results returned in the API response from your request. The GET /events
API route or endpoint parameters are
documented here in the API reference.
The base API call to get a paginated list of events is:
Example events list query request
About This Request Payload
-
The example below is default query for the
GET /events
event list endpoint . You may specify additional query/querystring parameters to filter, change the number of items returned, etc. -
Set the
limit
query parameter to set the maximum number of items returned in the response (i.e.?limit=25
,?existing_param=abc&limit=50
). -
Generated
events id
s arestring
values based on the date of the event and are lexicographically sortable by the event id since the event identifiers are monotonically increasing by create date of the event. - See the event list API reference for a detailed list of fields or properties for the request and response bodies for event listing/querying.
curl -i -X GET \
'https://sandbox-api.shipwell.com/events?event_name=shipment.created&limit=20' \
-H 'Authorization: YOUR_AUTHORIZATION_HEADER'
import requests
base_path = ""
host = "sandbox-api.shipwell.com"
target_url = "https://" + host + base_path + "/events"
headers = {
"Accept-Type": "application/json",
"Authorization": "YOUR_AUTHORIZATION_HEADER"
}
params = {
"event_name": "shipment.created",
"limit": 20
}
response = requests.get(target_url, headers=headers, params=params)
data = response.json()
print(data)
import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
OkHttpClient client = new OkHttpClient();
String basePath = "";
String host = "sandbox-api.shipwell.com";
String targetUrl = "https://" +
host +
base_path +
"/events";
// Build the URL with query parameters
HttpUrl url = HttpUrl.parse(targetUrl).newBuilder()
.addQueryParameter("event_name", "shipment.created")
.addQueryParameter("limit", "20")
.build();
// Create the request
Request request = new Request.Builder()
.url(targetUrl)
.header("Authorization", "YOUR_AUTHORIZATION_HEADER")
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
response.body().string();
}
Example events list query response
{
"data": [
{
"id":"01J4HG1MANX4C9MSMJMFMTCE2C",
"occurred_at":"2024-08-05T14:45:39.820961Z",
"event_name":"shipment.created",
"details":{
// ... event details
// "1":{ // version
// "id":"a3f23d02-5f9e-4e07-9a1e-cba90c526ed4",
// "mode":"FTL",
// ... other properties
"source":{
"publishing_system":"backend",
"company_id":"a54ef012-77d4-44a0-8ba5-115b09b655be",
"user_id":"a8497882-4b9d-4919-a4f1-1ce3adb48e63",
"request_id":"ce1c06c6-ad0f-4d87-a28d-b1223d8d6b21"
},
"pending_webhooks":0,
"resource_type":"event",
"self_link":"https://sandbox-api.shipwell.com/events01J4HG1MANX4C9MSMJMFMTCE2C"
},
// ... other events ...
],
"count": 20, // number of items in the "data" array for this response
"links": {
"next": "https://sandbox-api.shipwell.com/events?starting_after=...", // next set of results that match the query
"prev": null, // previous set of results that match the query
"first": "https://sandbox-api.shipwell.com/events?starting_after=...", // first set of results that match the query (i.e. first page)
"last": "https://sandbox-api.shipwell.com/events?ending_before=..." // last set of results that match the query (i.e. last page)
}
}
Event Polling - Detailed Example
The following is a detailed example of event polling.
- In practice, code like the following will be adapted to run on a recurring schedule. Set up your event polling/processing code to run every N hours or minutes via a scheduled job or task runner (typically in your ERP (Enterprise Resource Planning) )
-
For each
event name/type
that you poll for, store theevent name/type
, theevent_id
, and the date/time that for the last event you successfully processed in a manner where you may retrieve, for example, the lastshipment.created
event_id
that you processed and when you processed the event.-
A data store (database, file system, cache or a combination) containing processing data with items like
last_processing_date_time
(inUTC
),event_name
, andevent_id
is useful.
-
A data store (database, file system, cache or a combination) containing processing data with items like
-
Narrow down the time window that you are retrieving events for by setting a start date/time (
occurred_at.gte
) and optional end date/time (occurred_at.lte
) while also setting this to be larger that the duration of time between scheduled event polling job or task runs. Typically, set this window of time to 24-48 hours.-
Example: If you schedule event polling to run on the hour every hour, then set the start date/time (
occurred_at.gte
) in the past while also setting theevent name/type
and last successfully processedevent_id
for thatevent name/type
. - If you have a downtime or need to reprocess data, then you may adjust the date/time windows, remove or reset the start and end dates
-
The
starting_after
parameter value is anevent_id
so while you may rely on that without using any date/time filter parameters, it is useful to set the parameters to help with debugging and logging output and to isolate the window of data.
-
Example: If you schedule event polling to run on the hour every hour, then set the start date/time (
Example event polling
# Ensure that requests and tenacity libraries are installed (i.e. pip install requests, pip install tenacity, etc.)
import json
import pprint
import requests
from datetime import datetime, timezone, timedelta
from requests.exceptions import HTTPError, Timeout, RequestException
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
EVENT_LIMIT_OR_PAGE_SIZE = 2 # return a max of N items per page
EVENT_NAME = "shipment.created" # event name or type
VERBOSE = True # verbose logging
base_path = ""
host = "sandbox-api.shipwell.com"
target_url = "https://" + host + base_path + "/events"
header_params = {
"Accept-Type": "application/json",
"Authorization": "YOUR_AUTHORIZATION_HEADER"
}
# Helper functions
# Retries 5 times with exponential backoff if an HTTP error or timeout occurs
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(
multiplier=1, min=2, max=10
), # Exponential backoff (2s, 4s, 8s...)
retry=retry_if_exception_type((HTTPError, Timeout, RequestException)),
)
def get_request_with_retries(url, headers, params=None):
try:
response = requests.get(
url, headers=headers, params=params, timeout=30
) # Client timeout set to 30 seconds
response.raise_for_status() # Raise HTTPError for bad responses (4xx, 5xx)
return response.json() # Get the JSON response
except HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
raise
except Timeout as timeout_err:
print(f"Timeout error: {timeout_err}")
raise
except RequestException as req_err:
print(f"Request failed: {req_err}")
raise
# Customizing retry message for the last attempt
def get_request_with_final_retry(url, headers, params=None):
try:
return get_request_with_retries(url, headers, params)
except RetryError as e:
if e.last_attempt.attempt_number == 5:
print("Final retry attempt failed. No more retries.")
raise e # Re-raise the error after printing final message
def get_current_iso_8601_time() -> str:
# Get the current UTC time
current_utc_time = datetime.now(timezone.utc)
# Format as ISO 8601 string
iso_8601_string = current_utc_time.isoformat() # 2025-10-25T13:45:30.123456+00:00
return iso_8601_string
def get_previous_iso_8601_time(iso_time: str, hours_before: int) -> str:
# Replace "Z" with "+00:00" to standardize the input format
standardized_time = iso_time.replace("Z", "+00:00")
# Parse the input ISO 8601 string to a datetime object
input_time = datetime.fromisoformat(standardized_time)
# Subtract the specified number of hours
past_time = input_time - timedelta(hours=abs(hours_before))
# Return as ISO 8601 string
return past_time.isoformat()
def get_midnight(iso_time: str) -> str:
# If the input iso_time is "2025-10-25T13:45:30+00:00"
# then this function will return "2024-10-25T00:00:00+00:00"
# aka midnight of the date/time before the date/time
# Replace "Z" with "+00:00" to standardize the input format
standardized_time = iso_time.replace("Z", "+00:00")
# Parse the input ISO 8601 string to a datetime object
input_time = datetime.fromisoformat(standardized_time)
# Set the time to midnight (00:00:00) for the current date
midnight = input_time.replace(hour=0, minute=0, second=0, microsecond=0)
# Return as ISO 8601 string
return midnight.isoformat()
def get_query_param(url, param_name):
# Parse the URL
parsed_url = urlparse(url)
# Parse the query parameters
querystring_params = parse_qs(parsed_url.query)
# Return the parameter value (return None if the parameter is not present)
# Since parse_qs returns a list for each parameter, we get the first element if it exists
return querystring_params.get(param_name, [None])[0]
def add_or_replace_query_param(url, param_name, param_value):
# Only add or replace the parameter if param_value is truthy
if not param_value:
return url # Return the original URL if param_value is falsy
# Parse the URL
parsed_url = urlparse(url)
# Parse the query parameters
querystring_params = parse_qs(parsed_url.query)
# Add or replace the specified query parameter
querystring_params[param_name] = param_value
# Rebuild the query string
new_query_string = urlencode(querystring_params, doseq=True)
# Create a new URL with the updated query string
new_url = urlunparse(parsed_url._replace(query=new_query_string))
return new_url
def merge_query_params(url, params):
# Parse the URL
parsed_url = urlparse(url)
# Parse existing query parameters from the URL into a dictionary
querystring_params = parse_qs(parsed_url.query)
# Update the dictionary with new parameters,
# replacing existing values if needed
for key, value in params.items():
if value: # Only include truthy values from params
# Set value as a list to match parse_qs format
querystring_params[key] = [value]
# Rebuild the query string from the updated parameters dictionary
new_query_string = urlencode(querystring_params, doseq=True)
# Construct a new URL with the updated query string
new_url = urlunparse(parsed_url._replace(query=new_query_string))
return new_url
def process_single_event(event_id, event_name, full_event_data, verbose=False) -> bool:
# check if your database has a record of the event
# you are currently processing the event,
# have previously processed the event and have conditions
# to reprocess
# generally determine which logic to run based on the event name or type
# if you have already, processed the event, etc.
# (i.e. save to your own DB, update records, cancel processing,
# skip processing, perform workflow X for event name/type Y, etc.)
# note: each event has a self_link property to get the full event details
# for a non-deleted object if needed
# "self_link": 'https://sandbox-api.shipwell.com/.../01J44KYQHY...'
pass
def process_events(
event_name, start_time, end_time, url, starting_after_event_id=None, verbose=False
) -> bool:
processing_complete = False
query_params = {
"event_name": event_name,
"limit": EVENT_LIMIT_OR_PAGE_SIZE,
"occurred_at.gte": start_time,
"occurred_at.lte": end_time,
}
# merge override existing query param values in URL
query_target_url = merge_query_params(url=url, params=query_params)
# ensures starting_after value is set
# event ids are monotonically increasing by date/time
# so this will always return the earliest id
# in the account.
# setting "starting_after" also ensures that the data
# is sorted in ascending order from oldest to newest
starting_after_event_id_value = (
starting_after_event_id
or get_query_param(url=query_target_url, param_name="starting_after")
or "00000000000000000000000000"
)
# modify target URL to add or replace the url param
query_target_url = add_or_replace_query_param(
url=query_target_url,
param_name="starting_after",
param_value=starting_after_event_id_value,
)
try:
if verbose:
print(f"Target URL: {query_target_url}")
print(f"Request Header: {json.dumps(header_params, indent=2)}")
print(f"Querystring Parameters: {json.dumps(query_params, indent=2)}")
response_wrapper = get_request_with_final_retry(
url=query_target_url, headers=header_params, params=query_params
)
if verbose:
print("Response Body: ")
pprint.pp(response_wrapper)
# page of events to process (already sorted by the API in
# ascending order when starting_after used)
event_page = response_wrapper.get("data", [])
# process events in already sorted older to newer order
for event in event_page:
# process or reprocess
process_single_event(
event_id=event["id"],
event_name=event["event_name"],
full_event_data=event,
verbose=verbose,
)
# Handle paginated results (i.e. N results are returned at a time
# and if you have more than N results then you will need to
# paginate and follow the results to process all the matching results
links = response_wrapper.get("links", {})
# "links": {"next": "https://sandbox-api.shipwell.com/events/..."}
next_page_url = links.get("next")
if next_page_url:
# if there are more results get the next page of events to process
print(f"Fetching next page: {next_page_url}")
process_events(
event_name=event_name,
start_time=start_time,
end_time=end_time,
url=next_page_url,
)
else:
print(f"No additional pages of data: {query_target_url}")
processing_complete = True
return processing_complete
except Exception as err:
print(f"Request failed after retries: {err}")
return processing_complete
try:
# specify a date/time range within 1-48 hours
end_time = get_current_iso_8601_time()
# in testing you may want to increase the hours_before value below (i.e. -24 * 365)
start_time = get_previous_iso_8601_time(iso_time=end_time, hours_before=-24)
# last processed event id in your system
starting_after_event_id = None
result = process_events(
event_name=EVENT_NAME,
start_time=start_time,
end_time=end_time,
url=target_url,
starting_after_event_id=starting_after_event_id,
verbose=VERBOSE,
)
if result:
print("Processing complete")
except Exception as err:
print(f"Request failed after retries: {err}")
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import okhttp3.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
public class EventProcessor {
private static final OkHttpClient client = new OkHttpClient();
private static final int EVENT_LIMIT_OR_PAGE_SIZE = 2;
private static final String EVENT_NAME = "shipment.created";
private static final boolean VERBOSE = true;
private static final String HOST = "sandbox-api.shipwell.com";
private static final String BASE_URL = "https://" + HOST + "/events";
private static final String AUTH_HEADER = "YOUR_AUTHORIZATION_HEADER";
// Resilience4j Retry configuration
private static final RetryConfig retryConfig = RetryConfig.custom()
.maxAttempts(5)
.waitDuration(Duration.ofSeconds(2))
.retryExceptions(IOException.class, java.net.http.HttpTimeoutException.class)
.build();
private static final RetryRegistry retryRegistry = RetryRegistry.of(retryConfig);
private static final Retry retry = retryRegistry.retry("eventProcessorRetry");
public static void main(String[] args) {
try {
String endTime = getCurrentIso8601Time();
String startTime = getPreviousIso8601Time(endTime, 24);
processEvents(EVENT_NAME, startTime, endTime, BASE_URL, null, VERBOSE);
} catch (Exception e) {
System.out.println("Request failed after retries: " + e.getMessage());
}
}
private static void processEvents(String eventName, String startTime, String endTime, String url,
String startingAfterEventId, boolean verbose) {
boolean processingComplete = false;
Map<String, String> queryParams = new HashMap<>();
queryParams.put("event_name", eventName);
queryParams.put("limit", String.valueOf(EVENT_LIMIT_OR_PAGE_SIZE));
queryParams.put("occurred_at.gte", startTime);
queryParams.put("occurred_at.lte", endTime);
try {
String queryUrl = mergeQueryParams(url, queryParams);
queryUrl = addOrReplaceQueryParam(queryUrl, "starting_after", startingAfterEventId != null ? startingAfterEventId : "00000000000000000000000000");
if (verbose) {
System.out.println("Target URL: " + queryUrl);
System.out.println("Headers: " + AUTH_HEADER);
System.out.println("Query Params: " + queryParams.toString());
}
Supplier<String> retryableSupplier = Retry.decorateSupplier(retry, () -> getRequest(queryUrl));
String response = retryableSupplier.get();
if (verbose && response != null) {
System.out.println("Response Body: " + response);
}
// Process and paginate results here as needed
processingComplete = true;
} catch (Exception e) {
System.out.println("Request failed after retries: " + e.getMessage());
}
}
private static String getRequest(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.addHeader("Authorization", AUTH_HEADER)
.addHeader("Accept-Type", "application/json")
.build();
try (Response response = client.newCall(request).execute()) {
if (response.isSuccessful()) {
return response.body().string();
} else {
throw new IOException("HTTP error: " + response.code());
}
}
}
private static String getCurrentIso8601Time() {
return OffsetDateTime.now(ZoneOffset.UTC).toString();
}
private static String getPreviousIso8601Time(String isoTime, int hoursBefore) {
OffsetDateTime dateTime = OffsetDateTime.parse(isoTime);
return dateTime.minusHours(hoursBefore).toString();
}
private static String addOrReplaceQueryParam(String url, String paramName, String paramValue) throws URISyntaxException {
URI uri = new URI(url);
String query = uri.getQuery();
StringBuilder newQuery = new StringBuilder();
if (query != null) {
for (String param : query.split("&")) {
String[] pair = param.split("=");
if (!pair[0].equals(paramName)) {
newQuery.append(param).append("&");
}
}
}
newQuery.append(paramName).append("=").append(paramValue);
return new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString(), uri.getFragment()).toString();
}
private static String mergeQueryParams(String url, Map<String, String> params) throws URISyntaxException {
URI uri = new URI(url);
String query = uri.getQuery();
StringBuilder newQuery = new StringBuilder();
if (query != null) {
for (String param : query.split("&")) {
String[] pair = param.split("=");
if (!params.containsKey(pair[0])) {
newQuery.append(param).append("&");
}
}
}
for (Map.Entry<String, String> entry : params.entrySet()) {
newQuery.append(entry.getKey()).append("=").append(entry.getValue()).append("&");
}
return new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString().replaceAll("&$", ""), uri.getFragment()).toString();
}
}