from operator import add
import numpy as np
import pandas as pd
from sklearn.externals import joblib
from sklearn.neighbors.nearest_centroid import NearestCentroid
from os import path
from .utility_funcs.preprocessing_operations import initInputParameters, readVocabFromFile
from .interface_repository_classifier import Interface_RepoClassifier
from sklearn import preprocessing
import os
from .github_repo import GithubRepo
from pathlib import Path
[docs]class RepositoryClassifier(Interface_RepoClassifier):
def __init__(self, bUseStringFeatures=True):
"""
constructor which initializes member variables
"""
self.bModelLoaded = False
self.bModelTrained = False
self.clf = None
self.lstMeanValues = None
self.lstVoc = None
self.stdScaler = None
self.lstTrainLabels = None
self.lstTrainData = None
self.normalizer = None
self.bUseCentroids = True
self.normalizerIntegerAttr = None
# self.scaler = None
self.lstTrainDataRaw = None
self.lstStrCategories = ['DEV', 'HW', 'EDU', 'DOCS', 'WEB', 'DATA', 'OTHER']
self.directory = path.dirname(__file__)
print(self.directory)
self.bUseStringFeatures = bUseStringFeatures
# get the project-directory
self.strProjectDir = str(Path().resolve().parent)
print('strProjectDir:', self.strProjectDir)
self.strModelPath = self.directory + '/model/'
# Create model-directory if needed
if not os.path.exists(self.strModelPath):
os.makedirs(self.strModelPath)
self.strModelFileName = 'RepositoryClassifier.pkl'
self.strLstMeanValuesFileName = 'lstMeanValues.pkl'
self.strMatIntegerTrainingData = 'matIntegerTrainingData.pkl'
self.strLstTrainLabels = 'lstTrainLabels.pkl'
self.strLstTrainData = 'lstTrainData.pkl'
self.strNormalizer = 'normalizer.pkl'
self.strNormalizerIntegerAttr = 'normalizerIntegerAttr.pkl'
self.strLstTrainDataRaw = 'lstTrainDataRaw.pkl'
self.iNumCategories = len(self.lstStrCategories)
self.matIntegerTrainingData = []
[docs] def loadTrainingData(self, strProjPathFileNameCSV ='/data/csv/additional_data_sets_cleaned.csv', externalpath=None):
"""
trains the model with a given csv-file. the csv file must have 2 columns URL and CATEGORY.
the URL is given in the form 'https://github.com/owner/repository-name'
the CATEGORY is given by one of these options 'DEV', 'HW', 'EDU', 'DOCS', 'WEB', 'DATA', 'OTHER'
:param strProjPathFileNameCSV: file path relative to the project-path where the csv-file is stored
:return: self.lstTrainData (the scaled and normed data with which the model was trained with),
self.lstTrainLabels (the used training labels)
"""
trainData = None
if externalpath is None:
trainData = pd.read_csv(self.directory + strProjPathFileNameCSV, header=0, delimiter=",")
else:
trainData = pd.read_csv(strProjPathFileNameCSV, header=0, delimiter=",")
iNumTrainData = len(trainData.index)
print("iNumTrainData: ", iNumTrainData)
print('~~~~~~~~~~ EXTRACTING FEATURES ~~~~~~~~~~')
lstGithubRepo = []
for i in range(iNumTrainData):
# fill the list with GithubRepo-Objects
lstGithubRepo.append(GithubRepo.fromURL(trainData["URL"][i]))
# fill the train and the label-data
self.lstTrainData = []
self.lstTrainDataRaw = []
self.lstTrainLabels = []
print('~~~~~~~~~~ CALCULATE THE MEAN VALUES ~~~~~~~~~~')
self.lstMeanValues = [0] * 7
i = 0
for tmpRepo in lstGithubRepo:
# lstMeanValues += tmpGithubRepo.getIntegerFeatures()
self.lstMeanValues = list(map(add, self.lstMeanValues, tmpRepo.getIntegerFeatures()))
# find the according label as an intger for the current repository
# the label is defined in trainData
self.lstTrainLabels.append(self.lstStrCategories.index(trainData["CATEGORY"][i]))
i += 1
# Divide each element with the number of training data
self.lstMeanValues[:] = [x / iNumTrainData for x in self.lstMeanValues]
print('lstMeanValues: ', self.lstMeanValues)
print('~~~~~~~~~~ GET THE VOCABULARY ~~~~~~~~~~')
strVocabPath = self.directory + '/vocab/'
# Create vocab-directory if needed directory
if not os.path.exists(strVocabPath):
os.makedirs(strVocabPath)
strVocabPath += 'vocabList.dump'
self.lstVoc = initInputParameters(strVocabPath, lstGithubRepo)
print('lstVoc: ', self.lstVoc)
print('len(lstVoc): ', len(self.lstVoc))
lstInputFeatures = []
lstInputFeaturesRaw = []
for tmpRepo in lstGithubRepo:
lstIntegerAttributes = tmpRepo.getNormedFeatures(self.lstMeanValues)
lstInputFeaturesRaw = tmpRepo.getIntegerFeatures()
lstInputFeatures = lstIntegerAttributes
self.matIntegerTrainingData.append(tmpRepo.getNormedFeatures(self.lstMeanValues))
if self.bUseStringFeatures:
lstInputFeatures += tmpRepo.getWordOccurences(self.lstVoc)
lstInputFeaturesRaw += tmpRepo.getWordOccurences(self.lstVoc)
lstInputFeatures += tmpRepo.getRepoLanguageAsVector()
lstInputFeaturesRaw += tmpRepo.getRepoLanguageAsVector()
# test using unnormed features
self.lstTrainData.append(lstInputFeatures)
self.lstTrainDataRaw.append(lstInputFeaturesRaw)
print("lstTrainData:")
print(self.lstTrainData)
print("lstTrainLabels:")
print(self.lstTrainLabels)
print('self.matIntegerTrainingData')
print(self.matIntegerTrainingData)
print('~~~~~~~~~~ NORMALIZE ~~~~~~~~~~~~~')
self.normalizer = preprocessing.Normalizer()
self.normalizer.fit(self.lstTrainData)
self.normalizerIntegerAttr = preprocessing.Normalizer()
self.normalizerIntegerAttr.fit(self.matIntegerTrainingData)
self.lstTrainData = self.normalizer.transform(self.lstTrainData)
return self.lstTrainData, self.lstTrainLabels
[docs] def trainModel(self, lstTrainData, lstTrainLabels):
"""
trains the model called self.clf with the given trainData and trainLabels
:param lstTrainData: list
:param lstTrainLabels:
:return:
"""
print('~~~~~~~~~~ TRAIN THE MODEL ~~~~~~~~~~')
# train the nearest neighbour-model
# "the shrink_threshold" parameter has only negative impact on the prediction results
self.clf = NearestCentroid()
# test out other classifiers
# self.clf = KNeighborsClassifier()
# self.clf = SVC()
# self.clf = RadiusNeighborsClassifier(radius=100)
# self.clf = MLPClassifier()
# self.clf = GaussianProcessClassifier()
# self.clf = LogisticRegression()
# self.fit_transform()
self.clf.fit(lstTrainData, lstTrainLabels)
# this will break the machine
# self.plotTheResult(lstTrainData, lstTrainLabels)
self.bModelTrained = True
[docs] def plotTheResult(self,lstTrainData, lstTrainLabels):
"""
this is currently empty -> see the plots in the GUI instead
:param lstTrainData: matrix which was used for training
:param lstTrainLabels: labels which were used for training
:return:
"""
pass
[docs] def exportModelToFile(self):
"""
exports the trained model and the mean values of the input variables to './model/'
the export is done via joblib.dump() to .pkl-file
:return:
"""
if self.bModelTrained:
print('~~~~~~~~~~ SAVE MODEL TO FILE ~~~~~~~')
# http://scikit-learn.org/stable/modules/model_persistence.html
# http://stackoverflow.com/questions/10592605/save-classifier-to-disk-in-scikit-learn
# save the trained classifier to a file
joblib.dump(self.clf, self.strModelPath + self.strModelFileName)
joblib.dump(self.lstMeanValues, self.strModelPath + self.strLstMeanValuesFileName)
joblib.dump(self.matIntegerTrainingData, self.strModelPath + self.strMatIntegerTrainingData)
joblib.dump(self.lstTrainLabels, self.strModelPath + self.strLstTrainLabels)
joblib.dump(self.lstTrainData, self.strModelPath + self.strLstTrainData)
joblib.dump(self.normalizer, self.strModelPath + self.strNormalizer)
joblib.dump(self.normalizerIntegerAttr, self.strModelPath + self.strNormalizerIntegerAttr)
joblib.dump(self.lstTrainDataRaw, self.strModelPath + self.strLstTrainDataRaw)
[docs] def loadModelFromFile(self):
"""
loads / imports the model-object from './model/RepositoryClassifier.pkl'
and the list of the mean values from './model/lstMeanValues.pkl'
:return:
"""
print('~~~~~~~~~~ LOAD THE MODEL ~~~~~~~~~~~')
# load the classifier from the file
self.clf = joblib.load(self.strModelPath + self.strModelFileName)
self.lstMeanValues = joblib.load(self.strModelPath + self.strLstMeanValuesFileName)
# load the integer training data for later plotting
self.matIntegerTrainingData = joblib.load(self.strModelPath + self.strMatIntegerTrainingData)
self.lstTrainLabels = joblib.load(self.strModelPath + self.strLstTrainLabels)
self.lstTrainData = joblib.load(self.strModelPath + self.strLstTrainData)
self.normalizer = joblib.load(self.strModelPath + self.strNormalizer)
self.normalizerIntegerAttr = joblib.dump(self.normalizerIntegerAttr, self.strModelPath + self.strNormalizerIntegerAttr)
self.lstTrainDataRaw = joblib.load(self.strModelPath + self.strLstTrainDataRaw)
print('lstMeanValues: ', self.lstMeanValues)
print('~~~~~~~~~~ GET THE VOCABULARY ~~~~~~~~~~')
strVocabPath = self.directory + '/vocab/'
strVocabPath += 'vocabList.dump'
self.lstVoc = readVocabFromFile(strVocabPath)
# only print out the first 7 and the last 7 entries
# http://stackoverflow.com/questions/646644/how-to-get-last-items-of-a-list-in-python
print('len(self.lstVoc):', len(self.lstVoc))
if len(self.lstVoc) > 14:
print("[", end="")
print(*self.lstVoc[:7], sep=", ", end=" ")
print('...', end=" ")
print(*self.lstVoc[-7:], sep=", ", end="")
print("]")
self.bModelLoaded = True
return self.clf, self.lstMeanValues, self.matIntegerTrainingData, self.lstTrainLabels, self.lstTrainData, self.normalizer, self.normalizerIntegerAttr, self.lstTrainDataRaw
[docs] def predictResultsAndCompare(self, strProjPathFileNameCSV = '/data/csv/manual_classification_appendix_b.csv'): # '/data/csv/additional_data_sets_cleaned.csv'):
"""
loads a csv-file with of layout 'URL, CATEGORY, CATEGORY_ALTERNATIVE_1,CATEGORY_ALTERNATIVE_2'
the URL is given in the format 'https://github.com/owner/repository-name'
the CATEGORY, CATEGORY_ALTERNATIVE_1,CATEGORY_ALTERNATIVE_2 is given by one of these options 'DEV', 'HW', 'EDU',
'DOCS', 'WEB', 'DATA', 'OTHER'
After the predicition phase the result is compared with the given CATEGORY and CATEGORY_ALTERNATIVES
A verification matrix is created and the accuracy is calculated from 0.0 to 1.0
:param strProjPathFileNameCSV: path relative to the project-path where the csv file is stored
:return: the accuracy value (0.0 - 1.0)
"""
if not self.bModelLoaded and not self.bModelTrained:
print('the model hasn\'t been loaded or trained yet')
return
print('~~~~~~~~~~ CREATE VERITY COMP MATRIX ~~~~~~~~')
print('~~~~~~~~~~ PREDICT RESULTS ~~~~~~~~~~')
# classify the result
# read the unlabeled data set from a csv
dtUnlabeledData = pd.read_csv(self.directory + strProjPathFileNameCSV, header=0, delimiter=",") # , nrows=iNumOfPredictions)
# http://stackoverflow.com/questions/15943769/how-to-get-row-count-of-pandas-dataframe
iNumOfPredictions = len(dtUnlabeledData.index)
print('~~~~~~~~~~~ CREATE VERITY MATRIX ~~~~~~~~~~~~')
matPredictionTarget = np.zeros((iNumOfPredictions, self.iNumCategories))
# use a verity matrix to validate the result
matPredictionRes = np.copy(matPredictionTarget)
matPredictionResWithAlt = np.copy(matPredictionTarget)
for i in range(iNumOfPredictions):
# set the verity matrix
strTarget = dtUnlabeledData["CATEGORY"][i]
strTargetAlt1 = dtUnlabeledData["CATEGORY_ALTERNATIVE_1"][i]
strTargetAlt2 = dtUnlabeledData["CATEGORY_ALTERNATIVE_2"][i]
print('strTarget: ', strTarget)
if pd.notnull(strTargetAlt1):
print('strTargetAlt1:', strTargetAlt1)
matPredictionTarget[i, self.lstStrCategories.index(strTargetAlt1)] = 1
if pd.notnull(strTargetAlt2):
print('strTargetAlt2:', strTargetAlt2)
matPredictionTarget[i, self.lstStrCategories.index(strTargetAlt2)] = 1
iLabel, iLabelAlt, lstFinalPercentages, tmpRepo, lstNormedInputFeatures = self.predictCategoryFromURL(dtUnlabeledData["URL"][i])
matPredictionTarget[i, self.lstStrCategories.index(strTarget)] = 1
print()
matPredictionRes[i, iLabel] = 1
matPredictionResWithAlt[i, iLabel] = 1
matPredictionResWithAlt[i, iLabelAlt] = 1
matPredictionResultByCategory = [[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]]
for i in range(0, iNumOfPredictions):
for j in range(0, 7):
if matPredictionRes[i][j] == 1:
if matPredictionRes[i][j] == matPredictionTarget[i][j]:
matPredictionResultByCategory[j][0] += 1
matPredictionResultByCategory[j][1] += 1
print("i, j", i, j, matPredictionResultByCategory)
else:
matPredictionResultByCategory[j][0] += 1
print("i, j, not", i, j, matPredictionResultByCategory)
for i in range (0, len(matPredictionResultByCategory)):
matPredictionResultByCategory[i][2] = matPredictionResultByCategory[i][1] / matPredictionResultByCategory[i][0]
self.__printResult(tmpRepo, iLabel, iLabelAlt)
print('verity matrix for matPredictionTarget:\n ', matPredictionTarget)
print('verity matrix for matPredictionRes:\n ', matPredictionRes)
matCompRes = np.multiply(matPredictionTarget, matPredictionRes)
matCompResAlt = np.multiply(matPredictionTarget, matPredictionResWithAlt)
fPredictionRes = sum(matCompRes.flatten()) / iNumOfPredictions
fPredictionResWithAlt = sum(matCompResAlt.flatten()) / iNumOfPredictions
print('fPredictionRes:', fPredictionRes)
print('fPredictionResWithAlt:', fPredictionResWithAlt)
fAccuracy = fPredictionRes * 100
print('fAccuracy: ', fAccuracy, '%\n')
print('DEV: found:', matPredictionResultByCategory[0][0], ', correct:', matPredictionResultByCategory[0][1], ', reliability:', matPredictionResultByCategory[0][2] * 100, '%',
'\nHW: found:', matPredictionResultByCategory[1][0], ', correct:', matPredictionResultByCategory[1][1], ', reliability:', matPredictionResultByCategory[1][2] * 100, '%',
'\nEDU: found:', matPredictionResultByCategory[2][0], ', correct:', matPredictionResultByCategory[2][1], ', reliability:', matPredictionResultByCategory[2][2] * 100, '%',
'\nDOCS: found:', matPredictionResultByCategory[3][0], ', correct:', matPredictionResultByCategory[3][1], ', reliability:', matPredictionResultByCategory[3][2] * 100, '%',
'\nWEB: found:', matPredictionResultByCategory[4][0], ', correct:', matPredictionResultByCategory[4][1], ', reliability:', matPredictionResultByCategory[4][2] * 100, '%',
'\nDATA: found:', matPredictionResultByCategory[5][0], ', correct:', matPredictionResultByCategory[5][1], ', reliability:', matPredictionResultByCategory[5][2] * 100, '%',
'\nOTHER: found:', matPredictionResultByCategory[6][0], ', correct:', matPredictionResultByCategory[6][1], ', reliability:', matPredictionResultByCategory[6][2] * 100, '%')
return fPredictionRes
[docs] def predictCategoryFromOwnerRepoName(self, strUser, strRepoName):
"""
predicts the category for a repository which is given by the user and repo-name
:param strUser: owner of the repository
:param strRepoName: name of the repository
:return:
"""
tmpRepo = GithubRepo(strUser, strRepoName)
return self.predictCategoryFromGitHubRepoObj(tmpRepo)
[docs] def predictCategoryFromURL(self, strGitHubRepoURL):
"""
loads the features of a given repository by URL and the model predicts its category-label
:param strGitHubRepoURL: url to the repository
:return: label value form 0 - 6, lst of the precentages for the other categories
"""
try:
tmpRepo = GithubRepo.fromURL(strGitHubRepoURL)
except Exception as ex:
raise ex
return self.predictCategoryFromGitHubRepoObj(tmpRepo)
[docs] def predictCategoryFromGitHubRepoObj(self, tmpRepo):
"""
predicts the category for a GithubRepo-Object
:param tmpRepo: GithubRepo-Object
:return: iLabel, iLabelAlt, lstFinalPercentages, tmpRepo, lstNormedInputFeatures
"""
lstNormedInputFeatures = tmpRepo.getNormedFeatures(self.lstMeanValues)
if self.bUseStringFeatures:
lstNormedInputFeatures += tmpRepo.getWordOccurences(self.lstVoc)
lstNormedInputFeatures += tmpRepo.getRepoLanguageAsVector()
# apply pre-processing
lstNormedInputFeatures = np.array(lstNormedInputFeatures).reshape(1, len(lstNormedInputFeatures))
lstNormedInputFeatures = self.normalizer.transform(lstNormedInputFeatures)
# reshape Input Features -> otherwise a deprecation warning occurs
iLabel = int(self.clf.predict(lstNormedInputFeatures))
if self.bUseCentroids is True:
matCentroids = self.clf.centroids_
lstFinalPercentages = self.predictProbaNearestCentroids(matCentroids, lstNormedInputFeatures)
else:
lstFinalPercentages = self.clf.predict_proba(lstNormedInputFeatures)
iLabelAlt = self.getLabelAlternative(lstFinalPercentages)
self.__printResult(tmpRepo, iLabel, iLabelAlt, bPrintWordHits=False)
return iLabel, iLabelAlt, lstFinalPercentages, tmpRepo, lstNormedInputFeatures
[docs] def getLabelAlternative(self, lstFinalPercentages):
"""
gets the first alternative (the seoond result)
:param lstFinalPercentages: percentages lsit for the single categories
:return: integer label which describes the category
"""
# copy the percentages in an additional list
lstFinalPercentagesCopy = []
lstFinalPercentagesCopy = lstFinalPercentages[:]
# get the s
iMaxIndex = lstFinalPercentagesCopy.index(max(lstFinalPercentagesCopy))
lstFinalPercentagesCopy[iMaxIndex] = 0
iSecondMaxIndex = lstFinalPercentagesCopy.index(max(lstFinalPercentagesCopy))
return iSecondMaxIndex
[docs] def predictProbaNearestCentroids(self, matCentroids, lstInputFeatures):
"""
because predictProba was missing in the default functionality for nearest-centroid
the probability is now calculated via the distances to the different centroids
:param matCentroids: matrix of the centroids for each category
:param lstInputFeatures: full normed input feature list for which the prediction is based on
:return:
"""
lstFinalPercentages = []
fDistSum = 0
lstDistances = []
for i, centroid in enumerate(matCentroids):
fDist = np.linalg.norm([lstInputFeatures] - centroid)
lstDistances.append((i, fDist))
fDistSum += fDist
lstDistances.sort(key=lambda x: x[1])
lstPercentages = []
for i, fDist in enumerate(lstDistances):
lstPercentages.append(lstDistances[i][1] / fDistSum)
lstDistancesReordered = []
for i, fPercentage in enumerate(reversed(lstPercentages)):
lstDistancesReordered.append((lstDistances[i][0], fPercentage))
lstDistancesReordered.sort(key=lambda x: x[0])
for i, fPercentage in enumerate(lstDistancesReordered):
lstFinalPercentages.append(fPercentage[1])
print('{:15s} {:3f}'.format(self.lstStrCategories[i], fPercentage[1]))
return lstFinalPercentages
def __printResult(self, tmpRepo, iLabel, iLabelAlt, bPrintWordHits=False):
"""
prints the repository name and its category by using the iLabel
:param tmpRepo: given repository
:param iLabel: previously predicted label
:return:
"""
strStopper1 = "=" * 80
strStopper2 = "-" * 80
print(strStopper1)
if bPrintWordHits is True:
if self.bUseStringFeatures:
lstOccurence = tmpRepo.getWordOccurences(self.lstVoc)
tmpRepo.printFeatureOccurences(tmpRepo.getDicFoundWords())
print('Prediction for ' + tmpRepo.getName() + ', ' + tmpRepo.getUser() + ': ', end="")
print(self.lstStrCategories[iLabel])
if iLabelAlt is not None:
print('Alternative: ', self.lstStrCategories[iLabelAlt])
print(strStopper2)