import time
import sys
import subprocess
## Getting start time of a job
start_time= time.time()
## importing spark and Hive Context to run queries
from pyspark import SparkContext
from pyspark.sql import HiveContext
## Giving Name for Job
sc=SparkContext(appName="Test Code")
sqlContext = HiveContext(sc)
BYTE = "org.apache.hadoop.io.BytesWritable"
Text = "org.apache.hadoop.io.Text"
## defining sequence file name (hdfs path)
current_files="[absolute path goes here ]"
## Input files with format
file=sc.sequenceFile("{0}".format(current_files),BYTE,Text).values()
## Taking only those columns which are required (index starting from 0)
file_col= file.map(lambda line: line.split(u'\uFFFD'))\
.map(lambda line: (line[1],line[4],line[11],line[64]))
## Reducing dataset by filtering data
file_filter=file_col.filter(lambda x: x[2] == "[filter value]")
## Converting RDD to DF
df=file_filter.toDF()
## Defining schema for df
df_schema=df.selectExpr("_1 as [column 1]" ,"_2 as [Column 2 ]","_3 as [Column 3]","_4 as [Column 4]")
## Registering temp table so that queries can run with defined logic
df_schema.registerTempTable("[table_name]")
query = sqlContext.sql("[Select Query ]")
## Storing data into HDFS so that it can be used -repartition will make 10 files and lambda function used to create 4 rows which will be ~ separated
fs_query.repartition(10).map(lambda row: str(row[0]) + "~" + str(row[1])+ "~" + str(row[2])+ "~" + str(row[3]) ).saveAsTextFile("[hdfs path ]/{0}".format(current_date))
## End time after rounding to 2 decimal places.
end_time = round(time.time() - start_time,2)
print (end_time)
end_time_str=str(end_time)
## send email
cmd="""echo "Python Task completed : {0} hour in {1} seconds" | mailx -S smtp=[server ip] -s "[Subject of email]" email_id"""
p=subprocess.Popen(cmd.format(current_date,end_time_str), shell=True, stdout=subprocess.PIPE)
output, errors = p.communicate()
print ("error is : {0} and ouput is {1}" .format( errors,output))
Spark Submit
spark-submit --queue [name of queue -if any] --conf spark.ui.enabled=false --num-executors 10 --executor-cores 5 --executor-memory 30g --driver-memory 10g [.py script name]
Here spark ui enabled=false will not try binding to any port before execution for logging.
Number of executors * number of cores +1 is total number of V cores (here 51 cores )which would be used by program .
Total memory would be executor memory * no. of executors ie (30g * 10 = 300 g) for entire program.
Driver Memory is gateway node RAM used .
Comments