2016-06-01 33 views
9

Tôi đang làm việc với Apache NiFi 0.5.1 trên một kịch bản Groovy để thay thế các giá trị Json đến với các giá trị được chứa trong tệp ánh xạ. Các tập tin bản đồ trông như thế này (nó là một .txt đơn giản):Apache NiFi ExecuteScript: Tập lệnh Groovy để thay thế các giá trị Json thông qua một tệp ánh xạ

Header1;Header2;Header3 
A;some text;A2 

Tôi đã bắt đầu với những điều sau:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
     { inputStream, outputStream -> 

      def content = """ 
{ 
    "field1": "A" 
    "field2": "A", 
    "field3": "A" 

}""" 

      def slurped = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurped) 
      builder.content.field1 = "A" 
      builder.content.field2 = "some text" 
      builder.content.field3 = "A2" 
      outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
     } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS) 

bước đầu tiên này chỉ hoạt động tốt, mặc dù nó là mã hóa cứng và nó là xa là lý tưởng. Suy nghĩ ban đầu của tôi là sử dụng ReplaceTextWithMapping để có thể thực hiện các thay thế, tuy nhiên nó không hoạt động tốt với các tệp ánh xạ phức tạp (ví dụ: multi-columns). Tôi muốn thực hiện điều này hơn nữa, nhưng tôi không chắc chắn làm thế nào để đi về nó. Trước hết, thay vì chuyển toàn bộ JSON bị bẻ khóa, tôi muốn đọc flowfile đến. Làm thế nào là có thể trong NiFi? Trước khi chạy kịch bản như là một phần của ExecuteScript, tôi đã xuất ra tệp .Json với nội dung thông qua UpdateAttribute nơi filename = myResultingJSON.json. Hơn nữa, tôi biết làm thế nào để tải một tập tin .txt với Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8'), tuy nhiên làm thế nào để sử dụng các tập tin nạp để thực hiện các thay do đó kết quả JSON tôi sẽ trông như thế này:

{ 
    "field1": "A" 
    "field2": "some text", 
    "field3": "A2" 
} 

Cảm ơn bạn đã bạn giúp đỡ,

I.

EDIT:

sửa đổi đầu tiên với kịch bản không cho phép tôi đọc từ InputStream:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 

import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
     { inputStream, outputStream -> 

      def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8) 

      def slurped = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurped) 
      builder.content.field1 = "A" 
      builder.content.field2 = "some text" 
      builder.content.field3 = "A2" 
      outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
     } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS) 

Tôi đã sau đó chuyển sang thử nghiệm cách tiếp cận với ConfigSlurper và viết một lớp chung trước khi tiêm logic vào Groovy ExecuteScript:

class TestLoadingMappings { 

    static void main(String[] args) { 

     def content = ''' 
     {"field2":"A", 
     "field3": "A" 
     } 
     ''' 

     println "This is the content of the JSON file" + content 

     def slurped = new JsonSlurper().parseText(content) 
     def builder = new JsonBuilder(slurped) 

     println "This is the content of my builder " + builder 

     def propertiesFile = new File("D:\\myFile.txt") 
     Properties props = new Properties() 
     props.load(new FileInputStream(propertiesFile)) 
     def config = new ConfigSlurper().parse(props).flatten() 

     println "This is the content of my config " + config 

     config.each { k, v -> 
      if (builder[k]) { 
       builder[k] = v 
      } 
     } 
     println(builder.toPrettyString()) 
    } 

} 

Tôi đang trở lại với một groovy.lang.MissinPropertyException và điều này là bởi vì ánh xạ không đơn giản. Tất cả các trường/thuộc tính (từ trường1 đến trường3) đi vào InpuStream với cùng giá trị (ví dụ) và điều này có nghĩa là mỗi trường 2, ví dụ, có giá trị đó bạn có thể chắc chắn rằng nó sẽ hợp lệ cho hai thuộc tính kia. Tuy nhiên, tôi không thể có một trường ánh xạ ánh xạ "field2": "someText" bởi vì ánh xạ thực được điều khiển bởi giá trị đầu tiên trong tệp ánh xạ. Ở đây một ví dụ:

{ 
     "field1": "A" 
     "field2": "A", 
     "field3": "A" 

} 

Trong tập tin bản đồ của tôi, tôi có:

A;some text;A2 

Tuy nhiên field1 cần lập bản đồ đến A (giá trị đầu tiên trong file) hoặc giữ nguyên, nếu bạn muốn. Trường 2 cần ánh xạ tới giá trị trong cột cuối cùng (A2) và cuối cùng Field3 cần ánh xạ tới 'một số văn bản' trong cột ở giữa.

Bạn có thể trợ giúp việc này không? Đó là điều tôi có thể đạt được với Groovy và ExecuteScript. Nếu cần tôi có thể chia các tập tin cấu hình thành hai.

Ngoài ra, tôi đã xem nhanh tùy chọn khác (PutDistributedMapCache) và tôi không chắc mình đã hiểu cách tải cặp khóa-giá trị vào bộ nhớ cache bản đồ được phân phối. Dường như bạn sẽ cần phải có một DistributedMapCacheClient và tôi không chắc chắn liệu điều này có thể dễ dàng thực hiện.

Cảm ơn bạn!

EDIT 2:

Một số tiến bộ khác, tôi có bây giờ làm việc lập bản đồ, nhưng không chắc chắn lý do tại sao nó không thành công khi đọc dòng thứ hai của các thuộc tính file:

"A" someText 
"A2" anotherText 

class TestLoadingMappings { 

    static void main(String[] args) { 

     def content = ''' 
     {"field2":"A", 
     "field3":"A" 
     } 
     ''' 

     println "This is the content of the JSON file" + content 

     def slurper = new JsonSlurper().parseText(content) 
     def builder = new JsonBuilder(slurper) 

     println "This is the content of my builder " + builder 

     assert builder.content.field2 == "A" 
     assert builder.content.field3 == "A" 

     def propertiesFile = new File('D:\\myTest.txt') 
     Properties props = new Properties() 
     props.load(new FileInputStream(propertiesFile)) 
     println "This is the content of the properties " + props 
     def config = new ConfigSlurper().parse(props).flatten() 

     config.each { k, v -> 
      if (builder.content.field2) { 

       builder.content.field2 = config[k] 
      } 
      if (builder.content.field3) { 

       builder.content.field3 = config[k] 
      } 

      println(builder.toPrettyString()) 
      println "This is my builder " + builder 
     } 
    } 
} 

Tôi đang trở lại với: This is my builder {"field2":"someText","field3":"someText"}

Bất kỳ ý tưởng nào tại sao?

Cảm ơn bạn rất nhiều

EDIT 3 (Đã chuyển từ bên dưới)

Tôi đã viết như sau:

import groovy.json.JsonBuilder 
    import groovy.json.JsonSlurper 

    class TestLoadingMappings { 

     static void main(String[] args) { 

      def content = 
      ''' 
      {"field2":"A", 
      "field3":"A" 
      } 
      ''' 
      def slurper = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurper) 

      println "This is the content of my builder " + builder 

      def propertiesFile = new File('D:\\properties.txt') 
      Properties props = new Properties() 
      props.load(new FileInputStream(propertiesFile)) 
      def conf = new ConfigSlurper().parse(props).flatten() 

      conf.each { k, v -> 
      if (builder.content[k]) { 
       builder.content[k] = v 
      } 
      println("This prints the resulting JSON :" + builder.toPrettyString()) 
     } 
    } 
} 

Tuy nhiên, tôi đã phải thay đổi cấu trúc của các tập tin bản đồ như sau :

"field1"="substitutionText" 
"field2"="substitutionText2" 

Sau đó tôi đã 'kết hợp' Trình cấu hình vào thực thi eScript kịch bản, như sau:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import org.apache.commons.io.IOUtils 
import org.apache.nifi.processor.io.StreamCallback 

import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
     { inputStream, outputStream -> 

      def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 

      def slurped = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurped) 
      outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 

      def propertiesFile = new File(''D:\\properties.txt') 
      Properties props = new Properties() 
      props.load(new FileInputStream(propertiesFile)) 
      def conf = new ConfigSlurper().parse(props).flatten(); 

      conf.each { k, v -> 
       if (builder.content[k]) { 
        builder.content[k] = v 
       } 
      } 
      outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8)) 
     } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS) 

Vấn đề có vẻ là một thực tế mà tôi có thể không thực sự tái tạo logic trong các tập tin bản đồ gốc bằng cách sử dụng một cái gì đó tương tự như tạo ra cho trong TestLoadingMappings tôi. Như đã đề cập trong ý kiến ​​của tôi/chỉnh sửa trước đó, các bản đồ nên làm việc theo cách này:

field2 = nếu A sau đó thay thế cho "một số văn bản"

field3 = nếu A sau đó thay thế để A2

.. .

field2 = B sau đó thay thế cho "một số văn bản khác"

field3 = B sau đó thay thế để B2

và con trai trên.

Tóm lại, ánh xạ được điều khiển bởi giá trị đến trong InputStream (thay đổi), điều kiện ánh xạ tới các giá trị khác nhau tùy thuộc vào thuộc tính JSON. Bạn có thể đề nghị một cách tốt hơn để đạt được ánh xạ này thông qua một Groovy/ExecuteScript không? Tôi có tính linh hoạt trong việc sửa đổi tệp ánh xạ, bạn có thể thấy cách tôi có thể thay đổi nó để đạt được ánh xạ mong muốn không?

Cảm ơn

Trả lời

9

Tôi có một số ví dụ về cách đọc trong một tập tin lưu lượng chứa JSON:

http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html

Bạn đã có những cấu trúc bên phải phía trên; về cơ bản, bạn có thể sử dụng biến "inputStream" đó trong phần đóng để đọc nội dung tập tin luồng đến. Nếu bạn muốn đọc nó trong cùng một lúc (mà bạn có thể sẽ cần phải làm cho JSON), bạn có thể sử dụng IOUtils.toString() theo sau là một JsonSlurper, như được thực hiện trong các ví dụ trong các liên kết ở trên.

Đối với tập tin bản đồ của bạn, đặc biệt nếu JSON của bạn là "phẳng", bạn có thể có một file Java Properties, lập bản đồ tên của lĩnh vực này với giá trị mới:

field2 = một số văn bản

field3 = A2

Khám phá ConfigSlurper để đọc trong tệp thuộc tính.

Khi bạn đã nhập tệp JSON vào và đọc trong tệp ánh xạ, bạn có thể lấy các trường riêng lẻ của JSON sử dụng ký pháp mảng thay vì ký hiệu thành viên trực tiếp. Vì vậy, chúng ta hãy nói rằng tôi đọc các thuộc tính vào một ConfigSlurper, và tôi muốn ghi đè lên bất kỳ tài sản hiện có trong JSON đầu vào của tôi (được gọi là "json" cho ví dụ) với một từ tệp thuộc tính. Điều đó có thể trông giống như sau:

config.parse(props).flatten().each { k,v -> 
    if(json[k]) { 
    json[k] = v 
    } 
} 

Sau đó bạn có thể tiếp tục với outputStream.write() của mình.

Thay vì đọc bản đồ của bạn từ một tệp, bạn cũng có thể tải nó vào bộ nhớ cache được phân phối qua bộ xử lý PutDistributedMapCache. Bạn có thể đọc từ một DistributedCacheMapServer trong ExecuteScript của bạn, tôi có một ví dụ ở đây:

http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html

Nếu bản đồ của bạn rất phức tạp, bạn có thể muốn sử dụng bộ vi xử lý TransformJSON, mà sẽ có sẵn trong phiên bản tiếp theo của NiFi (0.7.0). Trường hợp Jira liên quan là ở đây:

https://issues.apache.org/jira/browse/NIFI-361

EDIT:

Để đối phó với chỉnh sửa của bạn, tôi đã không nhận ra bạn có nhiều quy tắc cho các giá trị khác nhau. Trong trường hợp này, một tệp thuộc tính có lẽ không phải là cách tốt nhất để biểu diễn ánh xạ. Thay vào đó, bạn có thể sử dụng JSON:

{ 
    "field2": { 
     "A": "some text", 
     "B": "some other text" 
     }, 
    "field3": { 
     "A": "A2", 
     "B": "B2" 
     } 
} 

Sau đó, bạn có thể sử dụng JSONSlurper để đọc trong tệp ánh xạ. Dưới đây là ví dụ về cách sử dụng tệp ánh xạ ở trên:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import org.apache.commons.io.IOUtils 
import org.apache.nifi.processor.io.StreamCallback 

import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

def mappingJson = new File('/Users/mburgess/mappings.json').text 

flowFile = session.write(flowFile, { inputStream, outputStream -> 

    def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
    def inJson = new JsonSlurper().parseText(content) 
    def mappings = new JsonSlurper().parseText(mappingJson) 

    inJson.each {k,v -> 
     inJson[k] = mappings[k][v] 
    } 
    outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8)) 
} as StreamCallback) 

session.transfer(flowFile, REL_SUCCESS) 
+0

Matt, cảm ơn bạn rất nhiều vì đã giúp bạn. Vui lòng xem phản hồi/chỉnh sửa của tôi trong bài đăng gốc. Tôi đã thực hiện một vài nỗ lực tại kịch bản Groovy + ConfigSlurper và tôi đang đấu tranh để có ánh xạ mong muốn. Cảm ơn bạn – paranza

+0

Cảm ơn Matt, tôi đã tự mình viết lại bản đồ và có định dạng Json không giúp được gì. Cám ơn bạn một lần nữa. – paranza

Các vấn đề liên quan