Skip to content

Commit

Permalink
Sandersaarond/client tidy and test (#77)
Browse files Browse the repository at this point in the history
* Fixup to client

* Add setuptools to pyproject where needed

* Client tweaks + tests

* Fixup function, tests, still chatty about warnings

* Improve test coverage of client

* Tidy up, modularize out testing a little bit, add some typing

* Fix docker credential format

* correct user_id to tenant_id for uniformity

* more python client cleanup
  • Loading branch information
SandersAaronD authored Jul 29, 2024
1 parent e9c811d commit 6e87b5f
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 82 deletions.
2 changes: 1 addition & 1 deletion docker-compose-jupyter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ services:
ports:
- "8888:8888"
environment:
- GF_AI_TRAINING_CREDS=83bcaff6228b39bbe431af5e19fb4368e2a03dd3@1337@http://ai-training-api:8000
- GF_AI_TRAINING_CREDS=83bcaff6228b39bbe431af5e19fb4368e2a03dd3:1337@http://ai-training-api:8000
2 changes: 1 addition & 1 deletion docker-compose-pytorch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ services:
ports:
- "8888:8888"
environment:
- GF_AI_TRAINING_CREDS=83bcaff6228b39bbe431af5e19fb4368e2a03dd3@1337@http://ai-training-api:8000
- GF_AI_TRAINING_CREDS=83bcaff6228b39bbe431af5e19fb4368e2a03dd3:1337@http://ai-training-api:8000
3 changes: 2 additions & 1 deletion o11y/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ authors = [{name = "Grafana Labs", email = "info@grafana.com"}]
readme = "README.md"
requires-python = ">= 3.8"
dependencies = [
"requests>=2.31.0"
"requests>=2.31.0",
"setuptools>=67.8.0",
]

[build-system]
Expand Down
160 changes: 111 additions & 49 deletions o11y/src/o11y/_internal/client.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,126 @@
# Contains a python object representing the metadata client
# This should handle anything related to the job itself, like registering the job, updating metadata, etc
# This should not be used for logging, metrics, etc
from typing import Optional, Tuple
import warnings
import requests
import json
import logging
import os
import logging
from .util.validate_url import validate_url
from .. import logger
import time
import warnings
from typing import Any, Dict, Optional, Tuple

import requests
from urllib.parse import urlparse

from .. import logger

class Client:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.process_uuid = None
self.user_metadata = None
self.url = None
self.token = None
self.tenant_id = None
# We are going to assume that the user has set the credentials in the environment
# There are other flows but it's the easiest one
login_string = os.environ.get('GF_AI_TRAINING_CREDS')
self.set_credentials(login_string)

def set_credentials(self, login_string):
if not login_string or type(login_string) != str:
logger.error("No login string provided, please set GF_AI_TRAINING_CREDS environment variable")
def set_credentials(self, login_string: Optional[str]) -> bool:
self.logger.info(f"Setting credentials with login string: {login_string}")
if not login_string or not isinstance(login_string, str):
self.logger.warning("No login string provided or invalid type")
warnings.warn("No login string provided, please set GF_AI_TRAINING_CREDS environment variable")
return False
# Count @ characters in the login string, should be 2
if login_string.count("@") != 2:
logger.error("Invalid login string format")

try:
token, tenant_id, uri = self._parse_login_string(login_string)
self.logger.info(f"Parsed login string - Token: {token[:5]}..., User ID: {tenant_id}, URI: {uri}")
uri = self._validate_credentials(token, tenant_id, uri)
self._set_credentials(token, tenant_id, uri)
self.logger.info(f"Credentials set - URL: {self.url}, User ID: {self.tenant_id}, Token: {self.token[:5]}...")
return True
except Exception as e:
self.logger.error(f"Error setting credentials: {str(e)}")
warnings.warn(f"Invalid login string: {str(e)}")
return False

token, tenant_id, url = login_string.split("@")
if not url.startswith("http://") and not url.startswith("https://"):
url = "http://" + url

self.url = url
def _parse_login_string(self, login_string: str) -> Tuple[str, str, str]:
parts = login_string.split('@')
if len(parts) != 2:
raise ValueError("Invalid login string format. Expected format: token:tenant_id@uri")

credentials, uri = parts
cred_parts = credentials.split(':')
if len(cred_parts) != 2:
raise ValueError("Invalid credentials format. Expected format: token:tenant_id")

token, tenant_id = cred_parts
return token.strip(), tenant_id.strip(), uri.strip()

def _validate_credentials(self, token: str, tenant_id: str, uri: str) -> str:
if not tenant_id.isdigit():
warnings.warn("Invalid tenant_id: must be purely numeric")

parsed_uri = urlparse(uri)
if not parsed_uri.scheme:
uri = "https://" + uri
elif parsed_uri.scheme not in ["http", "https"]:
warnings.warn(f"Invalid URI scheme '{parsed_uri.scheme}'. Using https instead.")
uri = "https://" + parsed_uri.netloc + parsed_uri.path

return uri

def _set_credentials(self, token: str, tenant_id: str, uri: str) -> None:
self.url = uri
self.token = token
self.tenant_id = tenant_id
return True

def register_process(self, data):
# If the process is currently registered, clear everything from it
self.logger.info(f"Registering process with data: {data}")
if self.process_uuid:
self.logger.info(f"Clearing existing process UUID: {self.process_uuid}")
self.process_uuid = None
self.user_metadata = None

if not self.tenant_id or not self.token:
self.logger.error("User ID or token is not set. Make sure to call set_credentials first.")
return False

headers = {
'Authorization': f'Bearer {self.tenant_id}:{self.token}',
'Content-Type': 'application/json'
}
self.logger.info(f"Request headers: Authorization: Bearer {self.tenant_id[:5]}..:{self.token[:5]}...")

response = requests.post(f'{self.url}/api/v1/process/new', headers=headers, data=json.dumps(data))
if response.status_code != 200:
logging.error(f'Failed to register with error: {response.text}')
return False
url = f'{self.url}/api/v1/process/new'
self.logger.info(f"Sending request to URL: {url}")

try:
response = requests.post(url, headers=headers, json=data)
self.logger.info(f"Response status code: {response.status_code}")
self.logger.info(f"Response content: {response.text}")

if response.status_code != 200:
self.logger.error(f'Failed to register with error: {response.text}')
return False

process_uuid = response.json()['data']['process_uuid']
except:
logging.error(f'Failed to register with error: {response.text}')
self.logger.info(f"Received process UUID: {process_uuid}")
except Exception as e:
self.logger.error(f"Exception during process registration: {str(e)}")
return False

self.process_uuid = process_uuid
self.user_metadata = data['user_metadata']
self.logger.info(f"Process registered successfully. UUID: {self.process_uuid}")
return True

# Update user_metadata information
def update_metadata(self, process_uuid, user_metadata):
def update_metadata(self, process_uuid: str, user_metadata: Dict[str, Any]) -> bool:
if not process_uuid:
logging.error("No process registered, unable to update metadata")
logger.error("No process registered, unable to update metadata")
return False
headers = {
'Authorization': f'Bearer {self.tenant_id}:{self.token}',
Expand All @@ -77,40 +129,41 @@ def update_metadata(self, process_uuid, user_metadata):
data = {
'user_metadata': user_metadata
}
response = requests.post(f'{self.url}/api/v1/process/{process_uuid}/update-metadata', headers=headers, data=json.dumps(data))
url = f'{self.url}/api/v1/process/{process_uuid}/update-metadata'
response = requests.post(url, headers=headers, json=data)

if response.status_code != 200:
logging.error(f'Failed to update metadata: {response.text}')
logger.error(f'Failed to update metadata: {response.text}')
return False
return True

# Report a state change to the process
# POST /api/v1/process/{uuid}/state
# Options are “succeeded” and “failed”
def report_state(self, state):
def report_state(self, state: str) -> bool:
if not self.process_uuid:
logging.error("No process registered, unable to report state")
logger.error("No process registered, unable to report state")
return False
headers = {
'Authorization': f'Bearer {self.token}',
'Authorization': f'Bearer {self.tenant_id}:{self.token}',
'Content-Type': 'application/json'
}
data = {
'state': state
}
response = requests.post(f'{self.url}/api/v1/process/{self.process_uuid}/state', headers=headers, data=json.dumps(data))
url = f'{self.url}/api/v1/process/{self.process_uuid}/state'
response = requests.post(url, headers=headers, json=data)

if response.status_code != 200:
logging.error(f'Failed to report state: {response.text}')
logger.error(f'Failed to report state: {response.text}')
return False
return True

def send_model_metrics(self, log, *, x_axis=None):
def send_model_metrics(self, log: Dict[str, Any], *, x_axis: Optional[Dict[str, Any]] = None) -> bool:
if not self.process_uuid:
logging.error("No process registered, unable to send logs")
logger.error("No process registered, unable to send logs")
return False

timestamp = str(time.time_ns())

metadata = {
metadata: Dict[str, Any] = {
"process_uuid": self.process_uuid,
"type": "model-metrics"
}
Expand All @@ -137,15 +190,24 @@ def send_model_metrics(self, log, *, x_axis=None):
]
}

response = requests.post(
f'{self.url}/api/v1/process/{self.process_uuid}/model-metrics',
headers={
'Authorization': f'Bearer {self.token}', 'Content-Type': 'application/json'
},
data=json.dumps(json_data)
)
url = f'{self.url}/api/v1/process/{self.process_uuid}/model-metrics'

if response.status_code != 200:
logging.error(f'Failed to log model metric: {response.text}')
headers = {
'Authorization': f'Bearer {self.tenant_id}:{self.token}',
'Content-Type': 'application/json'
}

try:
response = requests.post(
url,
headers=headers,
json=json_data
)

if response.status_code != 200:
logger.error(f'Failed to log model metric. Status code: {response.status_code}, Response: {response.text}')
return False
return True
except requests.exceptions.RequestException as e:
logger.exception(f"An error occurred while sending the request: {e}")
return False
return True
51 changes: 21 additions & 30 deletions o11y/src/o11y/exported/log.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,41 @@
from typing import Dict, Union, Optional
from .. import client
from .. import logger

def log(log, *, x_axis=None):
def log(log: Dict[str, Union[int, float]], *, x_axis: Optional[Dict[str, Union[int, float]]] = None) -> bool:
"""
Sends a log to the Loki server
:param log: The log message
:return: None
Sends a log to the Loki server.
Args:
log (Dict[str, Union[int, float]]): The log message as a dictionary with string keys and numeric values.
x_axis (Optional[Dict[str, Union[int, float]]], optional): A single-item dictionary representing the x-axis. Defaults to None.
Returns:
bool: True if the log was sent successfully, False otherwise.
"""
# Check that log is a dict with all keys strings
if not isinstance(log, dict):
logger.error("Log must be a dict")
return False
for key in log.keys():
if not isinstance(key, str):
logger.error("Keys in log must be strings")
return False

# Check that all values are numbers
for key in log.keys():
if not isinstance(log[key], (int, float)):
logger.error("Values in log must be numbers")
return False

if not all(isinstance(key, str) and isinstance(value, (int, float)) for key, value in log.items()):
logger.error("Log must contain only string keys and numeric values")
return False

if x_axis is None:
client.send_model_metrics(log)
return
return bool(client.send_model_metrics(log))

# Check if x_axis exists
if not isinstance(x_axis, dict) or len(x_axis) != 1:
logger.error("x_axis must be a dict with one key")
return False

# Check that x_axis' single key is a string, and its value is a number
x_key = list(x_axis.keys())[0]
x_value = x_axis[x_key]
if not isinstance(x_key, str):
logger.error("x_axis key must be a string")
return False
if not isinstance(x_value, (int, float)):
logger.error("x_axis value must be a number")
x_key, x_value = next(iter(x_axis.items()))

if not isinstance(x_key, str) or not isinstance(x_value, (int, float)):
logger.error("x_axis must have a string key and a numeric value")
return False

# Check that this key is not already in the log line
if x_key in log.keys() and x_value != log[x_key]:

if x_key in log and x_value != log[x_key]:
logger.error("x_axis key must not be in your metrics, or must have the same value")
return False

client.send_model_metrics(log, x_axis=x_axis)
return bool(client.send_model_metrics(log, x_axis=x_axis))
Loading

0 comments on commit 6e87b5f

Please sign in to comment.