The following BaseApi and CleanRoom classes include method definitions for all of the operations in the tutorial. You may not need to perform all of these operations to programmatically execute your workflows. What you include will depend on your business case and what your team would prefer to manage via the UI vs. the API. This is a comprehensive view of what is required to enable an end-to-end run of a clean room question.
After these classes are defined, there is a cell to indicate which creds_path to use when they are invoked. Point the creds_path to the location where your YAML file is stored.
Establish a BaseApi Class
BaseApi ClassThe BaseApi class defines all of the underlying methods you might use to perform actions related to Clean Room objects when calling the Clean Room API.
The CleanRoom class invokes the BaseApi class, so you will typically only reference the CleanRoom class in your operational code.
# Example BaseApi class definition
class BaseApi:
"""A reusable base API class for authentication and API requests."""
def __init__(self, creds_path):
"""
Initializes the API client by loading credentials from a YAML file.
"""
with open(creds_path, 'r') as f:
creds = yaml.safe_load(f)
# Set environment variables.
os.environ['HABU_ORGANIZATION'] = creds['HABU_ORGANIZATION']
os.environ['HABU_API'] = 'https://api.habu.com/'
os.environ['HABU_API_CLIENT'] = creds['HABU_API_CLIENT']
os.environ['HABU_API_SECRET'] = creds['HABU_API_SECRET']
# Store credentials as instance variables.
self.organization = creds['HABU_ORGANIZATION']
self.api_url = os.environ['HABU_API']
self.api_client = creds['HABU_API_CLIENT']
self.api_secret = creds['HABU_API_SECRET']
# Initialize authentication variables.
self.access_token = None
self.expire_time = datetime.datetime.now(pytz.UTC) # Set initial expire time in UTC
def login(self):
"""
Authenticate and generate a new bearer token.
"""
url = f"{self.api_url}v1/oauth/token" # OAuth token endpoint
# Prepare the body data (grant_type as URL-encoded data).
data = {
"grant_type": "client_credentials"
}
# Prepare the authorization header using Basic Auth.
auth_basic = base64.b64encode(f"{self.api_client}:{self.api_secret}".encode()).decode('utf-8')
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {auth_basic}"
}
# Send POST request with form data (x-www-form-urlencoded).
response = requests.post(url, data=data, headers=headers)
# Log the full response for debugging.
# print(f"Response Status Code: {response.status_code}")
# print(f"Response Body: {response.text}")
if response.status_code != 200:
raise Exception(f"Failed to authenticate. Status Code: {response.status_code}, Response Body: {response.text}")
# Parse the response JSON if successful.
data = response.json()
# Get the access token and expiry time.
self.access_token = data.get("accessToken")
if not self.access_token:
raise Exception("Access token was not returned in the response.")
expires_in = data.get("expiresIn", 3600) # Default expiration is 3600 seconds (1 hour).
self.expire_time = datetime.datetime.now(pytz.UTC) + datetime.timedelta(seconds=expires_in)
## for debugging only:
## print(f"Access Token: {self.access_token}")
## for debugging only:
## print(f"Token Expiry Time: {self.expire_time}")
return self.access_token, self.expire_time
def get_token(self):
"""Ensures that the token is available and valid."""
# If the token has expired or is unavailable, refresh it.
if not self.access_token or datetime.datetime.now(pytz.UTC) >= self.expire_time:
print("Access token expired or not available. Logging in again...")
self.login()
return self.access_token
def post(self, url, body=None):
"""Sends a POST request to the API."""
# Get the valid access token.
token = self.get_token()
## for debugging only:
## print(f"Using access token: {token}")
# Print token to debug.
# Prepare the headers for the request.
headers = {'Authorization': f'Bearer {token}'}
# Print the request details for debugging.
print(f"POST Request URL: {url}")
print(f"POST Request Body: {body}")
# Send the POST request
response = requests.post(url, headers=headers, json=body)
# Log the response status and body for further debugging.
# print(f"Response Status Code: {response.status_code}")
# print(f"Response Body: {response.text}")
# Log the response body for debugging.
# If the response is not successful, raise an error.
response.raise_for_status()
return response.json()
def get(self, url):
"""Sends a GET request to the API."""
# Get the valid access token.
token = self.get_token()
## for debugging only:
# print(f"Using access token: {token}") # Print token to debug
headers = {'Authorization': f'Bearer {token}'}
# Make the GET request to the specified URL
response = requests.get(url, headers=headers)
# Log the response status and body for further debugging.
## print(f"Response Status Code: {response.status_code}")
print(f"Response Body: {response.text}") # Log the response body for debugging.
# If the response is not successful, raise an error
response.raise_for_status()
return response.json()
def put(self, url, body=None):
"""Sends a PUT request to the API."""
# Get the valid access token.
token = self.get_token()
headers = {'Authorization': f'Bearer {token}'}
# Make the PUT request to the specified URL.
response = requests.put(url, headers=headers, json=body)
# Log the response status and body for further debugging.
print(f"Response Status Code: {response.status_code}")
print(f"Response Body: {response.text}") # Log the response body for debugging.
# Print the request details for debugging.
print(f"PUT Request URL: {url}")
print(f"PUT Request Body: {body}")
response.raise_for_status()
return response.json()
Establish a CleanRoom Class
CleanRoom ClassThis class will include all of the methods we will use to perform actions via the external API.
# Example CleanRoom class definition
class CleanRoom(BaseApi):
def __init__(self, creds_path):
"""Initialize CleanRoom using BaseApi's authentication."""
super().__init__(creds_path) # Call BaseApi's constructor to initialize authentication.
# POST operations
def create_clean_rooms(self, cleanroom_metadata):
"""
Create a clean room with a dynamic payload.
:param cleanroom_metadata: Dictionary containing all clean room parameters.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/cleanrooms", body=cleanroom_metadata)
def create_org_credentials(self, organization_credential_details):
"""
Create an organization credential with a dynamic payload.
:param organization_credential_details: Dictionary containing all credential parameters.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/organization-credentials", body=organization_credential_details)
def create_data_connections(self, data_connection_details):
"""
Create a data connection with a dynamic payload.
:param data_connection_details: Dictionary containing all data connection parameters.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/data-connections",body=data_connection_details)
def create_map_field_configurations(self, data_connection_id, field_configuration_details):
"""
Perform field mapping on a data connection level.
:param data_connection_id: The data connection id for the relevant mapping configuration.
:param field_configuration_details: Dictionary containing all field configuration parameters.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/data-connections/{data_connection_id}/field-configurations",body=field_configuration_details)
def create_cleanroom_datasets(self, cleanroom_id, data_connection_id,ramp_id_consent):
"""
Provision a dataset to a clean room based on its data connection ID.
:param cleanroom_id: The clean room id for the clean room to which the datasets will be provisioned.
:param data_connection_id: The data connection id for the dataset.
:param ramp_id_consent: Confirms you attest to responsible use of RampID in clean rooms; use if RampID is present.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/cleanrooms/{cleanroom_id}/datasets/configure?dataConnectionId={data_connection_id}",body=ramp_id_consent)
def create_cleanroom_flows(self, cleanroom_id,flow_details):
"""
Create a Flow within a specified clean room.
:param cleanroom_id: The clean room id for the clean room in which the Flow will be created.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/cleanrooms/{cleanroom_id}/cleanroom-flows",body=flow_details)
def create_questions(self,question_details):
"""
Create a question based on provided details.
:param question_details: The specifications for the question to create.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/question", body=question_details)
def create_cleanroom_question_datasets_owners(self,cleanroom_question_id,assign_cleanroom_question_dataset_requests):
"""
Assign ownership of dataset macros in a question.
:param cleanroom_question_id: The id for the specific question in the cleanroom to which you are assigning owners.
:param assign_cleanroom_question_dataset_requests: Details for which partners to assign to which dataset macros.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/cleanroom-questions/{cleanroom_question_id}/datasets-ownership", body=assign_cleanroom_question_dataset_requests)
def create_cleanroom_question_datasets(self,cleanroom_question_id,cleanroom_question_dataset_details):
"""
Manage datasets for a clean room question. Opts data in for question execution.
:param cleanroom_question_id: ID for the desired question in the clean room.
:param cleanroom_question_dataset_details: Body for the details required to assign a particular dataset to a particular clean room question.
:return: API response as JSON
"""
return self.post(f"{self.api_url}v1/cleanroom-questions/{cleanroom_question_id}/datasets", body=cleanroom_question_dataset_details)
def create_run(self, cleanroom_question_id, create_run_parameters):
"""Trigger a run of a clean room question."""
return self.post(f"{self.api_url}v1/cleanroom-questions/{cleanroom_question_id}/create-run", body=create_run_parameters)
# PUT operations
def put_cleanroom_datasets(self,cleanroom_id,dataset_id,cleanroom_dataset_details):
"""
Update the configuration for a provisioned dataset in a clean room.
:param cleanroom_id: ID for the relevant clean room.
:param dataset_id: ID for the relevant dataset in the clean room.
:param clean_room_dataset_details: Configuration parameters for the update you want to make to relevant fields.
"""
return self.put(f"{self.api_url}v1/cleanrooms/{cleanroom_id}/datasets/{dataset_id}", body=cleanroom_dataset_details)
# GET operations
def get_clean_rooms(self):
"""Retrieve a list of available clean rooms for an organization."""
return self.get(f"{self.api_url}v1/cleanrooms")
def get_credential_source_options(self):
"""Retrieve a list of available credential source options for an organization."""
return self.get(f"{self.api_url}v1/credential-sources")
def get_data_source_options(self):
"""Retrieve a list of available data source options for an organization."""
return self.get(f"{self.api_url}v1/data-sources")
def get_data_connection_field_configurations(self, data_connection_id):
"""Retrieve a list of fields to configure for a specified data connection."""
return self.get(f"{self.api_url}v1/data-connections/{data_connection_id}/field-configurations")
def get_cleanroom_partners(self,cleanroom_id):
"""
Retrieve the list of clean room partners with IDs for assignment.
:param cleanroom_id: Relevant clean room id for the partner set you'd like to know about.
:return: API response as JSON
"""
return self.get(f"{self.api_url}v1/cleanrooms/{cleanroom_id}/partners")
def get_clean_room_questions(self, cleanroom_id):
"""Retrieve the list of clean room questions for a given clean room."""
return self.get(f"{self.api_url}v1/cleanrooms/{cleanroom_id}/cleanroom-questions")
def get_clean_room_question_details(self,cleanroom_question_id):
"""Retrieve the metadata for a specific clean room question."""
return self.get(f"{self.api_url}v1/cleanroom-questions/{cleanroom_question_id}")
def get_question_run_status(self,cleanroom_question_run_id):
"""Retrieve the status for a specific clean room question run."""
return self.get(f"{self.api_url}v1/cleanroom-question-runs/{cleanroom_question_run_id}")
def get_question_runs(self, cleanroom_question_id, limit=10, offset=0):
"""Retrieve the list of the last 10 question runs for a clean room question."""
return self.get(f"{self.api_url}v1/cleanroom-questions/{cleanroom_question_id}/cleanroom-question-runs?limit={limit}&offset={offset}")
def get_question_run_data(self, run_id):
"""Retrieve the run data for a given question run in a clean room."""
return self.get(f"{self.api_url}v1/cleanroom-question-runs/{run_id}/data")
Define your creds_path
creds_pathFor use with the above BaseApi class:
# REPLACE path with the path to your own YAML file containing your credentials.
creds_path = "<YOUR CREDS PATH>"
api_client = BaseApi(creds_path)
cleanroom_api = CleanRoom(creds_path)