Traversing and copying files from complex directory structures on Azure using Microsoft Fabric Data Pipeline.

In an earlier article, I dwelled through the details of copying over the contents of one directory on Azure to the other using Fabric Data Factory pipeline. A major issue with that approach was that, it maintained the source directory structure while copying over the contents to the destination.

In some use cases that might not be desirable. You may want to put the contents of the source into a flat directory structure at the destination.

For example for a directory structure like this

you might prefer not to maintain such a complex structure at the destination and want to place all the files in one directory at the destination.

In this article, I will demonstrate how this can be accomplished using a combination of Azure functions and Fabric Data Factory pipeline.

Just incase if you aren't aware of Azure Functions , you may want to get an idea about them before you read ahead. You can refer to Azure functions here.

The Setup

On Azure , synapasefilesystem is the source container, temporarycontainer is the destination container and the directory at the destination is called as Destination where all the files from source would be copied over.

To get started, create a new Azure Function application in Visual Studio. Ensure that the Authorization level is set to Function and the following dependencies are added to the project.

Azure Function

using Microsoft.Azure.WebJobs;
using Newtonsoft.Json;
using Microsoft.WindowsAzure.Storage.Blob;

In local.settings.json file add a key calledAzureWebJobsStorage and set its value to the connectionstring of the Azure storage account

Next , create a new class called MyBlobClass with the following properties

 public class MyBlobClass
 {
     public string FileName { get; set; }
     public DateTime FileModifiedDate { get; set; }
 }

Add the following function to the Main class

  [FunctionName("GetFiles")]
  public static async Task<System.Net.Http.HttpResponseMessage> Run([HttpTrigger(Microsoft.Azure.WebJobs.Extensions.Http.AuthorizationLevel.Function, "get", "post", Route = null)] System.Net.Http.HttpRequestMessage req, Microsoft.Azure.WebJobs.Host.TraceWriter log, ExecutionContext context)
  {

      var blobconnection = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
      CloudBlobClient _blobClient = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(blobconnection).CreateCloudBlobClient();
      var container = _blobClient.GetContainerReference("synapasefilesystem");
      BlobContinuationToken continuationToken = null;
      var response = container.ListBlobsSegmented(string.Empty, true, BlobListingDetails.None, new int?(), continuationToken, null, null);
      var blobNames = new List<string>();
      List<MyBlobClass> _items = new List<MyBlobClass>();

      foreach (var blob in response.Results)
      {
          MyBlobClass MyBlobs = new MyBlobClass();
          MyBlobs.FileName = (((CloudBlob)blob).Name);
          MyBlobs.FileModifiedDate = (((CloudBlob)blob).Properties.LastModified.Value.UtcDateTime);
          _items.Add(MyBlobs);
      }

      string json_all = JsonConvert.SerializeObject(_items);
      var responseMessage = new System.Net.Http.HttpResponseMessage(HttpStatusCode.OK)
      {
          Content = new System.Net.Http.StringContent(json_all, System.Text.Encoding.UTF8, "application/json")
      };

      return responseMessage;
  }

The Main class of the Azure function has an asynchronous function called GetFiles and Run is the name of the method that gets executed when the function is triggered which returns a Task of type HttpResponse. The function responds to both GET and POST HTTP methods. We wont require POST in this case. So its fine if it is not referenced in the function.

Breakdown of the function code

 var blobconnection = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
 CloudBlobClient _blobClient = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(blobconnection).CreateCloudBlobClient();
 var container = _blobClient.GetContainerReference("synapasefilesystem");

The above code snippet accesses a storage service to list the files in the source container called synapasefilesystem.

Next, in the following code a list item of type class MyBlobClass is defined.

  var response = container.ListBlobsSegmented(string.Empty, true, BlobListingDetails.None, new int?(), continuationToken, null, null);
  var blobNames = new List<string>();
  List<MyBlobClass> _items = new List<MyBlobClass>();

  foreach (var blob in response.Results)
  {
      MyBlobClass MyBlobs = new MyBlobClass();
      MyBlobs.FileName = (((CloudBlob)blob).Name);
      MyBlobs.FileModifiedDate = (((CloudBlob)blob).Properties.LastModified.Value.UtcDateTime);
      _items.Add(MyBlobs);

  }

The Response object gets the metadata of the container, subject to successful authentication and the details like Name and LastModified is added as listitems to a list object named _items which is of type class MyBlobClass that was declared earlier having properties FileName and FileModifiedDate . The function then returns a list of objects from the source with details like Name and ModifiedDate through the HTTP response.

Once done, deploy the Azure function to the Azure portal. Fetch the App keys and the function URI as they would be required to be referenced in the Fabric Data Pipeline.

The Response would look like this

Fabric Data Pipeline

Create a new Data pipeline in Fabric and add a new Azure function activity to the pipeline. Create a connection to the deployed Azure function.

Set the relevant properties in the Settings section of the Azure Function activity.

The Azure function returns the entire structure of the source Azure directory in the Response object ,so filtering out the folders/sub folders from the Response will be required.

To do that , add a new Filter activity and under the Settings section set the Items property to

@json(activity('Loop The Container').output.Response)

and the condition property to

@contains(item().FileName, '.')

The logic behind the above , is to filter out items from the Response object that have a "."(dot) indicating that they are files and only these items from the Response object should be copied over to the destination while discarding the rest.

Next, add a For Each activity and set its Items property to

@activity('Filter Only Files').output.value

Then under the For Each activity add a Copy Data activity and for the File Path section set the source container name and for the directory property set the following expression.

@if(greater(lastIndexOf(item().FileName, '/'), 0), substring(item().FileName, 0, lastIndexOf(item().FileName, '/')),'')

The above expression fetches only file path from the Response object excluding the filename

For setting the filename in the source property use the following expression

@substring(item().FileName, add(lastIndexOf(item().FileName,'/'), 1), sub(length(item().FileName), add(lastIndexOf(item().FileName, '/'), 1)))

Under the Destination property, set the destination Container and the destination directory and for the file name set the expression to

@substring(item().FileName, add(lastIndexOf(item().FileName,'/'), 1), sub(length(item().FileName), add(lastIndexOf(item().FileName, '/'), 1)))

which is same as the filename expression that was used for the source settings earlier.

Once all set, the pipeline should have four activities

When executed, the pipeline would dump all the files from the source directory into a single Destination directory on Azure, traversing the entire source directory structure recursively.

That's all....Thank you for reading !!!

0
Subscribe to my newsletter

Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sachin Nandanwar
Sachin Nandanwar