User + Task models
"""
These imports define the key object
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
"""
These object and definitions are used throughout the Jupyter Notebook.
"""
# Setup of key Flask object (app)
app = Flask(__name__)
# Setup SQLAlchemy object and properties for the database (db)
database = 'sqlite:///sqlite.db' # path and filename of database
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database
app.config['SECRET_KEY'] = 'SECRET_KEY'
db = SQLAlchemy()
# This belongs in place where it runs once per project
db.init_app(app)
""" database dependencies to support sqlite examples """
import datetime
from datetime import datetime
import json
from sqlalchemy.exc import IntegrityError
from werkzeug.security import generate_password_hash, check_password_hash
''' Tutorial: https://www.sqlalchemy.org/library.html#tutorials, try to get into a Python shell and follow along '''
# Define the User class to manage actions in the 'users' table
# -- Object Relational Mapping (ORM) is the key concept of SQLAlchemy
# -- a.) db.Model is like an inner layer of the onion in ORM
# -- b.) User represents data we want to store, something that is built on db.Model
# -- c.) SQLAlchemy ORM is layer on top of SQLAlchemy Core, then SQLAlchemy engine, SQL
# Defining the template for users, class definition template. Used to create objects for type user.
class User(db.Model):
__tablename__ = 'users' # table name is plural, class name is singular
# Define the User schema with "vars" from object
# Attributes used for future defined users
id = db.Column(db.Integer, primary_key=True)
_name = db.Column(db.String(255), unique=False, nullable=False)
_uid = db.Column(db.String(255), unique=True, nullable=False)
_password = db.Column(db.String(255), unique=False, nullable=False)
# constructor of a User object, initializes the instance variables within object (self)
# The constructed used to instantiate an object from our user class
def __init__(self, name, uid, password="123qwerty"):
self._name = name # variables with self prefix become part of the object,
self._uid = uid
self.set_password(password)
# a name getter method, extracts name from object
@property
def name(self):
return self._name
# a setter function, allows name to be updated after initial object creation
@name.setter
def name(self, name):
self._name = name
# a getter method, extracts uid from object
@property
def uid(self):
return self._uid
# a setter function, allows uid to be updated after initial object creation
@uid.setter
def uid(self, uid):
self._uid = uid
# check if uid parameter matches user id in object, return boolean
def is_uid(self, uid):
return self._uid == uid
@property
def password(self):
return self._password[0:10] + "..." # because of security only show 1st characters
# update password, this is conventional method used for setter
def set_password(self, password):
"""Create a hashed password."""
self._password = generate_password_hash(password, method='sha256')
# check password parameter against stored/encrypted password
def is_password(self, password):
"""Check against hashed password."""
result = check_password_hash(self._password, password)
return result
# output content using str(object) is in human readable form
# output content using json dumps, this is ready for API response
def __str__(self):
return json.dumps(self.read())
# CRUD create/add a new record to the table
# returns self or None on error
def create(self):
try:
# creates a person object from User(db.Model) class, passes initializers
db.session.add(self) # add prepares to persist person object to Users table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read converts self to dictionary
# returns dictionary
def read(self):
return {
"id": self.id,
"name": self.name,
"uid": self.uid,
}
# CRUD update: updates user name, password, phone
# returns self
def update(self, name="", uid="", password=""):
"""only updates values with length"""
if len(name) > 0:
self.name = name
if len(uid) > 0:
self.uid = uid
if len(password) > 0:
self.set_password(password)
db.session.add(self) # performs update when id exists\n",
db.session.commit()
return self
# CRUD delete: remove self
# None
def delete(self):
db.session.delete(self)
db.session.commit()
return None
class Tasks(db.Model):
__tablename__ = 'tasks'
# Define the task schema
# uid key is grabbed from parent table
id = db.Column(db.Integer, primary_key=True)
_taskName = db.Column(db.String(255), unique=False, nullable=False)
_status = db.Column(db.String(255), unique=False, nullable=False)
_uid = db.Column(db.String(255), db.ForeignKey('users._uid'))
# Constructor of a Notes object, initializes of instance variables within object
def __init__(self, uid, taskName, status):
self.uid = uid
self.taskName = taskName
self.status = status
# taskName getter
@property
def taskName(self):
return self._taskName
# taskName setter
@taskName.setter
def taskName(self, taskName):
self._taskName = taskName
# status getter
@property
def status(self):
return self._status
# status setter
@status.setter
def status(self, status):
self._status = status
# a getter method, extracts uid from object
@property
def uid(self):
return self._uid
# a setter function, allows uid to be updated after initial object creation
@uid.setter
def uid(self, uid):
self._uid = uid
# check if uid parameter matches user id in object, return boolean
def is_uid(self, uid):
return self._uid == uid
# output content using str(object) is in human readable form
# output content using json dumps, this is ready for API response
def __str__(self):
return json.dumps(self.read())
# CRUD create/add a new record to the table
# returns self or None on error
def create(self):
try:
# creates a person object from User(db.Model) class, passes initializers
db.session.add(self) # add prepares to persist person object to Users table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read converts self to dictionary
# returns dictionary
def read(self):
return {
"id": self.id,
"taskName": self.taskName,
"status": self.status,
"uid": self.uid
}
# CRUD update: updates user name, password, phone
# returns self
def update(self, name="", uid="", status=""):
"""only updates values with length"""
if len(name) > 0:
self.name = name
if len(uid) > 0:
self.uid = uid
if len(status) > 0:
self.status = status
db.session.add(self) # performs update when id exists\n",
db.session.add(self) # performs update when id exists\n",
db.session.commit()
return self
# CRUD delete: remove self
# None
def delete(self):
db.session.delete(self)
db.session.commit()
return None
"""Database Creation and Testing """
# Builds working user data
def initUsers():
with app.app_context():
"""Create database and tables"""
db.create_all()
"""Tester data for table"""
u1 = User(name='Theo H', uid='TheoH32', password='theo131!')
u2 = User(name='Jake W', uid='Jabroni', password='pumpkin868')
u3 = User(name='Justin L', uid='!Justin', password='idkwhatmypasswordis')
users = [u1, u2, u3]
"""Builds sample user/note(s) data"""
for user in users:
try:
'''add user to table'''
object = user.create()
print(f"Created new uid {object.uid}")
except: # error raised if object nit created
'''fails with bad or duplicate data'''
print(f"Records exist uid {user.uid}, or error.")
initUsers()
# task data
def initTasks():
with app.app_context():
"""Create database and tables"""
db.create_all()
"""Tester data for table"""
t1 = Tasks(taskName='bball practice', status='100%', uid='TheoH32')
t2 = Tasks(taskName='math homework', status='0%', uid='TheoH32')
t3 = Tasks(taskName='read 30 pages', status='60%', uid='TheoH32')
t4 = Tasks(taskName='go to the gym', status='100%', uid='TheoH32')
t5 = Tasks(taskName='play video games', status='25%', uid='!Justin')
t6 = Tasks(taskName='take a nap', status='100%', uid='!Justin')
t7 = Tasks(taskName='eat lunch', status='0%', uid='!Justin')
t8 = Tasks(taskName='beach picnic', status='100%', uid='Jabroni')
t9 = Tasks(taskName='call Theo', status='100%', uid='Jabroni')
tasks = [t1, t2, t3, t4, t5, t6, t7, t8, t9]
"""Builds sample user/note(s) data"""
for task in tasks:
try:
'''add user to table'''
object = task.create()
print(f"Created new task {object.taskName} for {object.uid} ")
except: # error raised if object nit created
'''fails with bad or duplicate data'''
print(f"Records exist uid {task.taskName}, or error.")
initTasks()
def find_by_uid(uid):
with app.app_context():
user = User.query.filter_by(_uid=uid).first()
return user # returns user object
def create():
# optimize user time to see if uid exists
uid = input("Enter your user id:")
user = find_by_uid(uid)
try:
print("Found\n", user.read())
return
except:
pass # keep going
# request value that ensure creating valid object
name = input("Enter your name:")
password = input("Enter your password")
# Initialize User object before date
user = User(name=name,
uid=uid,
password=password
)
# write object to database
with app.app_context():
try:
object = user.create()
print("Created\n", object.read())
except: # error raised if object not created
print("Unknown error uid {uid}")
##################################################################################<<<<<-------
create()
def create():
# optimize user time to see if uid exists
uid = input("Enter your user id:")
# request value that ensure creating valid object
taskName = input("Enter task name:")
status = input("Enter the status")
# Initialize User object before date
user = Tasks(taskName=taskName,
uid=uid,
status=status
)
# write object to database
with app.app_context():
try:
object = user.create()
print("Created\n", object.read())
except: # error raised if object not created
print("Error")
##################################################################################<<<<<-------
create()
def read():
with app.app_context():
userlist = User.query.all()
tasktable = Tasks.query.all()
user_json = [user.read() for user in userlist]
task_json = [task.read() for task in tasktable]
return user_json + task_json
read()
userInput = input("Enter the uid that you want to check")
def user_read():
with app.app_context():
# defines userlist with json data
userlist = User.query.all()
# i and while loop used to go through each user and find the one with the right uid, if not, i keeps going
i = 0
while (i < len(userlist)):
if userlist[i].uid == userInput:
# if it is found, define it as user_json and break
user_json = userlist[i].read()
break
else:
i = i + 1
print("USER MATCHES:")
print(user_json)
def task_read():
with app.app_context():
# defines task list with json data
tasklist = Tasks.query.all()
i = 0
# task_json needs to be defined before used later
task_json=""
while (i < len(tasklist)):
if tasklist[i].uid == userInput:
# if a task with a matching uid is found, it converts the json data to a string then appends the data and starts on another line with \n
task_json += str(tasklist[i].read()) + "\n"
i=i+1
else:
i = i + 1
print("TASK MATCHES:")
print(task_json)
user_read()
task_read()
def userUpdate():
uid = str(input("Uid of user you want to change?"))
with app.app_context():
user = User.query.filter_by(_uid=uid).first()
name = str(input("New name?"))
uid = str(input("New uid?"))
password = str(input("New password?"))
with app.app_context():
user._name = name
user._uid = uid
user._password = password
user.update()
print("User has been updated")
userUpdate()
def delete():
with app.app_context():
uid = str(input("Enter uid"))
# finds user by uid
user = find_by_uid(uid)
user.delete()
return f"{user.name} at id {user.id} has been deleted"
delete()
import sqlite3
database = 'instance/sqlite.db' # this is location of database
def schema():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Fetch results of Schema
results = cursor.execute("PRAGMA table_info('users')").fetchall()
taskResults = cursor.execute("PRAGMA table_info('tasks')").fetchall()
print("|User Schema|")
# Print the results
for row in results:
print(row)
print("----------------------------------------------")
print("|Task Schema|")
for row in taskResults:
print(row)
# Close the database connection
conn.close()
schema()
import sqlite3
database = 'instance/sqlite.db' # this is location of database
def find_by_uid(uid):
with app.app_context():
user = User.query.filter_by(_uid=uid).first()
return user # returns user object
def create():
# optimize user time to see if uid exists
uid = input("Enter your user id:")
user = find_by_uid(uid)
try:
print("Found\n", user.read())
return
except:
pass # keep going
# request value that ensure creating valid object
name = input("Enter your name:")
password = input("Enter your password")
# Initialize User object before date
user = User(name=name,
uid=uid,
password=password
)
# write object to database
with app.app_context():
try:
object = user.create()
print("Created\n", object.read())
except: # error raised if object not created
print("Unknown error uid {uid}")
def read():
with app.app_context():
userlist = User.query.all()
user_json = [user.read() for user in userlist]
print(user_json)
def update():
uid = str(input("uid of user you want to change?"))
with app.app_context():
user = User.query.filter_by(_uid=uid).first()
name = str(input("New name?"))
uid = str(input("New uid?"))
password = str(input("New password?"))
with app.app_context():
user._name = name
user._uid = uid
user._password = password
user.update()
print("User has been updated")
def delete():
with app.app_context():
uid = str(input("Enter uid"))
# finds user by uid
user = find_by_uid(uid)
user.delete()
return f"{user.name} at id {user.id} has been deleted"
def schema():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Fetch results of Schema
results = cursor.execute("PRAGMA table_info('users')").fetchall()
# Print the results
for row in results:
print(row)
# Close the database connection
conn.close()
def menu():
operation = input("Enter: (C)reate (R)ead (U)pdate or (D)elete or (S)chema")
if operation.lower() == 'c':
create()
elif operation.lower() == 'r':
read()
elif operation.lower() == 'u':
update()
elif operation.lower() == 'd':
delete()
elif operation.lower() == 's':
schema()
elif len(operation)==0: # Escape Key
return
else:
print("Please enter c, r, u, or d")
menu() # repeats, acts like a loop
try:
menu() # start
except:
print("Perform Jupyter 'Run All' prior to starting menu")