features = sc.parallelize(data_group[idx]).map(lambda x: (x.host_ip+'^'+x.domain, 1)).reduceByKey(operator.add).map(get_domain_features)
def get_domain_features(x):
host_url = x[0].split('^')
host = host_url[0]
url = host_url[1]
ext = tldextract.extract(url)
if ext.domain == "":
domain = ext.suffix
else:
domain = ".".join(ext[1:])
main_tag = domain.split('.')[0]
num = [i for i in main_tag if i.isdigit()]
alp = [i for i in main_tag if i.isalpha()]
return (host, (url, domain, main_tag), x[1], len(url), url.count('.') + 1, domain.count('.') + 1, len(main_tag), __Weight(main_tag), __Weight(num), __Weight(alp), main_tag.count('-'))