Skip to main content

Authorization: OPA policies + custom claims for user authorization (ABAC)

Context

You use OPA policies to drive authorization decisions in a ABAC-like fashion. In particular, access to certain endpoints is granted depending on whether the user has certain flags or roles in a given custom claim.

Solution

You can use the Open Policy Agent Evaluator plugin to evaluate OPA policies directly within Gate and you can enrich a user token as we have shown in the Token Enrichment guide.

Step 1 - Run Gate in transparent mode.

You should also have Gate deployed in your infrastructure. You should run Gate in transparent mode to ensure that everything was correctly deployed.

Step 2 - Add custom claims

Add a custom claim attribute to the JWT token as described in the Token Enrichment guide.

Step 3 - Implement your policy in Rego

To evaluate an OPA policy you need two pieces of input:

  • The Rego policy
  • Input data, passed to the Rego policy as the special input object

Policy input object example

The plugin passes the follow input format to OPA.

info

Note that this format is compatible with OPA-Envoy, so you can reuse your existing OPA policies with Gate.

The following is an example of input object Gate makes available to Rego policies during evaluation in its current Version 1 format:

{
"version": {
"gate": "v1.1"
},
"request": {
"http": {
"headers": {
"Accept": [
"application/json, text/plain, */*"
],
"Accept-Encoding": [
"gzip, compress, deflate, br"
],
"Authorization": [
"Bearer eyJhb..."
],
"Cookie": [
"foo=bar"
]
"User-Agent": [
"axios/1.3.4"
]
},
"cookies": {
"foo": "bar"
}
"host": "example.com",
"method": "GET",
"path": "/some/path",
"protocol": "HTTP/1.1",
"query": "option=value",
"scheme": "https",
"url": "https://example.com/some/path?option=value"
},
"parsed_path": [
"some",
"path"
],
"parsed_query": {
"option": [
"value"
]
},
"parsed_token": {
"header": {
"alg": "RS256",
"kid": "pYsNGA"
},
"payload": {
...
},
"signature": "JvVt..."
},
"time": "2023-04-09T18:38:41.117405838Z",
"token": "eyJh..."
}
}

Note that if the plugin is configured to execute on responses as well then the input object contains both the request and response objects. The request is the same as the object shown above. The whole input object looks as follows:

{
"version": {
"gate": "v1.1"
},
"request": {
"http": {
...
}
},
"response": {
"http": {
"contentType": "application/json"
"status": 200
"body": "{}"
"headers": {
"Accept": [
"application/json, text/plain, */*"
],
"Accept-Encoding": [
"gzip, compress, deflate, br"
],
"Authorization": [
"Bearer eyJhb..."
],
"User-Agent": [
"axios/1.3.4"
]
},
},
"time": "2023-04-09T18:38:46.117405838Z",
"parsed_path": [
"some",
"path"
],
"parsed_query": {
"option": [
"value"
]
},

}
}

The output of an evaluation is expected to be a boolean to be found at the location specified in the policy_decision_path plugin parameter.

Step 4 - Deploy your policy with Gate

You have two options to deploy OPA policies with Gate.

The first is to inline policies, this approach works best when you have a limited number of policies and performance is really important.

UserLoad balancerGate+OPADestination endpointYour systemHTTPrequestHTTPrequest?Embedded OPA policy

The second approach is to use a remote bundle. The remote bundle can be pulled from any remote host, including the SlashID distribution hub.

UserLoad balancerGate+OPADestination endpointBundle serviceYour systemHTTPrequestHTTPrequest?BundlerequestBundleRemote OPA bundle

Example

For the sake of this exercise, let's consider a case where we want to use an OPA policy to check whether a custom claim called user_roles contains the value admin.

Add a custom claim to the JWT token

As described in the Token Enrichment guide, we can define a token mapping function similar to the following to add custom claims to the user object.

def get_user(username: Optional[str]) -> Optional[DatabaseUser]:
try:
if username == f"user@{vendor_domain}":
return DatabaseUser(
username=username,
user_roles=["user"],
)
if username == f"admin@{vendor_domain}":
return DatabaseUser(
username=username,
user_roles=["user", "admin"],
)
except:
pass
return None

async def map_token(request: MapTokenRequest) -> MapTokenResponse | Response:
logger.info(f"/map_token: request={request}")

req_token = request.token
if not req_token.lower().startswith("bearer "):
return Response(status_code=status.HTTP_204_NO_CONTENT)

req_token = req_token[len("bearer ") :]
logger.info(f"/map_token: req_token={req_token}")
try:
token = jwt.decode(req_token, TOKEN_SIGNING_KEY, algorithms=["HS256"])
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Bearer token {req_token} is invalid: {e}",
)

username: Optional[str] = token.get("username")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Bearer token {req_token} doesn't contain username",
)

user = get_user(username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"User {user} is not mapped to SlashID person",
)

custom_claims= vars(user)

return MapTokenResponse(
person_id=None,
handles=[Handle(type="email_address", value=username)],
custom_claims=custom_claims,
)

Writing the rego policy

This is an example JWT token:

{
"aud": "c921477d-27f2-6bc2-4e3f-ab91a1c65f73",
"authenticated_methods": ["api"],
"exp": 1681720646,
"first_token": false,
"groups": [],
"groups_claim_name": "groups",
"iat": 1681634246,
"iss": "https://api.sandbox.slashid.com",
"jti": "2a4dbca88332fd40ac0dd717de6aa760",
"oid": "c921477d-27f2-6bc2-4e3f-ab91a1c65f73",
"person_id": "06414801-354e-7a95-ba08-062f71b3c9de",
"sub": "06414801-354e-7a95-ba08-062f71b3c9de",
"user_roles": ["user"]
}
info

We are using SlashID issued tokens in this example, but the OPA plugin is issuer-agnostic and can work with any JWT token

    package authz

import future.keywords.if

default allow := false

allow if input.request.parsed_token.payload.user_roles[_] == "admin"

Configuring Gate

This is a sample configuration for the OPA plugin in Gate using inline policies:

  plugins:
- id: translator_up
type: token-translation-upgrade
enabled: false
parameters:
<<: *slashid_config
header_with_token: Authorization
map_token_endpoint: http://backend:8000/map_token
# OPA plugin with an inline policy that checks whether the bearer token contains
# the 'admin' string in claim named 'user_roles'
- id: authz_admin
type: opa
enabled: false
parameters:
<<: *slashid_config
policy_decision_path: /authz/allow
policy: |
package authz

import future.keywords.if

default allow := false

allow if input.request.parsed_token.payload.user_roles[_] == "admin"
...

- pattern: "*/api/admin_opa"
target: http://backend:8000
plugins:
translator_up:
enabled: true
authz_admin:
enabled: true

The token if first enriched with the user_roles claim through the token-translation-upgrade plugin. Once the token is enriched we use the OPA plugin to grant access to the /api/admin_opa endpoint if the policy above returns True.

Conclusion

We have successfully deployed an OPA policy using Gate to implement ABAC on an arbitrary endpoint.