Common Task: Join two dataframe in Pyspark

Think about joining two tables in SQL can be very easy by just:

Select * from A join B on A.id = B.id

Or use sqlContext.sql

sqlContext.sql(“SELECT A.*, B.* FROM A JOIN B ON A.id = B.id”)

How about joint dataframe directly in Pyspark:

from pyspark.sql.functions import col

df1.alias(‘a’).join(df2.alias(‘b’), col(‘b.id’) == col(‘a.id’))

.select([col(‘a.’ + xx) for xx in df1.columns] + [col(‘b.’ + yy) for yy in df2.columns])

The tricky part is in select all the columns after join. Use a list comprehension will do it.

How to select a particular row with a condition on pyspark?

Dataframes in Spark is distributed and that means you can’t access the data in a typical procedural way like pandas df.loc[].
We need do an analysis in order to get a specific row(rows). Here is an example:

import pyspark
from pyspark.sql import SQLContext
df = sqlContext.createDataFrame([(“a”,1), (“b”,2), (“c”,3)], [“letter”, “name”])
myIndex = 1
values = (df.rdd.zipWithIndex()
.filter(lambda ((l,v), i): i == myIndex)
.map(lambda ((l,v), i): (l,v))
.collect())
print(values[0])

 

Three ways of rename column with groupby, agg operation in pySpark

Group and aggregation operations are very common in any data manipulation and analysis, but pySpark change the column name to a format of aggFunc(colname). This usually not the column name you’d like to use. The most intuitive way would be something like this:

group_df = df.groupby(‘colname’).max(‘value_column’).alias(‘max_column’)

However, this won’t change anything, neither did it give your an error. The reason is that we’re aliasing the whole data frame instead a column.

Here are three ways to just alias the column you just created from groupby.agg operation:

import pyspark.sql.functions as F

group_df = df.groupBy(‘colname’).max(‘value_column’)\

.select(F.col(‘max(colname)’).alias(‘max_column))

Second method is sue agg instead of calling the ‘max’ method:

from pyspark.sql.functions import max

df.groupBy(‘colname’).agg(max(‘value_column’).alias(‘max_column’))

Another method is to use ‘withColumnRenamed’:

df .groupBy(‘colname’).max(‘value_column’)\

.withColumnRenamed(‘max(value_column)’,’max_column’)

 

Spark sc.textFile Client.java.handleConnectionTimeout error

Image that you have setup your hadoop environment with Spark and you start to read in the files on the HDFS but it gives you error at the first step: reading the textfile with sparkContext.

Look at this example:

Check hadoop files on terminal within Hadoop:

hadoop fs -ls /input/wordCount.txt

hadoop fs -cat /input/wordCount.txt

The above command will print the whole txt file on the console. Now let’s start spark shell and do a simple file read.

val textFile = sc.textFile(“hdfs://input/wordCount.txt”)

textFile.count

Then you get an error with a lot message containing: Client.java.handleConnectionTimeout(814)).

The problem is very common and it is easy to fix. The error occurs when you didn’t specify the correct url. There is a distinguish between the URL when you use one ‘/’ vs two ‘//’.

  • hdfs:// – protocol type
  • localhost – ip address (e.g. 10.20.10.0)
  • 9000- port number  (9000 is just an example)
  • /input/wordCount.txt – complete path to the file you will load

There are two ways to specify the URL:

  1. hdfs://localhost:9000/input/wordCount.txt  (localhost:9000 from hadoop core-site.xml config file’s fs.defaultFS parameter value.
  2. hdfs:/input/wordCount.txt

Convert columnar URL text file to html bookmark file for import

When you have a list of URL stored in columnar and want to import to your browser like Chrome, it is painful to do it one by one. So I created a script in R to automate this process by converting a csv or any separator separated file to an HTML file for bookmark import.

Here is the code:

</pre>
library(gdata)

add_link &lt;- function(name, urllink){
current_time &lt;- as.integer(as.POSIXct(Sys.time()))
modified_time &lt;- as.integer(as.POSIXct(Sys.time()))
tag_list &lt;- c(&quot;<DT><A>", trimws(as.character(name)), "</A>")
A_tag &lt;- paste(tag_list, sep=&#039;&#039;, collapse = &#039;&#039;)
return(A_tag)
}

add_bookmark_folder &lt;- function(foldername){
tag_list &lt;- c(&quot;<DT><H3>",
foldername,"</H3>")
return(paste(tag_list, sep='', collapse=''))
}

## change the following two lines for your use.

bookmark_textfile = "bookmark.txt"  # input file

fileOutput &lt;- &quot;bookmart_import.html&quot; # output file
bookmark_text = read.table(bookmark_textfile, sep = &#039;|&#039;)
colnames(bookmark_text) &lt;- c(&quot;name&quot;,&quot;url&quot;)

header_string &lt;- paste(&#039;
<!-- This is an automatically generated file.
It will be read and overwritten.
DO NOT EDIT! -->

Bookmarks
<H1>Bookmarks</H1>
<DL>', add_bookmark_folder('DSAT'),
'<DL>', sep='\n', collapse='')

end_string &lt;- &#039;
</DL>
</DL>
'

# write strings to html file

write(header_string, fileOutput)
# DSat
for (i in 1:(dim(bookmark_text)[1] - 1)){
name &lt;- bookmark_text$name[i]
urllink &lt;- bookmark_text$url[i]
#print(add_link(name,urllink))
write(add_link(name, urllink), fileOutput, append=TRUE)
}
write(&#039; </DL>', fileOutput, append=TRUE)
# dev folder
dev_folder &lt;- paste(c(add_bookmark_folder(&#039;DEV&#039;), &#039;<DL>'), sep = '\n', collapse = '\n')
write(dev_folder, fileOutput, append=TRUE)
devIndex &lt;- dim(bookmark_text)[1] - 1
for (i in devIndex:dim(bookmark_text)[1]){
name &lt;- bookmark_text$name[i]
urllink &lt;- bookmark_text$url[i]
print(add_link(name,urllink))
write(add_link(name,urllink), fileOutput, append=TRUE)
}

write(end_string, fileOutput, append=TRUE)

print(&quot;bookmark import file successfully completed. &quot;)

&nbsp;
<pre>