DevSecOps – How to Ensure Application Security within the DevOps Process
How to ensure product security within the DevOps process? What SAST, DAST, and SCA are? How they can contribute to improving security?
Hey, today as Innokrea, we continue the topic of creating a REST API using the FastAPI framework, as well as technologies such as MongoDB or Docker. In the last part, we managed to create a basic folder structure, plan the architecture, set up the database, and containerize the necessary applications. Today, we will try to fill our project with code and test if the added functionalities work. We also encourage you to read the previous article in this series.
Let’s remind ourselves of the folder structure partially reflecting our planned layered architecture. Each of the folders contains a init.py file to mark the folder as a Python module. Our application will be responsible for handling basic login, user registration, and assigning tokens using JWT.
Figure 1 – Folder Structure
We will start by creating a model class User that reflects the structure of a record in the database. To do this, we will use the pydantic library and the BaseModel class. Let’s create a file named models.py in the models module:
from pydantic import BaseModel
from typing import Optional
from bson import ObjectId
class User(BaseModel):
_id: Optional[ObjectId] = None
email:str
role:str
password_hash:str
The above model must be consistent with the one initialized in the previous article in the database initialization file db-init.js. Let’s now try to create a connection to the database. In the database module, let’s create a file named connector.py and fill it with the following content:
import os
from pymongo import MongoClient
from pymongo.database import Database
from typing import Union
class Connector:
_client: Union[MongoClient, None] = None
hostname : Union[str, None] = os.getenv('DB_HOSTNAME')
username : Union[str, None] = os.getenv('DB_USERNAME')
password : Union[str, None] = os.getenv('DB_PASSWORD')
db_name: Union[str, None] = os.getenv('DB_NAME')
port: int = int(os.getenv('DB_PORT', '999999'))
@classmethod
def get_db_client(cls) -> MongoClient:
if not cls._client:
cls._client = MongoClient(host=cls.hostname,port=cls.port,username=cls.username,password=cls.password)
return cls._client
@classmethod
def get_db(cls) -> Database:
return cls.get_db_client().get_database(cls.db_name)
Remember to set the variables used here in the docker-compose-dev.yml file.Now, in the repositories folder, let’s create a file named user_repository.py using the Database class from the pymongo library.
from pymongo.collection import Collection
from pymongo.database import Database
from pymongo.results import InsertOneResult
from app.models.models import User
from typing import Any, Dict, Union
class UserRepository:
def __init__(self, database: Database) -> None:
self.database: Database = database
self.users_collection: Collection = self.database['users']
def add_user(self, user: User) -> User:
user_dict: Dict[str, Any] = user.model_dump()
insertion_result: InsertOneResult = self.users_collection.insert_one(user_dict)
user._id = insertion_result.inserted_id
return user
def find_user_by_email(self, email: str) -> Union[User, None]:
user_data: Union[Dict[str, Any], None] = self.users_collection.find_one({"email": email})
if not user_data:
return None
return User(**user_data)
In the class, we define a constructor taking ‘database’ and two methods – one for creating a user, and the other for retrieving a user object from the MongoDB database.Now let’s build the business logic layer for the user. To do this, we’ll start with exception handling in FastAPI. In case of failure, we’ll raise a custom exception defined by us, then catch it using a handler and return an HTTP error response to the user in JSON format. In the exceptions module, let’s create a file named exceptions.py:
from fastapi import status, HTTPException
from fastapi.responses import JSONResponse
from fastapi import Request
# definitions
class DuplicateUserException(HTTPException):
def __init__(self, status_code: int = status.HTTP_400_BAD_REQUEST, detail: str = "This e-mail is not acceptable"):
super().__init__(status_code=status_code, detail=detail)
class InvalidEmailFormatException(HTTPException):
def __init__(self, status_code: int = status.HTTP_400_BAD_REQUEST, detail: str = "This e-mail is not acceptable"):
super().__init__(status_code=status_code, detail=detail)
class InvalidCredentialsException(HTTPException):
def __init__(self, status_code: int = status.HTTP_401_UNAUTHORIZED, detail: str = "Incorrect e-mail or password"):
super().__init__(status_code=status_code, detail=detail)
# handlers
async def handle_duplicate_user_exception(request: Request, exc: DuplicateUserException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
async def handle_invalid_email_format_exception(request: Request, exc: InvalidEmailFormatException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
async def handle_invalid_credentials_exception(request: Request, exc: InvalidCredentialsException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
We also need to create so-called schemas, which are model classes for our requests and responses. In the schemas module, let’s define the following classes using the pydantic library in the schemas.py file:
from pydantic import BaseModel
class NewUserRequest(BaseModel):
email: str
password: str
class UserLoginRequest(BaseModel):
email: str
password: str
class NewUserResponse(BaseModel):
email: str
role: str
class UserLoginResponse(BaseModel):
access_token: str
Now we have everything we need to properly build our business logic. To do this, we create a file named user_service.py in the services module:
import re
from app.repositories.user_repository import UserRepository
from app.models.models import User
from hashlib import sha256
from app.schemas.schemas import *
from app.exceptions.exceptions import DuplicateUserException, InvalidEmailFormatException, InvalidCredentialsException
from typing import Union
email_regex = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b"
def is_email_valid(email: str) -> bool:
return bool(re.fullmatch(email_regex, email))
class UserService:
def __init__(self, repository:UserRepository) -> None:
self.repository: UserRepository = repository
def create_user(self, data:NewUserRequest, role:str) -> User:
email :str = data.email
password : str = data.password
if not is_email_valid(data.email):
raise InvalidEmailFormatException()
if self.repository.find_user_by_email(email):
raise DuplicateUserException()
password_hash: str = sha256(password.encode('utf-8')).hexdigest()
user:User = User(email=email,password_hash=password_hash, role=role)
user = self.repository.add_user(user)
return user
def check_user_credentials(self, email:str, password:str) -> User:
if not is_email_valid(email):
raise InvalidEmailFormatException()
user:Union[User,None] = self.repository.find_user_by_email(email)
if not user or sha256(password.encode('utf-8')).hexdigest() != user.password_hash:
raise InvalidCredentialsException()
return user
def check_user_exists(self,email) -> User:
if not is_email_valid(email):
raise InvalidEmailFormatException()
user:Union[User,None] = self.repository.find_user_by_email(email)
if not user:
raise InvalidCredentialsException()
return user
We can see the use of our classes from the schema module and ‘raise’ in case of conditions mismatch, such as incorrect email format or existing user in the database. However, exceptions are not yet handled by FastAPI, but we will add it using the appropriate methods on the FastAPI object. To generate the token, we will create another file named token_service.py in the services module, which will be responsible for token generation.
from typing import Union, Any
import os
from app.repositories.user_repository import UserRepository
from datetime import datetime, timedelta, timezone
from app.models.models import User
import jwt
class TokenService:
def __init__(self, user_repository: UserRepository) -> None:
self.user_repository: UserRepository = user_repository
self.jwt_access_token_secret_key: Union[str, None] = os.getenv('JWT_ACCESS_TOKEN_SECRET_KEY')
self.jwt_access_token_expire_minutes: int = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 7 * 24 * 60))
self.jwt_token_alg: Union[str, None] = os.getenv('JWT_TOKEN_ALG')
def generate_access_token(self, user_data: User) -> str:
expire: datetime = datetime.now(timezone.utc) + timedelta(minutes=self.jwt_access_token_expire_minutes)
data_to_encode: dict[str, Any] = {"email": user_data.email, "role": user_data.role, "exp": expire}
encoded_jwt: str = jwt.encode(data_to_encode, str(self.jwt_access_token_secret_key), str(self.jwt_token_alg))
return encoded_jwt
Before proceeding further, to the controller layer, we need to define dependencies in the dependencies module in the dependencies.py file, which we will use to inject dependencies into the endpoints in FastAPI.
from fastapi import Depends
from pymongo.database import Database
from app.services.user_service import UserService
from app.services.token_service import TokenService
from app.repositories.user_repository import UserRepository
from app.database.connector import Connector
def get_user_service(db: Database = Depends(Connector.get_db)) -> UserService:
return UserService(UserRepository(db))
def get_token_service(db: Database = Depends(Connector.get_db)) -> TokenService:
return TokenService(UserRepository(db))
Here we have the initialization of services using the database connection through the repository. The layered architecture is visible here.
Now it’s time to define the controller in the controllers module to define the appropriate endpoints for login and registration. We will move the logic of endpoints from the app.py file to the user_controller.py file.
from fastapi import APIRouter, Depends, status, Response
import os
from app.schemas.schemas import NewUserRequest, NewUserResponse, UserLoginRequest, UserLoginResponse
from app.services.user_service import UserService
from app.services.token_service import TokenService
from app.models.models import User
from app.dependencies.dependencies import get_user_service, get_token_service
router = APIRouter(
prefix=os.getenv('API_AUTHENTICATION_PREFIX','/api'),
tags=['user']
)
@router.post("/register", response_model=NewUserResponse, status_code=status.HTTP_201_CREATED)
def register(data: NewUserRequest, service: UserService = Depends(get_user_service)):
user: User = service.create_user(data, "user")
return NewUserResponse(email=user.email, role=user.role)
@router.post("/login",response_model=UserLoginResponse, status_code=status.HTTP_200_OK)
def login(data: UserLoginRequest, response: Response, user_service: UserService = Depends(get_user_service), token_service: TokenService = Depends(get_token_service)):
user: User = user_service.check_user_credentials(data.email, data.password)
access_token: str = token_service.generate_access_token(user)
response_model: UserLoginResponse = UserLoginResponse(access_token=access_token)
response.headers.append("Token-Type", "Bearer")
return response_model
In the controller, we defined a router to move the logic of creating endpoints from the app.py file. Here, we use the injected service to handle business logic and handle requests and send responses to clients through models defined in schemas.
The last piece of our puzzle is initializing FastAPI and adding a response to the exception thrown by services in the app object. This is where we connect the handler with the exception definition.
from fastapi import FastAPI
from app.exceptions.exceptions import *
from app.controllers.user_controller import router as user_router
app = FastAPI()
app.add_exception_handler(DuplicateUserException, handle_duplicate_user_exception)
app.add_exception_handler(InvalidEmailFormatException, handle_invalid_email_format_exception)
app.add_exception_handler(InvalidCredentialsException, handle_invalid_credentials_exception)
app.include_router(user_router)
To run the application, execute the Docker command:
docker-compose -f docker-compose-dev.yml up --build
After launching, we can test the endpoints using curl (localhost:8000/api):
curl -X POST "http://localhost:8000/api/register" -H "Content-Type: application/json" -d "{\"email\": \"example@example.com\",\"password\":\"securepassword\" }"
curl -X POST "http://localhost:8000/api/login" -H "Content-Type: application/json" -d "{\"email\": \"example@example.com\",\"password\":\"securepassword\" }"
Figure 2 – Queries using curl
The added user should be visible through the admin panel available at localhost:8083
Figure 3 – Database content after user registration
Figure 4 – Final project structure
We have successfully created a working project with a layered architecture that can be easily extended. We have proposed a division of the project into appropriate modules. In the next, final post on FastAPI, we will learn how to add tests with the pytest library and run them in Docker. If you’re curious, we encourage you to read on!
DevSecOps – How to Ensure Application Security within the DevOps Process
How to ensure product security within the DevOps process? What SAST, DAST, and SCA are? How they can contribute to improving security?
AdministrationSecurity
User Identity and Access Management – What’s the Deal with IDP?
What user identity is? Why managing access is essential for businesses? How an IDP (Identity Provider) works? You will find the answer to these questions in the article.
Security
Hey, hey... Programmer, this is another article for you! The second part of the article on design patterns. Get to know Adapter and Memento.
Programming