A efficient way to preprocess large text file in Linux file system

Linux commands like sed, tr,  works efficiently on text processing, here is a common way to strip text in a large text file.

cat somefile.txt | sed -e “s/\([.\!?,’/()]\)/ \1 /g” | tr “[:upper:]” “[:lower:]” > reprocessed.txt

Advertisements

The pitfall of eval function and its safe alternative in Python

As some of you have used eval and exec in python as a handy quick-and-dirty way to get the dynamic source code, then munge it a little, then execute it. The functionality of ‘eval’ is its ability to evaluate a string as though it were an expression and returns a result. In Python, it can also be a structured representation of code such as abstract syntax tree (like Lisp forms). Its key usages are:

  • Evaluating a mathematical expression
  • Compiler bootstrapping
  • Scripting (dynamic code)
  • Language tutors

For example:


x = 1

eval('x + 1') # returns 2

eval ('x') # returns 1

# The most general form for evaluating statements is using code objects
x = 1
y = 2
eval(compile("print ('x+y=',x+y)", "compile-sample.py", "single"))
# returns
x + y = 3
 

However, there is a big safety flaw while using ‘eval’ command which evaluates the code in the expression without considering whether it’s safe or not. See a good article about ‘The danger of eval”. A good example can be:


eval(input("__import__('os').system('rm -rf /root/important_data')"))

The cure for this is to use $ast.literal_eval$ which raise an exception if the input isn’t a valid Python datatype, so the code won’t be executed if it is not safe.

The rule of thumb is to use ast.literal_eval whenever you need eval. But there are some notable differences: ast.literal_eval won’t work for bitwise operators. For example:


ast.literal_eval("1 & 1") # raise an error

eval("1 & 1") # will return 1

ast.literal_eval() only considers a small subset of Python’s syntax to be valid:

The string or node provided may only consist of the following Python literal structures: strings, numbers, tuples, lists, dicts, booleans, and None.

Secret ingredient for tuning Random Forest Classifier and XGBoost Tree

Tuning a machine learning model can be time consuming and may still not get to where you want. Here are two highly-used settings for Random Forest Classifier and XGBoost Tree in Kaggle competitions. Can be a good start.

Random Forest:


RandomForestClassifier(bootstrap=True, class_weight='balanced',

criterion='gini', max_depth=9, max_features=0.6,

max_leaf_nodes=None, min_impurity_decrease=0.0,

min_impurity_split=None, min_samples_leaf=1,

min_samples_split=2, min_weight_fraction_leaf=0.0,

n_estimators=500, n_jobs=-1, oob_score=False, random_state=0,

verbose=0, warm_start=False)

XGBoost Tree:

params = {'objective':'binary:logistic',
          'eta': 0.3,  # analogous to learning rate in GBM
          'tree_method': "hist",
          'grow_policy': "lossguide",
          'max_leaves': 256,  # =2^n depth
          'max_depth': 5, # typical 3-10 , 5 chosen based on testing
          'subsample': 0.8, # lower value prevents overfitting
          'colsample_bytree': 0.7, 
          'colsample_bylevel':0.7,
          'min_child_weight':0,  # different than 'min_child_leaf' in GBM
          'alpha':4,
          'lambda':10,  # add L2 regullarization for prevent overfitting
          'scale_pos_weight': 27,
          'objective': 'binary:logistic', 
          'eval_metric': 'auc', 
          'nthread':8,
          'random_state': 99, 
          'silent': True}

Common Task: Join two dataframe in Pyspark

Think about joining two tables in SQL can be very easy by just:

Select * from A join B on A.id = B.id

Or use sqlContext.sql

sqlContext.sql(“SELECT A.*, B.* FROM A JOIN B ON A.id = B.id”)

How about joint dataframe directly in Pyspark:

from pyspark.sql.functions import col

df1.alias(‘a’).join(df2.alias(‘b’), col(‘b.id’) == col(‘a.id’))

.select([col(‘a.’ + xx) for xx in df1.columns] + [col(‘b.’ + yy) for yy in df2.columns])

The tricky part is in select all the columns after join. Use a list comprehension will do it.

How to select a particular row with a condition on pyspark?

Dataframes in Spark is distributed and that means you can’t access the data in a typical procedural way like pandas df.loc[].
We need do an analysis in order to get a specific row(rows). Here is an example:

import pyspark
from pyspark.sql import SQLContext
df = sqlContext.createDataFrame([(“a”,1), (“b”,2), (“c”,3)], [“letter”, “name”])
myIndex = 1
values = (df.rdd.zipWithIndex()
.filter(lambda ((l,v), i): i == myIndex)
.map(lambda ((l,v), i): (l,v))
.collect())
print(values[0])

 

Three ways of rename column with groupby, agg operation in pySpark

Group and aggregation operations are very common in any data manipulation and analysis, but pySpark change the column name to a format of aggFunc(colname). This usually not the column name you’d like to use. The most intuitive way would be something like this:

group_df = df.groupby(‘colname’).max(‘value_column’).alias(‘max_column’)

However, this won’t change anything, neither did it give your an error. The reason is that we’re aliasing the whole data frame instead a column.

Here are three ways to just alias the column you just created from groupby.agg operation:

import pyspark.sql.functions as F

group_df = df.groupBy(‘colname’).max(‘value_column’)\

.select(F.col(‘max(colname)’).alias(‘max_column))

Second method is sue agg instead of calling the ‘max’ method:

from pyspark.sql.functions import max

df.groupBy(‘colname’).agg(max(‘value_column’).alias(‘max_column’))

Another method is to use ‘withColumnRenamed’:

df .groupBy(‘colname’).max(‘value_column’)\

.withColumnRenamed(‘max(value_column)’,’max_column’)

 

Spark sc.textFile Client.java.handleConnectionTimeout error

Image that you have setup your hadoop environment with Spark and you start to read in the files on the HDFS but it gives you error at the first step: reading the textfile with sparkContext.

Look at this example:

Check hadoop files on terminal within Hadoop:

hadoop fs -ls /input/wordCount.txt

hadoop fs -cat /input/wordCount.txt

The above command will print the whole txt file on the console. Now let’s start spark shell and do a simple file read.

val textFile = sc.textFile(“hdfs://input/wordCount.txt”)

textFile.count

Then you get an error with a lot message containing: Client.java.handleConnectionTimeout(814)).

The problem is very common and it is easy to fix. The error occurs when you didn’t specify the correct url. There is a distinguish between the URL when you use one ‘/’ vs two ‘//’.

  • hdfs:// – protocol type
  • localhost – ip address (e.g. 10.20.10.0)
  • 9000- port number  (9000 is just an example)
  • /input/wordCount.txt – complete path to the file you will load

There are two ways to specify the URL:

  1. hdfs://localhost:9000/input/wordCount.txt  (localhost:9000 from hadoop core-site.xml config file’s fs.defaultFS parameter value.
  2. hdfs:/input/wordCount.txt