Join devRant
Do all the things like
++ or -- rants, post your own rants, comment on others' rants and build your customized dev avatar
Sign Up
Pipeless API
From the creators of devRant, Pipeless lets you power real-time personalized recommendations and activity feeds using a simple API
Learn More
Search - "apache beam"
-
Entering Week4 post-layoff. Week2 of pretty much nothing but playing with my kids, doing house chores, exercising and job searching.
I spent like 3 hours in the gym last Friday. Instructor there turned to me and said "tough divorce?". To what I answered "very happily married, got laid off from work". He said that it would be his second guess.
Even before this whole crap I had enough cash flow-yielding investments to just about make rent. My wife makes enough to make sure we will want for nothing, our old folks have our kids' tuition fees covered, and we have some savings anyway.
But the anxiety-laden period between "send a dozen messages and resumė's" and having the same "greetings, fellow millenial!" meetings with different sets of tech-illiterate boomers and toddlers is becoming a boring nuisance, one that "having a side project to keep my mind warm" could solve.
Maybe I will fix the Stardew Valley Mods API for Android. I haven't done the C#/.NET thing since uni, and my frontend Java game is weak (at best) but how much could have it changed this last decade or so? /s
Maybe I will write a MongoDB Runner for Apache Beam. But I'm afraid that won't yeld enough street cred to be worth it Does anyone knows what it means?
Maybe I will finally be done consolidating a lifetime of cloud storage into a big-kid glacier-level LTS solution.
Dunno, bored here. Need some 20h/week project I can quit as soon as some job appears to be lining up. Ideas?1 -
I am trying to extract data from the PubSub subscription and finally, once the data is extracted I want to do some transformation. Currently, it's in bytes format. I have tried multiple ways to extract the data in JSON format using custom schema it fails with an error
TypeError: __main__.MySchema() argument after ** must be a mapping, not str [while running 'Map to MySchema']
**readPubSub.py**
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing
class MySchema(typing.NamedTuple):
user_id:str
event_ts:str
create_ts:str
event_id:str
ifa:str
ifv:str
country:str
chip_balance:str
game:str
user_group:str
user_condition:str
device_type:str
device_model:str
user_name:str
fb_connect:bool
is_active_event:bool
event_payload:str
TOPIC_PATH = "projects/nectar-259905/topics/events"
def run(pubsub_topic):
options = PipelineOptions(
streaming=True
)
runner = 'DirectRunner'
print("I reached before pipeline")
with beam.Pipeline(runner, options=options) as pipeline:
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)
| "Writing to console" >> beam.Map(print))
print("I reached after pipeline")
result = message.run()
result.wait_until_finish()
run(TOPIC_PATH)
If I use it directly below
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| "Writing to console" >> beam.Map(print))
I get output as
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:42:52.283 UTC',
'event_id': 'Game_Request_Declined',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9140',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:54:38.297 UTC',
'event_id': 'Decline_Game_Request',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9905',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
Please let me know if I m doing something wrong while parsing the data to JSON. Also, I am looking for examples to do data masking and run some SQL within Apache Beam4