Getting Started¶
A simple example how to use authentication and authorization middleware with an aiohttp application.
import asyncio
from os import urandom
import aiohttp_auth
from aiohttp import web
from aiohttp_auth import auth, autz
from aiohttp_auth.auth import auth_required
from aiohttp_auth.autz import autz_required
from aiohttp_auth.autz.policy import acl
from aiohttp_auth.permissions import Permission, Group
db = {
'bob': {
'password': 'bob_password',
'groups': ['guest', 'staff']
},
'alice': {
'password': 'alice_password',
'groups': ['guest']
}
}
# global ACL context
context = [(Permission.Allow, 'guest', {'view', }),
(Permission.Deny, 'guest', {'edit', }),
(Permission.Allow, 'staff', {'view', 'edit', 'admin_view'}),
(Permission.Allow, Group.Everyone, {'view_home', })]
# create an ACL authorization policy class
class ACLAutzPolicy(acl.AbstractACLAutzPolicy):
"""The concrete ACL authorization policy."""
def __init__(self, db, context=None):
# do not forget to call parent __init__
super().__init__(context)
self.db = db
async def acl_groups(self, user_identity):
"""Return acl groups for given user identity.
This method should return a set of groups for given user_identity.
Args:
user_identity: User identity returned by auth.get_auth.
Returns:
Set of acl groups for the user identity.
"""
# implement application specific logic here
user = self.db.get(user_identity, None)
if user is None:
# return empty set of groups for not authenticated users
# middleware will fill it with Group.Everyone
return set()
return user['groups']
async def login(request):
# http://127.0.0.1:8080/login?username=bob&password=bob_password
user_identity = request.GET.get('username', None)
password = request.GET.get('password', None)
if user_identity in db and password == db[user_identity]['password']:
# remember user identity
await auth.remember(request, user_identity)
return web.Response(text='Ok')
raise web.HTTPUnauthorized()
# only authenticated users can logout
# if user is not authenticated auth_required decorator
# will raise a web.HTTPUnauthorized
@auth_required
async def logout(request):
# forget user identity
await auth.forget(request)
return web.Response(text='Ok')
# user should have a group with 'admin_view' permission allowed
# if he does not autz_required will raise a web.HTTPForbidden
@autz_required('admin_view')
async def admin(request):
return web.Response(text='Admin Page')
@autz_required('view_home')
async def home(request):
text = 'Home page.'
# check if current user is permitted with 'admin_view' permission
if await autz.permit(request, 'admin_view'):
text += ' Admin page: http://127.0.0.1:8080/admin'
# get current user identity
user_identity = await auth.get_auth(request)
if user_identity is not None:
# user is authenticated
text += ' Logout: http://127.0.0.1:8080/logout'
return web.Response(text=text)
@autz_required('view')
async def view(request):
return web.Response(text='View Page')
def init_app(loop):
app = web.Application(loop=loop)
# Create an auth ticket mechanism that expires after 1 minute (60
# seconds), and has a randomly generated secret. Also includes the
# optional inclusion of the users IP address in the hash
auth_policy = auth.CookieTktAuthentication(urandom(32), 60,
include_ip=True)
# Create an ACL authorization policy
autz_policy = ACLAutzPolicy(db, context)
# setup middlewares in aiohttp fashion
aiohttp_auth.setup(app, auth_policy, autz_policy)
app.router.add_get('/', home)
app.router.add_get('/login', login)
app.router.add_get('/logout', logout)
app.router.add_get('/admin', admin)
app.router.add_get('/view', view)
return app
loop = asyncio.get_event_loop()
app = init_app(loop)
web.run_app(app, host='127.0.0.1')