Login Helper¶
pyecotrend_ista.login_helper ¶
Login helper for Keycloak.
LoginHelper ¶
LoginHelper(username: str , password: str , totp: str | None = None, session: Session | None = None, logger=None)
Login helper for Keycloak.
Attributes:
-
(session
) –Session Optional session object for making HTTP requests.
-
(username
) –str Username for authentication.
-
(password
) –str Password for authentication.
-
(cookie
) –str Authentication cookie.
-
(auth_code
) –str Authorization code.
-
(form_action
) –str Form action URL for authentication.
Notes
This class provides utility methods for handling authentication and session management using Keycloak.
Parameters:
-
username
(
) –str Username for authentication.
-
password
(
) –str Password for authentication.
-
totp
(
, default:str None
) –Time-based One-Time Password if enabled, by default None.
-
session
(
, default:Session None
) –Optional session object for making HTTP requests, by default None.
-
logger
(
, default:Logger None
) –Logger object for logging messages, by default None.
Source code in src/pyecotrend_ista/login_helper.py
def __init__(
self,
username: str,
password: str,
totp: str | None = None,
session: requests.Session | None = None,
logger=None,
) -> None: # numpydoc ignore=ES01,EX01
"""Initialize the object with username and password.
Parameters
----------
username : str
Username for authentication.
password : str
Password for authentication.
totp : str, optional
Time-based One-Time Password if enabled, by default None.
session : requests.Session, optional
Optional session object for making HTTP requests, by default None.
logger : logging.Logger, optional
Logger object for logging messages, by default None.
"""
self.username: str = username
self.password: str = password
self.totp: str | None = totp
self.session = session or requests.Session()
self.session.verify = True
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504, 408])
self.session.mount("https://", HTTPAdapter(max_retries=retries))
self.logger = logger or logging.getLogger(__name__)
refresh_token ¶
refresh_token(refresh_token) -> tuple
Refresh the access token using the provided refresh token.
Parameters:
-
refresh_token
(
) –str The refresh token obtained from previous authentication.
Returns:
-
–tuple [str ,int ,str ]Tuple containing the refreshed access token, its expiration time in seconds, and the new refresh token.
Source code in src/pyecotrend_ista/login_helper.py
def refresh_token(self, refresh_token) -> tuple: # numpydoc ignore=ES01,EX01
"""Refresh the access token using the provided refresh token.
Parameters
----------
refresh_token : str
The refresh token obtained from previous authentication.
Returns
-------
tuple[str, int, str]
Tuple containing the refreshed access token, its expiration time in seconds,
and the new refresh token.
"""
resp: requests.Response = self._send_request(
"POST",
url=f"{PROVIDER_URL}token",
data={
"grant_type": GRANT_TYPE_REFRESH_TOKEN,
"client_id": CLIENT_ID, # ecotrend
"refresh_token": refresh_token,
},
)
result = resp.json()
return result["access_token"], result["expires_in"], result["refresh_token"]
get_token ¶
get_token() -> GetTokenResponse
Retrieve access and refresh tokens using the obtained authorization code.
Raises:
-
–KeycloakPostError If there's an error during the POST request to retrieve tokens.
-
–KeycloakInvalidTokenError If the response status code is not 200, indicating an invalid token.
Returns:
-
–GetTokenResponse A TypedDict containing authentication tokens including 'accessToken', 'accessTokenExpiresIn', and 'refreshToken'.
Source code in src/pyecotrend_ista/login_helper.py
def get_token(self) -> GetTokenResponse: # numpydoc ignore=ES01,EX01
"""Retrieve access and refresh tokens using the obtained authorization code.
Raises
------
KeycloakPostError
If there's an error during the POST request to retrieve tokens.
KeycloakInvalidTokenError
If the response status code is not 200, indicating an invalid token.
Returns
-------
GetTokenResponse
A TypedDict containing authentication tokens including 'accessToken',
'accessTokenExpiresIn', and 'refreshToken'.
"""
self._login()
_data = {
"grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
"client_id": CLIENT_ID, # ecotrend
"redirect_uri": REDIRECT_URI,
"code": self.auth_code,
}
if self.totp:
_data["totp"] = self.totp
resp: requests.Response = self._send_request(
"POST",
url=f"{PROVIDER_URL}token",
data=_data,
timeout=TIMEOUT,
allow_redirects=False,
)
raise_error_from_response(response=resp, error=KeycloakPostError)
# If the response code is not 200 raise an exception.
if resp.status_code != 200:
raise KeycloakInvalidTokenError()
return cast(GetTokenResponse, resp.json())
userinfo ¶
userinfo(token) -> Any
Retrieve user information from the Keycloak provider.
This method sends a GET request to the Keycloak userinfo
endpoint using the provided token in the Authorization header. It returns the JSON response containing user information.
Parameters:
-
token
(
) –str The access token to be used for authorization.
Returns:
-
–Any A dictionary containing the user information if the request is successful, or an empty dictionary if the user is a demo user.
Raises:
-
–KeycloakOperationError If the request fails due to a Keycloak operation error.
Source code in src/pyecotrend_ista/login_helper.py
def userinfo(self, token) -> Any: # numpydoc ignore=EX01
"""Retrieve user information from the Keycloak provider.
This method sends a GET request to the Keycloak `userinfo` endpoint using the provided
token in the Authorization header. It returns the JSON response containing user information.
Parameters
----------
token : str
The access token to be used for authorization.
Returns
-------
Any
A dictionary containing the user information if the request is successful, or an empty
dictionary if the user is a demo user.
Raises
------
KeycloakOperationError
If the request fails due to a Keycloak operation error.
"""
if self.username == DEMO_USER_ACCOUNT:
return {}
header = {"Authorization": f"Bearer {token}"}
url = f"{PROVIDER_URL}userinfo"
resp: requests.Response = self._send_request("GET", url=url, headers=header)
return resp.json()
logout ¶
logout(token) -> dict | Any | bytes | dict [str , str ]
Log out the user session from the identity provider.
Parameters:
-
token
(
) –str Refresh token associated with the user session.
Returns:
-
–Union [dict ,Any ,bytes ,dict [str ,str ]]Response data from the logout request. The exact type may vary based on the response content.
Raises:
-
–KeycloakPostError If an error occurs during the POST request to logout the user.
Source code in src/pyecotrend_ista/login_helper.py
def logout(self, token) -> dict | Any | bytes | dict[str, str]: # numpydoc ignore=ES01,EX01
"""Log out the user session from the identity provider.
Parameters
----------
token : str
Refresh token associated with the user session.
Returns
-------
Union[dict, Any, bytes, dict[str, str]]
Response data from the logout request. The exact type may vary based on the response content.
Raises
------
KeycloakPostError
If an error occurs during the POST request to logout the user.
"""
resp: requests.Response = self._send_request(
"POST",
url=f"{PROVIDER_URL}logout",
data={
"client_id": CLIENT_ID,
"refresh_token": token,
},
)
return raise_error_from_response(resp, KeycloakPostError)
raise_error_from_response ¶
raise_error_from_response(response: Response , error, expected_codes=None, skip_exists=False) -> dict | Any | bytes | dict [str , str ]
Raise an exception for the response.
Parameters:
-
response
(
) –Response The response object.
-
error
(
) –dict orException Error object to raise.
-
expected_codes
(
, default:Sequence [int ]None
) –Set of expected codes, which should not raise the exception.
-
skip_exists
(
, default:bool False
) –Indicates whether the response on already existing object should be ignored.
Returns:
-
–bytes ordict Content of the response message.
Raises:
-
–KeycloakError In case of unexpected status codes.
Notes
Source code in src/pyecotrend_ista/login_helper.py
def raise_error_from_response(
response: requests.Response, error, expected_codes=None, skip_exists=False
) -> dict | Any | bytes | dict[str, str]: # numpydoc ignore=ES01,EX01
"""Raise an exception for the response.
Parameters
----------
response : Response
The response object.
error : dict or Exception
Error object to raise.
expected_codes : Sequence[int], optional
Set of expected codes, which should not raise the exception.
skip_exists : bool, optional
Indicates whether the response on already existing object should be ignored.
Returns
-------
bytes or dict
Content of the response message.
Raises
------
KeycloakError
In case of unexpected status codes.
Notes
-----
Source from https://github.com/marcospereirampj/python-keycloak/blob/c98189ca6951f12f1023ed3370c9aaa0d81e4aa4/src/keycloak/exceptions.py
""" # noqa: DAR401,DAR402 pylint: disable=line-too-long
if expected_codes is None:
expected_codes = [200, 201, 204]
if response.status_code in expected_codes:
if response.status_code == requests.codes["no_content"]:
return {}
try:
return response.json()
except ValueError:
return response.content
if skip_exists and response.status_code == 409:
return {"msg": "Already exists"}
try:
message = response.json()["message"]
except (KeyError, ValueError):
message = response.content
if isinstance(error, dict):
error = error.get(response.status_code, KeycloakOperationError)
else:
if response.status_code == 401:
error = KeycloakAuthenticationError
raise error(error_message=message, response_code=response.status_code, response_body=response.content)