top of page
  • Writer's pictureJatin Madaan

Simple Pyspark Code to read sequential file,run sql query and storing Text File in hdfs :


import time

import sys

import subprocess


## Getting start time of a job

start_time= time.time()


## importing spark and Hive Context to run queries

from pyspark import SparkContext

from pyspark.sql import HiveContext


## Giving Name for Job

sc=SparkContext(appName="Test Code")

sqlContext = HiveContext(sc)

BYTE = "org.apache.hadoop.io.BytesWritable"

Text = "org.apache.hadoop.io.Text"


## defining sequence file name (hdfs path)

current_files="[absolute path goes here ]"


## Input files with format

file=sc.sequenceFile("{0}".format(current_files),BYTE,Text).values()


## Taking only those columns which are required (index starting from 0)

file_col= file.map(lambda line: line.split(u'\uFFFD'))\

.map(lambda line: (line[1],line[4],line[11],line[64]))


## Reducing dataset by filtering data

file_filter=file_col.filter(lambda x: x[2] == "[filter value]")


## Converting RDD to DF

df=file_filter.toDF()


## Defining schema for df

df_schema=df.selectExpr("_1 as [column 1]" ,"_2 as [Column 2 ]","_3 as [Column 3]","_4 as [Column 4]")


## Registering temp table so that queries can run with defined logic

df_schema.registerTempTable("[table_name]")

query = sqlContext.sql("[Select Query ]")


## Storing data into HDFS so that it can be used -repartition will make 10 files and lambda function used to create 4 rows which will be ~ separated

fs_query.repartition(10).map(lambda row: str(row[0]) + "~" + str(row[1])+ "~" + str(row[2])+ "~" + str(row[3]) ).saveAsTextFile("[hdfs path ]/{0}".format(current_date))


## End time after rounding to 2 decimal places.

end_time = round(time.time() - start_time,2)

print (end_time)

end_time_str=str(end_time)


## send email

cmd="""echo "Python Task completed : {0} hour in {1} seconds" | mailx -S smtp=[server ip] -s "[Subject of email]" email_id"""

p=subprocess.Popen(cmd.format(current_date,end_time_str), shell=True, stdout=subprocess.PIPE)

output, errors = p.communicate()

print ("error is : {0} and ouput is {1}" .format( errors,output))




Spark Submit

spark-submit --queue [name of queue -if any] --conf spark.ui.enabled=false --num-executors 10 --executor-cores 5 --executor-memory 30g --driver-memory 10g [.py script name]


  • Here spark ui enabled=false will not try binding to any port before execution for logging.

  • Number of executors * number of cores +1 is total number of V cores (here 51 cores )which would be used by program .

  • Total memory would be executor memory * no. of executors ie (30g * 10 = 300 g) for entire program.

  • Driver Memory is gateway node RAM used .




56 views0 comments

コメント


bottom of page