In the last part, we pushed jobs in faktory server. Now we need to write a Job runner or worker to fetch jobs and run them. We will write it in Golang:

Create Worker

If you haven’t installed Golang in your machine then I suggest you go through the docs and install the latest version. Let’s create a folder in our $GOPATH. ~/go/src/github.com/csvtojson/main.go The file will have the following code:

package main

import (
	"os"
	// this is the package required to create a client
    worker "github.com/contribsys/faktory_worker_go" 
)

// workerFunc ...
func workerFunc(ctx worker.Context, args ...interface{}) error {
	filePath := args[0].(string)
	convertToJSON(filePath)
	return nil
}

// main ...
func main()  {
   faktoryURL := "http://localhost:7419" // this is to be fetched from environment variable
   os.Setenv("FAKTORY_PROVIDER", "MY_FAKTORY_URL")
   os.Setenv("MY_FAKTORY_URL", faktoryURL)
   mgr := worker.NewManager() // main job handler

   // register job and function to execute them
   mgr.Register("csv_to_json", workerFunc)
   mgr.Concurrency = 20 // know your machine before setting this
   mgr.ProcessStrictPriorityQueues("default") // only fetch from default queue from redis

   // start processing jobs
   mgr.Run()
}

Now, let’s write the function that will take in the path of csv file as an argument and start processing it and convert everything inside and write to a equivalent JSON file. I will write comments in code but will not go into detail of the program. If you understand Golang then this should be easy. Create another file with the same package name: ~/go/src/github.com/csvtojson/converter.go

package main

import(
	"fmt"
	"os"
	"encoding/csv"    // primary packages to process a csv
	"encoding/json"   // primary packages to process json
)

// this is the data we get from csv, every row contains these data
type UserInfo struct { 
	Name string
	TxCount int
	TotalGain float
	TotalAsset float
	AssetName string
}

func convertToJSON(filePath *string) { 
	csvFile, err := os.Open(*filePath)
	
	if err != nil { 
		fmt.Println("Error while opening file:  ", err)
		return
	}
	defer CsvFile.Close()     // this ensures file is closed after the completion of the function
	
	reader := csv.NewReader(csvFile)    // method from the csv package we are using
	content, _ := reader.ReadAll()      // read and start storing everything in content variable
	
	if(len(content) < 1) {
		fmt.Println("Error while reading data: No Data Present")
		return 
	}
	
	// get all the headers from the csv file
	var allRecords []UserInfo
	var singleRecord UserInfo
	
	for _, row := range content { 
		singleRecord.Name = row[0]
		singleRecord.TxCount, _ = strconv.Atoi(row[1])
		singleRecord.TotalGain, _ = strconv.ParseFloat(row[2])
		singleRecord.TotalAsset, _ = strconv.ParseFloat(row[3])
		singleRecord.AssetName = row[4]
		allRecords = append(allRecords, singleRecord)
	}
	
	// convert the array into json
	jsonData, err := json.Marshal(allRecords)
	
	if err != nil { 
		fmt.Println("Error while marshal array to json")
		return
	}
	
	// create a jsonFile and insert the data
	jsonFile, err := os.Create(*filePath + ".json")
	
	if err != nil { 
		fmt.Println("Error while writing data to json file: ", err)
		return
	}
	
	defer jsonFile.Close()
	
	jsonFile.write(jsondata)
	jsonFile.Close()
}

Let’s build and run the program. From the root directory:

go build
./csvtojson

Now, this program can pull in jobs in the faktory server, process them. The best thing about this is we can build programs which can be fast, utilize concurrency. This binary is going to run as a single process which will consume faktory jobs.

  • Faktory is a standalone binary and needs a worker process to consume jobs(like the one we wrote above)
  • Faktory uses RocksDB for embedded datastore internally to persist all job data, queues, errors etc whereas sidekiq uses redis, which is dumb. Sidekiq is written in ruby, to access data there is network call involved.
  • Sidekiq is limited to only one language, that’s ruby, you can’t execute jobs with golang, python or javascript.