我试图使用python在Apache Nifi中运行executioncript进程,但是在我的数据流中将流文件传递给下一个处理器时遇到问题。
如果我运行独立的流文件创建和编写代码片段它可以工作,我可以在下一个处理器中读取流文件但是当我尝试丰富它时,它根本就不传递流文件。 事实上,没有产生错误,不知怎的,我不知道如何继续。 我对python和nifi有点新意,感谢你对这个特殊问题的帮助。
下面是我正在使用的代码,您可以看到它非常简单。 我只是想用一些逻辑创建并写一些字符串来流文件。 但到目前为止没有运气
import urllib2 import json import datetime import csv import time import sys import traceback from org.apache.nifi.processor.io import OutputStreamCallback from org.python.core.util import StringUtil class WriteContentCallback(OutputStreamCallback): def __init__(self, content): self.content_text = content def process(self, outputStream): try: outputStream.write(StringUtil.toBytes(self.content_text)) except: traceback.print_exc(file=sys.stdout) raise page_id = "dsssssss" access_token = "sdfsdfsf%sdfsdf" def scrapeFacebookPageFeedStatus(page_id, access_token): flowFile = session.create() flowFile = session.write(flowFile, WriteContentCallback("Hello there this is my data")) flowFile = session.write() session.transfer(flowFile, REL_SUCCESS) print "\nDone!\n%s Statuses Processed in %s" % \ (num_processed, datetime.datetime.now() - scrape_starttime) if __name__ == '__main__': scrapeFacebookPageFeedStatus(page_id, access_token)I am trying to run executescript process in Apache Nifi using python but having problem with passing flow file to next processor in my data flow.
If I run the standalone flow file creation and writing snippet it works and I can read flow file in the next processor but when I try to enrich it, it simply does not pass the flow file. In fact no error is generated and somehow I have no clue how to proceed. I am bit new with python and nifi and appreciate your help with this particular issue.
Below is the code I am using and you can see its very simple. I just want to create and write some string to flow file using some logic. But no luck so far
import urllib2 import json import datetime import csv import time import sys import traceback from org.apache.nifi.processor.io import OutputStreamCallback from org.python.core.util import StringUtil class WriteContentCallback(OutputStreamCallback): def __init__(self, content): self.content_text = content def process(self, outputStream): try: outputStream.write(StringUtil.toBytes(self.content_text)) except: traceback.print_exc(file=sys.stdout) raise page_id = "dsssssss" access_token = "sdfsdfsf%sdfsdf" def scrapeFacebookPageFeedStatus(page_id, access_token): flowFile = session.create() flowFile = session.write(flowFile, WriteContentCallback("Hello there this is my data")) flowFile = session.write() session.transfer(flowFile, REL_SUCCESS) print "\nDone!\n%s Statuses Processed in %s" % \ (num_processed, datetime.datetime.now() - scrape_starttime) if __name__ == '__main__': scrapeFacebookPageFeedStatus(page_id, access_token)最满意答案
我认为问题是检查__main__ :
if __name__ == '__main__': scrapeFacebookPageFeedStatus(page_id, access_token)__builtin__是我实验中的实际模块名称。 如果要保留单独的测试路径,可以删除该检查,也可以添加其他检查。
I believe the problem is the check for __main__:
if __name__ == '__main__': scrapeFacebookPageFeedStatus(page_id, access_token)__builtin__ was the actual module name in my experiment. You could either remove that check, or add a different one if you want to preserve your separate testing path.
更多推荐
发布评论