Event Polling

Overview

This scenario demonstrates the best practices on event polling and how to use the GET /events event list endpoint to retrieve events.

Recommendation

Webhook delivery is the more performant and recommended integration approach to receive event data.

Instead of constantly checking or polling for updates (which can lead to unnecessary API calls and usage of your system resources), webhooks allow your system to listen for and receive notifications only when an event actually occurs.

Event polling should only be used in scenarios where your integration systems do not support webhooks.

Note

Event polling is based on retrieving events on a recurring schedule. In practice event polling is executed via a scheduled job or task runner that runs code based on the following on a recurring schedule.

Interactive Code Notebook

Below is an interactive and runnable Google Colab code notebook that allows you to run code and see the results. The python notebook demonstrates event polling and how to get events for an event type (e.g. shipment.created, order.created, shipment.updated, etc.) within a date/time range while also covering pagination of events.

Starter Event Polling Python Notebook

Get Events - Basic Example

Event polling is based on retrieving events on a schedule. Before getting to the advanced event polling, understanding the base GET /events API will help to build the foundation for event polling. Retrieving one or multiple events in the API uses the same flexible GET /events event list endpoint. Obtaining one or more events will follow the same paginated data access pattern and only vary in the number of results returned in the API response from your request. The GET /events API route or endpoint parameters are documented here in the API reference.

The base API call to get a paginated list of events is:

Example events list query request
About This Request Payload
  • The example below is default query for the GET /events event list endpoint . You may specify additional query/querystring parameters to filter, change the number of items returned, etc.
  • Set the limit query parameter to set the maximum number of items returned in the response (i.e. ?limit=25 , ?existing_param=abc&limit=50 ).
  • Generated events id s are string values based on the date of the event and are lexicographically sortable by the event id since the event identifiers are monotonically increasing by create date of the event.
  • See the event list API reference for a detailed list of fields or properties for the request and response bodies for event listing/querying.
curlpythonjava
Copy
Copied
curl -i -X GET \
  'https://sandbox-api.shipwell.com/events?event_name=shipment.created&limit=20' \
  -H 'Authorization: YOUR_AUTHORIZATION_HEADER'
Copy
Copied
import requests

base_path = ""
host = "sandbox-api.shipwell.com"
target_url = "https://" + host + base_path + "/events"

headers = {
    "Accept-Type": "application/json",
    "Authorization": "YOUR_AUTHORIZATION_HEADER"
}

params = {
  "event_name": "shipment.created",
  "limit": 20
}

response = requests.get(target_url, headers=headers, params=params)
data = response.json()
print(data)
Copy
Copied
import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

OkHttpClient client = new OkHttpClient();

String basePath = "";
String host = "sandbox-api.shipwell.com";
String targetUrl = "https://" +
  host +
  base_path +
  "/events";

// Build the URL with query parameters
HttpUrl url = HttpUrl.parse(targetUrl).newBuilder()
        .addQueryParameter("event_name", "shipment.created")
        .addQueryParameter("limit", "20")
        .build();

// Create the request               
Request request = new Request.Builder()
  .url(targetUrl)
  .header("Authorization", "YOUR_AUTHORIZATION_HEADER")
  .build();

try (Response response = client.newCall(request).execute()) {
  if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
  response.body().string();
}
Example events list query response
Copy
Copied
{
  "data": [
       {
         "id":"01J4HG1MANX4C9MSMJMFMTCE2C",
         "occurred_at":"2024-08-05T14:45:39.820961Z",
         "event_name":"shipment.created",
         "details":{
            // ... event details 
            // "1":{ // version 
            //   "id":"a3f23d02-5f9e-4e07-9a1e-cba90c526ed4",
            //   "mode":"FTL",
            // ... other properties
         "source":{
            "publishing_system":"backend",
            "company_id":"a54ef012-77d4-44a0-8ba5-115b09b655be",
            "user_id":"a8497882-4b9d-4919-a4f1-1ce3adb48e63",
            "request_id":"ce1c06c6-ad0f-4d87-a28d-b1223d8d6b21"
         },
         "pending_webhooks":0,
         "resource_type":"event",
         "self_link":"https://sandbox-api.shipwell.com/events01J4HG1MANX4C9MSMJMFMTCE2C"
      },
    // ... other events ...
  ],
  "count": 20, // number of items in the "data" array for this response
  "links": {
    "next": "https://sandbox-api.shipwell.com/events?starting_after=...", // next set of results that match the query
    "prev": null, // previous set of results that match the query
    "first": "https://sandbox-api.shipwell.com/events?starting_after=...", // first set of results that match the query (i.e. first page)
    "last": "https://sandbox-api.shipwell.com/events?ending_before=..." // last set of results that match the query (i.e. last page)
  }
}

Event Polling - Detailed Example

The following is a detailed example of event polling.

  • In practice, code like the following will be adapted to run on a recurring schedule. Set up your event polling/processing code to run every N hours or minutes via a scheduled job or task runner (typically in your ERP (Enterprise Resource Planning) )
  • For each event name/type that you poll for, store the event name/type , the event_id , and the date/time that for the last event you successfully processed in a manner where you may retrieve, for example, the last shipment.created event_id that you processed and when you processed the event.
    • A data store (database, file system, cache or a combination) containing processing data with items like last_processing_date_time (in UTC ), event_name , and event_id is useful.
  • Narrow down the time window that you are retrieving events for by setting a start date/time ( occurred_at.gte ) and optional end date/time ( occurred_at.lte ) while also setting this to be larger that the duration of time between scheduled event polling job or task runs. Typically, set this window of time to 24-48 hours.
    • Example: If you schedule event polling to run on the hour every hour, then set the start date/time ( occurred_at.gte ) in the past while also setting the event name/type and last successfully processed event_id for that event name/type .
    • If you have a downtime or need to reprocess data, then you may adjust the date/time windows, remove or reset the start and end dates
    • The starting_after parameter value is an event_id so while you may rely on that without using any date/time filter parameters, it is useful to set the parameters to help with debugging and logging output and to isolate the window of data.
Example event polling
pythonjava
Copy
Copied
# Ensure that requests and tenacity libraries are installed (i.e. pip install requests, pip install tenacity, etc.)
import json
import pprint
import requests
from datetime import datetime, timezone, timedelta
from requests.exceptions import HTTPError, Timeout, RequestException
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse


EVENT_LIMIT_OR_PAGE_SIZE = 2  # return a max of N items per page
EVENT_NAME = "shipment.created"  # event name or type
VERBOSE = True  # verbose logging

base_path = ""
host = "sandbox-api.shipwell.com"
target_url = "https://" + host + base_path + "/events"

header_params = {
    "Accept-Type": "application/json",
    "Authorization": "YOUR_AUTHORIZATION_HEADER"
}


# Helper functions

# Retries 5 times with exponential backoff if an HTTP error or timeout occurs
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(
        multiplier=1, min=2, max=10
    ),  # Exponential backoff (2s, 4s, 8s...)
    retry=retry_if_exception_type((HTTPError, Timeout, RequestException)),
)
def get_request_with_retries(url, headers, params=None):
    try:
        response = requests.get(
            url, headers=headers, params=params, timeout=30
        )  # Client timeout set to 30 seconds
        response.raise_for_status()  # Raise HTTPError for bad responses (4xx, 5xx)
        return response.json()  # Get the JSON response
    except HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
        raise
    except Timeout as timeout_err:
        print(f"Timeout error: {timeout_err}")
        raise
    except RequestException as req_err:
        print(f"Request failed: {req_err}")
        raise


# Customizing retry message for the last attempt
def get_request_with_final_retry(url, headers, params=None):
    try:
        return get_request_with_retries(url, headers, params)
    except RetryError as e:
        if e.last_attempt.attempt_number == 5:
            print("Final retry attempt failed. No more retries.")
        raise e  # Re-raise the error after printing final message


def get_current_iso_8601_time() -> str:
    # Get the current UTC time
    current_utc_time = datetime.now(timezone.utc)

    # Format as ISO 8601 string
    iso_8601_string = current_utc_time.isoformat()  # 2025-10-25T13:45:30.123456+00:00
    return iso_8601_string


def get_previous_iso_8601_time(iso_time: str, hours_before: int) -> str:
    # Replace "Z" with "+00:00" to standardize the input format
    standardized_time = iso_time.replace("Z", "+00:00")

    # Parse the input ISO 8601 string to a datetime object
    input_time = datetime.fromisoformat(standardized_time)

    # Subtract the specified number of hours
    past_time = input_time - timedelta(hours=abs(hours_before))

    # Return as ISO 8601 string
    return past_time.isoformat()


def get_midnight(iso_time: str) -> str:
    # If the input iso_time is "2025-10-25T13:45:30+00:00"
    # then this function will return "2024-10-25T00:00:00+00:00"
    # aka midnight of the date/time before the date/time

    # Replace "Z" with "+00:00" to standardize the input format
    standardized_time = iso_time.replace("Z", "+00:00")

    # Parse the input ISO 8601 string to a datetime object
    input_time = datetime.fromisoformat(standardized_time)

    # Set the time to midnight (00:00:00) for the current date
    midnight = input_time.replace(hour=0, minute=0, second=0, microsecond=0)

    # Return as ISO 8601 string
    return midnight.isoformat()


def get_query_param(url, param_name):
    # Parse the URL
    parsed_url = urlparse(url)

    # Parse the query parameters
    querystring_params = parse_qs(parsed_url.query)

    # Return the parameter value (return None if the parameter is not present)
    # Since parse_qs returns a list for each parameter, we get the first element if it exists
    return querystring_params.get(param_name, [None])[0]


def add_or_replace_query_param(url, param_name, param_value):
    # Only add or replace the parameter if param_value is truthy
    if not param_value:
        return url  # Return the original URL if param_value is falsy

    # Parse the URL
    parsed_url = urlparse(url)

    # Parse the query parameters
    querystring_params = parse_qs(parsed_url.query)

    # Add or replace the specified query parameter
    querystring_params[param_name] = param_value

    # Rebuild the query string
    new_query_string = urlencode(querystring_params, doseq=True)

    # Create a new URL with the updated query string
    new_url = urlunparse(parsed_url._replace(query=new_query_string))

    return new_url


def merge_query_params(url, params):
    # Parse the URL
    parsed_url = urlparse(url)

    # Parse existing query parameters from the URL into a dictionary
    querystring_params = parse_qs(parsed_url.query)

    # Update the dictionary with new parameters,
    # replacing existing values if needed
    for key, value in params.items():
        if value:  # Only include truthy values from params
            # Set value as a list to match parse_qs format
            querystring_params[key] = [value]

    # Rebuild the query string from the updated parameters dictionary
    new_query_string = urlencode(querystring_params, doseq=True)

    # Construct a new URL with the updated query string
    new_url = urlunparse(parsed_url._replace(query=new_query_string))

    return new_url


def process_single_event(event_id, event_name, full_event_data, verbose=False) -> bool:
    # check if your database has a record of the event
    # you are currently processing the event,
    # have previously processed the event and have conditions
    # to reprocess
    # generally determine which logic to run based on the event name or type
    # if you have already, processed the event, etc.
    # (i.e. save to your own DB, update records, cancel processing,
    # skip processing, perform workflow X for event name/type Y, etc.)
    #   note: each event has a self_link property to get the full event details
    #   for a non-deleted object if needed
    #       "self_link": 'https://sandbox-api.shipwell.com/.../01J44KYQHY...'
    pass


def process_events(
    event_name, start_time, end_time, url, starting_after_event_id=None, verbose=False
) -> bool:
    processing_complete = False
    query_params = {
        "event_name": event_name,
        "limit": EVENT_LIMIT_OR_PAGE_SIZE,
        "occurred_at.gte": start_time,
        "occurred_at.lte": end_time,
    }

    # merge override existing query param values in URL
    query_target_url = merge_query_params(url=url, params=query_params)

    # ensures starting_after value is set
    # event ids are monotonically increasing by date/time
    # so this will always return the earliest id
    # in the account.
    # setting "starting_after" also ensures that the data
    # is sorted in ascending order from oldest to newest
    starting_after_event_id_value = (
        starting_after_event_id
        or get_query_param(url=query_target_url, param_name="starting_after")
        or "00000000000000000000000000"
    )
    # modify target URL to add or replace the url param
    query_target_url = add_or_replace_query_param(
        url=query_target_url,
        param_name="starting_after",
        param_value=starting_after_event_id_value,
    )

    try:
        if verbose:
            print(f"Target URL: {query_target_url}")
            print(f"Request Header: {json.dumps(header_params, indent=2)}")
            print(f"Querystring Parameters: {json.dumps(query_params, indent=2)}")

        response_wrapper = get_request_with_final_retry(
            url=query_target_url, headers=header_params, params=query_params
        )

        if verbose:
            print("Response Body: ")
            pprint.pp(response_wrapper)

        # page of events to process (already sorted by the API in
        # ascending order when starting_after used)
        event_page = response_wrapper.get("data", [])
        # process events in already sorted older to newer order
        for event in event_page:
            # process or reprocess
            process_single_event(
                event_id=event["id"],
                event_name=event["event_name"],
                full_event_data=event,
                verbose=verbose,
            )

        # Handle paginated results (i.e. N results are returned at a time
        # and if you have more than N results then you will need to
        # paginate and follow the results to process all the matching results
        links = response_wrapper.get("links", {})
        #  "links": {"next": "https://sandbox-api.shipwell.com/events/..."}
        next_page_url = links.get("next")
        if next_page_url:
            # if there are more results get the next page of events to process
            print(f"Fetching next page: {next_page_url}")
            process_events(
                event_name=event_name,
                start_time=start_time,
                end_time=end_time,
                url=next_page_url,
            )
        else:
            print(f"No additional pages of data: {query_target_url}")

        processing_complete = True
        return processing_complete
    except Exception as err:
        print(f"Request failed after retries: {err}")

    return processing_complete


try:
    # specify a date/time range within 1-48 hours
    end_time = get_current_iso_8601_time()
    # in testing you may want to increase the hours_before value below (i.e. -24 * 365)
    start_time = get_previous_iso_8601_time(iso_time=end_time, hours_before=-24)
    # last processed event id in your system
    starting_after_event_id = None

    result = process_events(
        event_name=EVENT_NAME,
        start_time=start_time,
        end_time=end_time,
        url=target_url,
        starting_after_event_id=starting_after_event_id,
        verbose=VERBOSE,
    )
    if result:
        print("Processing complete")

except Exception as err:
    print(f"Request failed after retries: {err}")
Copy
Copied
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import okhttp3.*;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class EventProcessor {

    private static final OkHttpClient client = new OkHttpClient();
    private static final int EVENT_LIMIT_OR_PAGE_SIZE = 2;
    private static final String EVENT_NAME = "shipment.created";
    private static final boolean VERBOSE = true;
    private static final String HOST = "sandbox-api.shipwell.com";
    private static final String BASE_URL = "https://" + HOST + "/events";
    private static final String AUTH_HEADER = "YOUR_AUTHORIZATION_HEADER";

    // Resilience4j Retry configuration
    private static final RetryConfig retryConfig = RetryConfig.custom()
        .maxAttempts(5)
        .waitDuration(Duration.ofSeconds(2))
        .retryExceptions(IOException.class, java.net.http.HttpTimeoutException.class)
        .build();
    
    private static final RetryRegistry retryRegistry = RetryRegistry.of(retryConfig);
    private static final Retry retry = retryRegistry.retry("eventProcessorRetry");

    public static void main(String[] args) {
        try {
            String endTime = getCurrentIso8601Time();
            String startTime = getPreviousIso8601Time(endTime, 24);
            processEvents(EVENT_NAME, startTime, endTime, BASE_URL, null, VERBOSE);
        } catch (Exception e) {
            System.out.println("Request failed after retries: " + e.getMessage());
        }
    }

    private static void processEvents(String eventName, String startTime, String endTime, String url, 
                                      String startingAfterEventId, boolean verbose) {
        boolean processingComplete = false;
        Map<String, String> queryParams = new HashMap<>();
        queryParams.put("event_name", eventName);
        queryParams.put("limit", String.valueOf(EVENT_LIMIT_OR_PAGE_SIZE));
        queryParams.put("occurred_at.gte", startTime);
        queryParams.put("occurred_at.lte", endTime);

        try {
            String queryUrl = mergeQueryParams(url, queryParams);
            queryUrl = addOrReplaceQueryParam(queryUrl, "starting_after", startingAfterEventId != null ? startingAfterEventId : "00000000000000000000000000");
            
            if (verbose) {
                System.out.println("Target URL: " + queryUrl);
                System.out.println("Headers: " + AUTH_HEADER);
                System.out.println("Query Params: " + queryParams.toString());
            }

            Supplier<String> retryableSupplier = Retry.decorateSupplier(retry, () -> getRequest(queryUrl));

            String response = retryableSupplier.get();
            if (verbose && response != null) {
                System.out.println("Response Body: " + response);
            }
            // Process and paginate results here as needed

            processingComplete = true;
        } catch (Exception e) {
            System.out.println("Request failed after retries: " + e.getMessage());
        }
    }

    private static String getRequest(String url) throws IOException {
        Request request = new Request.Builder()
            .url(url)
            .addHeader("Authorization", AUTH_HEADER)
            .addHeader("Accept-Type", "application/json")
            .build();

        try (Response response = client.newCall(request).execute()) {
            if (response.isSuccessful()) {
                return response.body().string();
            } else {
                throw new IOException("HTTP error: " + response.code());
            }
        }
    }

    private static String getCurrentIso8601Time() {
        return OffsetDateTime.now(ZoneOffset.UTC).toString();
    }

    private static String getPreviousIso8601Time(String isoTime, int hoursBefore) {
        OffsetDateTime dateTime = OffsetDateTime.parse(isoTime);
        return dateTime.minusHours(hoursBefore).toString();
    }

    private static String addOrReplaceQueryParam(String url, String paramName, String paramValue) throws URISyntaxException {
        URI uri = new URI(url);
        String query = uri.getQuery();
        StringBuilder newQuery = new StringBuilder();
        
        if (query != null) {
            for (String param : query.split("&")) {
                String[] pair = param.split("=");
                if (!pair[0].equals(paramName)) {
                    newQuery.append(param).append("&");
                }
            }
        }
        newQuery.append(paramName).append("=").append(paramValue);

        return new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString(), uri.getFragment()).toString();
    }

    private static String mergeQueryParams(String url, Map<String, String> params) throws URISyntaxException {
        URI uri = new URI(url);
        String query = uri.getQuery();
        StringBuilder newQuery = new StringBuilder();

        if (query != null) {
            for (String param : query.split("&")) {
                String[] pair = param.split("=");
                if (!params.containsKey(pair[0])) {
                    newQuery.append(param).append("&");
                }
            }
        }
        for (Map.Entry<String, String> entry : params.entrySet()) {
            newQuery.append(entry.getKey()).append("=").append(entry.getValue()).append("&");
        }
        return new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString().replaceAll("&$", ""), uri.getFragment()).toString();
    }
}
Copyright © Shipwell 2024. All right reserved.