diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/pom.xml trunk/pom.xml
--- 3rdparty-original/piggybank/java/pom.xml	1970-01-01 01:00:00.000000000 +0100
+++ trunk/pom.xml	2013-06-11 01:02:24.676859717 +0200
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<parent>
+		<groupId>eu.dnetlib</groupId>
+		<artifactId>dnet-parent</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>icm-iis-3rdparty-pig-avrostorage</artifactId>
+	<packaging>jar</packaging>
+	<!-- <version>0.0.1-SNAPSHOT</version> -->
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<dependencies>
+        <dependency>
+			<groupId>org.apache.pig</groupId>
+			<artifactId>pig</artifactId>
+			<version>0.10.0-cdh4.1.2</version>
+		</dependency>
+        <dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>1.7.4</version>
+		</dependency>
+        <dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<version>2.0.0-cdh4.1.2</version>
+		</dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>2.0.0-mr1-cdh4.1.2</version>
+            <type>jar</type>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>1.1</version>
+            <type>jar</type>
+        </dependency>
+        <dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.10</version>
+			<scope>test</scope>
+		</dependency>
+        <dependency>
+			<groupId>org.antlr</groupId>
+			<artifactId>antlr-runtime</artifactId>
+			<version>3.4</version>
+		</dependency>
+        <dependency>
+            <groupId>eu.dnetlib</groupId>
+            <artifactId>icm-iis-core</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+            <type>jar</type>
+        </dependency>
+	</dependencies>
+	<build>
+        <plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.4.3</version>
+                <configuration>
+                  <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                    <includes>
+                        <include>**/*Test.java</include>
+                        <include>**/avro/Test*.java</include>
+                    </includes>
+                    <excludes>
+                        <exclude>**/AllTests.java</exclude>
+                        <exclude>**/Abstract*Test.java</exclude>
+                        <exclude>**/*$*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+		</plugins>
+	</build>
+	<repositories>
+        <repository>
+			<id>cloudera</id>
+			<name>Cloudera Repository</name>
+			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+			<releases>
+				<enabled>true</enabled>
+			</releases>
+			<snapshots>
+				<enabled>false</enabled>
+			</snapshots>
+		</repository>
+	    <!-- This repository contains our patched 
+	    version of "avro" and "avro-mapred" modules (see the dependencies section)
+	    This entry might be removed when the patch to these modules becomes 
+	    a part of the official Avro release.-->
+	    <repository>
+			<id>dnet-deps</id>
+			<name>dnet dependencies</name>
+			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet-deps</url>
+			<releases>
+				<enabled>true</enabled>
+			</releases>
+			<snapshots>
+				<enabled>false</enabled>
+			</snapshots>
+			<layout>default</layout>
+		</repository>
+	</repositories>
+</project>
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java	2013-06-11 01:48:03.516933061 +0200
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java	2013-06-11 00:11:55.000000000 +0200
@@ -102,6 +102,7 @@ public class AvroSchema2Pig {
 
                 tupleSchema.setFields(childFields);
                 fieldSchema.setSchema(tupleSchema);
+                visitedRecords.remove(in);
             }
 
         } else if (avroType.equals(Schema.Type.ARRAY)) {
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java	2013-06-11 01:48:03.516933061 +0200
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java	2013-06-11 00:12:54.000000000 +0200
@@ -17,6 +17,7 @@
 
 package org.apache.pig.piggybank.storage.avro;
 
+import eu.dnetlib.iis.core.common.AvroUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -32,6 +33,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -105,6 +108,9 @@ public class AvroStorage extends FileInp
     private boolean checkSchema = true; /*whether check schema of input directories*/
     private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
 
+    private String inputSchemaFile;
+    private String outputSchemaFile;
+    
     /**
      * Empty constructor. Output schema is derived from pig schema.
      */
@@ -122,7 +128,7 @@ public class AvroStorage extends FileInp
      * @throws IOException
      * @throws ParseException
      */
-    public AvroStorage(String[] parts) throws IOException, ParseException {
+    public AvroStorage(String[] parts) throws IOException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException {
         outputAvroSchema = null;
         nullable = true;
         checkSchema = true;
@@ -147,13 +153,18 @@ public class AvroStorage extends FileInp
      */
     @Override
     public void setLocation(String location, Job job) throws IOException {
-        if (inputAvroSchema != null) {
-            return;
-        }
         Set<Path> paths = new HashSet<Path>();
         Configuration conf = job.getConfiguration();
         if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
-            setInputAvroSchema(paths, conf);
+             if (inputAvroSchema == null) {
+                 if (inputSchemaFile != null) {
+                    FileSystem fs = FileSystem.get(job.getConfiguration());
+                    Path path = fs.makeQualified(new Path(inputSchemaFile));
+                    inputAvroSchema = getSchemaFromFile(path, fs);
+                } else {
+                    setInputAvroSchema(paths, conf);
+                }
+            }
             FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
         } else {
             throw new IOException("Input path \'" + location + "\' is not found");
@@ -416,7 +427,6 @@ public class AvroStorage extends FileInp
      */
     @SuppressWarnings("unchecked")
     protected Map<String, Object> parseJsonString(String jsonString) throws ParseException {
-
         /*parse the json object */
         JSONParser parser = new JSONParser();
         JSONObject obj = (JSONObject) parser.parse(jsonString);
@@ -450,7 +460,6 @@ public class AvroStorage extends FileInp
      * @throws ParseException
      */
     protected Map<String, Object> parseStringList(String[] parts) throws IOException {
-
         Map<String, Object> map = new HashMap<String, Object>();
 
         for (int i = 0; i < parts.length; ) {
@@ -477,6 +486,10 @@ public class AvroStorage extends FileInp
                              || name.equalsIgnoreCase("same")
                              || name.equalsIgnoreCase("schema")
                              || name.equalsIgnoreCase("schema_file")
+                             || name.equalsIgnoreCase("input_schema_file")
+                             || name.equalsIgnoreCase("output_schema_file")
+                             || name.equalsIgnoreCase("input_schema_class")
+                             || name.equalsIgnoreCase("output_schema_class")
                              || name.matches("field\\d+")) {
                     /* store value as string */
                     map.put(name, value);
@@ -496,8 +509,7 @@ public class AvroStorage extends FileInp
     /**
      * Initialize output avro schema using input property map
      */
-    protected void init(Map<String, Object> inputs) throws IOException {
-
+    protected void init(Map<String, Object> inputs) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
         /*used to store field schemas */
         List<Field> fields = null;
 
@@ -517,11 +529,18 @@ public class AvroStorage extends FileInp
         }
         else if (inputs.containsKey("schema_file")) {
             Path path = new Path((String) inputs.get("schema_file"));
+            
             AvroStorageLog.details("schemaFile path=" + path.toUri().toString());
             FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
             Schema schema = getSchemaFromFile(path, fs);
             schemaManager = new AvroSchemaManager(schema);
         }
+        else if (inputs.containsKey("input_schema_file")) {
+            inputSchemaFile = (String) inputs.get("input_schema_file");
+        }
+        else if (inputs.containsKey("output_schema_file")) {
+            outputSchemaFile = (String) inputs.get("output_schema_file");
+        }
 
         /* iterate input property map */
         for (Entry<String, Object> entry : inputs.entrySet()) {
@@ -541,6 +560,10 @@ public class AvroStorage extends FileInp
                 nullable = (Boolean) value;
             } else if (name.equalsIgnoreCase("schema")) {
                 outputAvroSchema = Schema.parse((String) value);
+            } else if (name.equalsIgnoreCase("input_schema_class")) {
+                inputAvroSchema = AvroUtils.toSchema((String) value);
+            } else if (name.equalsIgnoreCase("output_schema_class")) {
+                outputAvroSchema = AvroUtils.toSchema((String) value);
             } else if (name.matches("field\\d+")) {
                 /*set schema of dth field */
                 if (fields == null)
@@ -579,6 +602,8 @@ public class AvroStorage extends FileInp
                 fields.add(field);
             } else if (!name.equalsIgnoreCase("data")
                                   && !name.equalsIgnoreCase("schema_file")
+                                  && !name.equalsIgnoreCase("input_schema_file")
+                                  && !name.equalsIgnoreCase("output_schema_file")
                                   && !name.equalsIgnoreCase("debug")) {
                 throw new IOException("Invalid parameter:" + name);
             }
@@ -599,7 +624,6 @@ public class AvroStorage extends FileInp
                 nullable = true;
             }
         }
-
     }
 
     @Override
@@ -609,6 +633,31 @@ public class AvroStorage extends FileInp
 
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
+        if (outputSchemaFile != null) {
+            FileSystem fs = FileSystem.get(job.getConfiguration());
+            Path path = fs.makeQualified(new Path(outputSchemaFile));
+
+            outputAvroSchema = getSchemaFromFile(path, fs);
+
+            UDFContext context = UDFContext.getUDFContext();
+            Properties property = context.getUDFProperties(ResourceSchema.class);
+        
+            String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+            String key = getSchemaKey();
+            Map<String, String> schemaMap = (prevSchemaStr != null)
+                                                                ? parseSchemaMap(prevSchemaStr)
+                                                                : new HashMap<String, String>();
+            schemaMap.put(key, outputAvroSchema.toString());
+            
+            List<String> schemas = new ArrayList<String>();
+            for (Map.Entry<String, String> entry : schemaMap.entrySet()) {
+                schemas.add(entry.getKey() + AvroStorage.SCHEMA_KEYVALUE_DELIM + entry.getValue());
+            }
+            
+            String newSchemaStr = StringUtils.join(schemas, AvroStorage.SCHEMA_DELIM);
+            property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
+        }
+                
         AvroStorageLog.details("output location=" + location);
         FileOutputFormat.setOutputPath(job, new Path(location));
     }
@@ -621,6 +670,7 @@ public class AvroStorage extends FileInp
         AvroStorageLog.funcCall("Check schema");
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(ResourceSchema.class);
+
         String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
         AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
 
@@ -683,6 +733,7 @@ public class AvroStorage extends FileInp
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(ResourceSchema.class);
         String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+      
         Map<String, String> map = (allSchemaStr != null)  ? parseSchemaMap(allSchemaStr) : null;
 
         String key = getSchemaKey();
@@ -711,7 +762,7 @@ public class AvroStorage extends FileInp
         StoreFunc.cleanupOnFailureImpl(location, job);
     }
 
-    @Override
+    //@Override
     public void cleanupOnSuccess(String location, Job job) throws IOException {
         // Nothing to do
     }
@@ -719,7 +770,7 @@ public class AvroStorage extends FileInp
     @Override
     public void putNext(Tuple t) throws IOException {
         try {
-            this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
+            this.writer.write(NullWritable.get(), t);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro	1970-01-01 01:00:00.000000000 +0100
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro	2013-06-11 01:28:15.576901251 +0200
@@ -0,0 +1 @@
+Objavro.schemaÚ{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]} »w„(d`çä×ô§¢0öüŠ¸¼ÊÐÒ»w„(d`çä×ô§¢
\ Brak znaku nowej linii na koÅ„cu pliku
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro	1970-01-01 01:00:00.000000000 +0100
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro	2013-06-11 01:16:41.648882666 +0200
@@ -0,0 +1 @@
+Objavro.schemaÚ{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]} »w„(d`çä×ô§¢0öüŠ¸¼ÊÐÒ»w„(d`çä×ô§¢
\ Brak znaku nowej linii na koÅ„cu pliku
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc	1970-01-01 01:00:00.000000000 +0100
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc	2013-06-11 01:16:56.912883076 +0200
@@ -0,0 +1,8 @@
+{
+    "type": "record",
+    "name": "simple",
+    "fields": [ 
+        {"name": "member_id", "type": "int"},
+        {"name": "count", "type": "long"}
+        ]
+}
\ Brak znaku nowej linii na koÅ„cu pliku
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java	2013-06-11 01:48:03.512933062 +0200
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java	2013-06-11 01:23:38.920893841 +0200
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.piggybank.storage.avro.AvroStorage;
 import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
-import org.apache.pig.test.Util;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -81,7 +80,9 @@ public class TestAvroStorage {
     final private String testNoMatchedFiles = getInputFile("test_dir{1,2}/file_that_does_not_exist*.avro");
     final private String testArrayFile = getInputFile("test_array.avro");
     final private String testRecordFile = getInputFile("test_record.avro");
+    final private String testSimpleFile = getInputFile("test_simple.avro");
     final private String testRecordSchema = getInputFile("test_record.avsc");
+    final private String testSimpleRecordSchema = getInputFile("test_simple_record.avsc");
     final private String testGenericUnionFile = getInputFile("test_generic_union.avro");
     final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro");
     final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro");
@@ -973,6 +974,76 @@ public class TestAvroStorage {
         verifyResults(output, expected);
     }
 
+    @Test
+    public void testOutputSchemaFile() throws IOException {
+        String output1= outbasedir + "testOutputSchemaFile";
+        String expected1 = basedir + "expected_testRecordSplit3.avro";
+        deleteDirectory(new File(output1));
+        String [] queries = {
+           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " groups = GROUP avro BY member_id;",
+           " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
+           " STORE sc INTO '" + output1 +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
+            };
+        testAvroStorage( queries);
+        verifyResults(output1, expected1);
+    }
+    
+    @Test
+    public void testInputSchemaFile() throws IOException {
+        String output1= outbasedir + "testInputSchemaFile";
+        String expected1 = basedir + "expected_testRecordSplit3.avro";
+        deleteDirectory(new File(output1));
+        String [] queries = {
+           " avro = LOAD '" + testSimpleFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('input_schema_file', '" + testSimpleRecordSchema + "');",
+           " STORE avro INTO '" + output1 +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
+            };
+        testAvroStorage( queries);
+        verifyResults(output1, expected1);
+    }
+  
+    @Test
+    public void testInputSchemaClass() throws IOException {
+        String output= outbasedir + "testInputSchemaClass";
+        String test = getInputFile("expected_testRecursiveRecordReference1.avro");
+        String expected = basedir + "expected_testRecursiveRecordReference1.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + test +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
+                " 'input_schema_class', 'org.apache.avro.Schema.Type.INT' );",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'schema', '\"int\"' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testOutputSchemaClass() throws IOException {
+        String output= outbasedir + "testOutputSchemaClass";
+        String test = getInputFile("expected_testRecursiveRecordReference1.avro");
+        String expected = basedir + "expected_testRecursiveRecordReference1.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + test +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
+                " 'schema', '\"int\"' );",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'output_schema_class', 'org.apache.avro.Schema.Type.INT' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+    
     private static void deleteDirectory (File path) {
         if ( path.exists()) {
             File [] files = path.listFiles();
@@ -1014,8 +1085,8 @@ public class TestAvroStorage {
 
     private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
         // Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
-        if (Util.isHadoop23())
-            return;
+//        if (Util.isHadoop23())
+//            return;
 
         FileSystem fs = FileSystem.getLocal(new Configuration()) ;
 
