Skip to content

Login Helper

pyecotrend_ista.login_helper

Login helper for Keycloak.

LoginHelper

LoginHelper(username: str, password: str, totp: str | None = None, session: Session | None = None, logger=None)

Login helper for Keycloak.

Attributes:

  • session (Session) –

    Optional session object for making HTTP requests.

  • username (str) –

    Username for authentication.

  • password (str) –

    Password for authentication.

  • cookie (str) –

    Authentication cookie.

  • auth_code (str) –

    Authorization code.

  • form_action (str) –

    Form action URL for authentication.

Notes

This class provides utility methods for handling authentication and session management using Keycloak.

Parameters:

  • username (str) –

    Username for authentication.

  • password (str) –

    Password for authentication.

  • totp (str, default: None ) –

    Time-based One-Time Password if enabled, by default None.

  • session (Session, default: None ) –

    Optional session object for making HTTP requests, by default None.

  • logger (Logger, default: None ) –

    Logger object for logging messages, by default None.

Source code in src/pyecotrend_ista/login_helper.py
def __init__(
    self,
    username: str,
    password: str,
    totp: str | None = None,
    session: requests.Session | None = None,
    logger=None,
) -> None:  # numpydoc ignore=ES01,EX01
    """Initialize the object with username and password.

    Parameters
    ----------
    username : str
        Username for authentication.
    password : str
        Password for authentication.
    totp : str, optional
        Time-based One-Time Password if enabled, by default None.
    session : requests.Session, optional
        Optional session object for making HTTP requests, by default None.
    logger : logging.Logger, optional
        Logger object for logging messages, by default None.

    """
    self.username: str = username
    self.password: str = password
    self.totp: str | None = totp

    self.session = session or requests.Session()

    self.session.verify = True
    retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504, 408])
    self.session.mount("https://", HTTPAdapter(max_retries=retries))

    self.logger = logger or logging.getLogger(__name__)

refresh_token

refresh_token(refresh_token) -> tuple

Refresh the access token using the provided refresh token.

Parameters:

  • refresh_token (str) –

    The refresh token obtained from previous authentication.

Returns:

  • tuple[str, int, str]

    Tuple containing the refreshed access token, its expiration time in seconds, and the new refresh token.

Source code in src/pyecotrend_ista/login_helper.py
def refresh_token(self, refresh_token) -> tuple:  # numpydoc ignore=ES01,EX01
    """Refresh the access token using the provided refresh token.

    Parameters
    ----------
    refresh_token : str
        The refresh token obtained from previous authentication.

    Returns
    -------
    tuple[str, int, str]
        Tuple containing the refreshed access token, its expiration time in seconds,
        and the new refresh token.
    """
    resp: requests.Response = self._send_request(
        "POST",
        url=f"{PROVIDER_URL}token",
        data={
            "grant_type": GRANT_TYPE_REFRESH_TOKEN,
            "client_id": CLIENT_ID,  # ecotrend
            "refresh_token": refresh_token,
        },
    )

    result = resp.json()

    return result["access_token"], result["expires_in"], result["refresh_token"]

get_token

get_token() -> GetTokenResponse

Retrieve access and refresh tokens using the obtained authorization code.

Raises:

  • KeycloakPostError

    If there's an error during the POST request to retrieve tokens.

  • KeycloakInvalidTokenError

    If the response status code is not 200, indicating an invalid token.

Returns:

  • GetTokenResponse

    A TypedDict containing authentication tokens including 'accessToken', 'accessTokenExpiresIn', and 'refreshToken'.

Source code in src/pyecotrend_ista/login_helper.py
def get_token(self) -> GetTokenResponse:  # numpydoc ignore=ES01,EX01
    """Retrieve access and refresh tokens using the obtained authorization code.

    Raises
    ------
    KeycloakPostError
        If there's an error during the POST request to retrieve tokens.

    KeycloakInvalidTokenError
        If the response status code is not 200, indicating an invalid token.

    Returns
    -------
    GetTokenResponse
        A TypedDict containing authentication tokens including 'accessToken',
        'accessTokenExpiresIn', and 'refreshToken'.

    """
    self._login()
    _data = {
        "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
        "client_id": CLIENT_ID,  # ecotrend
        "redirect_uri": REDIRECT_URI,
        "code": self.auth_code,
    }
    if self.totp:
        _data["totp"] = self.totp
    resp: requests.Response = self._send_request(
        "POST",
        url=f"{PROVIDER_URL}token",
        data=_data,
        timeout=TIMEOUT,
        allow_redirects=False,
    )

    raise_error_from_response(response=resp, error=KeycloakPostError)
    # If the response code is not 200 raise an exception.
    if resp.status_code != 200:
        raise KeycloakInvalidTokenError()

    return cast(GetTokenResponse, resp.json())

userinfo

userinfo(token) -> Any

Retrieve user information from the Keycloak provider.

This method sends a GET request to the Keycloak userinfo endpoint using the provided token in the Authorization header. It returns the JSON response containing user information.

Parameters:

  • token (str) –

    The access token to be used for authorization.

Returns:

  • Any

    A dictionary containing the user information if the request is successful, or an empty dictionary if the user is a demo user.

Raises:

  • KeycloakOperationError

    If the request fails due to a Keycloak operation error.

Source code in src/pyecotrend_ista/login_helper.py
def userinfo(self, token) -> Any:  # numpydoc ignore=EX01
    """Retrieve user information from the Keycloak provider.

    This method sends a GET request to the Keycloak `userinfo` endpoint using the provided
    token in the Authorization header. It returns the JSON response containing user information.

    Parameters
    ----------
    token : str
        The access token to be used for authorization.

    Returns
    -------
    Any
        A dictionary containing the user information if the request is successful, or an empty
        dictionary if the user is a demo user.

    Raises
    ------
    KeycloakOperationError
        If the request fails due to a Keycloak operation error.
    """
    if self.username == DEMO_USER_ACCOUNT:
        return {}

    header = {"Authorization": f"Bearer {token}"}
    url = f"{PROVIDER_URL}userinfo"

    resp: requests.Response = self._send_request("GET", url=url, headers=header)

    return resp.json()

logout

logout(token) -> dict | Any | bytes | dict[str, str]

Log out the user session from the identity provider.

Parameters:

  • token (str) –

    Refresh token associated with the user session.

Returns:

  • Union[dict, Any, bytes, dict[str, str]]

    Response data from the logout request. The exact type may vary based on the response content.

Raises:

  • KeycloakPostError

    If an error occurs during the POST request to logout the user.

Source code in src/pyecotrend_ista/login_helper.py
def logout(self, token) -> dict | Any | bytes | dict[str, str]:  # numpydoc ignore=ES01,EX01
    """Log out the user session from the identity provider.

    Parameters
    ----------
    token : str
        Refresh token associated with the user session.

    Returns
    -------
    Union[dict, Any, bytes, dict[str, str]]
        Response data from the logout request. The exact type may vary based on the response content.

    Raises
    ------
    KeycloakPostError
        If an error occurs during the POST request to logout the user.

    """
    resp: requests.Response = self._send_request(
        "POST",
        url=f"{PROVIDER_URL}logout",
        data={
            "client_id": CLIENT_ID,
            "refresh_token": token,
        },
    )

    return raise_error_from_response(resp, KeycloakPostError)

raise_error_from_response

raise_error_from_response(response: Response, error, expected_codes=None, skip_exists=False) -> dict | Any | bytes | dict[str, str]

Raise an exception for the response.

Parameters:

  • response (Response) –

    The response object.

  • error (dict or Exception) –

    Error object to raise.

  • expected_codes (Sequence[int], default: None ) –

    Set of expected codes, which should not raise the exception.

  • skip_exists (bool, default: False ) –

    Indicates whether the response on already existing object should be ignored.

Returns:

  • bytes or dict

    Content of the response message.

Raises:

  • KeycloakError

    In case of unexpected status codes.

Notes

Source from https://github.com/marcospereirampj/python-keycloak/blob/c98189ca6951f12f1023ed3370c9aaa0d81e4aa4/src/keycloak/exceptions.py

Source code in src/pyecotrend_ista/login_helper.py
def raise_error_from_response(
    response: requests.Response, error, expected_codes=None, skip_exists=False
) -> dict | Any | bytes | dict[str, str]:  # numpydoc ignore=ES01,EX01
    """Raise an exception for the response.

    Parameters
    ----------
    response : Response
        The response object.
    error : dict or Exception
        Error object to raise.
    expected_codes : Sequence[int], optional
        Set of expected codes, which should not raise the exception.
    skip_exists : bool, optional
        Indicates whether the response on already existing object should be ignored.

    Returns
    -------
    bytes or dict
        Content of the response message.

    Raises
    ------
    KeycloakError
        In case of unexpected status codes.

    Notes
    -----
    Source from https://github.com/marcospereirampj/python-keycloak/blob/c98189ca6951f12f1023ed3370c9aaa0d81e4aa4/src/keycloak/exceptions.py
    """  # noqa: DAR401,DAR402 pylint: disable=line-too-long
    if expected_codes is None:
        expected_codes = [200, 201, 204]

    if response.status_code in expected_codes:
        if response.status_code == requests.codes["no_content"]:
            return {}

        try:
            return response.json()
        except ValueError:
            return response.content

    if skip_exists and response.status_code == 409:
        return {"msg": "Already exists"}

    try:
        message = response.json()["message"]
    except (KeyError, ValueError):
        message = response.content

    if isinstance(error, dict):
        error = error.get(response.status_code, KeycloakOperationError)
    else:
        if response.status_code == 401:
            error = KeycloakAuthenticationError

    raise error(error_message=message, response_code=response.status_code, response_body=response.content)