TG-124 : Remise a niveau des TU

This commit is contained in:
Clément ARNAUDEAU 2017-03-24 15:37:21 +01:00
parent 1b8c71945a
commit 59f1fca610
5 changed files with 45 additions and 36 deletions

View File

@ -25,6 +25,7 @@ class GroupAPI(Resource):
resp_id = args['resp_id'] resp_id = args['resp_id']
sec_id = args['sec_id'] sec_id = args['sec_id']
res_dir = getParam('URL_BASE_DIRECTORY') + name + "/" res_dir = getParam('URL_BASE_DIRECTORY') + name + "/"
mails = []
group = getGroup(name=name) group = getGroup(name=name)
if group is not None: if group is not None:
@ -38,14 +39,14 @@ class GroupAPI(Resource):
rows = query.execute() rows = query.execute()
res = rows.first() res = rows.first()
if res.hash is not None and len(res.hash) > 0: if res.hash is not None and len(res.hash) > 0:
mail = mailsModels.getMailContent("NEW_RESP_OF_GROUP", {"GROUP": group["name"], mail = mailsModels.getMailContent("NEW_RESP_OF_GROUP", {"GROUP": name,
"URL": getParam('OLA_URL') + "registration/" "URL": getParam('OLA_URL') + "registration/"
+ res.hash}) + res.hash})
else: else:
mail = mailsModels.getMailContent("RESP_OF_GROUP", {"GROUP": group["name"], mail = mailsModels.getMailContent("RESP_OF_GROUP", {"GROUP": name,
"URL": getParam('OLA_URL')}) "URL": getParam('OLA_URL')})
send_mail(mail[0], user["email"], mail[1]) mails.append((user["email"], mail))
if "2" not in user['role'].split('-'): if "2" not in user['role'].split('-'):
role = user['role'] + "-2" role = user['role'] + "-2"
query = USER.update().values(role=role).where(USER.c.id == resp_id) query = USER.update().values(role=role).where(USER.c.id == resp_id)
@ -59,14 +60,14 @@ class GroupAPI(Resource):
rows = query.execute() rows = query.execute()
res = rows.first() res = rows.first()
if res.hash is not None and len(res.hash) > 0: if res.hash is not None and len(res.hash) > 0:
mail = mailsModels.getMailContent("NEW_SEC_OF_GROUP", {"GROUP": group["name"], mail = mailsModels.getMailContent("NEW_SEC_OF_GROUP", {"GROUP": name,
"URL": getParam('OLA_URL') + "registration/" "URL": getParam('OLA_URL') + "registration/"
+ res.hash}) + res.hash})
else: else:
mail = mailsModels.getMailContent("SEC_OF_GROUP", {"GROUP": group["name"], mail = mailsModels.getMailContent("SEC_OF_GROUP", {"GROUP": name,
"URL": getParam('OLA_URL')}) "URL": getParam('OLA_URL')})
send_mail(mail[0], user["email"], mail[1]) mails.append((user["email"], mail))
if "1" not in user['role'].split('-'): if "1" not in user['role'].split('-'):
role = user['role'] + "-1" role = user['role'] + "-1"
query = USER.update().values(role=role).where(USER.c.id == sec_id) query = USER.update().values(role=role).where(USER.c.id == sec_id)
@ -76,6 +77,12 @@ class GroupAPI(Resource):
department=department, resp_id=resp_id, sec_id=sec_id, ressources_dir=res_dir) department=department, resp_id=resp_id, sec_id=sec_id, ressources_dir=res_dir)
res = query.execute() res = query.execute()
os.mkdir(res_dir) os.mkdir(res_dir)
for m in mails:
addr = m[0]
mail = m[1]
send_mail(mail[0], addr, mail[1])
return {"GID": res.lastrowid}, 201 return {"GID": res.lastrowid}, 201
def put(self, gid): def put(self, gid):
@ -91,6 +98,7 @@ class GroupAPI(Resource):
resp_id = args['resp_id'] resp_id = args['resp_id']
sec_id = args['sec_id'] sec_id = args['sec_id']
res_dir = getParam('URL_BASE_DIRECTORY') + name + "/" res_dir = getParam('URL_BASE_DIRECTORY') + name + "/"
mails = []
group = getGroup(gid=gid) group = getGroup(gid=gid)
if group is None: if group is None:
@ -115,7 +123,7 @@ class GroupAPI(Resource):
mail = mailsModels.getMailContent("RESP_OF_GROUP", {"GROUP": group["name"], mail = mailsModels.getMailContent("RESP_OF_GROUP", {"GROUP": group["name"],
"URL": getParam('OLA_URL')}) "URL": getParam('OLA_URL')})
send_mail(mail[0], user["email"], mail[1]) mails.append((user["email"], mail))
if "2" not in user['role'].split('-'): if "2" not in user['role'].split('-'):
role = user['role'] + "-2" role = user['role'] + "-2"
query = USER.update().values(role=role).where(USER.c.id == resp_id) query = USER.update().values(role=role).where(USER.c.id == resp_id)
@ -136,7 +144,7 @@ class GroupAPI(Resource):
mail = mailsModels.getMailContent("SEC_OF_GROUP", {"GROUP": group["name"], mail = mailsModels.getMailContent("SEC_OF_GROUP", {"GROUP": group["name"],
"URL": getParam('OLA_URL')}) "URL": getParam('OLA_URL')})
send_mail(mail[0], user["email"], mail[1]) mails.append((user["email"], mail))
if "1" not in user['role'].split('-'): if "1" not in user['role'].split('-'):
role = user['role'] + "-1" role = user['role'] + "-1"
query = USER.update().values(role=role).where(USER.c.id == sec_id) query = USER.update().values(role=role).where(USER.c.id == sec_id)
@ -150,6 +158,11 @@ class GroupAPI(Resource):
if group["ressources_dir"] != res_dir: if group["ressources_dir"] != res_dir:
os.rename(group["ressources_dir"], res_dir) os.rename(group["ressources_dir"], res_dir)
for m in mails:
addr = m[0]
mail = m[1]
send_mail(mail[0], addr, mail[1])
return {"GID": gid}, 200 return {"GID": gid}, 200
def get(self, gid=0, name=""): def get(self, gid=0, name=""):

View File

@ -46,7 +46,7 @@ class UserAPI(Resource):
if psw is None or len(psw) < 8: if psw is None or len(psw) < 8:
return {"ERROR": "Password can't be empty or less than 8 characters !"}, 400 return {"ERROR": "Password can't be empty or less than 8 characters !"}, 400
password = sha256(psw).hexdigest() password = sha256(psw.encode('utf-8')).hexdigest()
if getUser(uid=uid) is None: if getUser(uid=uid) is None:
return {"ERROR": "This user doesn't exists !"}, 405 return {"ERROR": "This user doesn't exists !"}, 405

View File

@ -48,6 +48,6 @@ def getMailContent(mail_type, args):
else: else:
raise Exception("Unknown mail type !") raise Exception("Unknown mail type !")
for key, value in args: for key, value in args.items():
mail[1].replace("#" + key, value) mail[1].replace("#" + key, value)
return mail return mail

View File

@ -12,10 +12,12 @@ class GroupTestCase(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
query = USER.insert().values(login="admin", email="admin@admin.com", role="4", phone="00.00.00.00.00") query = USER.insert().values(email="admin@admin.com", role="4", phone="00.00.00.00.00", name="admin",
hash="toto")
res = query.execute() res = query.execute()
cls.uid = res.lastrowid cls.uid = res.lastrowid
query = USER.insert().values(login="adminx", email="adminx@admin.com", role="3", phone="00.00.00.00.00") query = USER.insert().values(email="adminx@admin.com", role="3", phone="00.00.00.00.00", name="adminx",
hash="zozo")
res = query.execute() res = query.execute()
cls.uid2 = res.lastrowid cls.uid2 = res.lastrowid
@ -25,9 +27,9 @@ class GroupTestCase(unittest.TestCase):
query.execute() query.execute()
query = GROUP.delete().where(group_class.name == "group_test2") query = GROUP.delete().where(group_class.name == "group_test2")
query.execute() query.execute()
query = USER.delete().where(user_class.login == "admin") query = USER.delete().where(user_class.email == "admin@admin.com")
query.execute() query.execute()
query = USER.delete().where(user_class.login == "adminx") query = USER.delete().where(user_class.email == "adminx@admin.com")
query.execute() query.execute()
def setUp(self): def setUp(self):

View File

@ -13,9 +13,9 @@ class UserTestCase(unittest.TestCase):
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
query = USER.delete().where(user_class.login == "admin") query = USER.delete().where(user_class.email == "admin@admin.com")
query.execute() query.execute()
query = USER.delete().where(user_class.login == "adminx") query = USER.delete().where(user_class.email == "admin@admin.com")
query.execute() query.execute()
def setUp(self): def setUp(self):
@ -24,42 +24,41 @@ class UserTestCase(unittest.TestCase):
def tearDown(self): def tearDown(self):
pass pass
def create_user(self, login, role): def create_user(self, email, role, name):
return self.app.post('/api/user', return self.app.post('/api/user',
data=json.dumps( data=json.dumps(
dict( dict(
CASid=login, email=email,
role=role role=role,
name=name
) )
), content_type='application/json') ), content_type='application/json')
def getUserByID(self, UID): def getUserByID(self, UID):
return self.app.get('/api/user/byuid/' + str(UID)) return self.app.get('/api/user/byuid/' + str(UID))
def getUserByLogin(self, login):
return self.app.get('/api/user/bylogin/' + login)
def getUserByEmail(self, email): def getUserByEmail(self, email):
return self.app.get('/api/user/byemail/' + email) return self.app.get('/api/user/byemail/' + email)
def change_user(self, UID, login, role, email, phone): def change_user(self, UID, email, role, phone, name, password):
return self.app.put('/api/user/byuid/' + str(UID), return self.app.put('/api/user/byuid/' + str(UID),
data=json.dumps( data=json.dumps(
dict( dict(
CASid=login,
role=role, role=role,
email=email, email=email,
phone=phone phone=phone,
name=name,
password=password
) )
), content_type='application/json') ), content_type='application/json')
def test_user(self): def test_user(self):
rv = self.create_user('admin', '4') rv = self.create_user('admin@admin.com', '4', 'Admin')
self.assertEqual(rv.status_code, 201, 'Creating user Failed') self.assertEqual(rv.status_code, 201, 'Creating user Failed')
uid = json.loads(rv.data)['UID'] uid = json.loads(rv.data)['UID']
self.assertIsNotNone(uid) self.assertIsNotNone(uid)
rv = self.create_user('admin', '4') rv = self.create_user('admin@admin.com', '4', 'Admin')
self.assertEqual(rv.status_code, 200, 'User is supposed to already exist') self.assertEqual(rv.status_code, 200, 'User is supposed to already exist')
uid2 = json.loads(rv.data)['UID'] uid2 = json.loads(rv.data)['UID']
self.assertEqual(uid, uid2, "The UID must be the same !") self.assertEqual(uid, uid2, "The UID must be the same !")
@ -69,23 +68,18 @@ class UserTestCase(unittest.TestCase):
user = json.loads(rv.data)['USER'] user = json.loads(rv.data)['USER']
self.assertIsNotNone(user) self.assertIsNotNone(user)
rv = self.getUserByLogin("admin") rv = self.getUserByEmail("admin@admin.com")
self.assertEqual(rv.status_code, 200, 'Getting user failed by Login')
user2 = json.loads(rv.data)['USER']
self.assertEqual(user, user2, "User by login must be the same !")
rv = self.getUserByEmail("admin@ola.com")
self.assertEqual(rv.status_code, 200, 'Getting user failed by email') self.assertEqual(rv.status_code, 200, 'Getting user failed by email')
user3 = json.loads(rv.data)['USER'] user3 = json.loads(rv.data)['USER']
self.assertEqual(user, user3, "User by email must be the same !") self.assertEqual(user, user3, "User by email must be the same !")
rv = self.change_user(uid, 'adminx', '3', 'adminx@email.com', '11.11.11.11.11') rv = self.change_user(uid, 'adminx@admin.com', '3', '11.11.11.11.11', 'Adminx', 'password')
self.assertEqual(rv.status_code, 200, 'User modification failed !') self.assertEqual(rv.status_code, 200, 'User modification failed !')
uid3 = json.loads(rv.data)['UID'] uid3 = json.loads(rv.data)['UID']
self.assertEqual(uid, uid3, "UIDs doesn't match !") self.assertEqual(uid, uid3, "UIDs doesn't match !")
rv = self.getUserByLogin("adminx") rv = self.getUserByEmail("adminx@admin.com")
self.assertEqual(rv.status_code, 200, 'Getting modified user failed by Login') self.assertEqual(rv.status_code, 200, 'Getting modified user failed by Email')
user4 = json.loads(rv.data)['USER'] user4 = json.loads(rv.data)['USER']
self.assertIsNotNone(user4, "Modified user shouldn't be None !") self.assertIsNotNone(user4, "Modified user shouldn't be None !")