Authorization: OPA policies + custom claims for user authorization (ABAC)
Context
You use OPA policies to drive authorization decisions in a ABAC-like fashion. In particular, access to certain endpoints is granted depending on whether the user has certain flags or roles in a given custom claim.
Solution
You can use the Open Policy Agent Evaluator plugin to evaluate OPA policies directly within Gate and you can enrich a user token as we have shown in the Token Enrichment guide.
Step 1 - Run Gate in transparent mode.
You should also have Gate deployed in your infrastructure. You should run Gate in transparent mode to ensure that everything was correctly deployed.
Step 2 - Add custom claims
Add a custom claim attribute to the JWT token as described in the Token Enrichment guide.
Step 3 - Implement your policy in Rego
To evaluate an OPA policy you need two pieces of input:
- The Rego policy
- Input data, passed to the Rego policy as the special
input
object
Policy input object example
The plugin passes the follow input format to OPA.
Note that this format is compatible with OPA-Envoy, so you can reuse your existing OPA policies with Gate.
The following is an example of input
object Gate makes available to Rego policies during evaluation in its current Version 1 format:
{
"version": {
"gate": "v1.1"
},
"request": {
"http": {
"headers": {
"Accept": [
"application/json, text/plain, */*"
],
"Accept-Encoding": [
"gzip, compress, deflate, br"
],
"Authorization": [
"Bearer eyJhb..."
],
"Cookie": [
"foo=bar"
]
"User-Agent": [
"axios/1.3.4"
]
},
"cookies": {
"foo": "bar"
}
"host": "example.com",
"method": "GET",
"path": "/some/path",
"protocol": "HTTP/1.1",
"query": "option=value",
"scheme": "https",
"url": "https://example.com/some/path?option=value"
},
"parsed_path": [
"some",
"path"
],
"parsed_query": {
"option": [
"value"
]
},
"parsed_token": {
"header": {
"alg": "RS256",
"kid": "pYsNGA"
},
"payload": {
...
},
"signature": "JvVt..."
},
"time": "2023-04-09T18:38:41.117405838Z",
"token": "eyJh..."
}
}
Note that if the plugin is configured to execute on responses as well then the input object contains both the request and response objects. The request is the same as the object shown above. The whole input object looks as follows:
{
"version": {
"gate": "v1.1"
},
"request": {
"http": {
...
}
},
"response": {
"http": {
"contentType": "application/json"
"status": 200
"body": "{}"
"headers": {
"Accept": [
"application/json, text/plain, */*"
],
"Accept-Encoding": [
"gzip, compress, deflate, br"
],
"Authorization": [
"Bearer eyJhb..."
],
"User-Agent": [
"axios/1.3.4"
]
},
},
"time": "2023-04-09T18:38:46.117405838Z",
"parsed_path": [
"some",
"path"
],
"parsed_query": {
"option": [
"value"
]
},
}
}
The output of an evaluation is expected to be a boolean to be found at the location specified in the policy_decision_path
plugin parameter.
Step 4 - Deploy your policy with Gate
You have two options to deploy OPA policies with Gate.
The first is to inline policies, this approach works best when you have a limited number of policies and performance is really important.
The second approach is to use a remote bundle. The remote bundle can be pulled from any remote host, including the SlashID distribution hub.
Example
For the sake of this exercise, let's consider a case where we want to use an OPA policy to check
whether a custom claim called user_roles
contains the value admin
.
Add a custom claim to the JWT token
As described in the Token Enrichment guide, we can define a token mapping function similar to the following to add custom claims to the user object.
- Python
- TypeScript
def get_user(username: Optional[str]) -> Optional[DatabaseUser]:
try:
if username == f"user@{vendor_domain}":
return DatabaseUser(
username=username,
user_roles=["user"],
)
if username == f"admin@{vendor_domain}":
return DatabaseUser(
username=username,
user_roles=["user", "admin"],
)
except:
pass
return None
async def map_token(request: MapTokenRequest) -> MapTokenResponse | Response:
logger.info(f"/map_token: request={request}")
req_token = request.token
if not req_token.lower().startswith("bearer "):
return Response(status_code=status.HTTP_204_NO_CONTENT)
req_token = req_token[len("bearer ") :]
logger.info(f"/map_token: req_token={req_token}")
try:
token = jwt.decode(req_token, TOKEN_SIGNING_KEY, algorithms=["HS256"])
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Bearer token {req_token} is invalid: {e}",
)
username: Optional[str] = token.get("username")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Bearer token {req_token} doesn't contain username",
)
user = get_user(username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"User {user} is not mapped to SlashID person",
)
custom_claims= vars(user)
return MapTokenResponse(
person_id=None,
handles=[Handle(type="email_address", value=username)],
custom_claims=custom_claims,
)
import * as pbkdf2 from "pbkdf2";
import * as jwt from "jsonwebtoken";
interface DatabaseUser {
username: string;
name: string;
street_address: string;
password_hash: Buffer;
user_roles: string[];
}
interface MapTokenRequest {
token: string;
}
interface Handle {
type: string;
value: string;
}
interface MapTokenResponse {
person_id: null;
handles: Handle[];
custom_claims: Record<string, any>;
}
function getUser(username: string | null): DatabaseUser | null {
try {
if (username === "[email protected]") {
return {
username: username,
user_roles: ["user"],
};
}
if (username === "[email protected]") {
return {
username: username,
user_roles: ["user", "admin"],
};
}
} catch (error) {
// Handle error here if needed
}
return null;
}
async function mapToken(
request: MapTokenRequest,
): Promise<MapTokenResponse | { statusCode: number }> {
let reqToken = request.token;
if (!reqToken.toLowerCase().startsWith("bearer ")) {
return { statusCode: 204 };
}
reqToken = reqToken.substring("bearer ".length);
let token: any;
try {
token = jwt.verify(reqToken, TOKEN_SIGNING_KEY, { algorithms: ["HS256"] });
} catch (e) {
throw {
statusCode: 401,
detail: "Bearer token is invalid",
};
}
const username = token.username;
if (username === undefined) {
throw {
statusCode: 401,
detail: "Bearer token doesn't contain username",
};
}
const user = getUser(username);
if (user === null) {
throw {
statusCode: 401,
detail: "User is not mapped to SlashID person",
};
}
const customClaims: Record<string, any> = { ...user };
return {
person_id: null,
handles: [{ type: "email_address", value: username }],
custom_claims: customClaims,
};
}
Writing the rego policy
This is an example JWT token:
{
"aud": "c921477d-27f2-6bc2-4e3f-ab91a1c65f73",
"authenticated_methods": ["api"],
"exp": 1681720646,
"first_token": false,
"groups": [],
"groups_claim_name": "groups",
"iat": 1681634246,
"iss": "https://api.sandbox.slashid.com",
"jti": "2a4dbca88332fd40ac0dd717de6aa760",
"oid": "c921477d-27f2-6bc2-4e3f-ab91a1c65f73",
"person_id": "06414801-354e-7a95-ba08-062f71b3c9de",
"sub": "06414801-354e-7a95-ba08-062f71b3c9de",
"user_roles": ["user"]
}
We are using SlashID issued tokens in this example, but the OPA plugin is issuer-agnostic and can work with any JWT token
package authz
import future.keywords.if
default allow := false
allow if input.request.parsed_token.payload.user_roles[_] == "admin"
Configuring Gate
This is a sample configuration for the OPA plugin in Gate using inline policies:
plugins:
- id: translator_up
type: token-translation-upgrade
enabled: false
parameters:
<<: *slashid_config
header_with_token: Authorization
map_token_endpoint: http://backend:8000/map_token
# OPA plugin with an inline policy that checks whether the bearer token contains
# the 'admin' string in claim named 'user_roles'
- id: authz_admin
type: opa
enabled: false
parameters:
<<: *slashid_config
policy_decision_path: /authz/allow
policy: |
package authz
import future.keywords.if
default allow := false
allow if input.request.parsed_token.payload.user_roles[_] == "admin"
...
- pattern: "*/api/admin_opa"
target: http://backend:8000
plugins:
translator_up:
enabled: true
authz_admin:
enabled: true
The token if first enriched with the user_roles
claim through the token-translation-upgrade
plugin.
Once the token is enriched we use the OPA plugin to grant access to the /api/admin_opa
endpoint if the policy above returns True
.
Conclusion
We have successfully deployed an OPA policy using Gate to implement ABAC on an arbitrary endpoint.